見出し

AirFlowのDAG 定義パラメータ解説編

   2023年02月16日     12分で読めます

AirFlow での DAG の作成時に定義するパラメータについて解説していこうと思う。 DAG は Python で AirFlow ライブラリーを使用することで簡単に DAG を記述することができる。 以下のサンプルコードでは AirFlow の’DAG’クラスを import して使用し、DAG クラスのインスタンス生成時にパラメーターを定義して DAG を生成する。

尚、本記述は AirFlow は AWS にある MWAA で実装されている Ver.2.0.2 を想定する。

DAG のパラメータは AirFlow のバージョンによって若干異なるので詳細は公式ページの確認をおすすめします。 公式ページ

AirFlowDAG画面

from datetime import timedelta
from airflow import DAG

args = {
    'owner': 'airflow',

}

with DAG(
    dag_id='example_DAG',
    description='DAGのサンプル',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 2, 15, 0, 0, tzinfo=None),
    end_date=datetime(2025, 2, 15, 0, 0, tzinfo=None),
    default_args=args,
    params={"example_key": "example_value"},,
    concurrency=1,
    max_active_runs=3,
    dagrun_timeout=timedelta(minutes=60),
    sla_miss_callback='[user def]',
    default_view='tree',
    orientation='LR',
    catchup=False,
    on_failure_callback='[user def]',
    on_success_callback='[user def]',
    access_control={'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}},
    is_paused_upon_creation=False,
    tags=['example', 'example2'],
    params={"example_key": "example_value"},

) as dag:

DAG のパラーメーター

上記サンプルで定義しているパラメーターについて解説していく。

dag_id (str)


定義必須

DAG の ID。 英数字、ダッシュ、ドット、およびアンダースコア (すべて ASCII) のみで構成する必要があります。AirFlow の UI 上で表示される DAG 名にあたるものがこの DAG_ID となるので識別しやすい命名規則で定義するのが好ましいです。

description (str)


定義推奨

DAG の説明。 AirFlow の UI 上に表示されるので、この部分で詳細な内容を説明することが好ましいです。(日本語使用可)

schedule_interval (datetime.timedelta 又は dateutil.relativedelta.relativedelta 又は str that acts as a cron expression)


定義必須

DAG の実行頻度(実行周期)を定義します。この timedelta オブジェクトは、最新のタスク インスタンスの execution_date に追加され、次のスケジュールを把握します。schedule_intervalの記述方法は複数あるため後述で説明します。私は 汎用性が高いためcronで定義しています。 尚 DAG をトリガー実行したい場合はnoneと定義

cronでの定義方法

これ以外にもcronだともっと複雑なスケジュールを定義できる。 cronの定義ついては WikiPedia を参照することをお勧めする。 WikiPedi:cron について

Preset意味cron
noneスケジュールしないで「外部トリガー」のみで DAG を実行 
@hourly一度だけスケジュールする0 * * * *
@daily1 時間に 1 回、開始時に実行する0 0 * * *
@weekly週に一度、日曜日の朝の深夜に走る0 0 * * 0
@monthly月に一度、月の最初の日の深夜に実行0 0 1 * *
@yearly年に一度、1 月<>日の深夜に開催0 0 1 1 *
@once一度だけスケジュールする0 0 1 1 *
@once一度だけスケジュールする 
@once一度だけスケジュールする 

start_date (datetime.datetime)


定義必須

スケジューラが実行周期に従い DAG の実行を開始する日時を定義します。未来日を設定するとその日になるまで実行されません。※こちらは必須項目です。

end_date (datetime.datetime)


基本的に定義不要

スケジューラが実行周期の DAG の実行を停止する日時を定義します。未来日を定義しその日をすぎると DAG が実行されなくなります。(未定義の場合 DAG は永久に動き続けます。) 対象 DAG の廃止が決まっている場合など、事前に設定しておけば手動で当日停止作業をする必要がないなどのメリットがあります。

template_searchpath (str or list[str])


定義不要

このリスト (非相対) は、jinja がテンプレートを検索する path を定義します。 ただし順序が重要な点とです。 jinja/airflow には、デフォルトで DAG ファイルのパスが含まれているので、カスタムで定義する場合には DAG ファイルのパスも定義しておく必要があります。 jinjaについては別途纏めてみようと思います。

template_undefined (jinja2.undefined)


基本的に定義不要

jinjaのテンプレートパスtemplate_searchpathした場合、テンプレート未定義型の設定をする必要がある。定義についてはjinjaライブラリーのについて確認することが好ましい。

user_defined_macros (dict)


定義不要

jinja テンプレートで公開されるマクロの辞書。 たとえば、この引数に渡すと、この DAG に関連するすべての jinja テンプレートを使用できるようになります。 ここで任意のタイプのオブジェクトを渡すことができることに注意してください.dict(foo='bar') と dict で定義すると呼び出せる``しかし、DAG 内で定義する場合普通に変数を使えば良いので基本使用しない。

user_defined_filters (dict)


基本的に定義不要

jinja テンプレートで公開されるフィルターのディクショナリ。 例 DAG.dict(hello=lambda name: 'Hello %s' % name)world 私もこの定義について完全には内容は理解できていないのでjinjaライブラリーのについて確認することが好ましい。

default_args (dict)


定義推奨

DAG 内の Operator を初期化するときにコンストラクターのキーワードのパラメーターを設定できる。つまり、DAG 内で定義する TASK のオペレーターの引数をdictで定義することができるので複数の TASK を作るとき難度も同じ定義を書かなくてよくなる。尚、各オペレーターで別途パラメータを定義した場合そちらが優先される。 つまり、Operatorで定義するパラメータのデフォルト値の設定

params (dict)


定義任意

テンプレートでアクセスできる DAG レベルのパラメータのディクショナリで、params の下にnamespaceがあります。 これらのパラメーターは、Task レベルでオーバーライドできるので使いまわすパラメータは定義すると便利である。

concurrency (int)


基本定義不要

同時に実行できるタスク インスタンスの数を定義できる。 基本定期に定義不要の項目ではあるが、並列タスクが多すぎると executerworkerを占有してしまい他の DAG の実行時間に影響する場合がある場合 同時に実行できるタスクを制限することで対策したり、AirFlow の実行環境リソース不足の 回避策としても使える。

max_active_runs (int)


定義推奨

DAG の実行最大数を設定できる。基本的には1と定義しておくのが良いと思う。1以上の値を使う用途としては、過去のスケジュールを遡って実行する際、1 スケジュールづつ再実行すると効率が悪い場合や、実行頻度が短く、前のスケジュール DAG が実行中の時でも実行周期が来れば次のスケジュールが実行されるようにする場合2~3など定義しておくとよいと思う。

dagrun_timeout (datetime.timedelta)


定義推奨

DAG の実行のタイムアウトを定義できる。想定の実行時間より時間がかかっている場合、処理を停止し直ちにアラートを出す必要がある場合定義しておくことで、リソース確保等が期待できる。

sla_miss_callback (types.FunctionType)


定義任意

タスクが定義された SLA を満たしていない場合に呼び出されます。例)SLA タイムアウトを報告するときに呼び出す。基本的に関数名を定義すれば使用できる。 用途としては、タイムアウトした場合、webhook を使用してアラートを飛ばしたり、タイムアウトの場合の必要な処理を実行したい場合等にあらかじめ、作成した関数を実行するようにできる。

SLA とは Service Level Agreement の略であると思われる。

default_view (str)


定義任意

AirFlow の DAG 一覧画面で DAG を選択したときに一番最初に表示される画面を設定できる。tree, graph, duration, gantt, landing_timesの中から文字列で定義する。定義しない場合TreeView が表示される。

orientation (str)


定義任意

グラフ ビューで DAG 方向を指定できる。LR,TB,RL,BTの中から文字列で定義する。 LR=左から右,TB=上から下,RL=右から左,BT=下から上 未定義の場合 LR(右から左)が適応される。

catchup (bool)


定義任意

前回実行時点まで遡って実行するかどうか。 未定義の場合Trueとなり遡って実行されます。 例えば新たに DAG を作成し、DAG のstart_dateをデプロイする 2 日前に定義してデプロイして スケジュールを有効にするとstart_dateの日付に遡って DAG が実行される。 公式でも解説しているのでわかりにくい場合にはリンクのコードサンプルを参照することを進めます。 catchup について-AirFlow 公式

on_failure_callback (callable)


定義任意

DAG の処理が失敗したときに呼び出される関数名を定義できる。この関数には、context(dict)が 1 つのパラメータとして渡されます。

on_success_callback (callable)


定義任意

DAG の処理が成功したときに呼び出される関数名を定義できる。この関数には、context(dict)が 1 つのパラメータとして渡されます。

access_control (dict)


定義任意

DAG のアクセスレベル許可を指定できる。 複数のユーザーで横断して AirFloww を使用する場合 roll 分けしておくと外部から DAG を編集されるリスクが減るというメリットがある。 例: {'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}

is_paused_upon_creation (bool or None)


定義不要

初めて作成するときに DAG を一時停止するかどうかを指定します。 DAG が既に存在する場合、このフラグは無視されます。 このオプションのパラメーターが指定されていない場合、グローバル構成設定が使用されます。 Trueで新規 DAG 作成に一時停止される。

jinja_environment_kwargs (dict)


基本定義不要

テンプレートのレンダリングのために Jinja 環境に渡される追加の構成オプション 例: Jinja がテンプレート文字列から末尾の改行を削除しないようにする

DAG(dag_id='my-dag',
    jinja_environment_kwargs={
        'keep_trailing_newline': True,
        # some other jinja2 Environment options here
    }
)
参照: Jinja 環境のドキュメント

tags (List[str])


定義推奨

AirFlow の UI 上で DAGS をフィルタリングするのに役立つタグのリスト。

DAGサンプル

DAGのSampleファイルのリンクをここに置いておきます。 AirFlowのDAGフォルダーに置くことでDAG取り込まれUI画面で見ることができるようになります。

サンプルファイル

参照

AirFlowの環境構築に関してはこちらを参照してみてください。