見出し

AirFlowのDAGの基本的な作成方法

   2023年02月17日     4分で読めます

AirFlow は DAG の中に複数の Task を定義して処理を組み込むことができる。タスクを定義するには AirFlow のオペレータを 使用することで簡単に定義することができる。ここではBashOperatorDummyOperatorを利用して DAG を作成する手順を解説していく。 尚、本記述は AirFlow は AWS にある MWAA で実装されている Ver.2.0.2 を想定する。

DAG の各種Operatorは AirFlow のバージョンによって若干異なるので詳細は公式ページの確認をおすすめします。 公式ページ

DAG ファイルの作成

まず DAG を作成するには任意の名前でpythonファイルを作成する。 DAG ファイルは AirFlow の DAG フォルダーに配置する。

AirFlow_DAGの置き場所

DAG の定義

DAG ファイルを作成したら、ソースコードを記述していく、まずは import ブロックを記述して最低限のimportの宣言を行う。

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from datetime import timedelta, datetime

その後 DAG の各パラメータの定義を行う。今回はrun_thisrun_this_lastの Task を定義して >> ビットシフト演算子で依存関係を定義する。

# args Operatorで使用するパラメーターのデフォルト値を定義する
# *dictで定義すること。
# 今回はownerのみを定義
args = {
    'owner': 'airflow',
}

# DAGの定義フォーマット
dag = DAG(
    dag_id='example_DAG',  # DAGのID名
    description='DAGのサンプル',  # DAGの説明
    default_args=args,  # Operatorのデフォルト
    schedule_interval='10 10 * * *',  # 実行スケジュール cron形式で定義 毎日10時10分に実行
    start_date=datetime(2023, 2, 15, 0, 0, tzinfo=None),  # daterime形式で記述 2023-2-15 00:00に以降にスケジュールが有効
    dagrun_timeout=timedelta(minutes=60),  # DAGの実行タイムアウト時間をtimedeltaで定義 60分でタイムアウト
    tags=['example', 'example2'],  # TAG名を定義 listで定義して複数設定可能
    params={"example_key": "example_value"},  # Operatorで使用する Taskのパラメータをdict型で定義
)

Task の定義

DAG の定義が終われば次に Task の定義をしていく。今回はBashOperatorDummyOperatorを使用して Task を定義する。

with dag(
    # [START howto_operator_bash]
    run_this = BashOperator(
        task_id='run_this',  # Task_IDを定義
        bash_command='echo 1',  # BashOperatorで実行するコマンドを記述

    )
    # [END howto_operator_bash]

    run_this_last = DummyOperator(
        task_id='run_this_last',  # Task_IDを定義
    )
)
# Task_IDの依存関係を`>>`で定義
run_this >> run_this_last

このように DAG を定義して AirFlow を起動すると以下のように AirFlow の UI 画面で、DAG がインポートされる。

AirFlow_UI画面

AirFlow からインポートされた DAG のコードも確認することができる。

AirFlow_UI画面2

最後に

今回は基本的な DAG の作成方法について解説してきたが、次回は Task 定義で使用している。Operator について解説していく。

参照

DAG 定義のパラメーターについてや概要はこちらから

AirFlow の環境構築方法(インストール方法)はこちらから