تستفيد دوال قائمة انتظار المهام من Cloud Tasks لمساعدة تطبيقك في تنفيذ المهام غير المتزامنة التي تستغرق وقتًا طويلاً أو تستهلك الكثير من الموارد أو النطاق الترددي، وذلك خارج مسار التطبيق الرئيسي.
على سبيل المثال، لنفترض أنّك تريد إنشاء نُسخ احتياطية من مجموعة كبيرة من ملفات الصور المستضافة حاليًا على واجهة برمجة تطبيقات مع حدّ أقصى لعدد الطلبات. ولكي تكون مستهلكًا مسؤولاً لواجهة برمجة التطبيقات هذه، عليك الالتزام بحدود المعدّل. بالإضافة إلى ذلك، قد يكون هذا النوع من المهام التي تستغرق وقتًا طويلاً عرضةً لحدوث أخطاء بسبب انتهاء المهلة وحدود الذاكرة.
للتخفيف من هذه التعقيدات، يمكنك كتابة دالة قائمة انتظار مهام تحدد خيارات المهام الأساسية، مثل scheduleTime
وdispatchDeadline
، ثم تسلّم الدالة إلى قائمة انتظار في Cloud Tasks. تم تصميم بيئة Cloud Tasks خصيصًا لضمان التحكّم الفعّال في الازدحام وسياسات إعادة المحاولة لهذه الأنواع من العمليات.
تتكامل حزمة تطوير البرامج (SDK) من Firebase للإصدار Cloud Functions for Firebase 3.20.1 والإصدارات الأحدث مع الإصدار Firebase Admin SDK 10.2.0 والإصدارات الأحدث لدعم وظائف قائمة انتظار المهام.
يمكن أن يؤدي استخدام دوال قائمة انتظار المهام مع Firebase إلى فرض رسوم على Cloud Tasks المعالجة. يمكنك الاطّلاع على Cloud Tasks الأسعار لمزيد من المعلومات.
إنشاء دوال قائمة انتظار المهام
لاستخدام دوالّ قائمة انتظار المهام، اتّبِع سير العمل التالي:
- اكتب دالة قائمة مهام باستخدام حزمة تطوير البرامج (SDK) الخاصة بـ Firebase من أجل Cloud Functions.
- اختبِر الدالة عن طريق تشغيلها باستخدام طلب HTTP.
- نفِّذ وظيفتك باستخدام واجهة سطر الأوامر Firebase. عند نشر دالة قائمة انتظار المهام للمرة الأولى، ستنشئ واجهة سطر الأوامر قائمة انتظار مهام في Cloud Tasks باستخدام الخيارات (الحدّ من المعدّل وإعادة المحاولة) المحدّدة في الرمز المصدر.
- أضِف مهامًا إلى قائمة المهام التي تم إنشاؤها حديثًا، مع تمرير المَعلمات لإعداد جدول زمني للتنفيذ إذا لزم الأمر. يمكنك إجراء ذلك من خلال كتابة الرمز باستخدام Admin SDK ونشره على Cloud Functions for Firebase.
كتابة دوال قائمة انتظار المهام
تستند نماذج الرموز البرمجية الواردة في هذا القسم إلى تطبيق يضبط خدمة تحتفظ بنسخة احتياطية من جميع الصور من صورة اليوم الفلكية التي تقدّمها وكالة ناسا. للبدء، استورِد الوحدات المطلوبة:
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
استخدِم onTaskDispatched
أو on_task_dispatched
لوظائف قائمة انتظار المهام. عند كتابة دالة قائمة انتظار المهام، يمكنك ضبط إعدادات إعادة المحاولة والحدّ الأقصى للمعدّل لكل قائمة انتظار.
ضبط وظائف قائمة انتظار المهام
تتضمّن وظائف قائمة انتظار المهام مجموعة فعّالة من إعدادات الضبط للتحكّم بدقة في حدود المعدّل وسلوك إعادة المحاولة لقائمة انتظار المهام:
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: تتم إعادة محاولة تنفيذ كل مهمة في قائمة انتظار المهام تلقائيًا 5 مرات كحد أقصى. يساعد ذلك في الحدّ من الأخطاء المؤقتة، مثل أخطاء الشبكة أو الانقطاع المؤقت للخدمة الخارجية التابعة.
retryConfig.minBackoffSeconds=60
: تتم إعادة محاولة تنفيذ كل مهمة بعد 60 ثانية على الأقل من كل محاولة. ويوفّر ذلك مساحة كبيرة بين كل محاولة، ما يتيح لنا عدم استنفاد محاولات إعادة الإرسال الخمس بسرعة كبيرة.
rateLimits.maxConcurrentDispatch=6
: يتم إرسال 6 مهام على الأكثر في وقت معيّن. يساعد ذلك في ضمان تدفّق ثابت للطلبات إلى الدالة الأساسية ويساعد في تقليل عدد المثيلات النشطة وعمليات التشغيل على البارد.
اختبار دوال قائمة انتظار المهام
في معظم الحالات، يكون محاكي Cloud Functions هو أفضل طريقة لاختبار وظائف قائمة انتظار المهام. راجِع مستندات Emulator Suite لمعرفة كيفية إعداد تطبيقك لمحاكاة وظائف قائمة انتظار المهام.
بالإضافة إلى ذلك، يتم عرض وظائف Task Queue في حزمة SDK الخاصة بـ Cloud Functions على أنّها وظائف HTTP بسيطة في Firebase Local Emulator Suite. يمكنك اختبار وظيفة مهمة محاكية من خلال إرسال طلب HTTP POST مع حمولة بيانات JSON:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
نشر دوال قائمة انتظار المهام
يمكنك نشر دالة قائمة انتظار المهام باستخدام واجهة سطر الأوامر Firebase:
$ firebase deploy --only functions:backupapod
عند نشر دالة قائمة انتظار المهام للمرة الأولى، تنشئ واجهة سطر الأوامر قائمة انتظار مهام في Cloud Tasks مع خيارات (الحدّ من المعدّل وإعادة المحاولة) محدّدة في الرمز المصدر.
إذا واجهت أخطاء في الأذونات عند نشر الدوال، تأكَّد من منح أدوار إدارة الهوية وإمكانية الوصول (IAM) المناسبة للمستخدم الذي ينفّذ أوامر النشر.
إضافة وظائف قائمة انتظار المهام إلى قائمة الانتظار
يمكن وضع دوال قائمة المهام في قائمة الانتظار في Cloud Tasks من بيئة خادم موثوقة، مثل Cloud Functions for Firebase، باستخدام Firebase Admin SDK لـ Node.js أو مكتبات Google Cloud الخاصة بلغة Python. إذا كنت جديدًا على Admin SDK، اطّلِع على إضافة Firebase إلى خادم للبدء.
ينشئ مسار نموذجي مهمة جديدة ويضعها في قائمة الانتظار في Cloud Tasks ويضبط إعدادات المهمة:
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
يحاول الرمز النموذجي توزيع تنفيذ المهام من خلال ربط تأخير لمدة N دقيقة بالمهمة رقم N. ويعني ذلك تشغيل مهمة واحدة تقريبًا في الدقيقة. يُرجى العِلم أنّه يمكنك أيضًا استخدام
scheduleTime
(Node.js) أوschedule_time
(Python) إذا أردت أن يؤدي Cloud Tasks إلى تشغيل مهمة في وقت محدّد.يحدّد نموذج الرمز الحد الأقصى للمدة الزمنية التي سينتظرها Cloud Tasks إلى حين اكتمال مهمة. ستعيد Cloud Tasks محاولة تنفيذ المهمة وفقًا لإعدادات إعادة المحاولة في قائمة الانتظار أو إلى أن يحين هذا الموعد النهائي. في المثال، تم ضبط قائمة الانتظار على إعادة محاولة تنفيذ المهمة 5 مرات كحد أقصى، ولكن يتم إلغاء المهمة تلقائيًا إذا استغرقت العملية بأكملها (بما في ذلك محاولات إعادة التنفيذ) أكثر من 5 دقائق.
استرداد معرّف الموارد المنتظم (URI) المستهدَف وتضمينه
نظرًا للطريقة التي تنشئ بها Cloud Tasks رموز المصادقة لمصادقة الطلبات إلى وظائف قائمة انتظار المهام الأساسية، يجب تحديد عنوان URL لوظيفة Cloud Run عند إضافة المهام إلى قائمة الانتظار. ننصحك باسترداد عنوان URL الخاص بالدالة آليًا كما هو موضّح أدناه:
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
تحديد المشاكل وحلّها
تفعيل تسجيل Cloud Tasks
تحتوي السجلّات من Cloud Tasks على معلومات تشخيصية مفيدة، مثل حالة الطلب المرتبط بمهمة. يتم إيقاف السجلات من Cloud Tasks تلقائيًا بسبب العدد الكبير من السجلات التي يمكن أن تنشئها في مشروعك. ننصحك بتفعيل سجلّات تصحيح الأخطاء أثناء تطوير وظائف قائمة المهام وتصحيح أخطائها بشكل نشط. اطّلِع على تفعيل التسجيل.
أذونات "إدارة الهوية وإمكانية الوصول"
قد تظهر أخطاء PERMISSION DENIED
عند إضافة مهام إلى قائمة الانتظار أو عندما تحاول Cloud Tasks استدعاء دوال قائمة انتظار المهام. تأكَّد من أنّ مشروعك يتضمّن عمليات الربط التالية في إدارة الهوية وإمكانية الوصول (IAM):
يجب أن يكون لدى الهوية المستخدَمة لإضافة المهام إلى قائمة الانتظار في Cloud Tasks إذن
cloudtasks.tasks.create
في خدمة "إدارة الهوية وإمكانية الوصول".في النموذج، هذا هو حساب الخدمة التلقائي App Engine
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
يجب أن يكون لدى الهوية المستخدَمة لإضافة المهام إلى قائمة الانتظار في Cloud Tasks إذن باستخدام حساب الخدمة المرتبط بمهمة في Cloud Tasks.
في النموذج، هذا هو حساب الخدمة التلقائي App Engine.
راجِع مستندات Google Cloud IAM للحصول على تعليمات حول كيفية إضافة حساب الخدمة التلقائي App Engine كمستخدم لحساب الخدمة التلقائي App Engine.
يجب أن يكون لدى الهوية المستخدَمة لتفعيل وظيفة قائمة انتظار المهام إذن
cloudfunctions.functions.invoke
.في النموذج، هذا هو حساب الخدمة التلقائي App Engine
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker