使用 Cloud Tasks 將函數排入隊列


任務佇列函數利用 Google Cloud Tasks來協助您的應用程式在主應用程式流程之外非同步運行耗時、資源密集或頻寬有限的任務。

例如,假設您想要建立目前託管在具有速率限制的 API 上的大量映像檔的備份。為了成為該 API 的負責任的消費者,您需要尊重他們的速率限制。另外,這種長時間運行的作業可能容易因逾時和記憶體限製而失敗。

為了減輕這種複雜性,您可以編寫一個任務佇列函數來設定基本任務選項,例如scheduleTimedispatchDeadline ,然後將函數交給 Cloud Tasks 中的佇列。 Cloud Tasks 環境專門設計用於確保此類操作的有效擁塞控制和重試策略。

Firebase SDK for Cloud Functions for Firebase v3.20.1 及更高版本可與 Firebase Admin SDK v10.2.0 及更高版本互通,以支援任務佇列功能。

將任務佇列函數與 Firebase 結合使用可能會產生 Cloud Tasks 處理費用。有關更多信息,請參閱雲任務定價

建立任務隊列函數

若要使用任務佇列功能,請遵循以下工作流程:

  1. 使用 Firebase SDK for Cloud Functions 編寫任務佇列函數。
  2. 透過使用 HTTP 請求觸發函數來測試您的函數。
  3. 使用 Firebase CLI 部署您的函數。首次部署任務佇列功能時,CLI 將在 Cloud Tasks 中建立一個任務佇列,並使用原始程式碼中指定的選項(速率限制和重試)。
  4. 將任務新增至新建立的任務佇列中,並根據需要傳遞參數以設定執行計劃。您可以透過使用 Admin SDK 編寫程式碼並將其部署到 Cloud Functions for Firebase 來實現此目的。

編寫任務佇列函數

本節中的程式碼範例基於一個應用程序,該應用程式設定了一項服務,該服務可備份 NASA每日天文圖片中的所有影像。首先,導入所需的模組:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

onTaskDispatchedon_task_dispatched用於任務佇列函數。編寫任務佇列函數時,您可以設定每個佇列的重試和速率限製配置。

配置任務隊列功能

任務佇列函數附帶一組強大的配置設置,可以精確控制任務佇列的速率限制和重試行為:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 :任務佇列中的每個任務最多自動重試 5 次。這有助於緩解暫時性錯誤,例如網路錯誤或依賴的外部服務的臨時服務中斷。

  • retryConfig.minBackoffSeconds=60 :每個任務在每次嘗試之間至少間隔 60 秒重試。這在每次嘗試之間提供了很大的緩衝區,因此我們不會急於太快耗盡 5 次重試嘗試。

  • rateLimits.maxConcurrentDispatch=6 :給定時間最多調度 6 個任務。這有助於確保對底層功能的穩定請求流,並有助於減少活動實例和冷啟動的數量。

測試任務隊列功能

Firebase 本機模擬器套件中的任務佇列函數公開為簡單的 HTTP 函數。您可以透過傳送帶有 JSON 資料負載的 HTTP POST 請求來測試模擬任務函數:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

部署任務佇列功能

使用 Firebase CLI 部署任務佇列功能:

$ firebase deploy --only functions:backupapod

首次部署任務佇列功能時,CLI 在 Cloud Tasks 中建立一個任務佇列,並使用原始程式碼中指定的選項(速率限制和重試)。

如果您在部署函數時遇到權限錯誤,請確保將適當的IAM 角色指派給執行部署命令的使用者。

Enqueue任務佇列函數

可以使用適用於 Node.js 的 Firebase 管理 SDK 或適用於 Python 的 Google Cloud 函式庫,將任務佇列函數從受信任的伺服器環境(例如 Cloud Functions for Firebase)排入 Cloud Tasks 中。如果您不熟悉 Admin SDK,請參閱將 Firebase 新增至伺服器以開始使用。

典型的流程會建立一個新任務,將其排入 Cloud Tasks 中,並設定該任務的配置:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    tasks_client = tasks_v2.CloudTasksClient()
    task_queue = tasks_client.queue_path(params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1,
                                         "backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task = tasks_v2.Task(http_request={
            "http_method": tasks_v2.HttpMethod.POST,
            "url": target_uri,
            "headers": {
                "Content-type": "application/json"
            },
            "body": json.dumps(body).encode()
        },
                             schedule_time=schedule_time)
        tasks_client.create_task(parent=task_queue, task=task)

    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • 範例程式碼嘗試透過為第 N 個任務關聯第 N 分鐘的延遲來分散任務的執行。這意味著每分鐘觸發約 1 個任務。請注意,如果您希望 Cloud Tasks 在特定時間觸發任務,您也可以使用scheduleTime (Node.js) 或schedule_time (Python)。

  • 範例程式碼設定 Cloud Tasks 等待任務完成的最長時間。 Cloud Tasks 將在佇列的重試配置之後重試任務,或直到達到此截止日期。在範例中,佇列配置為最多重試任務 5 次,但如果整個過程(包括重試嘗試)花費超過 5 分鐘,任務將自動取消。

檢索並包含目標 URI

由於 Cloud Tasks 會建立身分驗證令牌以對底層任務佇列函數的請求進行驗證的方式,您必須在對任務進行排隊時指定該函數的 Cloud Run URL。我們建議您以程式設計方式檢索函數的 URL,如下所示:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

故障排除

開啟 Cloud Tasks 日誌記錄

來自 Cloud Tasks 的日誌包含有用的診斷訊息,例如與任務關聯的請求的狀態。預設情況下,Cloud Tasks 的日誌處於關閉狀態,因為它可能會在您的專案中產生大量日誌。我們建議您在積極開發和偵錯任務佇列功能時開啟偵錯日誌。請參閱開啟日誌記錄

IAM 權限

將任務排隊或 Cloud Tasks 嘗試呼叫任務佇列函數時,您可能會看到PERMISSION DENIED錯誤。確保您的專案具有以下 IAM 綁定:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • 用於將任務排隊到 Cloud Tasks 的身份需要擁有使用與 Cloud Tasks 中的任務關聯的服務帳號的權限。

    在範例中,這是App Engine 預設服務帳戶

請參閱Google Cloud IAM 文檔,以了解有關如何將 App Engine 預設服務帳戶新增為 App Engine 預設服務帳戶的使用者的說明。

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker