Cloud Tasks ile işlevleri sıraya ekleme


Görev sırası işlevleri, uygulamanızın ana uygulama akışınızın dışında zaman alan, kaynak yoğun veya bant genişliği sınırlı görevleri eşzamansız olarak çalıştırmasına yardımcı olmak için Google Cloud Tasks'tan yararlanır.

Örneğin, şu anda hız sınırlaması olan bir API'de barındırılan büyük bir resim dosyası grubunun yedeklerini oluşturmak istediğinizi varsayalım. Bu API'nin sorumlu bir tüketicisi olmak için hız sınırlarına uymanız gerekir. Ayrıca bu tür uzun süreli işlerde, zaman aşımları ve bellek sınırları nedeniyle hata oluşabilir.

Bu karmaşıklığı azaltmak için scheduleTime ve dispatchDeadline gibi temel görev seçeneklerini ayarlayan bir görev sırası işlevi yazabilir ve ardından işlevi Cloud Tasks'ta bir sıraya iletebilirsiniz. Cloud Tasks ortamı, bu tür işlemler için etkin tıkanıklık kontrolü ve yeniden deneme politikaları sağlamak üzere özel olarak tasarlanmıştır.

Cloud Functions for Firebase v3.20.1 ve sonraki sürümleri için Firebase SDK'sı, görev sırası işlevlerini desteklemek için Firebase Admin SDK v10.2.0 ve sonraki sürümleriyle birlikte çalışır.

Firebase ile görev sırası işlevlerinin kullanılması, Cloud Tasks için ücret alınmasına neden olabilir. Daha fazla bilgi için Cloud Tasks fiyatlandırmasına göz atın.

Görev sırası işlevleri oluşturma

Görev sırası işlevlerini kullanmak için şu iş akışını izleyin:

  1. Cloud Functions için Firebase SDK'sını kullanarak görev sırası işlevi yazın.
  2. İşlevinizi bir HTTP isteğiyle tetikleyerek test edin.
  3. Firebase CLI ile işlevinizi dağıtın. Görev sırası işlevinizi ilk kez dağıtırken CLI, Cloud Tasks'te kaynak kodunuzda belirtilen seçenekler (hız sınırlaması ve yeniden deneme) ile bir görev sırası oluşturur.
  4. Gerekirse bir yürütme planı oluşturmak için parametreleri göndererek yeni oluşturulan görev sırasına görevleri ekleyin. Bunun için Admin SDK'yı kullanarak kodu yazıp Cloud Functions for Firebase'e dağıtabilirsiniz.

Görev sırası işlevlerini yazma

Bu bölümdeki kod örnekleri, NASA'nın Astronomi Günün Günü resmindeki tüm resimleri yedekleyen bir hizmeti kuran bir uygulamaya dayanır. Başlamak için gerekli modülleri içe aktarın:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Görev sırası işlevleri için onTaskDispatched veya on_task_dispatched kullanın. Bir görev sırası işlevi yazarken sıra başına yeniden deneme ve hız sınırlama yapılandırması ayarlayabilirsiniz.

Görev sırası işlevlerini yapılandırma

Görev sırası işlevleri, hız sınırlarını hassas bir şekilde kontrol etmek ve bir görev sırasının yeniden deneme davranışını korumak için güçlü bir dizi yapılandırma ayarıyla birlikte sunulur:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Görev sırasındaki her görev otomatik olarak en fazla 5 kez yeniden denenir. Bu, ağ hataları veya bağımlı, harici bir hizmetin geçici hizmet kesintisi gibi geçici hataları azaltmaya yardımcı olur.

  • retryConfig.minBackoffSeconds=60: Her görev, her denemeden sonra en az 60 saniye sonra yeniden denenir. Böylece her deneme arasında geniş bir arabellek sağlanır. Böylece 5 yeniden deneme girişimini çok hızlı tüketmek için acele etmiyoruz.

  • rateLimits.maxConcurrentDispatch=6: Belirli bir anda en fazla 6 görev dağıtılır. Bu, temel işleve düzenli bir istek akışı sağlanmasına ve etkin örnekler ile baştan başlatma sayısının azaltılmasına yardımcı olur.

Görev sırası işlevlerini test etme

Firebase Local Emulator Suite'teki görev sırası işlevleri, basit HTTP işlevleri olarak gösterilir. JSON veri yükü içeren bir HTTP POST isteği göndererek emüle edilmiş bir görev işlevini test edebilirsiniz:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Görev sırası işlevlerini dağıtma

Firebase CLI kullanarak görev sırası işlevini dağıtın:

$ firebase deploy --only functions:backupapod

Bir görev sırası işlevini ilk kez dağıtırken CLI, Cloud Tasks'ta kaynak kodunuzda belirtilen seçenekler (hız sınırlama ve yeniden deneme) ile bir görev sırası oluşturur.

İşlevleri dağıtırken izin hatalarıyla karşılaşırsanız dağıtım komutlarını çalıştıran kullanıcıya uygun IAM rollerinin atandığından emin olun.

Görev sırası işlevlerini sıraya alma

Görev sırası işlevleri, Node.js için Firebase Admin SDK veya Python için Google Cloud kitaplıkları kullanılarak Cloud Functions for Firebase gibi güvenilir bir sunucu ortamından Cloud Tasks'te sıraya alınabilir. Yönetici SDK'larını kullanmaya yeni başladıysanız başlamak için Firebase'i bir sunucuya ekleme başlıklı makaleyi inceleyin.

Tipik bir akış yeni bir görev oluşturur, bu görevi Cloud Tasks'ta sıraya koyar ve görevin yapılandırmasını ayarlar:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Örnek kod, N. görev için N. dakikalık gecikmeyi ilişkilendirerek görevlerin yürütülmesini yaymaya çalışır. Bu, dakikada yaklaşık 1 görev tetikleme anlamına gelir. Cloud Tasks'ın bir görevi belirli bir zamanda tetiklemesini istiyorsanız scheduleTime (Node.js) veya schedule_time (Python) komutlarını da kullanabilirsiniz.

  • Örnek kod, Cloud Tasks'ın bir görevi tamamlamak için bekleyeceği maksimum süreyi belirler. Cloud Tasks, sıranın yeniden deneme yapılandırmasından sonra veya bu son tarihe ulaşılana kadar görevi yeniden dener. Bu örnekte sıra, görevi en fazla 5 kez yeniden deneyecek şekilde yapılandırılmıştır ancak tüm işlem (yeniden deneme denemeleri dahil) 5 dakikadan uzun sürerse görev otomatik olarak iptal edilir.

Hedef URI'yı alın ve dahil edin

Cloud Tasks'ın temel görev sırası işlevlerine yönelik isteklerin kimliğini doğrulamak için kimlik doğrulama jetonları oluşturma şeklinden dolayı, görevleri sıraya alırken işlevin Cloud Run URL'sini belirtmeniz gerekir. İşlevinizin URL'sini aşağıda gösterildiği gibi programlı bir şekilde almanızı öneririz:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Sorun giderme

Cloud Tasks günlük kaydını etkinleştirin

Cloud Tasks'tan alınan günlükler, bir görevle ilişkili isteğin durumu gibi yararlı teşhis bilgileri içerir. Cloud Tasks'ın projenizde oluşturabileceği günlük hacminin yüksek olması nedeniyle, Cloud Tasks'tan alınan günlükler varsayılan olarak devre dışıdır. Görev sırası işlevlerinizi aktif bir şekilde geliştirirken ve hata ayıklarken hata ayıklama günlüklerini açmanızı öneririz. Günlük kaydını açma bölümüne bakın.

IAM İzinleri

Görevleri sıraya alırken veya Cloud Tasks görev sırası işlevlerinizi çağırmaya çalışırken PERMISSION DENIED hataları görebilirsiniz. Projenizin aşağıdaki IAM bağlamalarına sahip olduğundan emin olun:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Cloud Tasks'te görevleri sıraya koymak için kullanılan kimliğin, Cloud Tasks içindeki bir görevle ilişkili hizmet hesabını kullanmak için izne ihtiyacı vardır.

    Bu örnekte, bu App Engine varsayılan hizmet hesabı'dır.

App Engine varsayılan hizmet hesabını App Engine varsayılan hizmet hesabı kullanıcısı olarak ekleme talimatları için Google Cloud IAM belgelerini inceleyin.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker