Kolejkowanie funkcji za pomocą Cloud Tasks


Funkcje kolejki zadań wykorzystują możliwości Google Zadania w Cloud aby pomóc w działaniu aplikacji, które zajmują dużo czasu bądź inne zasoby lub usługi o ograniczonej przepustowości. asynchronicznie poza głównym przepływem aplikacji.

Załóżmy na przykład, że chcesz utworzyć kopie zapasowe dużego zestawu obrazów pliki, które są obecnie hostowane w interfejsie API z limitem liczby żądań. Jeśli chcesz być odpowiedzialnym konsumentem tego interfejsu API, należy przestrzegać związanych z nim limitów liczby żądań. Dodatkowo tego rodzaju długotrwałe zadanie może być podatne na awarie z powodu przekroczenia limitów czasu i i ograniczenia pamięci.

Aby ograniczyć tę złożoność, możesz napisać funkcję kolejki zadań, która ustawi podstawowe opcje zadania, takie jak scheduleTime i dispatchDeadline, a następnie przekaż do kolejki w Cloud Tasks. Zadania Cloud zaprojektowano tak, aby skutecznie kontrolować zatory zasad ponawiania w przypadku tego typu operacji.

Pakiet SDK Firebase dla Cloud Functions dla Firebase w wersji 3.20.1 i nowszych współdziała ze sobą z pakietem Firebase Admin SDK w wersji 10.2.0 lub nowszej, aby obsługiwać funkcje kolejki zadań.

Korzystanie z funkcji kolejki zadań w Firebase może spowodować naliczenie opłat za Przetwarzanie w Cloud Tasks. Zobacz Cennik Cloud Tasks .

Tworzenie funkcji kolejki zadań

Aby użyć funkcji kolejki zadań, wykonaj ten przepływ pracy:

  1. Napisz funkcję kolejki zadań za pomocą pakietu SDK Firebase dla Cloud Functions.
  2. Przetestuj funkcję przez aktywowanie jej za pomocą żądania HTTP.
  3. wdrożyć funkcję za pomocą interfejsu wiersza poleceń Firebase, Podczas wdrażania zadania kolejki po raz pierwszy, interfejs wiersza poleceń utworzy kolejka zadań w Cloud Tasks z opcjami (ograniczenie liczby żądań i ponowienie próby) określonymi w kodzie źródłowym.
  4. Do nowo utworzonej kolejki zadań możesz dodawać zadania i przekazywać parametry, aby je skonfigurować w razie potrzeby harmonogram wykonywania. Aby to zrobić, napisz kod za pomocą pakietu Admin SDK i wdrożyć go w Cloud Functions dla Firebase.

Zapisywanie funkcji kolejki zadań

Przykładowe fragmenty kodu w tej sekcji są oparte na aplikacji, która ustawia tworzy usługę tworzenia kopii zapasowych wszystkich zdjęć z NASA Zdjęcie dnia poświęcone astronomii. Aby rozpocząć, zaimportuj wymagane moduły:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Użyj formatu onTaskDispatched lub on_task_dispatched na potrzeby funkcji kolejek zadań. Podczas zapisywania funkcji kolejki zadań możesz ustawić ponawianie prób w kolejce i ograniczanie liczby żądań.

Konfigurowanie funkcji kolejki zadań

Funkcje kolejki zadań mają rozbudowany zestaw ustawień konfiguracji precyzyjnego kontrolowania limitów liczby żądań i ponownego działania kolejki zadań:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: każde zadanie w kolejce zadań jest automatycznie ponawiać próby maksymalnie 5 razy. Pomaga to ograniczyć błędy przejściowe, takie jak sieć lub tymczasowe zakłócenia w działaniu zależnej usługi zewnętrznej.

  • retryConfig.minBackoffSeconds=60: każde zadanie jest powtarzane przez co najmniej 60 sekund niezależnie od każdej próby. Powoduje to duży bufor pomiędzy każdą próbą aby nie zużywać ich zbyt szybko.

  • rateLimits.maxConcurrentDispatch=6: maksymalnie 6 zadań jest wysyłanych w jednym miejscu danego czasu. Pomaga to zapewnić stały napływ żądań do źródła i pomaga zmniejszyć liczbę aktywnych instancji oraz uruchomień „na zimno”.

Testowanie funkcji kolejki zadań

Funkcje kolejki zadań w Pakiecie emulatorów lokalnych Firebase są przedstawiane jako proste funkcji HTTP. Możesz przetestować emulowaną funkcję zadania, wysyłając żądanie HTTP POST żądanie z ładunkiem danych JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Wdrażanie funkcji kolejki zadań

Wdróż funkcję kolejki zadań za pomocą interfejsu wiersza poleceń Firebase:

$ firebase deploy --only functions:backupapod

Przy wdrażaniu funkcji kolejki zadań po raz pierwszy interfejs wiersza poleceń tworzy kolejka zadań w Cloud Tasks z opcjami (ograniczanie liczby żądań i ponawianie prób) podany w kodzie źródłowym.

Jeśli podczas wdrażania funkcji wystąpią błędy uprawnień, upewnij się, że odpowiednie role uprawnień są przypisane do użytkownika wykonującego polecenia wdrożeniowe.

Kolejkowanie funkcji kolejki zadań

Funkcje kolejki zadań można dodać do kolejki w Cloud Tasks z zaufanego środowiska serwera, np. Cloud Functions dla Firebase, które korzysta z pakietu Firebase Admin SDK Node.js lub Google Cloud dla Pythona. Jeśli nie znasz jeszcze pakietów Admin SDK, zobacz Aby rozpocząć, dodaj Firebase do serwera.

Typowy przepływ tworzy nowe zadanie, dodaje je do kolejki Cloud Tasks i ustawia jego konfigurację:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Przykładowy kod próbuje rozłożyć wykonanie zadania przez powiązanie opóźnienia n-tego minuty z n-tym zadaniem. Ten przekłada się na wyzwalanie ~ 1 zadanie/min. Pamiętaj, że możesz też użyć scheduleTime (Node.js) lub schedule_time (Python), jeśli chcesz Cloud Tasks, aby aktywować zadanie w określonym czasie.

  • Przykładowy kod ustawia maksymalny czas Cloud Tasks zaczeka na wykonanie danego zadania. Cloud Tasks spróbuje ponownie zadanie po ponownej próbie konfiguracji kolejki lub do upłynięcia tego terminu. W próbce kolejka jest skonfigurowana tak, aby ponawiać zadanie maksymalnie 5 razy, ale zadanie jest automatycznie anulowane, jeśli cały proces (w tym ponowna próba) trwa to dłużej niż 5 minut.

Pobieranie i uwzględnianie docelowego identyfikatora URI

Ze względu na sposób, w jaki Cloud Tasks tworzy tokeny uwierzytelniania do uwierzytelniania do bazowych funkcji kolejki zadań, musisz określić Adres URL funkcji w Cloud Run podczas dodawania zadań do kolejki. Śr zalecamy programowe pobieranie adresu URL dla funkcji poniżej:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Rozwiązywanie problemów

Włącz logowanie w Cloud Tasks

Logi z Cloud Tasks zawierają przydatne informacje diagnostyczne, takie jak stanu żądania powiązanego z zadaniem. Domyślnie logi z Usługa Cloud Tasks jest wyłączona ze względu na dużą liczbę logów, które może obsłużyć co możesz potencjalnie wygenerować w swoim projekcie. Zalecamy włączenie dzienników debugowania podczas aktywnego rozwijania i debugowania funkcji kolejki zadań. Zobacz Włączam .

Uprawnienia

Podczas dodawania zadań do kolejki lub podczas dodawania do kolejki mogą wystąpić PERMISSION DENIED błędy Cloud Tasks próbuje wywołać funkcje kolejki zadań. Upewnij się, że atrybuty projekt ma te powiązania uprawnień:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Tożsamość używana do kolejkowania zadań w Cloud Tasks wymaga uprawnień aby używać konta usługi powiązanego z zadaniem w Cloud Tasks.

    W przykładzie jest to domyślne konto usługi App Engine.

Zobacz dokumentację Google Cloud IAM. instrukcje dodawania domyślnego konta usługi App Engine jako użytkownik domyślnego konta usługi App Engine.

  • Tożsamość używana do aktywowania funkcji kolejki zadań wymaga Uprawnienie cloudfunctions.functions.invoke.

    W przykładzie jest to domyślne konto usługi App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker