จัดคิวฟังก์ชันด้วย Cloud Tasks


ฟังก์ชันคิวงานใช้ประโยชน์จาก Google Cloud Tasks เพื่อช่วยให้แอปของคุณทำงานที่กินเวลามาก ต้องใช้ทรัพยากรมาก หรือจำกัดแบนด์วิดท์ แบบไม่พร้อมกันนอกขั้นตอนหลักของแอปพลิเคชัน

ตัวอย่างเช่น สมมติว่าคุณต้องการสร้างข้อมูลสำรองของชุดไฟล์ภาพขนาดใหญ่ที่กำลังโฮสต์อยู่บน API ที่มีขีดจำกัดอัตรา หากต้องการเป็นผู้บริโภคที่มีความรับผิดชอบของ API ดังกล่าว คุณต้องปฏิบัติตามขีดจำกัดอัตราคำขอของผู้ใช้เหล่านั้น นอกจากนี้ งานที่ใช้เวลานานประเภทนี้อาจเสี่ยงต่อการล้มเหลวเนื่องจากหมดเวลาและขีดจำกัดของหน่วยความจำ

เพื่อลดความซับซ้อนนี้ คุณเขียนฟังก์ชันคิวงานที่ตั้งค่าตัวเลือกงานพื้นฐาน เช่น scheduleTime และ dispatchDeadline และส่งฟังก์ชันไปยังคิวใน Cloud Tasks ได้ สภาพแวดล้อม Cloud Tasks ออกแบบมาโดยเฉพาะเพื่อให้มั่นใจได้ว่านโยบายการควบคุมความคับคั่งและการลองใหม่มีประสิทธิภาพสำหรับการดำเนินการเหล่านี้โดยเฉพาะ

Firebase SDK สำหรับ Cloud Functions for Firebase v3.20.1 ขึ้นไปสามารถทำงานร่วมกับ Firebase Admin SDK เวอร์ชัน 10.2.0 ขึ้นไปเพื่อรองรับฟังก์ชันคิวงาน

การใช้ฟังก์ชันคิวงานกับ Firebase อาจทำให้มีค่าใช้จ่ายในการประมวลผล Cloud Tasks โปรดดูข้อมูลเพิ่มเติมที่ราคาของ Cloud Tasks

สร้างฟังก์ชันคิวงาน

หากต้องการใช้ฟังก์ชันคิวงาน ให้ทำตามขั้นตอนต่อไปนี้

  1. เขียนฟังก์ชันคิวงานโดยใช้ Firebase SDK สำหรับ Cloud Functions
  2. ทดสอบฟังก์ชันโดยทริกเกอร์ด้วยคำขอ HTTP
  3. ทำให้ฟังก์ชันใช้งานได้ด้วย Firebase CLI เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราแล้วลองอีกครั้ง) ที่ระบุไว้ในซอร์สโค้ดของคุณ
  4. เพิ่มงานลงในคิวงานที่สร้างขึ้นใหม่ แล้วส่งต่อพารามิเตอร์เพื่อตั้งค่ากำหนดการดำเนินการ หากจำเป็น โดยการเขียนโค้ดโดยใช้ Admin SDK และทำให้ใช้งานได้ใน Cloud Functions for Firebase

เขียนฟังก์ชันของคิวงาน

ตัวอย่างโค้ดในส่วนนี้อิงตามแอปที่ตั้งบริการซึ่งสำรองข้อมูลรูปภาพทั้งหมดจากภาพดาราศาสตร์ประจำวันของนาซา หากต้องการเริ่มต้นใช้งาน ให้นำเข้าโมดูลที่จำเป็น:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

ใช้ onTaskDispatched หรือ on_task_dispatched สำหรับฟังก์ชันคิวงาน เมื่อเขียนฟังก์ชันคิวงาน คุณสามารถตั้งค่าการลองอีกครั้งต่อคิวและการกำหนดค่าการจำกัดอัตรา

กำหนดค่าฟังก์ชันของคิวงาน

ฟังก์ชันคิวงานมาพร้อมกับชุดการกำหนดค่าที่มีประสิทธิภาพเพื่อควบคุมขีดจำกัดอัตราคำขอและลองดำเนินการของคิวงานอีกครั้งได้อย่างแม่นยํา

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: งานแต่ละรายการในคิวงานจะพยายามซ้ำสูงสุด 5 ครั้งโดยอัตโนมัติ วิธีนี้จะช่วยลดข้อผิดพลาดชั่วคราว เช่น ข้อผิดพลาดของเครือข่ายหรือการหยุดชะงักของบริการชั่วคราวของบริการภายนอกที่เกี่ยวข้อง

  • retryConfig.minBackoffSeconds=60: งานแต่ละอย่างจะลองซ้ำอย่างน้อย 60 วินาทีจากความพยายามแต่ละครั้ง วิธีนี้จะช่วยกันบัฟเฟอร์ขนาดใหญ่ระหว่างความพยายามแต่ละครั้ง ดังนั้นเราจึงไม่ต้องเร่งการพยายามลองใหม่ 5 ครั้งให้เร็วเกินไป

  • rateLimits.maxConcurrentDispatch=6: ระบบจะส่งงานไม่เกิน 6 งานในเวลาที่กำหนด วิธีนี้ช่วยให้มั่นใจว่าจะมีคำขอสตรีมไปยังฟังก์ชันที่สำคัญอย่างต่อเนื่อง และช่วยลดจำนวนอินสแตนซ์ที่ใช้งานอยู่และ Cold Start

ทดสอบฟังก์ชันของคิวงาน

ฟังก์ชันคิวงานใน Firebase Local Emulator Suite จะแสดงเป็นฟังก์ชัน HTTP แบบง่าย คุณทดสอบฟังก์ชันงานที่จำลองได้โดยการส่งคำขอ HTTP POST พร้อมเพย์โหลดข้อมูล JSON ดังนี้

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

ทำให้ฟังก์ชันคิวงานใช้งานได้

ทำให้ฟังก์ชันคิวงานใช้งานได้โดยใช้ Firebase CLI:

$ firebase deploy --only functions:backupapod

เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราแล้วลองอีกครั้ง) ที่ระบุในซอร์สโค้ดของคุณ

หากพบข้อผิดพลาดเกี่ยวกับสิทธิ์เมื่อทำให้ฟังก์ชันใช้งานได้ โปรดตรวจสอบว่าได้มอบหมายบทบาท IAM ที่เหมาะสมให้กับผู้ใช้ที่ใช้คำสั่งการทำให้ใช้งานได้

จัดคิวงานของฟังก์ชัน

คุณจัดคิวฟังก์ชันคิวงานได้ใน Cloud Tasks จากสภาพแวดล้อมเซิร์ฟเวอร์ที่เชื่อถือได้ เช่น Cloud Functions for Firebase โดยใช้ Firebase Admin SDK สำหรับ Node.js หรือไลบรารี Google Cloud สำหรับ Python หากคุณเพิ่งเริ่มใช้ SDK ผู้ดูแลระบบ โปรดดูที่หัวข้อเพิ่ม Firebase ไปยังเซิร์ฟเวอร์เพื่อเริ่มต้นใช้งาน

โฟลว์ทั่วไปจะสร้างงานใหม่ จัดคิวงานใน Cloud Tasks และตั้งค่าการกำหนดค่าสำหรับงาน ดังนี้

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • โค้ดตัวอย่างจะพยายามกระจายการดำเนินการของงานด้วยการเชื่อมโยงความล่าช้าของ Nth นาทีสำหรับงาน N ซึ่งจะแปลงเป็นการเรียกใช้ประมาณ 1 งาน/นาที โปรดทราบว่าคุณยังใช้ scheduleTime (Node.js) หรือ schedule_time (Python) ได้หากต้องการให้งานระบบคลาวด์ทริกเกอร์งานในเวลาที่เฉพาะเจาะจง

  • โค้ดตัวอย่างจะกำหนดระยะเวลาสูงสุดที่ Cloud Tasks จะรอให้งานเสร็จสมบูรณ์ Cloud Tasks จะลองทำงานอีกครั้งหลังจากกำหนดค่าคิวใหม่ หรือจนกว่าจะถึงกำหนดเวลานี้ ในตัวอย่าง คิวได้รับการกำหนดค่าให้ลองทำงานอีกครั้งได้สูงสุด 5 ครั้ง แต่งานจะถูกยกเลิกโดยอัตโนมัติหากกระบวนการทั้งหมด (รวมถึงการพยายามลองใหม่) ใช้เวลานานกว่า 5 นาที

ดึงข้อมูลและรวม URI เป้าหมาย

เนื่องจากวิธีที่ Cloud Tasks สร้างโทเค็นการตรวจสอบสิทธิ์เพื่อตรวจสอบสิทธิ์คำขอไปยังฟังก์ชันคิวงานที่สำคัญ คุณต้องระบุ URL ของ Cloud Run ของฟังก์ชันดังกล่าวเมื่อจัดคิวงาน เราขอแนะนำให้คุณดึงข้อมูล URL สำหรับฟังก์ชันโดยใช้โปรแกรมดังที่แสดงด้านล่าง

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

การแก้ปัญหา

เปิดการบันทึก Cloud Tasks

บันทึกจาก Cloud Tasks มีข้อมูลการวินิจฉัยที่เป็นประโยชน์ เช่น สถานะของคำขอที่เชื่อมโยงกับงาน โดยค่าเริ่มต้น ระบบจะปิดบันทึกจาก Cloud Tasks เนื่องจากมีบันทึกจำนวนมากที่อาจสร้างขึ้นในโปรเจ็กต์ของคุณได้ เราขอแนะนำให้คุณเปิดบันทึกการแก้ไขข้อบกพร่องขณะที่กำลังพัฒนาและแก้ไขข้อบกพร่องของฟังก์ชันคิวงาน โปรดดูการเปิดการบันทึก

สิทธิ์ IAM

คุณอาจเห็นข้อผิดพลาด PERMISSION DENIED รายการเมื่อจัดคิวงานหรือเมื่อ Cloud Tasks พยายามเรียกใช้ฟังก์ชันคิวงาน ตรวจสอบว่าโปรเจ็กต์มีการเชื่อมโยง IAM ต่อไปนี้

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • ข้อมูลประจำตัวที่ใช้กำหนดคิวงานไปยัง Cloud Tasks ต้องมีสิทธิ์ใช้บัญชีบริการที่เชื่อมโยงกับงานใน Cloud Tasks

    ในตัวอย่างนี้คือบัญชีบริการเริ่มต้นของ App Engine

ดูเอกสาร IAM ของ Google Cloud เพื่อดูวิธีเพิ่มบัญชีบริการเริ่มต้นของ App Engine เป็นผู้ใช้บัญชีบริการเริ่มต้นของ App Engine

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker