จัดคิวฟังก์ชันด้วย Cloud Tasks


ฟังก์ชันของคิวงานใช้ประโยชน์จาก Google งานระบบคลาวด์ เพื่อช่วยให้แอปของคุณใช้เวลานาน ต้องใช้ทรัพยากรมาก หรือจำกัดแบนด์วิดท์ งานต่างๆ แบบไม่พร้อมกันภายนอกโฟลว์หลักของแอปพลิเคชัน

เช่น สมมติว่าคุณต้องการสร้างข้อมูลสำรองชุดรูปภาพขนาดใหญ่ ซึ่งปัจจุบันโฮสต์อยู่ใน API โดยมีขีดจำกัดอัตราคำขอ หากต้องการเป็น ผู้บริโภคที่รับผิดชอบของ API ดังกล่าว คุณจะต้องเคารพขีดจำกัดอัตราคำขอ นอกจากนี้ งานที่ใช้เวลานานนี้อาจเสี่ยงต่อการล้มเหลวเนื่องจากหมดเวลา ขีดจำกัดของหน่วยความจำ

เพื่อลดความซับซ้อนนี้ คุณสามารถเขียนฟังก์ชันคิวงานที่ตั้งค่าพื้นฐาน ตัวเลือกงาน เช่น scheduleTime และ dispatchDeadline แล้วมอบหมาย ปิดไปยังคิวใน Cloud Tasks งานระบบคลาวด์ มีการออกแบบสภาพแวดล้อม โดยเฉพาะ เพื่อให้มั่นใจว่าการควบคุมความคับคั่งนั้นมีประสิทธิผล ลองกำหนดนโยบายอีกครั้งสำหรับการดำเนินการประเภทนี้

Firebase SDK for Cloud Functions for Firebase v3.20.1 ขึ้นไปทำงานร่วมกัน มี Firebase Admin SDK เวอร์ชัน 10.2.0 ขึ้นไปเพื่อรองรับฟังก์ชันคิวงาน

การใช้ฟังก์ชันคิวงานกับ Firebase อาจส่งผลให้มีการเรียกเก็บเงินสำหรับ กำลังประมวลผลงานระบบคลาวด์ โปรดดู ราคาของ Cloud Tasks เพื่อดูข้อมูลเพิ่มเติม

สร้างฟังก์ชันคิวงาน

หากต้องการใช้ฟังก์ชันคิวงาน ให้ทำตามขั้นตอนต่อไปนี้

  1. เขียนฟังก์ชันคิวงานโดยใช้ Firebase SDK สำหรับ Cloud Functions
  2. ทดสอบฟังก์ชันโดยทริกเกอร์ด้วยคำขอ HTTP
  3. ทำให้ฟังก์ชันใช้งานได้ด้วย Firebase CLI เมื่อทำให้งานของคุณใช้งานได้ เป็นครั้งแรก CLI จะสร้าง คิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราและลองใหม่) ที่ระบุไว้ในซอร์สโค้ด
  4. เพิ่มงานลงในคิวงานที่สร้างขึ้นใหม่ ส่งผ่านพารามิเตอร์เพื่อตั้งค่า กำหนดการดำเนินการหากจำเป็น คุณสามารถทำได้โดยเขียนโค้ด โดยใช้ Admin SDK และทำให้ใช้งานกับ Cloud Functions for Firebase ได้

เขียนฟังก์ชันของคิวงาน

ตัวอย่างโค้ดในส่วนนี้อิงตามแอปที่ตั้งค่า บริการที่สำรองข้อมูลรูปภาพทั้งหมดจากองค์การนาซา ภาพประจำวันของดาราศาสตร์ ในการเริ่มต้น ให้นำเข้าโมดูลที่จำเป็น:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

ใช้ onTaskDispatched หรือ on_task_dispatched สำหรับฟังก์ชันของคิวงาน เมื่อเขียนฟังก์ชันคิวงาน คุณสามารถตั้งค่าการลองอีกครั้งต่อคิวและการกำหนดค่าการจำกัดอัตรา

กำหนดค่าฟังก์ชันคิวงาน

ฟังก์ชันคิวงานมาพร้อมกับชุดการตั้งค่าการกำหนดค่าที่มีประสิทธิภาพ เพื่อควบคุมขีดจำกัดอัตราคำขอและการทำงานซ้ำของคิวงานอย่างละเอียด

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: ระบบจะดำเนินการแต่ละงานในคิวงานโดยอัตโนมัติ ลองใหม่ถึง 5 ครั้ง การดำเนินการนี้จะช่วยลดข้อผิดพลาดชั่วคราว เช่น เครือข่าย หรือบริการขัดข้องชั่วคราวของบริการภายนอกที่จำเป็น

  • retryConfig.minBackoffSeconds=60: แต่ละงานลองใหม่อย่างน้อย 60 วินาที นอกเหนือจากความพยายามแต่ละครั้ง ทำให้มีบัฟเฟอร์ขนาดใหญ่ระหว่างการพยายามแต่ละครั้ง เราจึงไม่ต้องรีบร้อนในการพยายามลองใหม่ทั้ง 5 ครั้งเร็วเกินไป

  • rateLimits.maxConcurrentDispatch=6: โดยส่งงานมากที่สุด 6 งานในเวลา ตามเวลาที่กำหนด วิธีนี้ช่วยให้มั่นใจว่าจะมีคำขอที่ส่งอย่างต่อเนื่องไปยังผู้ที่เกี่ยวข้อง และช่วยลดจำนวนอินสแตนซ์ที่ใช้งานอยู่และ Cold Start

ทดสอบฟังก์ชันของคิวงาน

ระบบจะแสดงฟังก์ชันคิวงานในชุดโปรแกรมจำลองภายในของ Firebase ฟังก์ชัน HTTP คุณทดสอบฟังก์ชันงานที่จำลองได้โดยส่ง HTTP POST คำขอที่มีเพย์โหลดข้อมูล JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

ทำให้ฟังก์ชันคิวงานใช้งานได้

ทำให้ฟังก์ชันคิวงานใช้งานได้โดยใช้ Firebase CLI ดังนี้

$ firebase deploy --only functions:backupapod

เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้าง คิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราแล้วลองอีกครั้ง) ที่ระบุไว้ในซอร์สโค้ดของคุณ

หากคุณพบข้อผิดพลาดด้านสิทธิ์เมื่อทำให้ฟังก์ชันใช้งานได้ โปรดตรวจสอบว่า บทบาท IAM ที่เหมาะสม ได้รับการกำหนดให้กับผู้ใช้ที่เรียกใช้คำสั่งทำให้ใช้งานได้

จัดคิวฟังก์ชันของคิวงาน

ฟังก์ชันของคิวงานจะอยู่ในคิวของ Cloud Tasks ได้จาก สภาพแวดล้อมของเซิร์ฟเวอร์ เช่น Cloud Functions for Firebase ที่ใช้ Firebase Admin SDK สำหรับ ไลบรารี Node.js หรือ Google Cloud สำหรับ Python ถ้าคุณเพิ่งเริ่มใช้ Admin SDK โปรดดูที่ เพิ่ม Firebase ไปยังเซิร์ฟเวอร์เพื่อเริ่มต้นใช้งาน

ขั้นตอนทั่วไปเป็นการสร้างงานใหม่ และเพิ่มคิวงานลงในคิว งานระบบคลาวด์ และกำหนดการกำหนดค่าสำหรับงานดังนี้

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • โค้ดตัวอย่างจะพยายามกระจายการดำเนินการของ งานด้วยการเชื่อมโยงความล่าช้าของนาทีที่ N กับงานที่ N ช่วงเวลานี้ แปลเป็นทริกเกอร์ ~ 1 งาน/นาที โปรดทราบว่าคุณยังสามารถ scheduleTime (Node.js) หรือ schedule_time (Python) หากต้องการ Cloud Tasks เพื่อทริกเกอร์งานในเวลาที่เฉพาะเจาะจง

  • โค้ดตัวอย่างจะกำหนดระยะเวลาสูงสุด Cloud Tasks จะรอ เพื่อให้งานเสร็จสมบูรณ์ งานระบบคลาวด์จะลองอีกครั้ง งานหลังจากการลองใหม่ การกำหนดค่าของคิว หรือจนกว่าจะถึงกำหนดเวลานี้ ในตัวอย่างนี้ คิวจะได้รับการกำหนดค่าให้ลองทำงานอีกครั้งได้สูงสุด 5 ครั้ง แต่งาน ยกเลิกโดยอัตโนมัติหากทั้งกระบวนการ (รวมถึงการพยายามลองอีกครั้ง) ใช้เวลานานกว่า 5 นาที

ดึงข้อมูลและรวม URI เป้าหมาย

เนื่องด้วยวิธีที่ Cloud Tasks สร้างโทเค็นการตรวจสอบสิทธิ์เพื่อตรวจสอบสิทธิ์ ไปยังฟังก์ชันของคิวงานที่จำเป็น คุณต้องระบุ URL ของ Cloud Run ของฟังก์ชันเมื่อจัดคิวงาน พ ขอแนะนำให้คุณเรียกข้อมูล URL สำหรับฟังก์ชันโดยใช้โปรแกรมดังนี้ ดังที่แสดงด้านล่าง

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

การแก้ปัญหา

เปิดการบันทึก Cloud Tasks

บันทึกจาก Cloud Tasks มีข้อมูลการวินิจฉัยที่มีประโยชน์ เช่น สถานะคำขอที่เชื่อมโยงกับงาน โดยค่าเริ่มต้น บันทึกจาก ระบบปิด Cloud Tasks เนื่องจากบันทึกจำนวนมากที่ทำได้ ที่อาจสร้างขึ้นในโปรเจ็กต์ของคุณได้ เราขอแนะนำให้คุณเปิดบันทึกการแก้ไขข้อบกพร่อง ขณะที่คุณกำลังพัฒนาและแก้ไขข้อบกพร่องของฟังก์ชันคิวงาน โปรดดู กำลังเปิด Logging

สิทธิ์ IAM

คุณอาจเห็นข้อผิดพลาด PERMISSION DENIED รายการเมื่อจัดคิวงานหรือเมื่อ Cloud Tasks จะพยายามเรียกใช้ฟังก์ชันคิวงาน ตรวจสอบว่า โปรเจ็กต์มีการเชื่อมโยง IAM ต่อไปนี้

  • ข้อมูลประจำตัวที่ใช้จัดคิวงานให้ตรงกับความต้องการของ Cloud Tasks สิทธิ์ IAM cloudtasks.tasks.create รายการ

    ในตัวอย่างนี้คือบัญชีบริการ App Engine เริ่มต้น

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • ข้อมูลประจำตัวที่ใช้จัดคิวงานไปยัง Cloud Tasks จะต้องมีสิทธิ์ เพื่อใช้บัญชีบริการที่เชื่อมโยงกับงานใน Cloud Tasks

    ในตัวอย่างนี้คือบัญชีบริการ App Engine เริ่มต้น

ดูเอกสารประกอบ Google Cloud IAM เพื่อดูวิธีการเพิ่มบัญชีบริการเริ่มต้นของ App Engine ในฐานะผู้ใช้บัญชีบริการเริ่มต้นของ App Engine

  • ข้อมูลประจำตัวที่ใช้ในการทริกเกอร์ความต้องการฟังก์ชันคิวงาน สิทธิ์cloudfunctions.functions.invoke

    ในตัวอย่างนี้คือบัญชีบริการ App Engine เริ่มต้น

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker