Airflowのschedule関連について - 11/01/2022

Airflowのschedule関連について

Airflow


概要

Airflowのscheule関連についてしっかりと理解できていなかったので、DataPipelines with ApacheAirflowのCAPTER3「Scheduling in Airflow」を参考にしつつ雑にまとめてみた。

TL;DR

schedule_intervalについて

「None」の場合、UIもしくはAPIからの実行のみがトリガーとなる。

dag = DAG(
    dag_id="unscheduled",
    start_date=dt.datetime(2019, 1, 1),
    schedule_interval=None
)

「Cron」を使用することができる。

dag = DAG(
    dag_id="use_cron",
    start_date=dt.datetime(2019, 1, 1),
    schedule_interval="0 0 * * *",
)

「@daily」などのairflowが提供しているmacroも使用することができる。

dag = DAG(
    dag_id="use_macro",
    start_date=dt.datetime(2019, 1, 1),
    schedule_interval="@daily"
)

「start_date」から「schedule_interval」を足した時間(start + interval)が初回実行になるため。

スクリーンショット 2022-08-07 15.58.34.png

「timedeltaインスタンス」を渡して使用することもできる。

これによってCronなどで実現できない「n日間隔で実行する」などのschedulingができる。

import datetime at dt

dag = DAG(
    dag_id="use_timedelta",
    schedule_interval=dt.timedelta(day=3),
    start_date=dt.datetime(year=2019, month=1, day=1),
)

「start_date=2019,1,1からschedule_interval=3日後(2019,1,3)分の実行となる」

「end_date」を使用して最終実行時間を指定することもできる。

dag = DAG(
    dag_id="use_enddate",
    start_date=dt.datetime(2019, 1, 1),
    schedule_interval="@daily",
    end_date=dt.datetime(2019, 1, 5)
)

日付関連のJinja Templateについて

Operatorに直接書くことができる。

BashOperator(
    task_id="fetch_events",
    bash_command={
        "mkdir -p /data/events && "
        "start_date={{ds}}&end_date={{next_ds}}"
    }
)

引数に渡すこともできる。

def _function_test(**context):
    """function test"""
    print(f"{context['templates_dict']['start_date']}"
          f"{context['templates_dict']['end_date']}")

fetch_events = PythonOperator(
    task_id="fetch_events",
    python_callable=_function_test,
    templates_dict={
        "start_date": "start_date={{ds}}",
        "end_date": "end_date={{next_ds}}"
    }
)

interval-basedな日付の取得の仕方をするので注意

Backfillについて

defaultで有効になっているが無効にしたい場合は「catchup」をFalseに

dag = DAG(
    dag_id="catchup_test",
    schedule_interval="@daily",
    start_date=dt.datetime(year=2019, month=1, day=1),
    catchup=False
)
catchup_by_default = False

share on twitter suggests change