こんにちは!梶尾です!
今回Cloud Composerについて構築してみましたので、その作業をまとめておくことにしました。
バージョンは現在の最新であるcomposer-3-airflow-2.10.5-build.3で構築します。
Cloud Composerとは
Google Cloud Composerは、Apache Airflowをベースにしたフルマネージドのワークフロー オーケストレーション サービスです。
Google Cloud Platform上のリソースを管理でき、
Airflowをベースにしたマネージドサービスなので、インフラ管理の手間なく、ワークフローを簡単に自動化できます。
https://cloud.google.com/composer/docs/composer-3/composer-overview?hl=ja
Cloud Composer環境の作成
APIの有効化
1.Google Cloudにログインします。
2.検索バーに「Cloud Composer」と入力し、Composerを選択します。

3.有効にするをクリックします。

環境の作成
1.環境の作成をクリックし、Composer3をクリックします。

2.環境の作成画面が表示されます。
今回は名前、場所、イメージのバージョン、サービスアカウントを入力し、
それ以外はデフォルトで作成を行いました。

3.作成をクリックします。

4.緑のチェックマークが表示されると作成完了です。
作成までには10分程度かかります。

Cloud Composerの画面とAirflowの画面について
Composerの名前をクリックすると環境の詳細画面が表示されます。

Composerの画面では、各環境のヘルス状態などをモニタリング、ログ確認、環境構成の再設定、環境変数の設定、PYPIパッケージの設定等を行うことができます。

上部のAIRFLOW UIを開くをクリックすると、Airflowの画面が表示されます。

Airflowの画面では、DAGの管理や環境変数・共通設定が行え、
DAGの管理は主にワークフロー(タスク)の可視化、タスクの実行ログ確認、タスクの再実行などが行えます。

DAGファイルとSQLファイルの作成
DAGとはDirected Acyclic Graph(有向非巡回グラフ)の略で、ワークフローの各タスクとその依存関係を表すグラフです。
DAGファイルは、Cloud Composerでワークフローを定義するためのファイルで、Pythonで記述します。
タスクとその実行順序を記述することで、複雑なワークフローを簡単に管理することができます。
今回は以下の通り「bigquery_update_dag.py」と「query.sql」ファイルを作成しました。
bigquery_update_dag.py
処理内容はquery.sqlファイルを実行するタスクを、毎日13時にスケジュール実行します。
import datetime
from datetime import timedelta
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
# SQLファイル格納パス
SQLFILE = (
"/sql/query.sql"
)
# BigQueryInsertJobOperatorの実行ロケーション
LOCATION = "asia-northeast1"
# DAGデフォルトオプション
default_dag_args = {
"owner": "DAG作成者名",
"retries": 1, # タスク失敗時のリトライ回数
"retry_delay": datetime.timedelta(minutes=1), # リトライ間隔
}
with models.DAG(
"BigQueryUpdateDAG", # DAG名
description="BigQueryへのUpdate処理をスケジュール実行するDAG", # DAGの概要
default_args=default_dag_args, # DAGデフォルトオプション
tags=["test"], # DAGに付けるタグ名
start_date=datetime.datetime(2025, 5, 24), # 実行日の前日を指定
schedule="00 4 * * *", # cron式でUTC(協定世界時)で記載する
dagrun_timeout=timedelta(seconds=36000), # DAGのタイムアウト時間
is_paused_upon_creation=True, # DAGを初めてアップロードした際にPause状態にする
catchup=False, # 開始日から未実行分のタスクを遡って実行する
max_active_runs=1, # DAGの最大同時実行数
template_searchpath=["/home/airflow/gcs/data"], # DAGのテンプレートサーチパス
) as dag:
# BigQueryにSQLファイルを実行する
bigquery_update_task = BigQueryInsertJobOperator(
task_id="bigquery_update_task", # タスクID
configuration={
"query": {
"query": SQLFILE,
"useLegacySql": False,
},
"queryParameters": None,
},
location=LOCATION,
dag=dag,
)
# タスク実行順序
(
bigquery_update_task
)
query.sql
処理内容はtestTableのdaysカラムを+1します。
UPDATE
`testDataset.testTable`
SET
days = days + 1
WHERE
ID = "001";
DAGファイルとSQLファイルの配置
作成したDAGファイルとSQLファイルを所定のフォルダに配置します。
1.Composerの環境の詳細画面でDAG フォルダを開くをクリックします。

2.Composerバケットのdagsフォルダが表示されますので、フォルダ直下にDAGファイルをアップロードします。

3.dataフォルダを開きます。

4.sqlフォルダを作成します。

5.SQLファイルをアップロードします。

DAGの実行
Airflowの画面からDAGを有効化・実行します。
1.Airflowの画面を開き、BigQueryUpdateDAGをクリックします。

2.左上のトグルをONにします。
ONにしていると、毎日13時にDAGがスケジュール実行されます。

3.DAGで指定したstart_dateの日付から次回の実行時間を過ぎている場合は自動実行されますが、
過ぎていない場合(今回の例では5/25 13時より前の場合)は実行されません。
手動実行する方法は右上のTriggerDAGボタンをクリックします。

4.実行履歴は左ペインにグラフで表示されます。
画面内に各色の説明が表示されていますが一例として、
黄緑色は実行中、赤色はエラー、緑色は正常終了で表示されます。
また、タスク毎にも実行結果が表示されます。

まとめ
今回はCloud Composer環境の作成から簡単なDAGのスケジュール実行を紹介いたしました。
Cloud Composerは、Google Cloud Platform上のBigQuery、Cloud Storageなどのサービスとの連携もスムーズなので、データパイプラインを効率的に構築することができます。
ワークフローの可視化やスケジューリング機能も充実しているため、是非触ってみてください。
Google および Google Cloud Platform™ service は Google LLC の商標であり、この記事は Google によって承認されたり、Google と提携したりするものではありません。
コメント