タスクキュー関数は、Google Cloud Tasks を利用して、時間がかかるタスク、リソースを大量に消費するタスク、または帯域幅に制限があるタスクを、メイン アプリケーション フローの外部で非同期に実行できるようにします。
たとえば、現在ホストされている大量の画像ファイルについて、レート制限のある API でバックアップを作成するとします。責任ある利用者として行動するには、その API のレート制限を尊重する必要があります。また、この種類の長時間実行ジョブは、タイムアウトやメモリ制限によって失敗するおそれもあります。
こうした複雑さを軽減するために、scheduleTime や dispatchDeadline などの基本的なタスク オプションが設定されたタスクキュー関数を作成してから、Cloud Tasks のキューにこの関数を渡します。Cloud Tasks 環境は、このようなオペレーションがあった場合に、データの渋滞を効果的に制御し、ポリシーを再試行するように作られています。
Firebase SDK for Cloud Functions for Firebase v3.20.1 以降は、Firebase Admin SDK v10.2.0 以降との連携によって、タスクキュー関数をサポートします。
Firebase でタスクキュー関数を使用すると、Cloud Tasks の処理に対して課金される場合があります。詳細は、Cloud Tasks の料金をご覧ください。
タスクキュー関数を作成する
タスクキュー関数を使用するには、次のワークフローに従います。
- Firebase SDK for Cloud Functions を使用してタスクキュー関数を記述します。
- 関数を HTTP リクエストでトリガーしてテストします。
- Firebase CLI を使用して関数をデプロイします。タスクキュー関数を初めてデプロイするときに、ソースコード内で指定されているオプション(レート制限と再試行)が設定されたタスクキューが Cloud Tasks に作成されます。
- 新しく作成されたタスクキューにタスクを追加し、必要に応じてパラメータを渡して実行スケジュールを設定します。これを行うには、Admin SDK を使用してコードを記述し、Cloud Functions for Firebase にデプロイします。
タスクキュー関数を記述する
このセクションのコードサンプルは、NASA の Astronomy Picture of the Day(今日の天文写真)のすべての画像をバックアップするサービスを設定するアプリをベースにしています。始めるには、まず必要なモジュールをインポートします。
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");
// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
タスクキュー関数には onTaskDispatched または on_task_dispatched を使用します。タスクキュー関数を記述するときに、キューごとに再試行とレート制限を構成できます。
タスクキュー関数を構成する
タスクキュー関数には、レート制限と再試行を正確に制御するための強力な設定機能があります。
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5: タスクキュー内の各タスクは、最大 5 回まで自動的に再試行されます。これにより、ネットワーク エラーや、依存している外部サービスの一時的な停止といった、一時的なエラーの影響を軽減できます。retryConfig.minBackoffSeconds=60: 各タスクは、それぞれ 60 秒以上の間隔で再試行されます。これにより、各試行の間に大きなバッファができるため、5 回の再試行を短時間で使い切らずにすみます。rateLimits.maxConcurrentDispatch=6: 同時にディスパッチされるタスクが 6 個までに制限されます。これにより、基盤となる関数に安定してリクエストが流れるようになり、アクティブなインスタンスの数とコールド スタートの数を低減できます。
タスクキュー関数をテストする
ほとんどの場合、タスクキュー関数をテストするには Cloud Functions エミュレータが最適な方法です。タスクキュー関数のエミュレーション用にアプリを計測化する方法については、Emulator Suite のドキュメントをご覧ください。
また、タスクキュー関数は、Firebase Local Emulator Suite でシンプルな HTTP 関数として公開されます。 JSON データ ペイロードを使用して HTTP POST リクエストを送信することで、エミュレートされたタスク関数をテストできます。
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
タスクキュー関数をデプロイする
Firebase を使用してタスクキュー関数をデプロイします。
$ firebase deploy --only functions:backupapod
タスクキュー関数を初めてデプロイするときに、ソースコード内で指定されているオプション(レート制限と再試行)が設定されたタスクキューが Cloud Tasks に作成されます。
関数のデプロイ時に権限エラーが発生した場合は、デプロイ コマンドを実行するユーザーに適切な IAM ロールが割り当てられていることを確認します。
タスクキュー関数をキューに追加する
Node.js 用の Firebase Admin SDK または Python 用 Google Cloud ライブラリを使用して、Cloud Functions for Firebase などの信頼できるサーバー環境から Cloud Tasks のキューにタスクキュー関数を追加できます。Admin SDK を初めて使用する場合は、サーバーに Firebase を追加してスタートするをご覧ください。
一般的なフローでは、新しいタスクを作成し、Cloud Tasks のキューに入れて、タスクの構成を設定します。
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
このサンプルコードでは、N 番目のタスクに N 分の遅延を適用して、タスクの実行を分散しようとします。これにより、1 分あたり約 1 個のタスクがトリガーされることになります。
scheduleTime(Node.js)またはschedule_time(Python)を使用して、Cloud Tasks で特定の時間にタスクをトリガーすることもできます。このサンプルコードでは、タスクが完了するまでに Cloud Tasks が待機する最長時間を設定します。Cloud Tasks は、キューの再試行の構成に従ってタスクを再試行しますが、再試行はこの制限時間に達するまで行われます。このサンプルでは、キューはタスクを 5 回まで再試行するように構成されていますが、プロセス全体(再試行を含む)に 5 分以上かかる場合、タスクは自動的にキャンセルされます。
トラブルシューティング
Cloud Tasks のロギングを有効にする
Cloud Tasks のログには、タスクに関連付けられたリクエストのステータスなど、有用な診断情報が含まれます。プロジェクトで大量のログが生成される可能性があるため、Cloud Tasks のログはデフォルトでオフになっています。タスクキュー関数の開発とデバッグを積極的に進めている間は、デバッグログをオンにすることをおすすめします。ロギングをオンにするをご覧ください。
IAM 権限
キューにタスクを追加するとき、または Cloud Tasks がタスクキュー関数を呼び出そうとするときに、PERMISSION DENIED エラーが発生することがあります。プロジェクトに次の IAM バインディングがあることを確認してください。
Cloud Tasks でタスクをキューに追加する際に使用する ID には、
cloudtasks.tasks.createIAM 権限が必要です。このサンプルでは、これは App Engine のデフォルトのサービス アカウントです。
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
Cloud Tasks でタスクをキューに追加する際に使用する ID には、Cloud Tasks のタスクに関連付けられたサービス アカウントを使用するための権限が必要です。
このサンプルでは、これは App Engine のデフォルトのサービス アカウントです。
App Engine のデフォルトのサービス アカウントのユーザーとして App Engine のデフォルトのサービス アカウントを追加する方法については、Google Cloud IAM のドキュメントをご覧ください。
タスクキュー関数をトリガーする際に使用する ID には、
cloudfunctions.functions.invoke権限が必要です。このサンプルでは、これは App Engine のデフォルトのサービス アカウントです。
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker