Enfileirar funções com o Cloud Tasks


As funções de fila de tarefas usam o Google Cloud Tasks para ajudar seu app a executar tarefas demoradas, que consomem muitos recursos ou limitadas por largura de banda de maneira assíncrona e fora do fluxo principal do aplicativo.

Por exemplo, imagine que você quer criar backups de um grande conjunto de arquivos de imagem hospedados atualmente em uma API com um limite de taxa. Para fazer o consumo responsável dessa API, precisamos respeitar esses limites. Além disso, esse tipo de job de longa duração pode ser vulnerável a falhas devido a tempos limite e limites de memória.

Para atenuar essa complexidade, grave uma função de fila de tarefas que defina opções básicas de tarefa, como scheduleTime e dispatchDeadline e a distribua para uma fila no Cloud Tasks. O ambiente do Cloud Tasks foi projetado especificamente para garantir um controle de congestionamento e políticas de repetição eficientes para esses tipos de operações.

O SDK do Firebase para o Cloud Functions para Firebase v3.20.1 e versões posteriores interopera com o SDK Admin do Firebase v10.2.0 e versões mais recentes para oferecer suporte a funções da fila de tarefas.

O uso de funções da fila de tarefas com o Firebase pode resultar em cobranças pelo processamento do Cloud Tasks. Consulte Preços do Cloud Tasks para mais informações.

Como criar funções da fila de tarefas

Para usar as funções da fila de tarefas, siga este fluxo de trabalho:

  1. Grave uma função da fila de tarefas usando o SDK do Firebase para Cloud Functions.
  2. Teste suas funções localmente usando o Pacote de emuladores locais do Firebase.
  3. Implante a função com a CLI do Firebase. Ao implantar a função de fila de tarefas pela primeira vez, a CLI vai criar uma fila de tarefas no Cloud Tasks com opções (limitação de taxa e nova tentativa) especificadas no código-fonte.
  4. Adicione tarefas à fila de tarefas recém-criada, transmitindo parâmetros para configurar uma programação de execução. Para isso, escreva o código usando o SDK Admin e implante-o no Cloud Functions para Firebase.

Como criar funções da fila de tarefas

Os exemplos de código desta seção são baseados em um app que configura um serviço de backup de todas as imagens do site de fotos astronômicas diárias (em inglês) da NASA: Para começar, importe os módulos necessários:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
const HttpsError = functions.https.HttpsError;

Python (pré-lançamento)

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Use onTaskDispatched ou on_task_dispatched para funções da fila de tarefas. Ao escrever uma função da fila de tarefas, você pode definir uma configuração de limitação de taxa e nova tentativa por fila.

Configuração da fila de tarefas

As funções da fila de tarefas têm um conjunto avançado de configurações para controlar com precisão os limites de taxa e o comportamento de repetição de uma fila de tarefas:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python (pré-lançamento)

@tasks_fn.on_task_dispatched(
    retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
    rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: cada tarefa na fila é repetida automaticamente até cinco vezes. Isso nos ajuda a reduzir falhas transitórias, como erros de rede, ou a interrupção temporária de um serviço externo e dependente.

  • retryConfig.minBackoffSeconds=60: cada tarefa é executada novamente após um intervalo mínimo de 60 segundos. Com isso, temos um grande buffer entre cada tentativa, e não há pressa para realizar as cinco tentativas muito rapidamente.

  • rateLimits.maxConcurrentDispatch=6: são enviadas, no máximo, seis tarefas em um determinado momento. Isso ajuda a garantir um fluxo estável de solicitações para a função, além de reduzir o número de instâncias ativas e inicializações a frio.

Como testar funções da fila de tarefas usando o Pacote de emuladores locais do Firebase

As funções da fila de tarefas no Pacote de emuladores locais do Firebase são expostas como funções HTTP simples. Para testar uma função de tarefa emulada, envie uma solicitação HTTP POST com um payload de dados json:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Como implantar a função da fila de tarefas

Implante a função da fila de tarefas usando a CLI do Firebase:

$ firebase deploy --only functions:backupapod

Ao implantar a função de fila de tarefas pela primeira vez, a CLI cria uma fila de tarefas no Cloud Tasks com opções (limitação de taxa e nova tentativa) especificadas no código-fonte.

Se você encontrar erros de permissão ao implantar funções, verifique se os papéis do IAM apropriados estão atribuídos ao usuário que executa os comandos de implantação.

Enfileirar a função

As funções da fila de tarefas podem ser enfileiradas no Cloud Tasks a partir de um ambiente de servidor confiável, como o Cloud Functions para Firebase, usando o SDK Admin do Firebase para Node.js ou as bibliotecas do Google Cloud para Python. Se você não tiver experiência com os SDKs Admin, consulte Adicionar o Firebase a um servidor para começar.

Um fluxo típico cria uma nova tarefa, a enfileira no Cloud Tasks e define a configuração dela:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python (pré-lançamento)

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    tasks_client = tasks_v2.CloudTasksClient()
    task_queue = tasks_client.queue_path(
        params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1, "backupapod"
    )
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task = tasks_v2.Task(
            http_request={
                "http_method": tasks_v2.HttpMethod.POST,
                "url": target_uri,
                "headers": {"Content-type": "application/json"},
                "body": json.dumps(body).encode(),
            },
            schedule_time=schedule_time,
        )
        tasks_client.create_task(parent=task_queue, task=task)

    return https_fn.Response(
        status=200, response=f"Enqueued {BACKUP_COUNT} tasks"
    )


  • O exemplo de código tenta distribuir a execução das tarefas associando um atraso de N minutos à Nª tarefa. Isso se traduz no acionamento de aproximadamente 1 tarefa/minuto. Observe que você também pode usar scheduleTime (Node.js) ou schedule_time (Python) se quiser que o Cloud Tasks acione uma tarefa em um horário específico.

  • O código de amostra define o tempo máximo que o Cloud Tasks aguardará a conclusão de uma tarefa. O Cloud Tasks vai tentar realizar a tarefa novamente após a configuração da nova tentativa da fila ou até que esse prazo seja atingido. No exemplo, a fila é configurada para tentar realizar a tarefa novamente até cinco vezes, mas a tarefa é cancelada automaticamente se todo o processo (incluindo novas tentativas) levar mais de cinco minutos.

Recuperar e incluir o URI de destino

O Cloud Tasks cria um token de autenticação para autenticar solicitações para a função da fila de tarefas. Sendo assim, você precisa especificar o URL do Cloud Run da função ao enfileirar uma tarefa. Recomendamos que você recupere de maneira programática o URL da função, conforme demonstrado abaixo:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python (pré-lançamento)

def get_function_url(
    name: str, location: str = SupportedRegion.US_CENTRAL1
) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    authed_session = AuthorizedSession(credentials)
    url = (
        "https://cloudfunctions.googleapis.com/v2beta/"
        + f"projects/{project_id}/locations/{location}/functions/{name}"
    )
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url


Solução de problemas

Ativar a geração de registros do Cloud Tasks

Os registros do Cloud Tasks têm informações de diagnóstico úteis, como o status da solicitação associada a uma tarefa. Por padrão, os registros do Cloud Tasks são desativados devido ao grande volume de registros que podem ser gerados no projeto. Recomendamos ativar os registros de depuração enquanto você desenvolve e depura ativamente as funções da fila de tarefas. Consulte Como ativar a geração de registros.

Permissões do IAM

É possível ver erros PERMISSION DENIED ao enfileirar tarefas ou quando o Cloud Tasks tenta invocar as funções da fila de tarefas. Verifique se o projeto tem as seguintes vinculações do IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • A identidade usada para enfileirar tarefas para o Cloud Tasks precisa de permissão para usar a conta de serviço associada a uma tarefa no Cloud Tasks.

    No exemplo, a conta de serviço padrão do App Engine.

Consulte a documentação do Google Cloud IAM para ver instruções sobre como adicionar a conta de serviço padrão do App Engine como um usuário da conta de serviço padrão do App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker