使用 Cloud Tasks 將函式排入佇列


工作佇列函式利用 Google Cloud Tasks 幫助您的應用程式執行耗時、會耗用大量資源或頻寬受限 也可以在主要應用程式流程之外,以非同步方式處理工作

舉例來說,假設您想為一組大型的映像檔建立備份 檔案。為了成為 負責該 API 的使用者,您必須遵守其頻率限制。此外, 這類長時間執行的工作可能會因逾時和 記憶體用量限制

如要降低這種複雜性,您可以撰寫設定基本工作的工作佇列函式 工作選項 (例如 scheduleTimedispatchDeadline),然後握手 執行函式。Cloud Tasks 環境經過特別設計,可確保有效的壅塞控管機制,以及 針對這些作業重試政策。

Cloud Functions for Firebase v3.20.1 以上版本的 Firebase SDK 可以互通操作 搭配 Firebase Admin SDK v10.2.0 以上版本支援工作佇列函式。

使用 Firebase 的工作佇列函式可能會產生下列費用: 正在處理 Cloud Tasks。詳情請見 Cloud Tasks 定價 瞭解詳情

建立工作佇列函式

如要使用工作佇列函式,請按照以下工作流程操作:

  1. 使用 Cloud Functions 適用的 Firebase SDK 編寫工作佇列函式。
  2. 透過 HTTP 要求觸發函式來測試函式。
  3. 使用 Firebase CLI 部署函式。部署工作時 佇列函式。CLI 將會建立 Cloud Tasks 中的工作佇列 使用原始碼中指定的選項 (頻率限制與重試)。
  4. 將工作新增至新建的工作佇列,並傳送參數進行設定 執行排程這時只要編寫程式碼 並部署至 Cloud Functions for Firebase。

寫入工作佇列函式

本節的程式碼範例是以 備份服務,將 NASA 的 天文圖片。 如要開始,請匯入必要的模組:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

使用 onTaskDispatchedon_task_dispatched 供工作佇列函式使用編寫工作佇列函式時 您可以設定每個佇列的重試和頻率限制設定。

設定工作佇列函式

工作佇列函式提供一組強大的組態設定 精確控制工作佇列的頻率限制與重試行為:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5:工作佇列中的每項工作會自動執行 最多重試 5 次。這有助於減少網路等暫時性錯誤 錯誤,或對相依外部服務的暫時性服務中斷。

  • retryConfig.minBackoffSeconds=60:每項工作至少重試 60 秒 並不會每次嘗試產生這會在每次嘗試之間提供較大的緩衝區 這樣我們就不會急速用完 5 次重試次數。

  • rateLimits.maxConcurrentDispatch=6:最多將 6 項工作分派給 時間。這有助於確保向基礎 功能,並協助減少有效執行個體和冷啟動的數量。

測試工作佇列函式

Firebase 本機模擬器套件中的工作佇列函式會以簡易的方式公開 HTTP 函式如要測試模擬工作函式,請傳送 HTTP POST 範例:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

部署工作佇列函式

使用 Firebase CLI 部署工作佇列函式:

$ firebase deploy --only functions:backupapod

首次部署工作佇列函式時,CLI 會建立 Cloud Tasks 中的工作佇列,並提供選項 (頻率限制和重試) 您在原始碼中指定的版本

如果您在部署函式時遇到權限錯誤,請確認 適當的身分與存取權管理角色 會指派給執行 Deployment 指令的使用者

將工作佇列函式排入佇列

可使用信任的應用程式,將工作佇列函式新增至 Cloud Tasks 例如 Cloud Functions for Firebase 適用於 Python 的 Node.js 或 Google Cloud 程式庫 如果您是第一次使用 Admin SDK,請參閱 如要開始使用,請將 Firebase 新增至伺服器

一般流程會建立新的工作,並排入 ,並設定工作的設定:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • 程式碼範例會嘗試 為第 N 項工作把第 N 分鐘的延遲時間建立關聯這個 則會觸發大約每分鐘 1 項任務請注意,您也可以使用 scheduleTime (Node.js) 或 schedule_time (Python) 在特定時間觸發工作的 Cloud Tasks。

  • 程式碼範例會設定時間上限 Cloud Tasks 會等待 待工作完成Cloud Tasks 會重試 才會執行 佇列設定,或是直到這個期限為止。在本範例中 佇列已設為重試工作最多 5 次,但工作 如果整個程序 (包括重試) 會自動取消 需要超過 5 分鐘

擷取並加入目標 URI

基於 Cloud Tasks 建立驗證權杖的方式 要求基礎工作佇列函式時,您必須指定 將工作排入佇列時的函式 Cloud Run 網址。三 建議您以程式輔助的方式擷取函式的網址,如 示範:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

疑難排解

開啟 Cloud Tasks 記錄功能

Cloud Tasks 中的記錄檔含有實用的診斷資訊,例如 工作相關要求的狀態。根據預設 Cloud Tasks 有大量記錄檔,因此已將其關閉 可能會在專案中產生錯誤建議您啟用偵錯記錄檔 而您在積極開發工作佇列函式並進行偵錯時。詳情請見 啟用中 記錄

身分與存取權管理權限

將工作加入佇列時,或PERMISSION DENIED Cloud Tasks 會嘗試叫用您的工作佇列函式。請確認 專案具有下列 IAM 繫結:

  • 這個身分可用於將工作排入 Cloud Tasks 需求 cloudtasks.tasks.create IAM 權限。

    在範例中,這是 App Engine 預設服務帳戶

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • 透過這個身分將工作排入 Cloud Tasks 需要權限 使用與 Cloud Tasks 中工作相關聯的服務帳戶。

    在範例中,這是 App Engine 預設服務帳戶

請參閱 Google Cloud IAM 說明文件 瞭解如何新增 App Engine 預設服務帳戶 做為 App Engine 預設服務帳戶的使用者。

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker