Mettre des fonctions en file d'attente avec Cloud Tasks


Les fonctions de file d'attente de tâches exploitent Cloud Tasks de Google pour aider votre application à exécuter des tâches fastidieuses, gourmandes en ressources ou limitées en bande passante de manière asynchrone, en dehors de votre flux d'application principal.

Imaginons que vous souhaitiez créer des sauvegardes d'un grand ensemble de fichiers image actuellement hébergés sur une API avec une limite de débit. Pour être un consommateur responsable de cette API, vous devez respecter ses limites de débit. De plus, ce type de tâche de longue durée peut être sujet à des échecs en raison des délais avant expiration et des limites de mémoire.

Pour atténuer cette complexité, vous pouvez écrire une fonction de file d'attente de tâches qui définit des options de tâche de base telles que scheduleTime et dispatchDeadline, puis transmet la fonction à une file d'attente dans Cloud Tasks. L'environnement Cloud Tasks est conçu spécifiquement pour assurer un contrôle efficace de la congestion et des stratégies de nouvelle tentative pour ces types d'opérations.

Le SDK Firebase pour Cloud Functions for Firebase v3.20.1 et versions ultérieures interopéra avec Firebase Admin SDK v10.2.0 et versions ultérieures pour prendre en charge les fonctions de file d'attente de tâches.

L'utilisation de fonctions de file d'attente de tâches avec Firebase peut entraîner des frais de traitement Cloud Tasks. Pour en savoir plus, consultez la page Tarifs Cloud Tasks.

Créer des fonctions de file d'attente de tâches

Pour utiliser les fonctions de file d'attente de tâches, suivez ce workflow:

  1. Écrivez une fonction de file d'attente de tâches à l'aide du SDK Firebase pour Cloud Functions.
  2. Testez votre fonction en la déclenchant avec une requête HTTP.
  3. Déployez votre fonction avec la CLI Firebase. Lorsque vous déployez votre fonction de file d'attente de tâches pour la première fois, la CLI crée une file d'attente de tâches dans Cloud Tasks avec des options (limitation de débit et nouvelle tentative) spécifiées dans votre code source.
  4. Ajoutez des tâches à la file d'attente de tâches que vous venez de créer, en transmettant des paramètres pour configurer un calendrier d'exécution si nécessaire. Pour ce faire, écrivez le code à l'aide de Admin SDK et déployez-le sur Cloud Functions for Firebase.

Écrire des fonctions de file d'attente des tâches

Les exemples de code de cette section sont basés sur une application qui configure un service qui sauvegarde toutes les images de l'image astronomique du jour de la NASA. Pour commencer, importez les modules requis:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Utilisez onTaskDispatched ou on_task_dispatched pour les fonctions de file d'attente de tâches. Lorsque vous écrivez une fonction de file d'attente de tâches, vous pouvez définir la configuration de la nouvelle tentative et de la limitation de débit par file d'attente.

Configurer les fonctions de file d'attente de tâches

Les fonctions de file d'attente de tâches sont fournies avec un ensemble puissant de paramètres de configuration pour contrôler précisément les limites de débit et le comportement de nouvelle tentative d'une file d'attente de tâches:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: chaque tâche de la file d'attente de tâches est automatiquement réessayée jusqu'à cinq fois. Cela permet d'atténuer les erreurs temporaires, telles que les erreurs réseau ou la perturbation temporaire du service d'un service externe dépendant.

  • retryConfig.minBackoffSeconds=60: chaque tâche est réessayée au moins 60 secondes après chaque tentative. Cela fournit un grand tampon entre chaque tentative afin de ne pas épuiser trop rapidement les cinq tentatives.

  • rateLimits.maxConcurrentDispatch=6: Au maximum six tâches sont distribuées à un moment donné. Cela permet de garantir un flux constant de requêtes vers la fonction sous-jacente et de réduire le nombre d'instances actives et de démarrages à froid.

Tester les fonctions de file d'attente des tâches

Dans la plupart des cas, l'émulateur Cloud Functions est le meilleur moyen de tester les fonctions de file d'attente de tâches. Consultez la documentation de la suite d'émulateurs pour découvrir comment instrumenter votre application pour l'émulation des fonctions de file d'attente de tâches.

En outre, les fonctions_sdk de la file d'attente de tâches sont exposées en tant que fonctions HTTP simples dans Firebase Local Emulator Suite. Vous pouvez tester une fonction de tâche émulée en envoyant une requête HTTP POST avec une charge utile de données JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Déployer des fonctions de file d'attente de tâches

Déployez la fonction de file d'attente de tâches à l'aide de la CLI Firebase:

$ firebase deploy --only functions:backupapod

Lorsque vous déployez une fonction de file d'attente de tâches pour la première fois, la CLI crée une file d'attente de tâches dans Cloud Tasks avec des options (limitation du débit et nouvelle tentative) spécifiées dans votre code source.

Si vous rencontrez des erreurs d'autorisation lors du déploiement de fonctions, assurez-vous que les rôles IAM appropriés sont attribués à l'utilisateur qui exécute les commandes de déploiement.

Mettre en file d'attente des fonctions de file d'attente de tâches

Les fonctions de file d'attente de tâches peuvent être mises en file d'attente dans Cloud Tasks à partir d'un environnement serveur approuvé tel que Cloud Functions for Firebase à l'aide de Firebase Admin SDK pour Node.js ou des bibliothèques Google Cloud pour Python. Si vous ne connaissez pas les Admin SDK, consultez Ajouter Firebase à un serveur pour commencer.

Un flux typique crée une tâche, l'ajoute à la file d'attente dans Cloud Tasks et définit la configuration de la tâche:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • L'exemple de code tente de répartir l'exécution des tâches en associant un délai de N minutes à la N ième tâche. Cela se traduit par le déclenchement d'environ une tâche par minute. Notez que vous pouvez également utiliser scheduleTime (Node.js) ou schedule_time (Python) si vous souhaitez que Cloud Tasks déclenche une tâche à un moment spécifique.

  • L'exemple de code définit la durée maximale d'attente de Cloud Tasks pour qu'une tâche soit terminée. Cloud Tasks relancera la tâche conformément à la configuration de la file d'attente ou jusqu'à ce que cette échéance soit atteinte. Dans l'exemple, la file d'attente est configurée pour réessayer la tâche jusqu'à cinq fois, mais la tâche est automatiquement annulée si l'ensemble du processus (y compris les tentatives de nouvelle tentative) prend plus de cinq minutes.

Récupérer et inclure l'URI cible

En raison de la façon dont Cloud Tasks crée des jetons d'authentification pour authentifier les requêtes adressées aux fonctions de file d'attente de tâches sous-jacentes, vous devez spécifier l'URL Cloud Run de la fonction lorsque vous mettez en file d'attente des tâches. Nous vous recommandons de récupérer l'URL de votre fonction par programmation, comme indiqué ci-dessous:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Dépannage

"Cloud Tasks">Activer la journalisation Cloud Tasks

Les journaux de Cloud Tasks contiennent des informations de diagnostic utiles, telles que l'état de la requête associée à une tâche. Par défaut, les journaux de Cloud Tasks sont désactivés en raison du grand volume de journaux qu'ils peuvent potentiellement générer dans votre projet. Nous vous recommandons d'activer les journaux de débogage lorsque vous développez et déboguez activement vos fonctions de file d'attente de tâches. Consultez la section Activer la journalisation.

Autorisations IAM

Des erreurs PERMISSION DENIED peuvent s'afficher lorsque vous mettez en file d'attente des tâches ou lorsque Cloud Tasks tente d'appeler vos fonctions de file d'attente de tâches. Assurez-vous que votre projet dispose des liaisons IAM suivantes:

  • L'identité utilisée pour mettre des tâches en file d'attente dans Cloud Tasks nécessite une autorisation IAM cloudtasks.tasks.create.

    Dans l'exemple, il s'agit du compte de service par défaut App Engine.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • L'identité utilisée pour mettre en file d'attente des tâches dans Cloud Tasks doit disposer de l'autorisation d'utiliser le compte de service associé à une tâche dans Cloud Tasks.

    Dans l'exemple, il s'agit du compte de service par défaut App Engine.

Pour savoir comment ajouter le compte de service par défaut App Engine en tant qu'utilisateur du compte de service par défaut App Engine, consultez la documentation Google Cloud IAM.

  • L'identité utilisée pour déclencher la fonction de file d'attente de tâches nécessite l'autorisation cloudfunctions.functions.invoke.

    Dans l'exemple, il s'agit du compte de service par défaut App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker