הוספת פונקציות לתור באמצעות Cloud Tasks


הפונקציות של תור המשימות מנצלות את היתרונות של Google Cloud Tasks כדי לעזור לאפליקציה שלכם להריץ באופן אסינכרוני משימות שצורכות זמן, צריכת משאבים או רוחב פס באופן אסינכרוני, מחוץ לתהליך הראשי של האפליקציה.

לדוגמה, נניח שאתם רוצים ליצור גיבויים של קבוצה גדולה של קובצי תמונות שמתארחים כרגע ב-API עם הגבלת קצב שליחה. כדי להיות צרכנים אחראיים של אותו API, עליכם לפעול בהתאם להגבלות הקצב של יצירת הבקשות. בנוסף, משימות ארוכות כאלה עלולות להיכשל בגלל זמן קצוב לתפוגה ומגבלות זיכרון.

כדי לצמצם את המורכבות הזו, אפשר לכתוב פונקציה של תור משימות שמגדירה אפשרויות משימות בסיסיות כמו scheduleTime ו-dispatchDeadline, ואז מעבירה את הפונקציה לתור ב-Cloud Tasks. הסביבה Cloud Tasks תוכננה במיוחד כדי להבטיח בקרה יעילה על עומסים וכללי מדיניות חוזרים לפעולות מהסוג הזה.

כדי לתמוך בפונקציות של 'תור משימות', Firebase SDK for Cloud Functions for Firebase מגרסה 3.20.1 ואילך פועל באופן הדדי עם Firebase Admin SDK בגרסה 10.2.0 ואילך.

שימוש בפונקציות של תור משימות עם Firebase עלול לגרום לחיוב על עיבוד Cloud Tasks. למידע נוסף, ראו תמחור Cloud Tasks.

יצירת פונקציות בתור המשימות

כדי להשתמש בפונקציות של תור המשימות, פועלים לפי תהליך העבודה הבא:

  1. כותבים פונקציה של תור משימות באמצעות ה-SDK של Firebase ל-Cloud Functions.
  2. כדי לבדוק את הפונקציה, מפעילים אותה באמצעות בקשת HTTP.
  3. פורסים את הפונקציה באמצעות ה-CLI Firebase. כשפורסים את הפונקציה של תור המשימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וניסיון חוזר) שצוינו בקוד המקור.
  4. מוסיפים משימות לתור המשימות החדש שנוצר, ומעבירים פרמטרים כדי להגדיר לוח זמנים לביצוע אם צריך. כדי לעשות זאת, כותבים את הקוד באמצעות Admin SDK ופורסים אותו ב-Cloud Functions for Firebase.

כתיבת פונקציות של תור משימות

דוגמאות הקוד בקטע הזה מבוססות על אפליקציה שמגדירה שירות לגיבוי כל התמונות מ-Astronomy Picture of the Day של NASA. כדי להתחיל, מייבאים את המודולים הנדרשים:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

משתמשים ב-onTaskDispatched או ב-on_task_dispatched לפונקציות של תור המשימות. כשכותבים פונקציה של תור משימות, אפשר להגדיר הגדרות של ניסיונות חוזרים לכל תור והגבלת קצב.

הגדרת פונקציות של תור משימות

פונקציות של תור משימות מגיעות עם קבוצה חזקה של הגדרות אישיות, שמאפשרות לשלוט במדויק במגבלות הקצב ובהתנהגות של ניסיונות חוזרים של תור משימות:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: כל משימה בתור המשימות תנסה שוב באופן אוטומטי עד 5 פעמים. כך אפשר לצמצם שגיאות זמניות, כמו שגיאות רשת או שיבושים זמניים בשירות של שירות חיצוני תלוי.

  • retryConfig.minBackoffSeconds=60: המערכת תנסה שוב כל משימה לפחות 60 שניות אחרי כל ניסיון. כך נוצר מאגר גדול בין כל ניסיון, כדי שלא נמהר להשתמש בכל 5 הניסיונות החוזרים מהר מדי.

  • rateLimits.maxConcurrentDispatch=6: נשלחות עד 6 משימות בכל רגע נתון. כך אפשר להבטיח זרם יציב של בקשות לפונקציה הבסיסית, ולצמצם את מספר המכונות הפעילות והפעלות במצב התחלתי (cold start).

בדיקת הפונקציות בתור המשימה

ברוב המקרים, המהדר של Cloud Functions הוא הדרך הטובה ביותר לבדוק פונקציות של תורים של משימות. במשאבי העזרה של חבילת האמולטור מוסבר איך לבצע אינסטרומנטציה של האפליקציה לאמולציה של פונקציות בתור משימות.

בנוסף, פונקציות ה-functions_sdk של תור המשימות מוצגות כפונקציות HTTP פשוטות ב-Firebase Local Emulator Suite. כדי לבדוק פונקציית משימה שעברה הדמיה, שולחים בקשת HTTP POST עם מטען נתונים בפורמט JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

פריסת פונקציות של תור משימות

פורסים את הפונקציה של תור המשימות באמצעות CLI של Firebase:

$ firebase deploy --only functions:backupapod

כשפורסים פונקציה של תור משימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וביצוע ניסיון חוזר) שצוינו בקוד המקור.

אם נתקלים בשגיאות בהרשאות בפריסה של פונקציות, צריך לוודא שתפקידי ה-IAM המתאימים מוקצים למשתמש שמריצים את פקודות הפריסה.

הוספת פונקציות לתור המשימה

אפשר להוסיף פונקציות של תור משימות ל-Cloud Tasks מסביבת שרת מהימנה כמו Cloud Functions for Firebase באמצעות Firebase Admin SDK ל-Node.js או ספריות Google Cloud ל-Python. אם זו הפעם הראשונה שאתם משתמשים ב-Admin SDK, כדאי לעיין במאמר הוספת Firebase לשרת כדי להתחיל.

תהליך טיפוסי יוצר משימה חדשה, מוסיפה אותה לתור ב-Cloud Tasks ומגדיר את ההגדרות של המשימה:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • הקוד לדוגמה מנסה לפזר את ביצוע המשימות על ידי שיוך השהיה של N דקות למשימה Nth. כלומר, הפעלה של כ-1 משימה לדקה. הערה: אפשר גם להשתמש ב-scheduleTime (Node.js) או ב-schedule_time (Python) אם רוצים ש-Cloud Tasks תפעיל משימה בשעה ספציפית.

  • בקוד לדוגמה מוגדר משך הזמן המקסימלי ש-Cloud Tasks ימתין להשלמת המשימה. Cloud Tasks ינסה לבצע שוב את המשימה אחרי הגדרת התור לניסיון חוזר או עד לתאריך היעד הזה. בדוגמה, התור מוגדר לנסות שוב את המשימה עד 5 פעמים, אבל המשימה מבוטלת באופן אוטומטי אם התהליך כולו (כולל ניסיונות חוזרים) נמשך יותר מ-5 דקות.

אחזור והכללה של URI היעד

בגלל האופן שבו Cloud Tasks יוצר אסימוני אימות כדי לאמת בקשות לפונקציות של תור המשימות הבסיסי, צריך לציין את כתובת ה-URL של הפונקציה ב-Cloud Run כשמקדישים משימות. מומלץ לאחזר את כתובת ה-URL של הפונקציה באופן פרוגרמטי, כפי שמתואר בהמשך:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

פתרון בעיות

Cloud Tasks">הפעלת הרישום ביומן של Cloud Tasks

היומנים מ-Cloud Tasks מכילים מידע שימושי לאבחון, כמו סטטוס הבקשה שמשויכת למשימה. כברירת מחדל, יומנים מ-Cloud Tasks מושבתים בגלל כמות היומנים הגדולה שיכולה ליצור בפרויקט שלך. מומלץ להפעיל את יומני ניפוי הבאגים בזמן הפיתוח והניפוי של פונקציות תור המשימות. הפעלת הרישום ביומן

הרשאות IAM

יכול להיות שתראו שגיאות מסוג PERMISSION DENIED כשאתם מוסיפים משימות לתור או כש-Cloud Tasks מנסה להפעיל את הפונקציות של תור המשימות. מוודאים שהפרויקט כולל את קישורי ה-IAM הבאים:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

הוראות להוספת חשבון השירות App Engine שמוגדר כברירת מחדל כמשתמש בחשבון השירות App Engine שמוגדר כברירת מחדל מפורטות במסמכי העזרה של Google Cloud IAM.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker