見出し

AirflowでDAG間の依存関係を定義する

   2023年02月18日     5分で読めます

Airflow で DAG 間の依存関係を定義するには external_task_sensor を使用して観測している DAG が終了したら後続の Task を実行させることができる。 実際には Airflow で DAG 同士の依存関係を定義する方法は 2 つある。1 つは TriggerDagRunOperator を使う方法と、 ExternalTaskSensor を使う方法だ。 今回はexternal_task_sensorについて詳しく解説していこうと思う。

尚、本記述は AirFlow は AWS にある MWAA で実装されている Ver.2.0.2 を想定する。

DAG の各種Operatorは AirFlow のバージョンによって若干異なるので詳細は公式ページの確認をおすすめします。 公式ページ

external_task_sensor の定義

external_task_sensorは別の DAG または別の DAG 内のタスクが完了するまで待機します。 external_task_sensorBaseOperatorを継承しているので通常の Operator と同様に定義できる。 BaseOperatorのパラメータについては別で解説していこうと思う。

from airflow.sensors.external_task import ExternalTaskSensor

Check_Task = ExternalTaskSensor(
    task_id="Check_Task",
    external_dag_id=external_dag_id,  # チェックするDAGのID
    external_task_id=external_task_id,  # チェックするTaskのID
    execution_delta=timedelta(minute=10),  # チェックするDAGとの実行時間差
    timeout=600,  # タイムアウトの時間10分
    allowed_states=["success"], # 成功判定するステータス
    failed_states=["failed", "skipped"], エラー判定するステータス
    check_existence=True,
)

サンプルファイル

external_task_sensor のパラメーター

上記サンプルで定義しているパラメーターについて解説していく。

定義必須

task_id(str)

定義必須

BaseOperatorから継承したパラメータ Task_id を定義する。

external_dag_id (str)

定義必須

クリアする必要がある依存タスクを含む dag_id。

external_task_id (str)

定義必須

クリアする必要がある依存タスクの task_id。

allowed_states (反復可能)

定義任意

許可された状態の反復可能、デフォルトは[‘success’]

failed_states (反復可能)

定義任意

失敗または許可されていない状態の反復可能、デフォルトは None

execution_delta (Optional[datetime.timedelta])

定義任意

前回の実行との時差 既定値は、現在のタスクまたは DAG と同じ execution_date です。 昨日の場合は、[positive!] datetime.timedelta(days=1) を使用します。いずれもexecution_deltaまたは execution_date_fnを渡すことができます。ただし両方ではありません。

execution_date_fn (オプション[呼び出し可能])

定義任意

現在の実行日を最初の日付として受け取る関数 位置指定引数と、オプションで任意の数のキーワード引数を コンテキストディクショナリであり、クエリに必要な実行日を返します。 execution_deltaまたはexecution_date_fnのいずれかを外部タスクセンサーに渡すことができます。 しかし、両方ではありません。

check_existence (bool) – 外部タスクが存在するかどうかを確認するには True に設定します ( external_task_id が None でない) か、待機する DAG が存在するかどうかを確認します ( external_task_id なし)、外部タスクの場合はすぐに待機を停止します または DAG が存在しません (既定値: False)

execution_deltaの定義で注意する点

execution_delta ([datetime.timedelta])のパラメーター定義する際に気を付けないといけない。 執者もexecution_deltaでは一度はまった経験があるので備忘録のため解説しておく。 execution_deltaで設定する値はDAGクリアする必要のあるDAGの``の時間になるように定義する必要があります。 例えばDAG_Aが10時00分実行でDAG_Bが11時00分実行でDAB_Aのクリアを定義しないといけない場合、execution_delta=timedelta(minute=60)となる つまり、チェックするDAGとの実行時間差を設定する必要がある。この時時間差はぴったりに定義しないとexternal_task_sensorがうまく動作しないので注意が必要です。

終わりに

今回はexternal_task_sensorについてまとめてみた。 特にDAG間の依存関係をすることは実践ではよくあることので使い方をマスターしておこう。

関連記事

DAG 定義のパラメーターについてや概要はこちらから

AirFlow の環境構築方法(インストール方法)はこちらから

参照