توابع را با وظایف Cloud در صف قرار دهید


توابع صف وظایف از Google Cloud Tasks استفاده می‌کنند تا به برنامه شما کمک کنند تا کارهای زمان‌بر، منابع فشرده یا با پهنای باند محدود را به‌طور ناهمزمان، خارج از جریان برنامه اصلی‌تان اجرا کند.

به عنوان مثال، تصور کنید که می‌خواهید از مجموعه بزرگی از فایل‌های تصویری که در حال حاضر روی یک API با محدودیت نرخ میزبانی می‌شوند، نسخه پشتیبان تهیه کنید. برای اینکه یک مصرف کننده مسئول آن API باشید، باید به محدودیت های نرخ آن ها احترام بگذارید. بعلاوه، این نوع کار طولانی مدت می تواند به دلیل وقفه های زمانی و محدودیت حافظه در معرض شکست قرار گیرد.

برای کاهش این پیچیدگی، می‌توانید یک تابع صف کار بنویسید که گزینه‌های اصلی کار مانند scheduleTime و dispatchDeadline را تنظیم می‌کند و سپس تابع را به یک صف در Cloud Tasks تحویل می‌دهد. محیط Cloud Tasks به طور خاص برای اطمینان از کنترل تراکم مؤثر و سیاست‌های امتحان مجدد برای این نوع عملیات طراحی شده است.

Firebase SDK for Cloud Functions برای Firebase نسخه 3.20.1 و بالاتر با Firebase Admin SDK v10.2.0 و بالاتر برای پشتیبانی از توابع صف وظایف، تعامل دارد.

استفاده از توابع صف وظایف با Firebase می‌تواند منجر به هزینه‌هایی برای پردازش Cloud Tasks شود. برای اطلاعات بیشتر به قیمت گذاری Cloud Tasks مراجعه کنید.

ایجاد توابع صف وظایف

برای استفاده از توابع صف کار، این گردش کار را دنبال کنید:

  1. با استفاده از Firebase SDK for Cloud Function، یک تابع صف کار بنویسید.
  2. عملکرد خود را با فعال کردن آن با یک درخواست HTTP آزمایش کنید.
  3. تابع خود را با Firebase CLI مستقر کنید. هنگامی که برای اولین بار عملکرد صف وظیفه خود را به کار می گیرید، CLI یک صف کار در Cloud Tasks با گزینه هایی (محدود کردن نرخ و تلاش مجدد) که در کد منبع شما مشخص شده است ایجاد می کند.
  4. وظایف را به صف وظایف جدید ایجاد شده اضافه کنید و در صورت نیاز، پارامترها را برای تنظیم یک برنامه زمانبندی اجرا ارسال کنید. می‌توانید با نوشتن کد با استفاده از Admin SDK و استقرار آن در Cloud Functions برای Firebase به این هدف برسید.

توابع صف وظایف را بنویسید

نمونه‌های کد در این بخش بر اساس برنامه‌ای است که سرویسی را راه‌اندازی می‌کند که از همه تصاویر عکس روز نجوم ناسا نسخه پشتیبان تهیه می‌کند. برای شروع، ماژول های مورد نیاز را وارد کنید:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

پایتون

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

برای توابع صف وظایف از onTaskDispatched یا on_task_dispatched استفاده کنید. هنگام نوشتن یک تابع صف کار، می توانید پیکربندی مجدد در هر صف و محدودیت سرعت را تنظیم کنید.

پیکربندی توابع صف وظایف

توابع صف کار با مجموعه ای قدرتمند از تنظیمات پیکربندی برای کنترل دقیق محدودیت های نرخ و رفتار مجدد یک صف کار ارائه می شوند:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

پایتون

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : هر کار در صف وظایف به طور خودکار تا 5 بار تکرار می شود. این به کاهش خطاهای گذرا مانند خطاهای شبکه یا اختلال موقت سرویس یک سرویس خارجی وابسته کمک می کند.

  • retryConfig.minBackoffSeconds=60 : هر کار حداقل با فاصله 60 ثانیه از هر تلاش دوباره امتحان می شود. این یک بافر بزرگ بین هر تلاش ایجاد می کند، بنابراین ما عجله نداریم که 5 تلاش مجدد را خیلی سریع تمام کنیم.

  • rateLimits.maxConcurrentDispatch=6 : حداکثر 6 کار در یک زمان معین ارسال می شود. این به اطمینان از جریان ثابت درخواست‌ها به تابع اصلی کمک می‌کند و به کاهش تعداد نمونه‌های فعال و شروع سرد کمک می‌کند.

تست توابع صف وظایف

توابع صف وظایف در مجموعه شبیه ساز محلی Firebase به عنوان توابع ساده HTTP در معرض دید قرار می گیرند. می توانید یک تابع وظیفه شبیه سازی شده را با ارسال یک درخواست HTTP POST با یک بار داده JSON آزمایش کنید:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

استقرار توابع صف وظایف

با استفاده از Firebase CLI تابع صف وظایف را اجرا کنید:

$ firebase deploy --only functions:backupapod

هنگام استقرار یک تابع صف کار برای اولین بار، CLI یک صف کار در Cloud Tasks با گزینه هایی (محدود کردن نرخ و امتحان مجدد) که در کد منبع شما مشخص شده است ایجاد می کند.

اگر هنگام استقرار توابع با خطاهای مجوز مواجه شدید، مطمئن شوید که نقش های IAM مناسب به کاربری که دستورات استقرار را اجرا می کند اختصاص داده شده است.

توابع صف وظایف را در صف قرار دهید

توابع صف وظایف را می توان در Cloud Tasks از یک محیط سرور قابل اعتماد مانند Cloud Functions برای Firebase با استفاده از Firebase Admin SDK برای Node.js یا کتابخانه های Google Cloud برای Python قرار داد. اگر با Admin SDK ها تازه کار هستید، برای شروع به افزودن Firebase به سرور مراجعه کنید.

یک جریان معمولی یک کار جدید ایجاد می‌کند، آن را در Cloud Tasks قرار می‌دهد و پیکربندی کار را تنظیم می‌کند:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

پایتون

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • کد نمونه سعی می‌کند تا اجرای وظایف را با مرتبط کردن یک تاخیر دقیقه نهم برای کار نهم گسترش دهد. به این معنی است که ~ 1 کار در دقیقه راه اندازی می شود. توجه داشته باشید که اگر می‌خواهید Cloud Tasks یک کار را در زمان خاصی فعال کند، می‌توانید از scheduleTime (Node.js) یا schedule_time (Python) نیز استفاده کنید.

  • کد نمونه حداکثر زمانی را تعیین می‌کند که Cloud Tasks برای تکمیل یک کار منتظر بماند. Cloud Tasks پس از پیکربندی مجدد صف یا تا زمانی که به این مهلت رسیده باشد، کار را دوباره امتحان می کند. در نمونه، صف به گونه ای پیکربندی شده است که کار را تا 5 بار دوباره امتحان کنید، اما اگر کل فرآیند (از جمله تلاش های مجدد) بیش از 5 دقیقه طول بکشد، کار به طور خودکار لغو می شود.

URI هدف را بازیابی و اضافه کنید

با توجه به روشی که Cloud Tasks توکن‌های احراز هویت را برای احراز هویت درخواست‌ها به توابع صف وظایف زیربنایی ایجاد می‌کند، باید هنگام قرار دادن وظایف، URL اجرای Cloud تابع را مشخص کنید. توصیه می کنیم همانطور که در زیر نشان داده شده است URL تابع خود را به صورت برنامه نویسی بازیابی کنید:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

پایتون

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

عیب یابی

ثبت وظایف Cloud را روشن کنید

گزارش‌های مربوط به Cloud Tasks حاوی اطلاعات تشخیصی مفیدی مانند وضعیت درخواست مرتبط با یک کار هستند. به‌طور پیش‌فرض، گزارش‌های Cloud Tasks به دلیل حجم زیاد گزارش‌هایی که می‌تواند در پروژه شما ایجاد کند، غیرفعال می‌شوند. توصیه می کنیم در حالی که به طور فعال در حال توسعه و اشکال زدایی عملکردهای صف وظایف خود هستید، گزارش های اشکال زدایی را روشن کنید. به روشن کردن ورود به سیستم مراجعه کنید.

مجوزهای IAM

هنگام قرار دادن وظایف یا زمانی که Cloud Tasks سعی می کند توابع صف وظایف شما را فراخوانی کند، ممکن است خطاهای PERMISSION DENIED را مشاهده کنید. اطمینان حاصل کنید که پروژه شما دارای الزامات IAM زیر است:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • هویتی که برای ردیف کردن وظایف در Cloud Tasks استفاده می‌شود برای استفاده از حساب سرویس مرتبط با یک کار در Cloud Tasks به مجوز نیاز دارد.

    در نمونه، این حساب سرویس پیش‌فرض App Engine است.

برای دستورالعمل‌هایی درباره نحوه افزودن حساب سرویس پیش‌فرض App Engine به‌عنوان کاربر حساب خدمات پیش‌فرض App Engine، به اسناد Google Cloud IAM مراجعه کنید.

  • هویت مورد استفاده برای راه اندازی تابع صف وظایف به مجوز cloudfunctions.functions.invoke نیاز دارد.

    در نمونه، این حساب سرویس پیش‌فرض App Engine است

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker