הפונקציות של תור המשימות מנצלות את Cloud Tasks של Google כדי לעזור לאפליקציה להריץ משימות שדורשות זמן רב, משימות שצורכות הרבה משאבים או משימות עם מגבלת רוחב פס באופן אסינכרוני, מחוץ לתהליך העיבוד הראשי של האפליקציה.
לדוגמה, נניח שאתם רוצים ליצור גיבויים של קבוצה גדולה של קובצי תמונות שמתארחים כרגע ב-API עם הגבלת קצב שליחה. כדי להיות צרכנים אחראים של ה-API הזה, עליכם לפעול בהתאם למגבלות הקצב שלו. בנוסף, משימות ארוכות כאלה עלולות להיכשל בגלל זמן קצוב לתפוגה ומגבלות זיכרון.
כדי לצמצם את המורכבות הזו, אפשר לכתוב פונקציה בתור משימות שמגדירה אפשרויות בסיסיות של משימות כמו scheduleTime
ו-dispatchDeadline
, ולאחר מכן להעביר את הפונקציה לתור ב-Cloud Tasks. הסביבה של Cloud Tasks תוכננה במיוחד כדי להבטיח מדיניות אפקטיבית לבקרה על עומסים ולניסיון חוזר לביצוע פעולות כאלה.
Firebase SDK עבור Cloud Functions for Firebase מגרסה 3.20.1 ואילך פועל בשילוב עם Firebase Admin SDK מגרסה 10.2.0 ואילך כדי לתמוך בפונקציות של תור המשימות.
שימוש בפונקציות של תור משימות עם Firebase עלול לגרום לחיוב על עיבוד Cloud Tasks. למידע נוסף, ראו תמחור של Cloud Tasks.
יצירת פונקציות של רשימת משימות
כדי להשתמש בפונקציות של תור המשימות, פועלים לפי תהליך העבודה הבא:
- כתיבת פונקציה של תור המשימות באמצעות ה-SDK Firebase של Cloud Functions.
- כדי לבדוק את הפונקציה, מפעילים אותה באמצעות בקשת HTTP.
- פורסים את הפונקציה באמצעות ה-CLI של Firebase. כשפורסים את הפונקציה של תור המשימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וניסיון חוזר) שצוינו בקוד המקור.
- מוסיפים משימות לתור המשימות החדש שנוצר, ומעבירים פרמטרים כדי להגדיר לוח זמנים לביצוע לפי הצורך. כדי לעשות זאת, כותבים את הקוד באמצעות Admin SDK ופורסים אותו ב-Cloud Functions for Firebase.
כתיבת פונקציות של תור משימות
דוגמאות הקוד בקטע הזה מבוססות על אפליקציה שמגדירה שירות שמגבה את כל התמונות מתמונת האסטרונומיה של היום של נאס"א. כדי להתחיל, מייבאים את המודולים הנדרשים:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
משתמשים ב-onTaskDispatched
או ב-on_task_dispatched
לפונקציות של תור המשימות. כשכותבים פונקציה בתור 'תור משימות', אפשר להגדיר ניסיונות חוזרים לכל תור ותצורה להגבלת קצב של יצירת משימות.
הגדרת פונקציות של תור משימות
פונקציות של תור משימות מגיעות עם קבוצה חזקה של הגדרות אישיות, שמאפשרות לשלוט במדויק במגבלות הקצב ובהתנהגות של ניסיונות חוזרים של תור משימות:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: כל משימה בתור המשימות תנסה שוב באופן אוטומטי עד 5 פעמים. כך אפשר לצמצם שגיאות חולפות כמו שגיאות רשת או שיבושים זמניים בשירות של שירות חיצוני תלוי.retryConfig.minBackoffSeconds=60
: המערכת תנסה שוב כל משימה לפחות 60 שניות אחרי כל ניסיון. כך נוצר מאגר גדול בין כל ניסיון, כדי שלא נמהר להשתמש בכל 5 הניסיונות החוזרים מהר מדי.rateLimits.maxConcurrentDispatch=6
: נשלחות עד 6 משימות בכל רגע נתון. כך אפשר להבטיח זרם יציב של בקשות לפונקציה הבסיסית, ולצמצם את מספר המכונות הפעילות והפעלות במצב התחלתי (cold start).
בדיקת פונקציות של תור משימות
ברוב המקרים, המהדר של Cloud Functions הוא הדרך הטובה ביותר לבדוק פונקציות של תורים של משימות. במשאבי העזרה של חבילת האמולטור מוסבר איך לבצע אינסטרומנטציה של האפליקציה לאמולציה של פונקציות בתור משימות.
בנוסף, פונקציות ה-functions_sdk של תור המשימות מוצגות כפונקציות HTTP פשוטות ב-Firebase Local Emulator Suite. כדי לבדוק פונקציית משימה שעברה הדמיה, שולחים בקשת HTTP POST עם מטען נתונים בפורמט JSON:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
פריסת פונקציות של תור משימות
פורסים את הפונקציה של תור המשימות באמצעות CLI של Firebase:
$ firebase deploy --only functions:backupapod
כשפורסים פונקציה של תור משימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וביצוע ניסיון חוזר) שצוינו בקוד המקור.
אם נתקלתם בשגיאות הרשאות בזמן הפריסה של פונקציות, ודאו שהתפקידי ה-IAM המתאימים הוקצו למשתמש שמריץ את פקודות הפריסה.
הוספת פונקציות של תור משימות לתור
אפשר להוסיף פונקציות של תור משימות ל-Cloud Tasks מסביבת שרת מהימנה כמו Cloud Functions for Firebase באמצעות Firebase Admin SDK ל-Node.js או ספריות Google Cloud ל-Python. אם אתם משתמשים חדשים ב-Admin SDK, תוכלו להיעזר במאמר הוספת Firebase לשרת כדי להתחיל.
תהליך אופייני יוצר משימה חדשה, מוסיף אותה לתור ל-Cloud Tasks ומגדיר את ההגדרות של המשימה:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
הקוד לדוגמה מנסה לפזר את ביצוע המשימות על ידי שיוך עיכוב של N דקות למשימה ה-N. כלומר, הפעלה של כ-1 משימה לדקה. שימו לב שאפשר גם להשתמש ב-
scheduleTime
(Node.js) או ב-schedule_time
(Python) אם רוצים ש-Cloud Tasks יפעיל משימה בזמן מסוים.בקוד לדוגמה מוגדר משך הזמן המקסימלי ש-Cloud Tasks ימתין להשלמת המשימה. Cloud Tasks ינסה לבצע שוב את המשימה אחרי הגדרת התור לניסיון חוזר או עד לתאריך היעד הזה. בדוגמה, התור מוגדר לנסות שוב את המשימה עד 5 פעמים, אבל המשימה מבוטלת באופן אוטומטי אם התהליך כולו (כולל ניסיונות חוזרים) נמשך יותר מ-5 דקות.
אחזור של ה-URI של היעד וכללתו
בגלל האופן שבו Cloud Tasks יוצר אסימוני אימות כדי לאמת בקשות לפונקציות של תור המשימות הבסיסי, צריך לציין את כתובת ה-URL של הפונקציה ב-Cloud Run כשמקדישים משימות. מומלץ לאחזר את כתובת ה-URL של הפונקציה באופן פרוגרמטי, כפי שמתואר בהמשך:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
פתרון בעיות
Cloud Tasks">הפעלת הרישום ביומן של Cloud Tasks
היומנים מ-Cloud Tasks מכילים מידע שימושי לאבחון, כמו סטטוס הבקשה שמשויכת למשימה. כברירת מחדל, יומנים מ-Cloud Tasks מושבתים בגלל הכמות הגדולה של יומנים שהם עלולים ליצור בפרויקט. מומלץ להפעיל את יומני ניפוי הבאגים בזמן פיתוח הפונקציות של תור המשימות וניפוי באגים. איך מפעילים את הרישום ביומן
הרשאות IAM
יכול להיות שתראו שגיאות מסוג PERMISSION DENIED
כשאתם מוסיפים משימות לתור או כש-Cloud Tasks מנסה להפעיל את הפונקציות של תור המשימות. מוודאים שהפרויקט כולל את קישורי ה-IAM הבאים:
הזהות שמשמשת להעברת משימות לתור ל-Cloud Tasks צריכה להיות הרשאת
cloudtasks.tasks.create
ב-IAM.בדוגמה, זהו חשבון השירות שמוגדר כברירת מחדל App Engine
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
לזהות שמשמשת להוספת משימות לתור ב-Cloud Tasks צריכה להיות הרשאה להשתמש בחשבון השירות שמשויך למשימה ב-Cloud Tasks.
בדוגמה, זהו חשבון השירות שמוגדר כברירת מחדל App Engine.
הוראות להוספת חשבון השירות App Engine שמוגדר כברירת מחדל כמשתמש בחשבון השירות App Engine שמוגדר כברירת מחדל מפורטות במסמכי העזרה של Google Cloud IAM.
הזהות ששימשה להפעלת פונקציית תור המשימות צריכה את ההרשאה
cloudfunctions.functions.invoke
.בדוגמה, זהו חשבון השירות שמוגדר כברירת מחדל App Engine
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker