Cloud Tasks ile işlevleri sıraya alma


Görev sırası işlevleri, uygulamanızın zaman alıcı, yoğun kaynak kullanan veya bant genişliği sınırlı görevleri ana uygulama akışınızın dışında eşzamansız olarak çalıştırmasına yardımcı olmak için Google Cloud Tasks'den yararlanır.

Örneğin, şu anda hız sınırı olan bir API'de barındırılan büyük bir resim dosyası grubunun yedeklerini oluşturmak istediğinizi varsayalım. Bu API'nin sorumlu bir kullanıcısı olmak için hız sınırlamalarına uymanız gerekir. Ayrıca, bu tür uzun süreli işler zaman aşımları ve bellek sınırları nedeniyle başarısızlığa uğrayabilir.

Bu karmaşıklığı azaltmak için scheduleTime ve dispatchDeadline gibi temel görev seçeneklerini belirleyen bir görev sırası işlevi yazabilir ve ardından işlevi Cloud Tasks içindeki bir sıraya alabilirsiniz. Cloud Tasks ortamı, bu tür işlemler için etkili tıkanıklık kontrolü ve yeniden deneme politikaları sağlamak üzere özel olarak tasarlanmıştır.

Cloud Functions for Firebase v3.20.1 ve sonraki sürümler için Firebase SDK'sı, görev sırası işlevlerini desteklemek amacıyla Firebase Admin SDK v10.2.0 ve sonraki sürümlerle birlikte çalışır.

Görev kuyruğu işlevlerini Firebase ile kullanmak, işleme ücretlerine yol açabilir. Daha fazla bilgi için Cloud Tasks fiyatlandırmasına göz atın.

Görev sırası işlevleri oluşturma

Görev sırası işlevlerini kullanmak için şu iş akışını izleyin:

  1. Cloud Functions için Firebase SDK'sını kullanarak bir görev sırası işlevi yazın.
  2. İşlevinizi bir HTTP isteğiyle tetikleyerek test edin.
  3. İşlevinizi Firebase CLI ile dağıtın. Görev kuyruğu işlevinizi ilk kez dağıtırken CLI, kaynak kodunuzda belirtilen seçenekleri (hız sınırlaması ve yeniden deneme) içeren bir görev kuyruğu oluşturur.
  4. Gerekirse yeni oluşturulan görev kuyruğuna görevler ekleyin ve gerekirse yürütme planı oluşturmak için parametreler iletin. Bunu, Admin SDK'ü kullanarak kodu yazıp Cloud Functions for Firebase'e dağıtarak yapabilirsiniz.

Görev sırası işlevleri yazma

Bu bölümdeki kod örnekleri, NASA'nın Günün Astronomi Resmi'ndeki tüm görüntüleri yedekleyen bir hizmeti kuran bir uygulamaya dayanmaktadır. Başlamak için gerekli modülleri içe aktarın:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Görev sırası işlevleri için onTaskDispatched veya on_task_dispatched kullanın. Görev sırası işlevi yazarken sıra başına yeniden deneme ve hız sınırlayıcı yapılandırma ayarlayabilirsiniz.

Görev sırası işlevlerini yapılandırma

Görev sırası işlevleri, hız sınırlarını hassas bir şekilde kontrol etmek ve görev sırasının yeniden deneme davranışını hassas bir şekilde denetlemek için bir dizi güçlü yapılandırma ayarıyla birlikte gelir:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Görev kuyruğundaki her görev otomatik olarak en fazla 5 kez yeniden denenir. Bu sayede ağ hataları veya bağımlı, harici bir hizmetin geçici hizmet kesintisi gibi geçici hataları en aza indirebilirsiniz.

  • retryConfig.minBackoffSeconds=60: Her görev, her deneme arasında en az 60 saniye bekledikten sonra tekrar denenir. Bu sayede, 5 yeniden deneme denemesini çok hızlı bir şekilde tüketmemek için her deneme arasında büyük bir tampon sağlanır.

  • rateLimits.maxConcurrentDispatch=6: Belirli bir zamanda en fazla 6 görev gönderilir. Bu, temel işleve sabit bir istek akışı sağlamaya yardımcı olur ve etkin örneklerin ve baştan başlatma işlemlerinin sayısını azaltır.

Görev sırası işlevlerini test etme

Çoğu durumda, görev kuyruğu işlevlerini test etmenin en iyi yolu Cloud Functions emülatörüdür. Uygulamanızı görev sırası işlevleri emülasyonu için nasıl donatacağınızı öğrenmek üzere Emulator Suite dokümanlarını inceleyin.

Ayrıca, görev kuyruğu functions_sdk, Firebase Local Emulator Suite içinde basit HTTP işlevleri olarak gösterilir. JSON veri yükü içeren bir HTTP POST isteği göndererek taklit edilmiş bir görev işlevini test edebilirsiniz:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Görev sırası işlevlerini dağıtma

Firebase KSA'yı kullanarak görev sırası işlevini dağıtın:

$ firebase deploy --only functions:backupapod

CLI, bir görev kuyruğu işlevini ilk kez dağıtırken kaynak kodunuzda belirtilen seçenekleri (hız sınırlaması ve yeniden deneme) kullanarak Cloud Tasks içinde bir görev kuyruğu oluşturur.

İşlevleri dağıtırken izin hatalarıyla karşılaşırsanız dağıtım komutlarını çalıştıran kullanıcıya uygun IAM rollerinin atandığından emin olun.

Görev sırası işlevlerini sıraya ekleme

Görev sırası işlevleri, Node.js için Firebase Admin SDK veya Python için Google Cloud kitaplıkları kullanılarak Cloud Functions for Firebase gibi güvenilir bir sunucu ortamından Cloud Tasks'e eklenebilir. Admin SDK'lerde yeniyseniz başlamak için Firebase'i bir sunucuya ekleme başlıklı makaleyi inceleyin.

Tipik bir akış, yeni bir görev oluşturur, bu görevi Cloud Tasks'te sıraya ekler ve görevin yapılandırmasını ayarlar:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Örnek kod, N. görev için N. dakikalık bir gecikme ilişkilendirerek görevlerin yürütülmesini dağıtmaya çalışır. Bu, dakika başına yaklaşık 1 görev tetiklenmesi anlamına gelir. Bir görevi belirli bir zamanda tetiklemek isterseniz scheduleTime (Node.js) veya schedule_time (Python) kullanabileceğinizi de unutmayın.Cloud Tasks

  • Örnek kod, Cloud Tasks'nin bir görevin tamamlanmasını beklediği maksimum süreyi belirler. Cloud Tasks, sıranın yeniden deneme yapılandırmasını takip ederek veya bu son tarihe kadar görevi yeniden dener. Örnekte, sıra, görevi 5 defaya kadar yeniden deneyecek şekilde yapılandırılmıştır ancak tüm işlem (yeniden deneme denemeleri dahil) 5 dakikadan uzun sürerse görev otomatik olarak iptal edilir.

Hedef URI'yi alıp dahil etme

Cloud Tasks, temel görev sırası işlevlerine yapılan istekleri doğrulamak için kimlik doğrulama jetonları oluşturma şekli nedeniyle, görevleri sıraya eklerken işlevin Cloud Run URL'sini belirtmeniz gerekir. Aşağıda gösterildiği gibi işlevinizin URL'sini programatik olarak almanızı öneririz:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Sorun giderme

Cloud Tasks">Cloud Tasks günlük kaydını etkinleştirme

Cloud Tasks kaynaklı günlükler, bir görevle ilişkili isteğin durumu gibi yararlı teşhis bilgileri içerir. Cloud Tasks kaynaklı günlükler, projenizde oluşturabileceği büyük miktardaki günlükler nedeniyle varsayılan olarak devre dışıdır. Görev sırası işlevlerinizi aktif olarak geliştirirken ve hata ayıklarken hata ayıklama günlüklerini açmanızı öneririz. Günlüğe kaydetmeyi etkinleştirme başlıklı makaleyi inceleyin.

IAM İzinleri

Görevleri sıraya eklerken veya Cloud Tasks görev sırası işlevlerinizi çağırmaya çalışırken PERMISSION DENIED hataları görebilirsiniz. Projenizde aşağıdaki IAM bağlamalarının bulunduğundan emin olun:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

App Engine varsayılan hizmet hesabını, App Engine varsayılan hizmet hesabının kullanıcısı olarak ekleme talimatları için Google Cloud IAM belgelerine bakın.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker