จัดคิวฟังก์ชันด้วย Cloud Tasks

ฟังก์ชันคิวงานใช้ประโยชน์จาก Google Cloud Tasks เพื่อช่วยให้แอปเรียกใช้งานที่ใช้เวลานาน ใช้ทรัพยากรมาก หรือมีแบนด์วิดท์จำกัด แบบไม่พร้อมกันนอกโฟลว์แอปพลิเคชันหลัก

ตัวอย่างเช่น สมมติว่าคุณต้องการสร้างข้อมูลสำรองของไฟล์รูปภาพจำนวนมากที่โฮสต์อยู่ใน API ที่มีการจำกัดอัตราการเรียก คุณต้องปฏิบัติตามการจำกัดอัตราการเรียกของ API นั้นๆ เพื่อให้เป็นผู้ใช้ API อย่างมีความรับผิดชอบ นอกจากนี้ งานที่ใช้เวลานานประเภทนี้ยังอาจเกิดข้อผิดพลาดได้เนื่องจากหมดเวลาและหน่วยความจำเต็ม

คุณสามารถเขียนฟังก์ชันคิวงานที่ตั้งค่าตัวเลือกงานพื้นฐาน งาน เช่น scheduleTime และ dispatchDeadline แล้วส่ง ฟังก์ชันไปยังคิวใน Cloud Tasks เพื่อลดความซับซ้อนนี้ Cloud Tasks สภาพแวดล้อมได้รับการออกแบบมาโดยเฉพาะเพื่อให้มั่นใจว่ามีการควบคุมการรับส่งข้อมูลที่มากเกินไปและนโยบายการลองใหม่ที่มีประสิทธิภาพสำหรับการดำเนินการประเภทนี้

Firebase SDK สำหรับ Cloud Functions for Firebase v3.20.1 ขึ้นไปทำงานร่วมกับ Firebase Admin SDK v10.2.0 ขึ้นไปเพื่อรองรับฟังก์ชันคิวงาน

การใช้ฟังก์ชันคิวงานกับ Firebase อาจทำให้เกิดค่าใช้จ่ายในการประมวลผล Cloud Tasks ดูข้อมูลเพิ่มเติมได้ที่ Cloud Tasksราคา

สร้างฟังก์ชันคิวงาน

หากต้องการใช้ฟังก์ชันคิวงาน ให้ทำตามเวิร์กโฟลว์นี้

  1. เขียนฟังก์ชันคิวงานโดยใช้ Firebase SDK สำหรับ Cloud Functions
  2. ทดสอบฟังก์ชันโดยทริกเกอร์ด้วยคำขอ HTTP
  3. ทำให้ฟังก์ชันใช้งานได้ด้วย Firebase CLI เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราคำขอและการลองใหม่) ที่ระบุไว้ในซอร์สโค้ด
  4. เพิ่มงานลงในคิวงานที่สร้างขึ้นใหม่ โดยส่งพารามิเตอร์เพื่อตั้งค่ากำหนดการดำเนินการหากจำเป็น คุณสามารถทำได้โดยเขียนโค้ด โดยใช้ Admin SDK และทำให้โค้ดใช้งานได้ใน Cloud Functions for Firebase

เขียนฟังก์ชันคิวงาน

ตัวอย่างโค้ดในส่วนนี้อิงตามแอปที่ตั้ง ค่าบริการที่สำรองรูปภาพทั้งหมดจากรูปภาพดาราศาสตร์ประจำวันของ NASA's Astronomy Picture of the Day หากต้องการเริ่มต้นใช้งาน ให้นำเข้าโมดูลที่จำเป็นดังนี้

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");

// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

ใช้ onTaskDispatched หรือ on_task_dispatched สำหรับฟังก์ชันคิวงาน เมื่อเขียนฟังก์ชันคิวงาน คุณสามารถตั้งค่าการลองใหม่และการจำกัดอัตราการเรียกต่อคิวได้

กำหนดค่าฟังก์ชันคิวงาน

ฟังก์ชันคิวงานมาพร้อมกับการตั้งค่าการกำหนดค่าที่มีประสิทธิภาพเพื่อควบคุมการจำกัดอัตราการเรียกและลักษณะการทำงานของการลองใหม่ของคิวงานได้อย่างแม่นยำ

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(
    retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
    rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: ระบบจะลองใหม่แต่ละงานในคิวงานโดยอัตโนมัติสูงสุด 5 ครั้ง ซึ่งจะช่วยลดข้อผิดพลาดชั่วคราว เช่น ข้อผิดพลาดของเครือข่ายหรือการหยุดชะงักของบริการชั่วคราวของบริการภายนอกที่ขึ้นอยู่กับบริการนั้น

  • retryConfig.minBackoffSeconds=60: ระบบจะลองใหม่แต่ละงานโดยเว้นระยะห่างอย่างน้อย 60 วินาทีระหว่างการลองแต่ละครั้ง ซึ่งจะช่วยให้มีบัฟเฟอร์ขนาดใหญ่ระหว่างการลองแต่ละครั้ง เพื่อไม่ให้เราเร่งการลองใหม่ 5 ครั้งเร็วเกินไป

  • rateLimits.maxConcurrentDispatch=6: ระบบจะส่งงานพร้อมกันสูงสุด 6 งานในเวลาที่กำหนด ซึ่งจะช่วยให้มั่นใจได้ว่ามีการส่งคำขอไปยังฟังก์ชันพื้นฐานอย่างต่อเนื่อง และช่วยลดจำนวนอินสแตนซ์ที่ใช้งานอยู่และการเริ่มต้นแบบ Cold Start

ทดสอบฟังก์ชันคิวงาน

ในกรณีส่วนใหญ่ โปรแกรมจำลอง Cloud Functions เป็นวิธีที่ดีที่สุดในการทดสอบฟังก์ชันคิวงาน ดูวิธีติดตั้งเครื่องมือในแอปสำหรับการจำลองฟังก์ชันคิวงานได้ในเอกสารประกอบชุดโปรแกรมจำลอง

นอกจากนี้ ฟังก์ชันคิวงานยังแสดงเป็นฟังก์ชัน HTTP อย่างง่ายใน Firebase Local Emulator Suite คุณสามารถทดสอบฟังก์ชันงานที่จำลองได้โดยส่งคำขอ HTTP POST พร้อมเพย์โหลดข้อมูล JSON ดังนี้

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

ทำให้ฟังก์ชันคิวงานใช้งานได้

ทำให้ฟังก์ชันคิวงานใช้งานได้โดยใช้ Firebase CLI ดังนี้

$ firebase deploy --only functions:backupapod

เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราคำขอและการลองใหม่) ที่ระบุไว้ในซอร์สโค้ด

หากพบข้อผิดพลาดเกี่ยวกับสิทธิ์เมื่อทำให้ฟังก์ชันใช้งานได้ ให้ตรวจสอบว่าได้กำหนด บทบาท IAM ที่เหมาะสมให้กับผู้ใช้ที่เรียกใช้คำสั่งการทำให้ใช้งานได้แล้ว

จัดคิวฟังก์ชันคิวงาน

คุณสามารถจัดคิวฟังก์ชันคิวงานใน Cloud Tasks จากสภาพแวดล้อมเซิร์ฟเวอร์ที่เชื่อถือได้ เช่น Cloud Functions for Firebase โดยใช้ Firebase Admin SDK สำหรับ Node.js หรือไลบรารี Google Cloud สำหรับ Python หากคุณยังไม่เคยใช้ Admin SDK โปรดดู หัวข้อเพิ่ม Firebase ลงในเซิร์ฟเวอร์เพื่อเริ่มต้นใช้งาน

โฟลว์ทั่วไปจะสร้างงานใหม่ จัดคิวงานใน Cloud Tasks และตั้งค่าการกำหนดค่าสำหรับงาน

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(
            schedule_time=schedule_time,
            dispatch_deadline_seconds=dispatch_deadline_seconds,
            uri=target_uri,
        )
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • โค้ดตัวอย่างพยายามกระจายการดำเนินการของงานโดยเชื่อมโยงการหน่วงเวลา N นาทีสำหรับงานที่ N ซึ่งหมายความว่าเป็นการทริกเกอร์งานประมาณ 1 งาน/นาที โปรดทราบว่าคุณยังใช้ scheduleTime (Node.js) หรือ schedule_time (Python) ได้ด้วยหากต้องการให้ Cloud Tasks ทริกเกอร์งานในเวลาที่เฉพาะเจาะจง

  • โค้ดตัวอย่างจะกำหนดระยะเวลาสูงสุดที่ Cloud Tasksจะรอ ให้งานเสร็จสมบูรณ์ Cloud Tasks จะลองใหม่ งานตามการกำหนดค่าการลองใหม่ ของคิวหรือจนกว่าจะถึงกำหนดเวลา ในตัวอย่างนี้ คิวได้รับการกำหนดค่าให้ลองใหม่งานสูงสุด 5 ครั้ง แต่ระบบจะยกเลิกงานโดยอัตโนมัติหากกระบวนการทั้งหมด (รวมถึงการลองใหม่) ใช้เวลานานกว่า 5 นาที

การแก้ปัญหา

เปิดการบันทึก Cloud Tasks

บันทึกจาก Cloud Tasks มีข้อมูลการวินิจฉัยที่เป็นประโยชน์ เช่น สถานะของคำขอที่เชื่อมโยงกับงาน โดยค่าเริ่มต้น ระบบจะปิดบันทึกจาก Cloud Tasks เนื่องจากบันทึกอาจมีปริมาณมากและอาจสร้างขึ้นในโปรเจ็กต์ของคุณ เราขอแนะนำให้คุณเปิดบันทึกการแก้ไขข้อบกพร่องขณะที่คุณกำลังพัฒนาและแก้ไขข้อบกพร่องของฟังก์ชันคิวงาน ดูหัวข้อ การเปิดการบันทึก

สิทธิ์ IAM

คุณอาจเห็น PERMISSION DENIED ข้อผิดพลาดเมื่อจัดคิวงานหรือเมื่อ Cloud Tasks พยายามเรียกใช้ฟังก์ชันคิวงาน ตรวจสอบว่าโปรเจ็กต์ของคุณมีการผูกมัด IAM ต่อไปนี้

  • ข้อมูลประจำตัวที่ใช้จัดคิวงานไปยัง Cloud Tasks ต้องมี cloudtasks.tasks.create สิทธิ์ IAM

    ในตัวอย่างนี้ ข้อมูลประจำตัวดังกล่าวคือApp Engineบัญชีบริการเริ่มต้น

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • ข้อมูลประจำตัวที่ใช้จัดคิวงานไปยัง Cloud Tasks ต้องมีสิทธิ์ ใช้บัญชีบริการที่เชื่อมโยงกับงานใน Cloud Tasks

    ในตัวอย่างนี้ ข้อมูลประจำตัวดังกล่าวคือApp Engineบัญชีบริการเริ่มต้น

ดูวิธีการเพิ่มบัญชีบริการเริ่มต้นเป็นผู้ใช้บัญชีบริการเริ่มต้นได้ในเอกสารประกอบ Cloud IAM ของ GoogleApp EngineApp Engine

  • ข้อมูลประจำตัวที่ใช้ทริกเกอร์ฟังก์ชันคิวงานต้องมีสิทธิ์ cloudfunctions.functions.invoke

    ในตัวอย่างนี้ ข้อมูลประจำตัวดังกล่าวคือApp Engineบัญชีบริการเริ่มต้น

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker