Task queue functions take advantage of Google Cloud Tasks to help your app run time-consuming, resource-intensive, or bandwidth-limited tasks asynchronously, outside your main application flow.
For example, imagine that you want to create backups of a large set of image files that are currently hosted on an API with a rate limit. In order to be a responsible consumer of that API, you need to respect their rate limits. Plus, this kind of long-running job could be vulnerable to failure due to timeouts and memory limits.
To mitigate this complexity, you can write a task queue function that sets basic
task options like scheduleTime
, and dispatchDeadline
, and then hands the
function off to a queue in Cloud Tasks. The Cloud Tasks
environment is designed specifically to ensure effective congestion control and
retry policies for these kinds of operations.
The Firebase SDK for Cloud Functions for Firebase v3.20.1 and higher interoperates with Firebase Admin SDK v10.2.0 and higher to support task queue functions.
Using task queue functions with Firebase can result in charges for Cloud Tasks processing. See Cloud Tasks pricing for more information.
Create task queue functions
To use task queue functions, follow this workflow:
- Write a task queue function using the Firebase SDK for Cloud Functions.
- Test your function by triggering it with an HTTP request.
- Deploy your function with the Firebase CLI. When deploying your task queue function for the first time, the CLI will create a task queue in Cloud Tasks with options (rate limiting and retry) specified in your source code.
- Add tasks to the newly created task queue, passing along parameters to set up an execution schedule if needed. You can achieve this by writing the code using the Admin SDK and deploying it to Cloud Functions for Firebase.
Write task queue functions
Code samples in this section are based on an app that sets up a service that backs up all images from NASA's Astronomy Picture of the Day. To get started, import the required modules:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Use onTaskDispatched
or on_task_dispatched
for task queue functions. When writing a task queue function
you can set per-queue retry and rate- limiting configuration.
Configure task queue functions
Task queue functions come with a powerful set of configuration settings to precisely control rate limits and retry behavior of a task queue:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: Each task in the task queue is automatically retried up to 5 times. This helps mitigate transient errors like network errors or temporary service disruption of a dependent, external service.retryConfig.minBackoffSeconds=60
: Each task is retried at least 60 seconds apart from each attempt. This provides a large buffer between each attempt so we don't rush to exhaust the 5 retry attempts too quickly.rateLimits.maxConcurrentDispatch=6
: At most 6 tasks are dispatched at a given time. This helps ensure a steady stream of requests to the underlying function and helps reduce the number of active instances and cold starts.
Test task queue functions
For most cases, the Cloud Functions emulator is the best way to test task queue functions. See the Emulator Suite documentation to learn how to instrument your app for task queue functions emulation.
Additionally, task queue functions_sdk are exposed as simple HTTP functions in the Firebase Local Emulator Suite. You can test an emulated task function by sending an HTTP POST request with a JSON data payload:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Deploy task queue functions
Deploy task queue function using the Firebase CLI:
$ firebase deploy --only functions:backupapod
When deploying a task queue function for the first time, the CLI creates a task queue in Cloud Tasks with options (rate limiting and retry) specified in your source code.
If you encounter permissions errors when deploying functions, make sure that the appropriate IAM roles are assigned to the user running the deployment commands.
Enqueue task queue functions
Task queue functions can be enqueued in Cloud Tasks from a trusted server environment like Cloud Functions for Firebase using the Firebase Admin SDK for Node.js or Google Cloud libraries for Python. If you're new to the Admin SDKs, see Add Firebase to a server to get started.
A typical flow creates a new task, enqueues it in Cloud Tasks, and sets the configuration for the task:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
The sample code tries to spread out execution of tasks by associating a delay of Nth minutes for the Nth task. This translates to triggering ~ 1 task/minute. Note that you can also use
scheduleTime
(Node.js) orschedule_time
(Python) if you want Cloud Tasks to trigger a task at a specific time.The sample code sets the maximum amount of time Cloud Tasks will wait for a task to complete. Cloud Tasks will retry the task following the retry configuration of the queue or until this deadline is reached. In the sample, the queue is configured to retry the task up to 5 times, but the task is automatically cancelled if the whole process (including retry attempts) takes more than 5 minutes.
Retrieve and include the target URI
Due to the way Cloud Tasks creates authentication tokens to authenticate requests to the underlying task queue functions, you must specify the Cloud Run URL of the function when enqueuing tasks. We recommend that you programmatically retrieve the URL for your function as demonstrated below:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Troubleshooting
Cloud Tasks">Turn on Cloud Tasks logging
Logs from Cloud Tasks contain useful diagnostic information like the status of the request associated with a task. By default, logs from Cloud Tasks are turned off due to the large volume of logs it can potentially generate on your project. We recommend you turn on the debug logs while you are actively developing and debugging your task queue functions. See Turning on logging.
IAM Permissions
You may see PERMISSION DENIED
errors when enqueueing tasks or when
Cloud Tasks tries to invoke your task queue functions. Ensure that your
project has the following IAM bindings:
The identity used to enqueue tasks to Cloud Tasks needs
cloudtasks.tasks.create
IAM permission.In the sample, this is the App Engine default service account
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
The identity used to enqueue tasks to Cloud Tasks needs permission to use the service account associated with a task in Cloud Tasks.
In the sample, this is the App Engine default service account.
See Google Cloud IAM documentation for instructions on how to add the App Engine default service account as a user of the App Engine default service account.
The identity used to trigger the task queue function needs
cloudfunctions.functions.invoke
permission.In the sample, this is the App Engine default service account
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker