AirFlowのDAGの基本的な作成方法
AirFlow は DAG の中に複数の Task を定義して処理を組み込むことができる。タスクを定義するには AirFlow のオペレータを 使用することで簡単に定義することができる。ここではBashOperator
とDummyOperator
を利用して DAG を作成する手順を解説していく。 尚、本記述は AirFlow は AWS にある MWAA で実装されている Ver.2.0.2
を想定する。
DAG の各種
Operator
は AirFlow のバージョンによって若干異なるので詳細は公式ページの確認をおすすめします。 公式ページ
DAG ファイルの作成
まず DAG を作成するには任意の名前でpython
ファイルを作成する。 DAG ファイルは AirFlow の DAG フォルダーに配置する。
DAG の定義
DAG ファイルを作成したら、ソースコードを記述していく、まずは import ブロックを記述して最低限のimport
の宣言を行う。
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from datetime import timedelta, datetime
その後 DAG の各パラメータの定義を行う。今回はrun_this
とrun_this_last
の Task を定義して >>
ビットシフト演算子で依存関係を定義する。
# args Operatorで使用するパラメーターのデフォルト値を定義する
# *dictで定義すること。
# 今回はownerのみを定義
args = {
'owner': 'airflow',
}
# DAGの定義フォーマット
dag = DAG(
dag_id='example_DAG', # DAGのID名
description='DAGのサンプル', # DAGの説明
default_args=args, # Operatorのデフォルト
schedule_interval='10 10 * * *', # 実行スケジュール cron形式で定義 毎日10時10分に実行
start_date=datetime(2023, 2, 15, 0, 0, tzinfo=None), # daterime形式で記述 2023-2-15 00:00に以降にスケジュールが有効
dagrun_timeout=timedelta(minutes=60), # DAGの実行タイムアウト時間をtimedeltaで定義 60分でタイムアウト
tags=['example', 'example2'], # TAG名を定義 listで定義して複数設定可能
params={"example_key": "example_value"}, # Operatorで使用する Taskのパラメータをdict型で定義
)
Task の定義
DAG の定義が終われば次に Task の定義をしていく。今回はBashOperator
とDummyOperator
を使用して Task を定義する。
with dag(
# [START howto_operator_bash]
run_this = BashOperator(
task_id='run_this', # Task_IDを定義
bash_command='echo 1', # BashOperatorで実行するコマンドを記述
)
# [END howto_operator_bash]
run_this_last = DummyOperator(
task_id='run_this_last', # Task_IDを定義
)
)
# Task_IDの依存関係を`>>`で定義
run_this >> run_this_last
このように DAG を定義して AirFlow を起動すると以下のように AirFlow の UI 画面で、DAG がインポートされる。
AirFlow からインポートされた DAG のコードも確認することができる。
最後に
今回は基本的な DAG の作成方法について解説してきたが、次回は Task 定義で使用している。Operator について解説していく。
参照
DAG 定義のパラメーターについてや概要はこちらから
AirFlow の環境構築方法(インストール方法)はこちらから