Funzioni di accodamento con Cloud Tasks


Le funzioni della coda delle attività sfruttano Google Cloud Tasks per aiutare la tua app a eseguire attività dispendiose in termini di tempo, risorse o con larghezza di banda limitata in modo asincrono, al di fuori del flusso dell'applicazione principale.

Ad esempio, immagina di voler creare backup di un ampio set di file di immagine attualmente ospitati su un'API con un limite di velocità. Per essere un consumatore responsabile di tale API, è necessario rispettare i limiti di tariffa. Inoltre, questo tipo di lavoro di lunga durata potrebbe essere vulnerabile a errori dovuti a timeout e limiti di memoria.

Per mitigare questa complessità, puoi scrivere una funzione della coda delle attività che imposta le opzioni di base delle attività come scheduleTime e dispatchDeadline e quindi trasmette la funzione a una coda in Cloud Tasks. L'ambiente Cloud Tasks è progettato specificamente per garantire un controllo efficace della congestione e criteri di ripetizione dei tentativi per questo tipo di operazioni.

Firebase SDK for Cloud Functions per Firebase v3.20.1 e versioni successive interagisce con Firebase Admin SDK v10.2.0 e versioni successive per supportare le funzioni della coda delle attività.

L'utilizzo delle funzioni della coda delle attività con Firebase può comportare addebiti per l'elaborazione di Cloud Tasks. Per ulteriori informazioni, consulta i prezzi di Cloud Tasks .

Creare funzioni della coda delle attività

Per utilizzare le funzioni della coda delle attività, seguire questo flusso di lavoro:

  1. Scrivi una funzione della coda di attività utilizzando Firebase SDK for Cloud Functions.
  2. Metti alla prova la tua funzione attivandola con una richiesta HTTP.
  3. Distribuisci la tua funzione con la CLI Firebase. Quando distribuisci la funzione della coda di attività per la prima volta, la CLI creerà una coda di attività in Cloud Tasks con le opzioni (limitazione della velocità e nuovi tentativi) specificate nel codice sorgente.
  4. Aggiungi attività alla coda delle attività appena creata, trasmettendo parametri per impostare un programma di esecuzione, se necessario. Puoi ottenere questo risultato scrivendo il codice utilizzando Admin SDK e distribuendolo a Cloud Functions for Firebase.

Scrivere funzioni della coda delle attività

Gli esempi di codice in questa sezione si basano su un'app che configura un servizio che esegue il backup di tutte le immagini dell'Astronomy Picture of the Day della NASA. Per iniziare, importa i moduli richiesti:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Pitone

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Utilizzare onTaskDispatched o on_task_dispatched per le funzioni della coda delle attività. Quando si scrive una funzione della coda di attività è possibile impostare nuovi tentativi per coda e una configurazione di limitazione della velocità.

Configurare le funzioni della coda delle attività

Le funzioni della coda di attività sono dotate di un potente set di impostazioni di configurazione per controllare con precisione i limiti di velocità e il comportamento dei tentativi di una coda di attività:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Pitone

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : ogni attività nella coda delle attività viene ritentata automaticamente fino a 5 volte. Ciò aiuta a mitigare gli errori temporanei come errori di rete o interruzioni temporanee del servizio di un servizio esterno dipendente.

  • retryConfig.minBackoffSeconds=60 : ogni attività viene ritentata ad almeno 60 secondi di distanza da ogni tentativo. Ciò fornisce un ampio buffer tra ogni tentativo, quindi non abbiamo fretta di esaurire i 5 tentativi troppo rapidamente.

  • rateLimits.maxConcurrentDispatch=6 : vengono inviate al massimo 6 attività in un dato momento. Ciò aiuta a garantire un flusso costante di richieste alla funzione sottostante e aiuta a ridurre il numero di istanze attive e di avvii a freddo.

Testare le funzioni della coda delle attività

Le funzioni della coda delle attività nella Firebase Local Emulator Suite sono esposte come semplici funzioni HTTP. Puoi testare una funzione di attività emulata inviando una richiesta HTTP POST con un payload di dati JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Distribuire le funzioni della coda delle attività

Distribuisci la funzione coda attività utilizzando la CLI Firebase:

$ firebase deploy --only functions:backupapod

Quando distribuisci una funzione della coda di attività per la prima volta, la CLI crea una coda di attività in Cloud Tasks con le opzioni (limitazione della velocità e nuovi tentativi) specificate nel codice sorgente.

Se riscontri errori di autorizzazione durante la distribuzione delle funzioni, assicurati che i ruoli IAM appropriati siano assegnati all'utente che esegue i comandi di distribuzione.

Accoda le funzioni della coda delle attività

Le funzioni della coda delle attività possono essere accodate in Cloud Tasks da un ambiente server attendibile come Cloud Functions for Firebase utilizzando Firebase Admin SDK per Node.js o le librerie Google Cloud per Python. Se non hai familiarità con gli SDK di amministrazione, vedi Aggiungere Firebase a un server per iniziare.

Un flusso tipico crea una nuova attività, la accoda in Cloud Tasks e imposta la configurazione per l'attività:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Pitone

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Il codice di esempio tenta di distribuire l'esecuzione delle attività associando un ritardo di Nesimi minuti per l'Nesima attività. Ciò si traduce nell'attivazione di ~ 1 attività/minuto. Tieni presente che puoi anche utilizzare scheduleTime (Node.js) o schedule_time (Python) se desideri che Cloud Tasks attivi un'attività in un momento specifico.

  • Il codice di esempio imposta il tempo massimo di attesa di Cloud Tasks prima del completamento di un'attività. Cloud Tasks ritenterà l'attività dopo la configurazione dei tentativi della coda o fino al raggiungimento di tale scadenza. Nell'esempio, la coda è configurata per ritentare l'attività fino a 5 volte, ma l'attività viene automaticamente annullata se l'intero processo (compresi i tentativi di ripetizione) impiega più di 5 minuti.

Recupera e includi l'URI di destinazione

A causa del modo in cui Cloud Tasks crea token di autenticazione per autenticare le richieste alle funzioni della coda delle attività sottostanti, devi specificare l'URL Cloud Run della funzione quando accodi le attività. Ti consigliamo di recuperare a livello di codice l'URL per la tua funzione come dimostrato di seguito:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Pitone

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Risoluzione dei problemi

Attiva la registrazione di Cloud Tasks

I log di Cloud Tasks contengono informazioni diagnostiche utili come lo stato della richiesta associata a un'attività. Per impostazione predefinita, i log di Cloud Tasks sono disattivati ​​a causa dell'elevato volume di log che può potenzialmente generare nel tuo progetto. Ti consigliamo di attivare i registri di debug mentre sviluppi attivamente ed esegui il debug delle funzioni della coda delle attività. Vedere Attivazione della registrazione .

Autorizzazioni IAM

Potresti visualizzare errori PERMISSION DENIED durante l'accodamento delle attività o quando Cloud Tasks tenta di richiamare le funzioni della coda delle attività. Assicurati che il tuo progetto disponga dei seguenti collegamenti IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • L'identità utilizzata per accodare le attività a Cloud Tasks necessita dell'autorizzazione per utilizzare l'account di servizio associato a un'attività in Cloud Tasks.

    Nell'esempio, questo è l' account di servizio predefinito di App Engine .

Consulta la documentazione di Google Cloud IAM per istruzioni su come aggiungere l'account di servizio predefinito di App Engine come utente dell'account di servizio predefinito di App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker