Poner en cola funciones con Cloud Tasks


Las funciones de cola de tareas aprovechan Google Cloud Tasks para ayudar a que su aplicación ejecute tareas que requieren mucho tiempo, recursos o ancho de banda limitado de forma asincrónica, fuera del flujo principal de su aplicación.

Por ejemplo, imagine que desea crear copias de seguridad de un gran conjunto de archivos de imágenes que actualmente están alojados en una API con un límite de velocidad. Para ser un consumidor responsable de esa API, debes respetar sus límites de tarifas. Además, este tipo de trabajo de larga duración podría ser vulnerable a fallas debido a tiempos de espera y límites de memoria.

Para mitigar esta complejidad, puede escribir una función de cola de tareas que establezca opciones básicas de tareas como scheduleTime y dispatchDeadline , y luego entregue la función a una cola en Cloud Tasks. El entorno de Cloud Tasks está diseñado específicamente para garantizar un control de congestión eficaz y políticas de reintento para este tipo de operaciones.

El SDK de Firebase para Cloud Functions para Firebase v3.20.1 y versiones posteriores interopera con el SDK de Firebase Admin v10.2.0 y versiones posteriores para admitir funciones de cola de tareas.

El uso de funciones de cola de tareas con Firebase puede generar cargos por el procesamiento de Cloud Tasks. Consulte los precios de Cloud Tasks para obtener más información.

Crear funciones de cola de tareas

Para utilizar las funciones de la cola de tareas, siga este flujo de trabajo:

  1. Escriba una función de cola de tareas utilizando el SDK de Firebase para Cloud Functions.
  2. Pruebe su función activándola con una solicitud HTTP.
  3. Implemente su función con Firebase CLI. Al implementar su función de cola de tareas por primera vez, la CLI creará una cola de tareas en Cloud Tasks con opciones (limitación de velocidad y reintento) especificadas en su código fuente.
  4. Agregue tareas a la cola de tareas recién creada, pasando parámetros para configurar un cronograma de ejecución si es necesario. Puede lograr esto escribiendo el código usando el SDK de administración e implementándolo en Cloud Functions para Firebase.

Escribir funciones de cola de tareas

Los ejemplos de código de esta sección se basan en una aplicación que configura un servicio que realiza una copia de seguridad de todas las imágenes de la Imagen astronómica del día de la NASA. Para comenzar, importe los módulos necesarios:

Nodo.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Pitón

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Utilice onTaskDispatched o on_task_dispatched para funciones de cola de tareas. Al escribir una función de cola de tareas, puede establecer un reintento por cola y una configuración de limitación de velocidad.

Configurar funciones de cola de tareas

Las funciones de la cola de tareas vienen con un potente conjunto de ajustes de configuración para controlar con precisión los límites de velocidad y el comportamiento de reintento de una cola de tareas:

Nodo.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Pitón

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : cada tarea en la cola de tareas se reintenta automáticamente hasta 5 veces. Esto ayuda a mitigar errores transitorios, como errores de red o interrupciones temporales del servicio de un servicio externo dependiente.

  • retryConfig.minBackoffSeconds=60 : cada tarea se reintenta con al menos 60 segundos de diferencia entre cada intento. Esto proporciona un gran margen entre cada intento para que no nos apresuremos a agotar los 5 reintentos demasiado rápido.

  • rateLimits.maxConcurrentDispatch=6 : se envían como máximo 6 tareas en un momento dado. Esto ayuda a garantizar un flujo constante de solicitudes a la función subyacente y ayuda a reducir la cantidad de instancias activas y arranques en frío.

Probar funciones de cola de tareas

Las funciones de la cola de tareas en Firebase Local Emulator Suite se exponen como funciones HTTP simples. Puede probar una función de tarea emulada enviando una solicitud HTTP POST con una carga útil de datos JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Implementar funciones de cola de tareas

Implemente la función de cola de tareas mediante Firebase CLI:

$ firebase deploy --only functions:backupapod

Al implementar una función de cola de tareas por primera vez, la CLI crea una cola de tareas en Cloud Tasks con opciones (limitación de velocidad y reintento) especificadas en su código fuente.

Si encuentra errores de permisos al implementar funciones, asegúrese de que se asignen las funciones de IAM adecuadas al usuario que ejecuta los comandos de implementación.

Poner en cola funciones de cola de tareas

Las funciones de la cola de tareas se pueden poner en cola en Cloud Tasks desde un entorno de servidor confiable como Cloud Functions para Firebase usando el SDK de Firebase Admin para Node.js o las bibliotecas de Google Cloud para Python. Si eres nuevo en los SDK de administración, consulta Agregar Firebase a un servidor para comenzar.

Un flujo típico crea una nueva tarea, la pone en cola en Cloud Tasks y establece la configuración de la tarea:

Nodo.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Pitón

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    tasks_client = tasks_v2.CloudTasksClient()
    task_queue = tasks_client.queue_path(params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1,
                                         "backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task = tasks_v2.Task(http_request={
            "http_method": tasks_v2.HttpMethod.POST,
            "url": target_uri,
            "headers": {
                "Content-type": "application/json"
            },
            "body": json.dumps(body).encode()
        },
                             schedule_time=schedule_time)
        tasks_client.create_task(parent=task_queue, task=task)

    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • El código de muestra intenta distribuir la ejecución de tareas asociando un retraso de enésimo minuto para la enésima tarea. Esto se traduce en activar ~ 1 tarea/minuto. Tenga en cuenta que también puede usar scheduleTime (Node.js) o schedule_time (Python) si desea que Cloud Tasks active una tarea en un momento específico.

  • El código de muestra establece la cantidad máxima de tiempo que Cloud Tasks esperará hasta que se complete una tarea. Cloud Tasks reintentará la tarea después de la configuración de reintento de la cola o hasta que se alcance esta fecha límite. En el ejemplo, la cola está configurada para reintentar la tarea hasta 5 veces, pero la tarea se cancela automáticamente si todo el proceso (incluidos los reintentos) tarda más de 5 minutos.

Recuperar e incluir el URI de destino

Debido a la forma en que Cloud Tasks crea tokens de autenticación para autenticar solicitudes en las funciones de la cola de tareas subyacentes, debes especificar la URL de Cloud Run de la función al poner en cola las tareas. Le recomendamos que recupere mediante programación la URL de su función como se muestra a continuación:

Nodo.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Pitón

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Solución de problemas

Activar el registro de tareas en la nube

Los registros de Cloud Tasks contienen información de diagnóstico útil, como el estado de la solicitud asociada con una tarea. De forma predeterminada, los registros de Cloud Tasks están desactivados debido al gran volumen de registros que potencialmente puede generar en su proyecto. Le recomendamos que active los registros de depuración mientras desarrolla y depura activamente las funciones de la cola de tareas. Consulte Activar el registro .

Permisos de IAM

Es posible que vea errores PERMISSION DENIED al poner en cola tareas o cuando Cloud Tasks intenta invocar sus funciones de cola de tareas. Asegúrese de que su proyecto tenga los siguientes enlaces de IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

Consulte la documentación de Google Cloud IAM para obtener instrucciones sobre cómo agregar la cuenta de servicio predeterminada de App Engine como usuario de la cuenta de servicio predeterminada de App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker