获取我们在 Firebase 峰会上发布的所有信息,了解 Firebase 可如何帮助您加快应用开发速度并满怀信心地运行应用。了解详情

使用 Cloud Tasks 将函数排入队列

使用集合让一切井井有条 根据您的偏好保存内容并对其进行分类。

任务队列函数利用 Google Cloud Tasks帮助您的应用在主应用程序流之外异步运行耗时、资源密集或带宽受限的任务。

例如,假设您想为当前托管在具有速率限制的 API 上的大量图像文件创建备份。为了成为该 API 的负责任的消费者,您需要遵守他们的速率限制。此外,由于超时和内存限制,这种长时间运行的作业可能容易失败。

为了减轻这种复杂性,您可以编写一个任务队列函数来设置基本任务选项,例如scheduleTimedispatchDeadline ,然后将该函数交给 Cloud Tasks 中的一个队列。 Cloud Tasks 环境专门设计用于确保对此类操作进行有效的拥塞控制和重试策略。

Firebase SDK for Cloud Functions for Firebase v3.20.1 及更高版本与 Firebase Admin SDK v10.2.0 及更高版本互操作以支持任务队列功能。

将任务队列函数与 Firebase 一起使用可能会导致 Cloud Tasks 处理费用。有关更多信息,请参阅Cloud Tasks 定价

创建任务队列函数

要使用任务队列功能,请遵循以下工作流程:

  1. 使用 Firebase SDK for Cloud Functions 编写任务队列函数。
  2. 使用 Firebase Local Emulator Suite 测试您的功能。
  3. 使用 Firebase CLI 部署您的函数。首次部署任务队列功能时,CLI 将在 Cloud Tasks 中创建一个任务队列,并在您的源代码中指定选项(速率限制和重试)。
  4. 将任务添加到新创建的任务队列中,并在需要时传递参数以设置执行计划。您可以通过使用 Admin SDK 编写代码并将其部署到 Cloud Functions for Firebase 来实现这一点。

编写任务队列函数

使用onTaskDispatched开始编写任务队列函数。编写任务队列函数的一个重要部分是设置每个队列的重试和速率限制配置。此页面中的代码示例基于一个应用程序,该应用程序设置了一项服务,该服务备份来自 NASA 的每日天文图片的所有图像:

任务队列配置

任务队列函数带有一组强大的配置设置,可以精确控制任务队列的速率限制和重试行为:

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {
  • retryConfig.maxAttempts=5 :任务队列中的每个任务最多自动重试5次。这有助于减少暂时性错误,例如网络错误或依赖的外部服务的临时服务中断。
  • retryConfig.minBackoffSeconds=60 :每个任务在每次尝试后至少重试 60 秒。这在每次尝试之间提供了一个很大的缓冲区,因此我们不会急于过快地耗尽 5 次重试尝试。
  • rateLimits.maxConcurrentDispatch=6 :在给定时间最多分派 6 个任务。这有助于确保对底层功能的稳定请求流,并有助于减少活动实例和冷启动的数量。

使用 Firebase Local Emulator Suite 测试任务队列功能

Firebase Local Emulator Suite 中的任务队列函数公开为简单的 HTTP 函数。您可以通过发送带有 json 数据负载的 HTTP POST 请求来测试模拟任务函数:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

部署任务队列功能

使用 Firebase CLI 部署任务队列功能:

$ firebase deploy --only functions:backupapod

首次部署任务队列功能时,CLI 会在 Cloud Tasks 中创建一个任务队列,并在您的源代码中指定选项(速率限制和重试)。

如果您在部署函数时遇到权限错误,请确保将适当的IAM 角色分配给运行部署命令的用户。

入队函数

可以使用适用于 Node.js 的 Firebase Admin SDK 从受信任的服务器环境(例如适用于 Firebase 的 Cloud Functions)将任务队列函数排入 Cloud Tasks 中。如果您不熟悉 Admin SDK,请参阅将 Firebase 添加到服务器以开始使用。

在典型的流程中,Admin SDK 创建一个新任务,将其放入 Cloud Tasks 队列,并设置任务的配置:

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });
  • scheduleDelaySeconds :示例代码尝试通过为第 N 个任务关联第 N 分钟的延迟来分散任务的执行。这转化为触发 ~ 1 任务/分钟。请注意,如果您希望 Cloud Tasks 在特定时间触发任务,也可以使用scheduleTime
  • dispatchDeadlineSeconds :Cloud Tasks 等待任务完成的最长时间。 Cloud Tasks 将在队列的重试配置之后或直到达到此截止日期之前重试任务。在示例中,队列配置为最多重试任务 5 次,但如果整个过程(包括重试尝试)花费的时间超过 5 分钟,任务将自动取消。

检索并包含目标 URI

由于 Cloud Functions for Firebase(第 2 代)不支持确定性 HTTP URL,您必须手动检索目标 URI 并将其包含在每个排队的任务中。您还可以通过编程方式检索函数的 URL,如下所示:

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

故障排除

打开 Cloud Tasks 日志记录

来自 Cloud Tasks 的日志包含有用的诊断信息,例如与任务关联的请求的状态。默认情况下,来自 Cloud Tasks 的日志被关闭,因为它可能会在您的项目中生成大量日志。我们建议您在积极开发和调试任务队列功能时打开调试日志。请参阅打开日志记录

IAM 权限

在对任务进行排队或 Cloud Tasks 尝试调用您的任务队列函数时,您可能会看到PERMISSION DENIED错误。确保您的项目具有以下 IAM 绑定:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • 用于将任务排队到 Cloud Tasks 的身份需要使用与 Cloud Tasks 中的任务关联的服务帐户的权限。

    在示例中,这是App Engine 默认服务帐户

有关如何将 App Engine 默认服务帐户添加为 App Engine 默认服务帐户用户的说明,请参阅Google Cloud IAM 文档

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker