Cloud Tasks で関数をキューに追加する


タスクキュー関数は、Google Cloud Tasks を利用して、時間がかかるタスク、リソースを大量に消費するタスク、または帯域幅に制限があるタスクを、メイン アプリケーション フローの外部で非同期に実行できるようにします。

たとえば、現在ホストされている大量の画像ファイルについて、レート制限のある API でバックアップを作成するとします。責任ある利用者として行動するには、その API のレート制限を尊重する必要があります。また、この種の長時間実行ジョブは、タイムアウトやメモリ制限によって失敗するおそれもあります。

こうした複雑さを軽減するために、scheduleTimedispatchDeadline などの基本的なタスク オプションが設定されたタスクキュー関数を作成してから、Cloud Tasks のキューにこの関数を渡します。Cloud Tasks 環境は、このようなオペレーションがあった場合に、データの渋滞を効果的に制御し、ポリシーを再試行するように作られています。

Firebase SDK for Cloud Functions for Firebase v3.20.1 以降では、Firebase Admin SDK v10.2.0 以降との連携によって、タスクキュー関数をサポートします。

Firebase でタスクキュー関数を使用すると、Cloud Tasks の処理に対して課金される場合があります。詳細は、Cloud Tasks の料金をご覧ください。

タスクキュー関数を作成する

タスクキュー関数を使用するには、次のワークフローに従います。

  1. Firebase SDK for Cloud Functions を使用してタスクキュー関数を記述します。
  2. HTTP リクエストで関数をトリガーして、関数をテストします。
  3. Firebase CLI を使用して関数をデプロイします。タスクキュー関数を初めてデプロイするときに、ソースコード内で指定されているオプション(レート制限と再試行)が設定されたタスクキューが Cloud Tasks に作成されます。
  4. 新しく作成されたタスクキューにタスクを追加し、必要に応じてパラメータを渡して実行スケジュールを設定します。これを行うには、Admin SDK を使用してコードを記述し、Cloud Functions for Firebase にデプロイします。

タスクキュー関数を作成する

このセクションのコードサンプルは、NASA の Astronomy Picture of the Day(今日の天文写真)のすべての画像をバックアップするサービスを設定するアプリをベースにしています。まず、必要なモジュールをインポートします。

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

タスクキュー関数には onTaskDispatched または on_task_dispatched を使用します。タスクキュー関数を記述するときに、キューごとに再試行とレート制限を構成できます。

タスクキュー関数を構成する

タスクキュー関数には、レート制限と再試行を正確に制御するための強力な設定機能があります。

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: タスクキュー内の各タスクは、最大 5 回まで自動的に再試行されます。これにより、ネットワーク エラーや、依存している外部サービスの一時的な停止といった、一時的なエラーの影響を軽減できます。

  • retryConfig.minBackoffSeconds=60: 各タスクは、それぞれ 60 秒以上の間隔で再試行されます。これにより、各試行の間に大きなバッファができるため、5 回の再試行を短時間で使い切らずにすみます。

  • rateLimits.maxConcurrentDispatch=6: 同時にディスパッチされるタスクが 6 個までに制限されます。これにより、基盤となる関数に安定してリクエストが流れるようになり、アクティブなインスタンスの数とコールド スタートの数を低減できます。

タスクキュー関数をテストする

ほとんどの場合、タスクキュー関数をテストするには Cloud Functions エミュレータが最適です。タスクキュー関数のエミュレーション用にアプリをインストルメント化する方法については、Emulator Suite のドキュメントをご覧ください。

また、タスクキュー functions_sdk は、Firebase Local Emulator Suite でシンプルな HTTP 関数として公開されます。 JSON データ ペイロードを使用して HTTP POST リクエストを送信することで、エミュレートされたタスク関数をテストできます。

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

タスクキュー関数をデプロイする

Firebase を使用してタスクキュー関数をデプロイします。

$ firebase deploy --only functions:backupapod

タスクキュー関数を初めてデプロイするときに、ソースコード内で指定されているオプション(レート制限と再試行)が設定されたタスクキューが Cloud Tasks に作成されます。

関数のデプロイ時に権限エラーが発生した場合は、デプロイ コマンドを実行するユーザーに適切な IAM ロールが割り当てられていることを確認します。

タスクキュー関数をキューに追加する

Node.js 用の Firebase Admin SDK または Python 用 Google Cloud ライブラリを使用して、Cloud Functions for Firebase などの信頼できるサーバー環境から Cloud Tasks のキューにタスクキュー関数を追加できます。Admin SDK を初めて使用する場合は、サーバーに Firebase を追加するをご覧ください。

一般的なフローでは、新しいタスクを作成し、Cloud Tasks のキューに入れて、タスクの構成を設定します。

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • このサンプルコードでは、N 番目のタスクに N 分の遅延を適用して、タスクの実行を分散しようとします。これにより、1 分あたり約 1 個のタスクがトリガーされることになります。scheduleTime(Node.js)または schedule_time(Python)を使用して、Cloud Tasks で特定の時間にタスクをトリガーすることもできます。

  • このサンプルコードでは、タスクが完了するまでに Cloud Tasks が待機する最長時間を設定します。Cloud Tasks は、キューの再試行の構成に従ってタスクを再試行しますが、再試行はこの制限時間に達するまで行われます。このサンプルでは、キューはタスクを 5 回まで再試行するように構成されていますが、プロセス全体(再試行を含む)に 5 分以上かかる場合、タスクは自動的にキャンセルされます。

ターゲット URI を取得して含める

Cloud Tasks は、認証トークンを作成して基となるタスクキュー関数へのリクエストを認証するため、タスクをキューに登録する際に関数の Cloud Run URL を指定する必要があります。関数の URL は、次に示すようにプログラムで取得することをおすすめします。

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

トラブルシューティング

Cloud Tasks">Cloud Tasks のロギングをオンにする

Cloud Tasks のログには、タスクに関連付けられたリクエストのステータスなど、有用な診断情報が含まれます。プロジェクトで大量のログが生成される可能性があるため、Cloud Tasks のログはデフォルトでオフになっています。タスクキュー関数の開発とデバッグを積極的に進めている間は、デバッグログをオンにすることをおすすめします。ロギングをオンにするをご覧ください。

IAM 権限

キューにタスクを追加するとき、または Cloud Tasks がタスクキュー関数を呼び出すときに、PERMISSION DENIED エラーが発生することがあります。プロジェクトに次の IAM バインディングがあることを確認してください。

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Cloud Tasks でタスクをキューに追加する際に使用する ID には、Cloud Tasks のタスクに関連付けられたサービス アカウントを使用するための権限が必要です。

    このサンプルでは、これは App Engine のデフォルトのサービス アカウントです。

App Engine のデフォルトのサービス アカウントのユーザーとして App Engine のデフォルトのサービス アカウントを追加する方法については、Google Cloud IAM のドキュメントをご覧ください。

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker