AirFlowについて
Airflow とは、2014 年に Airbnb 社が開発したオープンソースであり、2016 年より Apache 財団となる。開発言語は Python で、ワークフローエンジンに該当するワークフロー管理ツールです。類似のもので言うと digdag や argo などが該当します。
Airflow は、予め決められた順序を基に、処理を実行するワークフローを Python プログラムで作成する。また、スケジュールや監視を行う事が可能。
ワークフローはタスクの有向非巡回グラフ(DAG)を作成する事により、タスクを実行する。 具体的には python でファイルを実装して処理順番を指定していきます。
DAG とは
DAG とは、Directed acyclic graph の略であって、有向非巡回グラフを指します。方向性があり後戻りをしない特徴があるので、複数に分岐した後で元に戻らなければ良いという特徴があります。
重要なのは、エッジ(線)のベクトルが有向で、さらに一度通ったノード(点)には戻ってこれないという非巡回性の特性を持つ事である。
アーキテクチャ
Apache Airflow を理解するために、アーキテクチャを理解していく。
Apache AirFlow Component
AirFlow には以下のコンポーネントで構成されている。
Webserver:UI 部
DAG とタスクの動作を検査、トリガー、デバッグするための便利なユーザー インターフェイスを提供するScheduler:スケジュール部
スケジュールされたワークフローのトリガーと、実行するための Executor へのタスクの送信の両方を処理する。- Executer:実行部
実行中のタスクを処理するエグゼキュータ。デフォルトの Airflow インストールでは、スケジューラ内ですべてを実行しますが、ほとんどの本番環境に適したエグゼキュータは、実際にはタスクの実行をワーカーにプッシュします。 - DAG Directory
Scheduler と Executer (およびエグゼキューターが持つすべてのワーカー) によって読み取られる DAG ファイルのフォルダー
Apache AirFlow プログラムアーキテクチャ
Airflow のプログラムは以下のように大きく3つに区分される。
- DAG : どのような順序で実行されるかを記述したプログラム
- Operator : プログラムを実行するためのテンプレート
- Task : 実行する処理
Task の定義に Operator が記述されており、Python で処理を実行するための PythonOperator Bash 実行のための BashOperator、他各種 RDBMS や Hive、AWS や GCP など 様々なサービスの API をコールする Operator が用意されている。 これらを様々なオペレーターを使用することで様々なタスクをスピーディーにくみ上げることができる。
その他機能
その他 Airflow の機能については以下が存在する。
- Connection:各種データストアへの接続情報を管理
- Hook:Connection を使ってデータストアにアクセスしたり、データを load/dump するためのメソッド
- Pools:タスクの並列数を管理
- Queue:Celery のような、外部のキューイングシステムをジョブキューとして利用可能
- Branching:DAG 中での条件分岐を実現
- SLA:一定時間内に成功しなかった task を管理者にメール通知
UI の見方
ここでは Airflow で使用する主要の 2 画面について解説します。
DAGs 画面
DAG の有効スイッチ
DAG スケジュール、DAG トリガーを有効にするスイッチ。クリックで確認なく ON、OFF が切り替わるので注意が必要。TAG 名
DAG ファイルの parameter で TAG 名を設定しておくことで TAG 名が表示される。
TAG の命名ルールを整理しておくと DAG の整理等が行いやすくなる。DAG 名
DAG ファイルのパラメターで設定したdag_id
で設定した ID が表示される。
DAG のファイル名ではないので注意が必要。オーナー名
DAG ファイルの parameter でオーナー名を設定して置くことでオーナー名が表示される。複数のチームで Airflow を使用する場合、所有者(チーム)を把握できるようになる。- 累積実行結果
実行結果(累積)が出力される。 - 実行スケジュール
DAG ファイルの parameter でSchedule_interval
を設定しておくことで実行周期が表示される。
実行周期の表記方法についてはcron表記方法、またはプリセットで表記される。 - 直近の Task の実行状況
現在実行している task または、直近の task の実行結果を確認することができる。 DAG 操作
左から手動実行
・ステータス更新
・DAGの物理削除
の操作を行うことができる。- DAG 名検索
DAG 名入力して複数ある DAG の中から対象の DAG を探すことができる。 - TAG 名検索
TAG 名入力して複数ある DAG の中から対象の DAG を探すことができる。
DAG 詳細画面(Tree View)
DAG の有効スイッチ
DAG スケジュール、DAG トリガーを有効にするスイッチ。クリックで確認なく ON、OFF が切り替わるので注意が必要。DAG 名
DAG ファイルのパラメターで設定したdag_id
で設定した ID が表示される。
DAG のファイル名ではないので注意が必要。グラフビュー
task の依存関係をグラフ形式で表示することができる。- タスクの実行間隔
タスクの実行間隔を分析する画面に移動 - タスクの試行時間
- タスクの終了時間
タスクの実行結果のガントチャート画面
DAG の設定値確認画面
DAG を定義しているソースコード
- task の実行ステータスについての説明欄
- Tree View で見る task の実行結果の抽出期間の設定
- Tree View で実行結果を確認することができる。
DAG の作成
AirFlow の DAG を作成するためには.\opt\airflow\dags
へ DAG ファイルの置く必要がある。今回は以下の Sample ファイルを自身の環境の dags フォルダーへ格納しよう。
Sample_DAG ファイル
ファイルを設置後(30 秒後ほど)に AirFlow の画面を更新すると画面上で DAG が追加されていることが確認できる。
Tree ビューで実際の Tree 構造と Sample のソースコードを確認しながら記述方法について予習しておくとよいだろう。 ※AirFlow の環境構築についてはこちら
また、DAG の記述 Sample は公式で様々なケースモデルが提供されているので こちらを参考にすると良いだろう。
公式 Sample 集
参考
DAGについてはこちらもお勧め。