תור פונקציות עם Cloud Tasks


פונקציות תור המשימות מנצלות את היתרונות של Google Cloud Tasks כדי לעזור לאפליקציה שלך להריץ משימות גוזלות זמן, עתירות משאבים או מוגבלות ברוחב פס באופן אסינכרוני, מחוץ לזרימת האפליקציה הראשית שלך.

לדוגמה, דמיינו שאתם רוצים ליצור גיבויים של קבוצה גדולה של קבצי תמונה שמתארחים כעת ב-API עם הגבלת קצב. כדי להיות צרכן אחראי של ה-API הזה, עליך לכבד את מגבלות התעריפים שלו. בנוסף, סוג זה של עבודה ארוכת טווח עלול להיות חשוף לכישלון עקב פסק זמן ומגבלות זיכרון.

כדי להפחית את המורכבות הזו, אתה יכול לכתוב פונקציית תור משימות שמגדירה אפשרויות משימה בסיסיות כמו scheduleTime ו- dispatchDeadline , ולאחר מכן להעביר את הפונקציה לתור ב-Cloud Tasks. סביבת Cloud Tasks תוכננה במיוחד כדי להבטיח בקרת גודש יעילה ומדיניות ניסיון חוזר עבור פעולות מסוג זה.

Firebase SDK for Cloud Functions for Firebase v3.20.1 ואילך פועל בשילוב עם Firebase Admin SDK v10.2.0 ואילך כדי לתמוך בפונקציות של תור משימות.

שימוש בפונקציות בתור משימות עם Firebase עלול לגרום לחיובים עבור עיבוד משימות ענן. ראה תמחור Cloud Tasks למידע נוסף.

צור פונקציות בתור משימות

כדי להשתמש בפונקציות של תור משימות, בצע את זרימת העבודה הבאה:

  1. כתוב פונקציה של תור משימות באמצעות Firebase SDK for Cloud Functions.
  2. בדוק את הפונקציה שלך על ידי הפעלתה באמצעות בקשת HTTP.
  3. פרוס את הפונקציה שלך עם Firebase CLI. בעת פריסת פונקציית תור המשימות שלך בפעם הראשונה, ה-CLI יצור תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וניסיון חוזר) המצוינות בקוד המקור שלך.
  4. הוסף משימות לתור המשימות החדש שנוצר, העברת פרמטרים כדי להגדיר לוח זמנים לביצוע במידת הצורך. אתה יכול להשיג זאת על ידי כתיבת הקוד באמצעות ה-Admin SDK ופריסה שלו ב-Cloud Functions for Firebase.

כתוב פונקציות בתור משימות

דוגמאות קוד בסעיף זה מבוססות על אפליקציה שמגדירה שירות שמגבה את כל התמונות מתמונת היום באסטרונומיה של נאס"א. כדי להתחיל, ייבא את המודולים הדרושים:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

פִּיתוֹן

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

השתמש onTaskDispatched או on_task_dispatched עבור פונקציות בתור משימות. בעת כתיבת פונקציית תור משימות תוכל להגדיר ניסיון חוזר לכל תור ותצורה מגבילת קצב.

הגדר את פונקציות תור המשימות

פונקציות תור המשימות מגיעות עם קבוצה רבת עוצמה של הגדרות תצורה כדי לשלוט במדויק על מגבלות קצב והתנהגות חוזרת של תור משימות:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

פִּיתוֹן

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : כל משימה בתור המשימות מתבצעת מחדש אוטומטית עד 5 פעמים. זה עוזר להפחית שגיאות חולפות כמו שגיאות רשת או הפרעה זמנית בשירות של שירות חיצוני תלוי.

  • retryConfig.minBackoffSeconds=60 : כל משימה מתבצעת מחדש בהפרש של 60 שניות לפחות מכל ניסיון. זה מספק חיץ גדול בין כל ניסיון כך שלא נמהר למצות את 5 ניסיונות הניסיון החוזר מהר מדי.

  • rateLimits.maxConcurrentDispatch=6 : לכל היותר 6 משימות נשלחות בזמן נתון. זה עוזר להבטיח זרם קבוע של בקשות לפונקציה הבסיסית ועוזר להפחית את מספר המופעים הפעילים והתחלות קרות.

בדוק את פונקציות תור המשימות

פונקציות תור המשימות ב-Firebase Local Emulator Suite נחשפות כפונקציות HTTP פשוטות. אתה יכול לבדוק פונקציית משימה מדומה על ידי שליחת בקשת HTTP POST עם מטען נתונים של JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

פרוס פונקציות בתור משימות

פרוס את פונקציית תור המשימות באמצעות Firebase CLI:

$ firebase deploy --only functions:backupapod

בעת פריסת פונקציית תור משימות בפעם הראשונה, ה-CLI יוצר תור משימות ב-Cloud Tasks עם אפשרויות (הגבלת קצב וניסיון חוזר) המצוינות בקוד המקור שלך.

אם אתה נתקל בשגיאות הרשאות בעת פריסת פונקציות, ודא שתפקידי IAM המתאימים מוקצים למשתמש המריץ את פקודות הפריסה.

תור פונקציות של תור משימות

ניתן להעמיד פונקציות בתור משימות בתור ב-Cloud Tasks מסביבת שרת מהימנה כמו Cloud Functions for Firebase באמצעות Firebase Admin SDK עבור Node.js או ספריות Google Cloud עבור Python. אם אתה חדש ב-Admin SDK, ראה הוסף Firebase לשרת כדי להתחיל.

זרימה טיפוסית יוצרת משימה חדשה, מעמידה אותה בתור ב-Cloud Tasks ומגדירה את התצורה של המשימה:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

פִּיתוֹן

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • הקוד לדוגמה מנסה לפזר את הביצוע של משימות על ידי שיוך עיכוב של דקות Nth עבור המשימה Nth. זה מתורגם להפעלת ~ משימה אחת/דקה. שים לב שאתה יכול גם להשתמש scheduleTime (Node.js) או schedule_time (Python) אם אתה רוצה שמשימות ענן יפעילו משימה בזמן מסוים.

  • הקוד לדוגמה מגדיר את משך הזמן המקסימלי ש-Cloud Tasks ימתינו להשלמת משימה. Cloud Tasks ינסה שוב את המשימה לאחר הגדרת הנסיון החוזר של התור או עד למיצוי המועד האחרון. בדוגמה, התור מוגדר לנסות שוב את המשימה עד 5 פעמים, אך המשימה מבוטלת באופן אוטומטי אם כל התהליך (כולל ניסיונות ניסיון חוזר) נמשך יותר מ-5 דקות.

אחזר וכלול את URI היעד

בשל האופן שבו Cloud Tasks יוצר אסימוני אימות כדי לאמת בקשות לפונקציות תור המשימות הבסיסיות, עליך לציין את כתובת ה-URL של Cloud Run של הפונקציה בעת עמידה בתור משימות. אנו ממליצים לך לאחזר באופן פרוגרמטי את כתובת האתר עבור הפונקציה שלך כפי שמוצג להלן:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

פִּיתוֹן

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

פתרון תקלות

הפעל את רישום המשימות בענן

יומנים ממשימות ענן מכילים מידע אבחוני שימושי כמו מצב הבקשה המשויכת למשימה. כברירת מחדל, יומנים ממשימות ענן כבויים עקב הנפח הגדול של יומנים שהוא יכול ליצור בפרויקט שלך. אנו ממליצים להפעיל את יומני ניפוי הבאגים בזמן שאתה מפתח וניפוי באגים באופן פעיל בפונקציות תור המשימות שלך. ראה הפעלת רישום .

הרשאות IAM

ייתכן שתראה שגיאות PERMISSION DENIED בעת הצגת משימות בתור או כאשר Cloud Tasks מנסה להפעיל את פונקציות תור המשימות שלך. ודא שלפרויקט שלך יש את כריכות IAM הבאות:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

עיין בתיעוד של Google Cloud IAM לקבלת הוראות כיצד להוסיף את חשבון שירות ברירת המחדל של App Engine כמשתמש בחשבון שירות ברירת המחדל של App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker