توابع صف وظایف از Google Cloud Tasks استفاده میکنند تا به برنامه شما کمک کنند تا کارهای زمانبر، منابع فشرده یا با پهنای باند محدود را بهطور ناهمزمان، خارج از جریان برنامه اصلیتان اجرا کند.
به عنوان مثال، تصور کنید که میخواهید از مجموعه بزرگی از فایلهای تصویری که در حال حاضر روی یک API با محدودیت نرخ میزبانی میشوند، نسخه پشتیبان تهیه کنید. برای اینکه یک مصرف کننده مسئول آن API باشید، باید به محدودیت های نرخ آن ها احترام بگذارید. بعلاوه، این نوع کار طولانی مدت می تواند به دلیل وقفه های زمانی و محدودیت حافظه در معرض شکست قرار گیرد.
برای کاهش این پیچیدگی، میتوانید یک تابع صف کار بنویسید که گزینههای اصلی کار مانند scheduleTime
و dispatchDeadline
را تنظیم میکند و سپس تابع را به یک صف در Cloud Tasks تحویل میدهد. محیط Cloud Tasks به طور خاص برای اطمینان از کنترل تراکم مؤثر و سیاستهای امتحان مجدد برای این نوع عملیات طراحی شده است.
Firebase SDK for Cloud Functions for Firebase نسخه 3.20.1 و بالاتر با Firebase Admin SDK v10.2.0 و بالاتر برای پشتیبانی از توابع صف وظایف، تعامل دارد.
استفاده از توابع صف وظایف با Firebase میتواند منجر به هزینههایی برای پردازش Cloud Tasks شود. برای اطلاعات بیشتر به قیمت گذاری Cloud Tasks مراجعه کنید.
ایجاد توابع صف وظایف
برای استفاده از توابع صف کار، این گردش کار را دنبال کنید:
- با استفاده از Firebase SDK for Cloud Functions یک تابع صف کار بنویسید.
- عملکرد خود را با فعال کردن آن با یک درخواست HTTP آزمایش کنید.
- تابع خود را با Firebase CLI مستقر کنید. هنگامی که برای اولین بار عملکرد صف وظیفه خود را به کار می گیرید، CLI یک صف کار در Cloud Tasks با گزینه هایی (محدود کردن نرخ و تلاش مجدد) که در کد منبع شما مشخص شده است ایجاد می کند.
- وظایف را به صف وظایف جدید ایجاد شده اضافه کنید و در صورت نیاز، پارامترها را برای تنظیم یک برنامه زمانبندی اجرا ارسال کنید. میتوانید با نوشتن کد با استفاده از Admin SDK و استقرار آن در Cloud Functions for Firebase به این هدف برسید.
توابع صف وظایف را بنویسید
نمونههای کد در این بخش بر اساس برنامهای است که سرویسی را راهاندازی میکند که از همه تصاویر عکس روز نجوم ناسا نسخه پشتیبان تهیه میکند. برای شروع، ماژول های مورد نیاز را وارد کنید:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
پایتون
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
برای توابع صف وظایف onTaskDispatched
یا on_task_dispatched
استفاده کنید. هنگام نوشتن یک تابع صف کار، می توانید پیکربندی مجدد در هر صف و محدودیت سرعت را تنظیم کنید.
پیکربندی توابع صف وظایف
توابع صف کار با مجموعه ای قدرتمند از تنظیمات پیکربندی برای کنترل دقیق محدودیت های نرخ و رفتار مجدد یک صف کار ارائه می شوند:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
پایتون
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: هر کار در صف وظایف به طور خودکار تا 5 بار تکرار می شود. این به کاهش خطاهای گذرا مانند خطاهای شبکه یا اختلال موقت سرویس یک سرویس خارجی وابسته کمک می کند.retryConfig.minBackoffSeconds=60
: هر کار حداقل با فاصله 60 ثانیه از هر تلاش دوباره امتحان می شود. این یک بافر بزرگ بین هر تلاش ایجاد می کند، بنابراین ما عجله نداریم که 5 تلاش مجدد را خیلی سریع تمام کنیم.rateLimits.maxConcurrentDispatch=6
: حداکثر 6 کار در یک زمان معین ارسال می شود. این به اطمینان از جریان ثابت درخواستها به تابع اصلی کمک میکند و به کاهش تعداد نمونههای فعال و شروع سرد کمک میکند.
تست توابع صف وظایف
در بیشتر موارد، شبیه ساز Cloud Functions بهترین راه برای آزمایش توابع صف وظایف است. به مستندات Emulator Suite مراجعه کنید تا بیاموزید چگونه برنامه خود را برای شبیهسازی توابع صف کار ابزارسازی کنید .
علاوه بر این، task queue functions_sdk به عنوان توابع ساده HTTP در Firebase Local Emulator Suite نمایش داده می شود. می توانید یک تابع وظیفه شبیه سازی شده را با ارسال یک درخواست HTTP POST با یک بار داده JSON آزمایش کنید:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
استقرار توابع صف وظایف
با استفاده از Firebase CLI تابع صف وظایف را اجرا کنید:
$ firebase deploy --only functions:backupapod
هنگام استقرار یک تابع صف کار برای اولین بار، CLI یک صف کار در Cloud Tasks با گزینه هایی (محدود کردن نرخ و امتحان مجدد) که در کد منبع شما مشخص شده است ایجاد می کند.
اگر هنگام استقرار توابع با خطاهای مجوز مواجه شدید، مطمئن شوید که نقش های IAM مناسب به کاربری که دستورات استقرار را اجرا می کند اختصاص داده شده است.
توابع صف وظایف را در صف قرار دهید
توابع صف وظایف را می توان در Cloud Tasks از یک محیط سرور قابل اعتماد مانند Cloud Functions for Firebase با استفاده از Firebase Admin SDK برای Node.js یا کتابخانه های Google Cloud برای Python قرار داد. اگر با Admin SDK جدید هستید، برای شروع به افزودن Firebase به سرور مراجعه کنید.
یک جریان معمولی یک کار جدید ایجاد می کند، آن را در Cloud Tasks قرار می دهد و پیکربندی کار را تنظیم می کند:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
پایتون
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
کد نمونه سعی میکند تا اجرای وظایف را با مرتبط کردن یک تاخیر دقیقه نهم برای کار نهم گسترش دهد. به این معنی است که ~ 1 کار در دقیقه راه اندازی می شود. توجه داشته باشید که اگر میخواهید Cloud Tasks یک کار را در زمان خاصی فعال کند، میتوانید از
scheduleTime
(Node.js) یاschedule_time
(Python) نیز استفاده کنید.کد نمونه حداکثر زمانی را تعیین میکند که Cloud Tasks برای تکمیل یک کار منتظر بماند. Cloud Tasks پس از پیکربندی مجدد صف یا تا زمانی که به این مهلت رسیده باشد، کار را دوباره امتحان می کند. در نمونه، صف به گونه ای پیکربندی شده است که کار را تا 5 بار دوباره امتحان کنید، اما اگر کل فرآیند (از جمله تلاش های مجدد) بیش از 5 دقیقه طول بکشد، کار به طور خودکار لغو می شود.
URI هدف را بازیابی و اضافه کنید
با توجه به روشی که Cloud Tasks توکنهای احراز هویت را برای احراز هویت درخواستها به توابع صف وظایف زیربنایی ایجاد میکند، باید هنگام قرار دادن وظایف، URL اجرای Cloud تابع را مشخص کنید. توصیه می کنیم همانطور که در زیر نشان داده شده است URL تابع خود را به صورت برنامه نویسی بازیابی کنید:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
پایتون
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
عیب یابی
Cloud Tasks"> ثبت Cloud Tasks روشن کنید
گزارشهای مربوط به Cloud Tasks حاوی اطلاعات تشخیصی مفیدی مانند وضعیت درخواست مرتبط با یک کار هستند. بهطور پیشفرض، گزارشهای Cloud Tasks به دلیل حجم زیاد گزارشهایی که میتواند در پروژه شما ایجاد کند، غیرفعال میشوند. توصیه می کنیم در حالی که به طور فعال در حال توسعه و اشکال زدایی عملکردهای صف وظایف خود هستید، گزارش های اشکال زدایی را روشن کنید. به روشن کردن ورود به سیستم مراجعه کنید.
مجوزهای IAM
هنگام قرار دادن وظایف یا زمانی که Cloud Tasks سعی می کند توابع صف وظایف شما را فراخوانی کند، ممکن است خطاهای PERMISSION DENIED
را مشاهده کنید. اطمینان حاصل کنید که پروژه شما دارای الزامات IAM زیر است:
هویت مورد استفاده برای قرار دادن وظایف در Cloud Tasks به مجوز
cloudtasks.tasks.create
IAM نیاز دارد.در نمونه، این حساب سرویس پیشفرض App Engine است
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
هویتی که برای ردیف کردن وظایف در Cloud Tasks استفاده میشود برای استفاده از حساب سرویس مرتبط با یک کار در Cloud Tasks به مجوز نیاز دارد.
در نمونه، این حساب سرویس پیشفرض App Engine است.
برای دستورالعملهایی درباره نحوه افزودن حساب سرویس پیشفرض App Engine بهعنوان کاربر حساب خدمات پیشفرض App Engine ، به اسناد Google Cloud IAM مراجعه کنید.
هویت مورد استفاده برای راه اندازی تابع صف وظایف به مجوز
cloudfunctions.functions.invoke
نیاز دارد.در نمونه، این حساب سرویس پیشفرض App Engine است
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker