使用 Cloud Tasks 將函式排入佇列


工作佇列函式利用 Google Cloud Tasks,讓應用程式在主要應用程式流程外,以非同步方式執行耗時、需要大量資源或有限頻寬的工作。

例如,假設您想為目前託管於 API 且設有頻率限制的大型圖片檔建立備份,為成為負責任的 API 使用者,您必須遵守頻率限制。此外,這類長時間執行的工作可能會因逾時和記憶體限製而有安全漏洞。

如要降低這個複雜度,您可以編寫工作佇列函式,用於設定 scheduleTimedispatchDeadline 等基本工作選項,然後將函式交給 Cloud Tasks 中的佇列處理。Cloud Tasks 環境是專門用來確保這類作業的有效壅塞控制與重試政策。

Cloud Functions for Firebase v3.20.1 以上版本的 Firebase SDK 可與 Firebase Admin SDK 10.2.0 以上版本互通,以支援工作佇列函式。

使用 Firebase 的工作佇列函式可能會產生 Cloud Tasks 處理費用。詳情請參閱 Cloud Tasks 定價

建立工作佇列函式

如要使用工作佇列函式,請按照以下工作流程操作:

  1. 使用 Cloud Functions 適用的 Firebase SDK 編寫工作佇列函式。
  2. 透過 HTTP 要求觸發函式來測試函式。
  3. 使用 Firebase CLI 部署函式。首次部署工作佇列函式時,CLI 會在 Cloud Tasks 中建立工作佇列,並提供原始碼中指定的選項 (頻率限制和重試)。
  4. 將工作新增至新建立的工作佇列,視需要傳送參數以設定執行排程。方法是使用 Admin SDK 編寫程式碼,並將程式碼部署至 Cloud Functions for Firebase。

寫入工作佇列函式

本節的程式碼範例是以應用程式為基礎,該應用程式設定的服務來備份 NASA 的天文圖片中的所有圖片。如要開始,請匯入必要的模組:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

對於工作佇列函式,請使用 onTaskDispatchedon_task_dispatched。編寫工作佇列函式時,您可以設定每個佇列的重試和頻率限制設定。

設定工作佇列函式

工作佇列函式提供一組強大的配置設定,可精確控制工作佇列的頻率限制與重試行為:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5:工作佇列中的每個工作最多會自動重試 5 次。這有助於減少暫時性錯誤 (例如網路錯誤或相依外部服務發生暫時性中斷情形)。

  • retryConfig.minBackoffSeconds=60:每次嘗試後,每次工作都會重試至少 60 秒。這會在每次嘗試時提供較長的緩衝區,因此我們不會急著衝動 5 次重試嘗試。

  • rateLimits.maxConcurrentDispatch=6:在特定時間最多分派 6 項工作。這有助於確保向基礎函式發出的穩定串流要求,並減少運作中的執行個體數量和冷啟動數量。

測試工作佇列函式

Firebase 本機模擬器套件中的工作佇列函式會以簡單的 HTTP 函式公開。您可以利用 JSON 資料酬載傳送 HTTP POST 要求,藉此測試模擬工作函式:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

部署工作佇列函式

使用 Firebase CLI 部署工作佇列函式:

$ firebase deploy --only functions:backupapod

首次部署工作佇列函式時,CLI 會在 Cloud Tasks 中建立工作佇列,並提供原始碼中指定的選項 (頻率限制與重試)。

如果您在部署函式時遇到權限錯誤,請確認已將適當的 IAM 角色指派給執行部署指令的使用者。

將工作佇列函式排入佇列

您可以使用 Node.js 適用的 Firebase Admin SDK,或 Python 適用的 Google Cloud 程式庫,將 Cloud Tasks 等信任的伺服器環境 (例如 Cloud Functions for Firebase) 中的工作佇列函式排入佇列。如果您是第一次使用 Admin SDK,請參閱「將 Firebase 新增至伺服器」一文,瞭解如何開始使用。

一般流程會建立新工作、將其排入 Cloud Tasks,以及設定任務設定:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • 這個程式碼範例會將第 N 分鐘的延遲時間與第 N 分鐘建立關聯,以嘗試分散工作的執行作業。這會轉譯為每分鐘觸發約 1 項工作。請注意,如果您希望 Cloud Tasks 在特定時間觸發工作,也可以使用 scheduleTime (Node.js) 或 schedule_time (Python)。

  • 程式碼範例會設定 Cloud Tasks 等待工作完成的時間上限。Cloud Tasks 會在佇列重試設定後重試工作,或到此期限為止。在範例中,佇列設定為重試工作最多 5 次,但如果整個程序 (包括重試) 超過 5 分鐘,工作就會自動取消。

擷取並加入目標 URI

基於 Cloud Tasks 建立驗證權杖的方式,以驗證對基礎工作佇列函式的要求,您必須在將工作排入佇列時指定函式的 Cloud Run 網址。建議您透過程式輔助方式擷取函式的網址,如下所示:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

疑難排解

開啟 Cloud Tasks 記錄功能

Cloud Tasks 中的記錄會包含實用的診斷資訊,例如與工作相關聯的要求狀態。Cloud Tasks 預設會在專案中產生大量記錄檔,因此預設關閉記錄。建議您在積極開發工作佇列函式並進行偵錯時,先開啟偵錯記錄檔。請參閱「開啟記錄功能」一節。

身分與存取權管理權限

在將工作排入佇列,或 Cloud Tasks 嘗試叫用工作佇列函式時,您可能會看到 PERMISSION DENIED 錯誤。請確認您的專案具有下列 IAM 繫結:

  • 用來將工作排入 Cloud Tasks 的身分必須具備 cloudtasks.tasks.create IAM 權限。

    在範例中,這是 App Engine 預設服務帳戶

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • 用來將工作排入 Cloud Tasks 的身分必須具備權限,才能使用與 Cloud Tasks 中工作相關聯的服務帳戶。

    在範例中,這是 App Engine 預設服務帳戶

請參閱 Google Cloud IAM 說明文件,瞭解如何將 App Engine 預設服務帳戶新增為 App Engine 預設服務帳戶的使用者。

  • 用於觸發工作佇列函式的身分必須具備 cloudfunctions.functions.invoke 權限。

    在範例中,這是 App Engine 預設服務帳戶

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker