Funções de enfileiramento com o Cloud Tasks


As funções de fila de tarefas aproveitam o Google Cloud Tasks para ajudar seu aplicativo a executar tarefas demoradas, que consomem muitos recursos ou com largura de banda limitada de forma assíncrona, fora do fluxo principal do aplicativo.

Por exemplo, imagine que você deseja criar backups de um grande conjunto de arquivos de imagem atualmente hospedados em uma API com limite de taxa. Para ser um consumidor responsável dessa API, você precisa respeitar seus limites de taxas. Além disso, esse tipo de trabalho de longa duração pode ser vulnerável a falhas devido a tempos limite e limites de memória.

Para atenuar essa complexidade, você pode escrever uma função de fila de tarefas que defina opções básicas de tarefas, como scheduleTime e dispatchDeadline , e depois passar a função para uma fila no Cloud Tasks. O ambiente Cloud Tasks foi projetado especificamente para garantir controle eficaz de congestionamento e políticas de repetição para esses tipos de operações.

O SDK do Firebase para Cloud Functions para Firebase v3.20.1 e superior interopera com o SDK Admin do Firebase v10.2.0 e superior para oferecer suporte a funções de fila de tarefas.

O uso de funções de fila de tarefas com o Firebase pode resultar em cobranças pelo processamento do Cloud Tasks. Consulte preços do Cloud Tasks para obter mais informações.

Criar funções de fila de tarefas

Para usar funções de fila de tarefas, siga este fluxo de trabalho:

  1. Escreva uma função de fila de tarefas usando o SDK do Firebase para Cloud Functions.
  2. Teste sua função acionando-a com uma solicitação HTTP.
  3. Implante sua função com a CLI do Firebase. Ao implantar sua função de fila de tarefas pela primeira vez, a CLI criará uma fila de tarefas no Cloud Tasks com opções (limitação de taxa e nova tentativa) especificadas em seu código-fonte.
  4. Adicione tarefas à fila de tarefas recém-criada, passando parâmetros para configurar um cronograma de execução, se necessário. Você pode conseguir isso escrevendo o código usando o SDK Admin e implantando-o no Cloud Functions para Firebase.

Escrever funções de fila de tarefas

Os exemplos de código nesta seção são baseados em um aplicativo que configura um serviço que faz backup de todas as imagens da Imagem Astronômica do Dia da NASA. Para começar, importe os módulos necessários:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Pitão

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Use onTaskDispatched ou on_task_dispatched para funções de fila de tarefas. Ao escrever uma função de fila de tarefas, você pode definir novas tentativas por fila e configuração de limitação de taxa.

Configurar funções de fila de tarefas

As funções de fila de tarefas vêm com um poderoso conjunto de definições de configuração para controlar com precisão os limites de taxa e repetir o comportamento de uma fila de tarefas:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Pitão

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : cada tarefa na fila de tarefas é repetida automaticamente até 5 vezes. Isso ajuda a mitigar erros transitórios, como erros de rede ou interrupção temporária de um serviço externo dependente.

  • retryConfig.minBackoffSeconds=60 : Cada tarefa é repetida com pelo menos 60 segundos de intervalo entre cada tentativa. Isso fornece um grande buffer entre cada tentativa, para que não tenhamos pressa em esgotar as cinco novas tentativas muito rapidamente.

  • rateLimits.maxConcurrentDispatch=6 : No máximo 6 tarefas são despachadas em um determinado momento. Isso ajuda a garantir um fluxo constante de solicitações para a função subjacente e ajuda a reduzir o número de instâncias ativas e inicializações a frio.

Testar funções de fila de tarefas

As funções de fila de tarefas no Firebase Local Emulator Suite são expostas como funções HTTP simples. Você pode testar uma função de tarefa emulada enviando uma solicitação HTTP POST com uma carga de dados JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Implantar funções de fila de tarefas

Implante a função de fila de tarefas usando a CLI do Firebase:

$ firebase deploy --only functions:backupapod

Ao implantar uma função de fila de tarefas pela primeira vez, a CLI cria uma fila de tarefas no Cloud Tasks com opções (limitação de taxa e nova tentativa) especificadas em seu código-fonte.

Se você encontrar erros de permissão ao implantar funções, certifique-se de que as funções apropriadas do IAM estejam atribuídas ao usuário que executa os comandos de implantação.

Enfileirar funções de fila de tarefas

As funções de fila de tarefas podem ser enfileiradas no Cloud Tasks a partir de um ambiente de servidor confiável, como o Cloud Functions para Firebase, usando o SDK Admin do Firebase para Node.js ou as bibliotecas do Google Cloud para Python. Se você é novo nos SDKs Admin, consulte Adicionar o Firebase a um servidor para começar.

Um fluxo típico cria uma nova tarefa, coloca-a na fila do Cloud Tasks e define a configuração da tarefa:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Pitão

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    tasks_client = tasks_v2.CloudTasksClient()
    task_queue = tasks_client.queue_path(params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1,
                                         "backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task = tasks_v2.Task(http_request={
            "http_method": tasks_v2.HttpMethod.POST,
            "url": target_uri,
            "headers": {
                "Content-type": "application/json"
            },
            "body": json.dumps(body).encode()
        },
                             schedule_time=schedule_time)
        tasks_client.create_task(parent=task_queue, task=task)

    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • O código de exemplo tenta distribuir a execução de tarefas associando um atraso de enésimos minutos para a enésima tarefa. Isso se traduz no acionamento de aproximadamente 1 tarefa/minuto. Observe que você também pode usar scheduleTime (Node.js) ou schedule_time (Python) se quiser que o Cloud Tasks acione uma tarefa em um horário específico.

  • O código de exemplo define o tempo máximo que o Cloud Tasks aguardará a conclusão de uma tarefa. O Cloud Tasks tentará novamente a tarefa após a configuração de nova tentativa da fila ou até que esse prazo seja atingido. No exemplo, a fila está configurada para repetir a tarefa até 5 vezes, mas a tarefa será automaticamente cancelada se todo o processo (incluindo novas tentativas) demorar mais de 5 minutos.

Recuperar e incluir o URI de destino

Devido à forma como o Cloud Tasks cria tokens de autenticação para autenticar solicitações para as funções de fila de tarefas subjacentes, você deve especificar o URL do Cloud Run da função ao enfileirar tarefas. Recomendamos que você recupere programaticamente o URL da sua função, conforme demonstrado abaixo:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Pitão

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Solução de problemas

Ativar a geração de registros do Cloud Tasks

Os logs do Cloud Tasks contêm informações de diagnóstico úteis, como o status da solicitação associada a uma tarefa. Por padrão, os logs do Cloud Tasks estão desativados devido ao grande volume de logs que ele pode gerar no seu projeto. Recomendamos que você ative os logs de depuração enquanto estiver desenvolvendo e depurando ativamente suas funções de fila de tarefas. Consulte Ativando o registro em log .

Permissões IAM

Você pode ver erros PERMISSION DENIED ao enfileirar tarefas ou quando o Cloud Tasks tenta invocar suas funções de fila de tarefas. Certifique-se de que seu projeto tenha as seguintes vinculações do IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • A identidade usada para enfileirar tarefas no Cloud Tasks precisa de permissão para usar a conta de serviço associada a uma tarefa no Cloud Tasks.

    No exemplo, esta é a conta de serviço padrão do App Engine .

Consulte a documentação do Google Cloud IAM para obter instruções sobre como adicionar a conta de serviço padrão do App Engine como usuário da conta de serviço padrão do App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker