הוספת פונקציות לתור באמצעות Cloud Tasks

הפונקציות של תור המשימות מתבססות על Google Cloud Tasks כדי לעזור לאפליקציה להריץ משימות אסינכרוניות מחוץ לזרימת האפליקציה הראשית. משימות כאלה צורכות הרבה זמן ומשאבים, או שהן מוגבלות ברוחב הפס.

לדוגמה, נניח שרוצים ליצור גיבויים של קבוצה גדולה של קובצי תמונות שמתארחים כרגע ב-API עם הגבלת קצב. כדי להשתמש ב-API בצורה אחראית, צריך לכבד את הגבלות הקצב שלו. בנוסף, עבודות כאלה שפועלות לאורך זמן עלולות להיכשל בגלל פסק זמן ומגבלות זיכרון.

כדי לצמצם את המורכבות הזו, אפשר לכתוב פונקציית תור משימות שמגדירה אפשרויות בסיסיות למשימות כמו scheduleTime ו-dispatchDeadline, ואז מעבירה את הפונקציה לתור ב-Cloud Tasks. סביבת Cloud Tasks מיועדת במיוחד כדי להבטיח בקרה יעילה על עומס וכללי מדיניות לניסיון חוזר עבור סוגי הפעולות האלה.

‫Firebase SDK ל-Cloud Functions for Firebase מגרסה 3.20.1 ואילך פועל בשילוב עם Firebase Admin SDK מגרסה 10.2.0 ואילך כדי לתמוך בפונקציות של תור משימות.

שימוש בפונקציות של תור משימות עם Firebase עלול להוביל לחיובים על עיבוד Cloud Tasks. מידע נוסף זמין במאמר בנושא Cloud Tasks תמחור.

יצירת פונקציות של תור משימות

כדי להשתמש בפונקציות של תור משימות, פועלים לפי תהליך העבודה הבא:

  1. כתיבת פונקציה של תור משימות באמצעות Firebase SDK ל-Cloud Functions.
  2. בודקים את הפונקציה על ידי הפעלת הטריגר שלה באמצעות בקשת HTTP.
  3. פורסים את הפונקציה באמצעות Firebase CLI. כשפורסים את הפונקציה של תור המשימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם האפשרויות (הגבלת קצב וניסיון חוזר) שצוינו בקוד המקור.
  4. מוסיפים משימות לתור המשימות החדש, ומעבירים פרמטרים כדי להגדיר לוח זמנים להפעלה אם צריך. כדי לעשות את זה, כותבים את הקוד באמצעות Admin SDK ומפרסים אותו ב-Cloud Functions for Firebase.

כתיבת פונקציות של תור משימות

דוגמאות הקוד בקטע הזה מבוססות על אפליקציה שמגדירה שירות לגיבוי כל התמונות מ-Astronomy Picture of the Day של נאס"א. כדי להתחיל, מייבאים את המודולים הנדרשים:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");

// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

אפשר להשתמש בפונקציות של תור המשימות onTaskDispatched או on_task_dispatched. כשכותבים פונקציה של תור משימות, אפשר להגדיר ניסיון חוזר לכל תור והגבלת קצב של יצירת בקשות.

הגדרת פונקציות של תור משימות

פונקציות של תור משימות מגיעות עם קבוצה עוצמתית של הגדרות לשליטה מדויקת במגבלות הקצב ובהתנהגות הניסיון החוזר של תור משימות:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(
    retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
    rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: כל משימה בתור המשימות מנסה לבצע מחדש באופן אוטומטי עד 5 פעמים. זה עוזר לצמצם שגיאות זמניות כמו שגיאות ברשת או שיבוש זמני בשירות של שירות חיצוני שתלוי בו.

  • retryConfig.minBackoffSeconds=60: כל משימה מנסים להפעיל מחדש לפחות 60 שניות אחרי כל ניסיון. כך נוצר מרווח גדול בין כל ניסיון, כדי שלא נמהר למצות את 5 הניסיונות החוזרים מהר מדי.

  • rateLimits.maxConcurrentDispatch=6: לכל היותר 6 משימות נשלחות בכל זמן נתון. כך אפשר להבטיח זרם קבוע של בקשות לפונקציה הבסיסית, ולצמצם את מספר המקרים הפעילים וההפעלות במצב התחלתי (cold start).

בדיקת פונקציות של תור משימות

ברוב המקרים, Cloud Functionsהאמולטור הוא הדרך הכי טובה לבדוק פונקציות של תור משימות. במאמרי העזרה של כלים לאמולטור מוסבר איך לבצע אינסטרומנטציה באפליקציה לאמולציה של פונקציות של תור משימות.

בנוסף, פונקציות של תור משימות נחשפות כפונקציות HTTP פשוטות ב-Firebase Local Emulator Suite. אפשר לבדוק פונקציית משימה שנוצרה באמצעות אמולטור על ידי שליחת בקשת HTTP POST עם מטען נתונים של JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

פריסת פונקציות של תור משימות

פורסים את הפונקציה של תור המשימות באמצעות Firebase CLI:

$ firebase deploy --only functions:backupapod

כשמפעילים פונקציה של תור משימות בפעם הראשונה, ממשק ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וניסיון חוזר) שצוינו בקוד המקור.

אם נתקלתם בשגיאות הרשאה כשפרסתם פונקציות, ודאו שתפקידי ה-IAM המתאימים הוקצו למשתמש שמריץ את פקודות הפריסה.

הוספה לתור של פונקציות של תור משימות

אפשר להוסיף פונקציות של תור משימות לתור ב-Cloud Tasks מסביבת שרת מהימנה כמו Cloud Functions for Firebase באמצעות Firebase Admin SDK ל-Node.js או ספריות Google Cloud ל-Python. אם אתם חדשים ב-Admin SDK, כדאי לקרוא את המאמר הוספת Firebase לשרת כדי להתחיל.

בתהליך רגיל נוצרת משימה חדשה, היא מתווספת לתור ב-Cloud Tasks ומוגדרת:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(
            schedule_time=schedule_time,
            dispatch_deadline_seconds=dispatch_deadline_seconds,
            uri=target_uri,
        )
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • הקוד לדוגמה מנסה לפזר את הביצוע של המשימות על ידי שיוך של עיכוב של N דקות למשימה ה-N. המשמעות היא הפעלה של משימה אחת בדקה. שימו לב שאפשר גם להשתמש ב-scheduleTime (Node.js) או ב-schedule_time (Python) אם רוצים ש-Cloud Tasks יפעיל משימה בשעה ספציפית.

  • קוד הדוגמה מגדיר את משך הזמן המקסימלי ש-Cloud Tasks ימתין עד להשלמת משימה. ‫Cloud Tasks ינסה שוב לבצע את המשימה בהתאם להגדרת הניסיון החוזר של התור או עד שיגיע הדדליין הזה. בדוגמה, התור מוגדר לנסות לבצע את המשימה עד 5 פעמים, אבל המשימה מבוטלת אוטומטית אם התהליך כולו (כולל ניסיונות חוזרים) נמשך יותר מ-5 דקות.

פתרון בעיות

הפעלת רישום ביומן של Cloud Tasks

יומנים מ-Cloud Tasks מכילים מידע אבחוני שימושי כמו הסטטוס של הבקשה שמשויכת למשימה. כברירת מחדל, יומנים מ-Cloud Tasks מושבתים בגלל נפח היומנים הגדול שהם יכולים ליצור בפרויקט. מומלץ להפעיל את יומני הניפוי באגים בזמן שאתם מפתחים ומנפים באגים בפונקציות של תור המשימות. איך מפעילים את הרישום ביומן

הרשאות IAM

יכול להיות שתראו שגיאות PERMISSION DENIED כשמוסיפים משימות לתור או כש-Cloud Tasks מנסה להפעיל את הפונקציות של תור המשימות. חשוב לוודא שלפרויקט יש את ההרשאות הבאות של IAM:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

הוראות להוספת חשבון השירות שמוגדר כברירת מחדל [App Engine] כמשתמש בחשבון השירות שמוגדר כברירת מחדל [App Engine] זמינות במאמרי העזרה של Google Cloud IAM.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker