こんにちは。松井です。
みなさんは、DAGの実行完了後に後続で別のDAGを実行するというように、DAG間に依存関係を持たせたいと思ったことはありませんか?
本記事では、TriggerDagRunOperatorやExternalTaskSensorを用いたDAG間の依存関係の作り方を紹介します。
本記事でわかること
- DAG間の依存関係の作り方
環境およびAirFlowのバージョンについて
本記事では、Cloud Composer環境(メジャーバージョン2)の
AirFlowバージョン2.7.3を前提としています。
それ以外のバージョンではパラメーターなどが異なる場合があります。
TriggerDagRunOperatorについて
TriggerDagRunOperatorは、指定したDAGをキックします。
親DAGから複数の子DAGをキックしたい場合などはこちらを使用します。
TriggerDagRunOperatorの使い方
下記コードではTriggerDagRunOperatorのパラメーターとして、
タスクIDとトリガーDAGIDの2つを指定しています。
# 親DAG
import datetime
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
DAG_ID = "trigger_test_dag"
with models.DAG(
DAG_ID,
schedule=None,
max_active_runs=1,
start_date=datetime.datetime(2024, 12, 10),
) as dag:
# 子DAGをキック
trigger_b = TriggerDagRunOperator(
task_id="trigger_b",
trigger_dag_id="trigger_test_dag_b",
)
complete = BashOperator(
task_id="complete",
bash_command="echo complete",
)
trigger_b >> complete
上記を実行すると、子DAGとなる「trigger_test_dag_b」が即時実行された後、
実行結果を待たずに後続タスクが実行されます。
TriggerDagRunOperatorのその他パラメーターについて
execution_dateを指定することで子DAGの実行時間を指定できたり、
wait_for_completion(デフォルト=False)をTrueにすることで、
子DAGの完了を待ってから親DAGの後続処理を実行するといったことも可能です。
パラメーターの詳細については下記リンクを参照してください。
https://airflow.apache.org/docs/apache-airflow/2.7.3/_api/airflow/operators/trigger_dagrun/index.html#airflow.operators.trigger_dagrun.TriggerDagRunOperator
ExternalTaskSensorについて
ExternalTaskSensorは指定したDAG内のタスク完了を待ちます。
子DAGが複数の親DAGの完了を待つ場合などはこちらを使用します。
ExternalTaskSensorの使い方
下記コードではExternalTaskSensorののパラメーターとして、
タスクID・センサー対象DAGID・タスクIDの3つを指定しています。
# 親DAG
import datetime
from airflow import models
from airflow.operators.bash import BashOperator
DAG_ID = "trigger_test_dag"
with models.DAG(
DAG_ID,
schedule="20 1 * * *",
max_active_runs=1,
start_date=datetime.datetime(2025, 5, 19),
catchup=False,
) as dag:
complete = BashOperator(
task_id="complete",
bash_command="echo complete",
)
complete
# 子DAG
import datetime
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
DAG_ID = "trigger_test_dag_b"
with models.DAG(
DAG_ID,
schedule="20 1 * * *",
max_active_runs=1,
start_date=datetime.datetime(2025, 5, 19),
catchup=False,
) as dag:
# 別DAGの指定タスク完了まで待機
wait_task = ExternalTaskSensor(
task_id="wait_task",
external_dag_id="trigger_test_dag", # 完了待ちのDAG_ID
external_task_id="complete", # 完了待ちのtask_id
)
wait_complete = BashOperator(
task_id="wait_complete",
bash_command='echo complete',
)
wait_task >> wait_complete
上記がスケジューラーで同時実行されると、子DAGはwait_taskで親DAG内のcompleteタスクが成功するのを待ちます。
親DAGのcompleteタスク成功後、子DAGの後続処理が実行されます。
ExternalTaskSensor使用の注意点について
親DAGと子DAGの実行開始時間が同じであれば気にする必要は無いですが、
違う場合はexecution_deltaやexecution_date_fnパラメーターを指定して調整する必要があります。
例えば、上記コードの子DAGの実行開始時間を親DAGの30分後にしたい場合は、
下記のようにexecution_deltaを指定する必要があります。
with models.DAG(
DAG_ID,
schedule="50 1 * * *",
max_active_runs=1,
start_date=datetime.datetime(2025, 5, 19),
catchup=False,
) as dag:
# 別DAGの完了まで待機
wait_task = ExternalTaskSensor(
task_id="wait_task",
external_dag_id="trigger_test_dag", # 完了待ちのDAG_ID
external_task_id="complete", # 完了待ちのtask_id
execution_delta=datetime.timedelta(minutes=30),
)
ExternalTaskSensorのその他パラメーターについて
親と子DAGの実行開始時間が違う場合に調整を行うためのパラメーター以外にも、
タスクグループや複数のタスクを指定するパラメーターや、
親DAGがどのようなステータスで終了した場合、
成功・スキップ・失敗とするか?を指定するようなパラメーターなどもあります。
パラメーターの詳細については下記リンクを参照してください。
https://airflow.apache.org/docs/apache-airflow/2.7.3/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor
親DAGと子DAGの数が1:1の場合の使い分け
親DAGと子DAGの数が1:1の場合はどちらを使用しても問題はありませんが、
どちらを使用するかでできることが変わってきます。
下記にどのようなケースでどちらを使用すべきかをまとめましたので、
用途に合った方を使用してください。
特に、子DAGをキックしたいだけであれば、
DAGの処理待ちによるワーカーを使用しないかつ、
シンプルに記述できるためTriggerDagRunOperatorを使用するのがおすすめです。
TriggerDagRunOperatorを使用した方が良いケース
- シンプルに子DAGをキックしたい。
- 子DAGを単体でも動かしたい。
- 子DAGを他のDAGからもキックしたい。
- 子DAGの実行結果次第で親DAGの後続処理を実施するか判定したい。
ExternalTaskSensorを使用した方が良いケース
- 親DAGと子DAGは必ずセットで動かしたい。
- 子DAGの実行時間を明確に指定したい。
- 親DAGの指定タスクの実行結果次第で子DAGの後続処理を実施するか判定したい。
- 親DAGの指定タスクの実行を待っている間に並列で何か処理を行いたい。
まとめ
いかがでしたでしょうか。
DAGの依存関係が作成できれば、
依存関係や実行時間を考えて細かくスケジュールを考えたり、
ひとつのDAGに色々な処理を詰め込み過ぎたりといったことを減らすことができます。
是非、業務で活用してみてください。
他にも、Pub/SubやCloudFunctionとの組み合わせで、DTSの実行やBigQueryとの依存関係を作成することも可能ですが。こちらは、別記事で取り上げたいと思っています。
Google および Google Cloud Platform™ service は Google LLC の商標であり、この記事は Google によって承認されたり、Google と提携したりするものではありません。
コメント