Enqueue-Funktionen mit Cloud Tasks


Aufgabenwarteschlangenfunktionen nutzen Google Cloud Tasks , um Ihrer App dabei zu helfen, zeitaufwändige, ressourcenintensive oder bandbreitenbeschränkte Aufgaben asynchron außerhalb Ihres Hauptanwendungsablaufs auszuführen.

Stellen Sie sich beispielsweise vor, Sie möchten Backups einer großen Menge an Bilddateien erstellen, die derzeit auf einer API mit einer Ratenbegrenzung gehostet werden. Um ein verantwortungsbewusster Nutzer dieser API zu sein, müssen Sie deren Ratenbeschränkungen respektieren. Darüber hinaus kann es bei dieser Art von Langzeitjobs aufgrund von Zeitüberschreitungen und Speicherbeschränkungen zu Fehlern kommen.

Um diese Komplexität zu verringern, können Sie eine Aufgabenwarteschlangenfunktion schreiben, die grundlegende Aufgabenoptionen wie scheduleTime “ und dispatchDeadline festlegt und die Funktion dann an eine Warteschlange in Cloud Tasks übergibt. Die Cloud Tasks-Umgebung wurde speziell entwickelt, um eine effektive Überlastungskontrolle und Wiederholungsrichtlinien für diese Art von Vorgängen sicherzustellen.

Das Firebase SDK für Cloud Functions für Firebase v3.20.1 und höher arbeitet mit Firebase Admin SDK v10.2.0 und höher zusammen, um Aufgabenwarteschlangenfunktionen zu unterstützen.

Die Verwendung von Aufgabenwarteschlangenfunktionen mit Firebase kann zu Gebühren für die Verarbeitung von Cloud-Aufgaben führen. Weitere Informationen finden Sie unter Cloud Tasks-Preise .

Erstellen Sie Aufgabenwarteschlangenfunktionen

Um Aufgabenwarteschlangenfunktionen zu verwenden, befolgen Sie diesen Workflow:

  1. Schreiben Sie eine Aufgabenwarteschlangenfunktion mit dem Firebase SDK für Cloud Functions.
  2. Testen Sie Ihre Funktion, indem Sie sie mit einer HTTP-Anfrage auslösen.
  3. Stellen Sie Ihre Funktion mit der Firebase-CLI bereit. Wenn Sie Ihre Aufgabenwarteschlangenfunktion zum ersten Mal bereitstellen, erstellt die CLI eine Aufgabenwarteschlange in Cloud Tasks mit den in Ihrem Quellcode angegebenen Optionen (Ratenbegrenzung und Wiederholung).
  4. Fügen Sie Aufgaben zur neu erstellten Aufgabenwarteschlange hinzu und übergeben Sie bei Bedarf Parameter, um einen Ausführungsplan einzurichten. Sie können dies erreichen, indem Sie den Code mit dem Admin SDK schreiben und ihn in Cloud Functions für Firebase bereitstellen.

Aufgabenwarteschlangenfunktionen schreiben

Die Codebeispiele in diesem Abschnitt basieren auf einer App, die einen Dienst einrichtet, der alle Bilder des Astronomy Picture of the Day der NASA sichert. Importieren Sie zunächst die erforderlichen Module:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Verwenden Sie onTaskDispatched oder on_task_dispatched für Aufgabenwarteschlangenfunktionen. Beim Schreiben einer Aufgabenwarteschlangenfunktion können Sie Wiederholungen pro Warteschlange und eine ratenbegrenzende Konfiguration festlegen.

Konfigurieren Sie Aufgabenwarteschlangenfunktionen

Aufgabenwarteschlangenfunktionen umfassen leistungsstarke Konfigurationseinstellungen zur präzisen Steuerung von Ratenbegrenzungen und Wiederholungsverhalten einer Aufgabenwarteschlange:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : Jede Aufgabe in der Aufgabenwarteschlange wird automatisch bis zu fünf Mal wiederholt. Dies trägt dazu bei, vorübergehende Fehler wie Netzwerkfehler oder vorübergehende Dienstunterbrechungen eines abhängigen externen Dienstes zu mildern.

  • retryConfig.minBackoffSeconds=60 : Jede Aufgabe wird im Abstand von mindestens 60 Sekunden nach jedem Versuch erneut versucht. Dies sorgt für einen großen Puffer zwischen den einzelnen Versuchen, sodass wir die fünf Wiederholungsversuche nicht zu schnell erschöpfen.

  • rateLimits.maxConcurrentDispatch=6 : Höchstens 6 Aufgaben werden gleichzeitig versendet. Dies trägt dazu bei, einen stetigen Strom von Anfragen an die zugrunde liegende Funktion sicherzustellen und trägt dazu bei, die Anzahl aktiver Instanzen und Kaltstarts zu reduzieren.

Aufgabenwarteschlangenfunktionen testen

Aufgabenwarteschlangenfunktionen in der Firebase Local Emulator Suite werden als einfache HTTP-Funktionen bereitgestellt. Sie können eine emulierte Aufgabenfunktion testen, indem Sie eine HTTP-POST-Anfrage mit einer JSON-Datennutzlast senden:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Stellen Sie Aufgabenwarteschlangenfunktionen bereit

Stellen Sie die Aufgabenwarteschlangenfunktion mithilfe der Firebase-CLI bereit:

$ firebase deploy --only functions:backupapod

Wenn Sie eine Aufgabenwarteschlangenfunktion zum ersten Mal bereitstellen, erstellt die CLI eine Aufgabenwarteschlange in Cloud Tasks mit den in Ihrem Quellcode angegebenen Optionen (Ratenbegrenzung und Wiederholung).

Wenn beim Bereitstellen von Funktionen Berechtigungsfehler auftreten, stellen Sie sicher, dass dem Benutzer, der die Bereitstellungsbefehle ausführt, die entsprechenden IAM-Rollen zugewiesen sind.

Aufgabenwarteschlangenfunktionen in die Warteschlange stellen

Aufgabenwarteschlangenfunktionen können in Cloud Tasks aus einer vertrauenswürdigen Serverumgebung wie Cloud Functions für Firebase mithilfe des Firebase Admin SDK für Node.js oder Google Cloud-Bibliotheken für Python in die Warteschlange gestellt werden. Wenn Sie mit den Admin-SDKs noch nicht vertraut sind, finden Sie weitere Informationen unter Hinzufügen von Firebase zu einem Server .

Ein typischer Ablauf erstellt eine neue Aufgabe, stellt sie in Cloud Tasks ein und legt die Konfiguration für die Aufgabe fest:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Der Beispielcode versucht, die Ausführung von Aufgaben zu verteilen, indem er der N-ten Aufgabe eine Verzögerung von N-ten Minuten zuordnet. Dies entspricht dem Auslösen von ca. 1 Aufgabe/Minute. Beachten Sie, dass Sie auch scheduleTime (Node.js) oder schedule_time (Python) verwenden können, wenn Sie möchten, dass Cloud Tasks eine Aufgabe zu einem bestimmten Zeitpunkt auslöst.

  • Der Beispielcode legt die maximale Zeitspanne fest, die Cloud Tasks auf den Abschluss einer Aufgabe wartet. Cloud Tasks wiederholt die Aufgabe nach der Wiederholungskonfiguration der Warteschlange oder bis diese Frist erreicht ist. Im Beispiel ist die Warteschlange so konfiguriert, dass sie die Aufgabe bis zu fünf Mal wiederholt. Die Aufgabe wird jedoch automatisch abgebrochen, wenn der gesamte Vorgang (einschließlich Wiederholungsversuchen) mehr als fünf Minuten dauert.

Rufen Sie den Ziel-URI ab und fügen Sie ihn ein

Aufgrund der Art und Weise, wie Cloud Tasks Authentifizierungstoken erstellt, um Anforderungen an die zugrunde liegenden Aufgabenwarteschlangenfunktionen zu authentifizieren, müssen Sie beim Einreihen von Aufgaben die Cloud Run-URL der Funktion angeben. Wir empfehlen Ihnen, die URL für Ihre Funktion programmgesteuert abzurufen, wie unten gezeigt:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Fehlerbehebung

Aktivieren Sie die Cloud Tasks-Protokollierung

Protokolle von Cloud Tasks enthalten nützliche Diagnoseinformationen wie den Status der mit einer Aufgabe verknüpften Anfrage. Standardmäßig sind Protokolle von Cloud Tasks deaktiviert, da es potenziell eine große Menge an Protokollen für Ihr Projekt generieren kann. Wir empfehlen Ihnen, die Debug-Protokolle zu aktivieren, während Sie Ihre Aufgabenwarteschlangenfunktionen aktiv entwickeln und debuggen. Siehe Protokollierung aktivieren .

IAM-Berechtigungen

Beim Einreihen PERMISSION DENIED angezeigt. Stellen Sie sicher, dass Ihr Projekt über die folgenden IAM-Bindungen verfügt:

  • Die Identität, die zum Einreihen von Aufgaben in Cloud Tasks verwendet wird, benötigt die IAM-Berechtigung cloudtasks.tasks.create .

    Im Beispiel ist dies das App Engine-Standarddienstkonto

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Die Identität, die zum Einreihen von Aufgaben in Cloud Tasks verwendet wird, benötigt die Berechtigung zur Verwendung des Dienstkontos, das einer Aufgabe in Cloud Tasks zugeordnet ist.

    Im Beispiel ist dies das App Engine-Standarddienstkonto .

Anweisungen zum Hinzufügen des App Engine-Standarddienstkontos als Benutzer des App Engine-Standarddienstkontos finden Sie in der Dokumentation zu Google Cloud IAM .

  • Die Identität, die zum Auslösen der Aufgabenwarteschlangenfunktion verwendet wird, benötigt die Berechtigung cloudfunctions.functions.invoke .

    Im Beispiel ist dies das App Engine-Standarddienstkonto

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker