Enqueue functions with Cloud Tasks


Task queue functions take advantage of Google Cloud Tasks to help your app run time-consuming, resource-intensive, or bandwidth-limited tasks asynchronously, outside your main application flow.

For example, imagine that you want to create backups of a large set of image files that are currently hosted on an API with a rate limit. In order to be a responsible consumer of that API, you need to respect their rate limits. Plus, this kind of long-running job could be vulnerable to failure due to timeouts and memory limits.

To mitigate this complexity, you can write a task queue function that sets basic task options like scheduleTime, and dispatchDeadline, and then hands the function off to a queue in Cloud Tasks. The Cloud Tasks environment is designed specifically to ensure effective congestion control and retry policies for these kinds of operations.

The Firebase SDK for Cloud Functions for Firebase v3.20.1 and higher interoperates with Firebase Admin SDK v10.2.0 and higher to support task queue functions.

Using task queue functions with Firebase can result in charges for Cloud Tasks processing. See Cloud Tasks pricing for more information.

Create task queue functions

To use task queue functions, follow this workflow:

  1. Write a task queue function using the Firebase SDK for Cloud Functions.
  2. Test your function by triggering it with an HTTP request.
  3. Deploy your function with the Firebase CLI. When deploying your task queue function for the first time, the CLI will create a task queue in Cloud Tasks with options (rate limiting and retry) specified in your source code.
  4. Add tasks to the newly created task queue, passing along parameters to set up an execution schedule if needed. You can achieve this by writing the code using the Admin SDK and deploying it to Cloud Functions for Firebase.

Write task queue functions

Code samples in this section are based on an app that sets up a service that backs up all images from NASA's Astronomy Picture of the Day. To get started, import the required modules:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Use onTaskDispatched or on_task_dispatched for task queue functions. When writing a task queue function you can set per-queue retry and rate- limiting configuration.

Configure task queue functions

Task queue functions come with a powerful set of configuration settings to precisely control rate limits and retry behavior of a task queue:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Each task in the task queue is automatically retried up to 5 times. This helps mitigate transient errors like network errors or temporary service disruption of a dependent, external service.

  • retryConfig.minBackoffSeconds=60: Each task is retried at least 60 seconds apart from each attempt. This provides a large buffer between each attempt so we don't rush to exhaust the 5 retry attempts too quickly.

  • rateLimits.maxConcurrentDispatch=6: At most 6 tasks are dispatched at a given time. This helps ensure a steady stream of requests to the underlying function and helps reduce the number of active instances and cold starts.

Test task queue functions

Task queue functions in the Firebase Local Emulator Suite are exposed as simple HTTP functions. You can test an emulated task function by sending an HTTP POST request with a JSON data payload:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Deploy task queue functions

Deploy task queue function using the Firebase CLI:

$ firebase deploy --only functions:backupapod

When deploying a task queue function for the first time, the CLI creates a task queue in Cloud Tasks with options (rate limiting and retry) specified in your source code.

If you encounter permissions errors when deploying functions, make sure that the appropriate IAM roles are assigned to the user running the deployment commands.

Enqueue task queue functions

Task queue functions can be enqueued in Cloud Tasks from a trusted server environment like Cloud Functions for Firebase using the Firebase Admin SDK for Node.js or Google Cloud libraries for Python. If you're new to the Admin SDKs, see Add Firebase to a server to get started.

A typical flow creates a new task, enqueues it in Cloud Tasks, and sets the configuration for the task:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • The sample code tries to spread out execution of tasks by associating a delay of Nth minutes for the Nth task. This translates to triggering ~ 1 task/minute. Note that you can also use scheduleTime (Node.js) or schedule_time (Python) if you want Cloud Tasks to trigger a task at a specific time.

  • The sample code sets the maximum amount of time Cloud Tasks will wait for a task to complete. Cloud Tasks will retry the task following the retry configuration of the queue or until this deadline is reached. In the sample, the queue is configured to retry the task up to 5 times, but the task is automatically cancelled if the whole process (including retry attempts) takes more than 5 minutes.

Retrieve and include the target URI

Due to the way Cloud Tasks creates authentication tokens to authenticate requests to the underlying task queue functions, you must specify the Cloud Run URL of the function when enqueuing tasks. We recommend that you programmatically retrieve the URL for your function as demonstrated below:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Troubleshooting

Turn on Cloud Tasks logging

Logs from Cloud Tasks contain useful diagnostic information like the status of the request associated with a task. By default, logs from Cloud Tasks are turned off due to the large volume of logs it can potentially generate on your project. We recommend you turn on the debug logs while you are actively developing and debugging your task queue functions. See Turning on logging.

IAM Permissions

You may see PERMISSION DENIED errors when enqueueing tasks or when Cloud Tasks tries to invoke your task queue functions. Ensure that your project has the following IAM bindings:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • The identity used to enqueue tasks to Cloud Tasks needs permission to use the service account associated with a task in Cloud Tasks.

    In the sample, this is the App Engine default service account.

See Google Cloud IAM documentation for instructions on how to add the App Engine default service account as a user of the App Engine default service account.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker