ฟังก์ชันคิวงานใช้ประโยชน์จาก Google Cloud Tasks เพื่อช่วยให้แอปเรียกใช้งานที่ใช้เวลานาน ใช้ทรัพยากรมาก หรือมีแบนด์วิดท์จำกัด แบบไม่พร้อมกันนอกโฟลว์แอปพลิเคชันหลัก
ตัวอย่างเช่น สมมติว่าคุณต้องการสร้างข้อมูลสำรองของไฟล์รูปภาพจำนวนมากที่โฮสต์อยู่ใน API ที่มีการจำกัดอัตราการเรียก คุณต้องปฏิบัติตามการจำกัดอัตราการเรียกของ API นั้นๆ เพื่อให้เป็นผู้ใช้ API อย่างมีความรับผิดชอบ นอกจากนี้ งานที่ใช้เวลานานประเภทนี้ยังอาจเกิดข้อผิดพลาดได้เนื่องจากหมดเวลาและหน่วยความจำเต็ม
คุณสามารถเขียนฟังก์ชันคิวงานที่ตั้งค่าตัวเลือกงานพื้นฐาน
งาน เช่น scheduleTime และ dispatchDeadline แล้วส่ง
ฟังก์ชันไปยังคิวใน Cloud Tasks เพื่อลดความซับซ้อนนี้ Cloud Tasks สภาพแวดล้อมได้รับการออกแบบมาโดยเฉพาะเพื่อให้มั่นใจว่ามีการควบคุมการรับส่งข้อมูลที่มากเกินไปและนโยบายการลองใหม่ที่มีประสิทธิภาพสำหรับการดำเนินการประเภทนี้
Firebase SDK สำหรับ Cloud Functions for Firebase v3.20.1 ขึ้นไปทำงานร่วมกับ Firebase Admin SDK v10.2.0 ขึ้นไปเพื่อรองรับฟังก์ชันคิวงาน
การใช้ฟังก์ชันคิวงานกับ Firebase อาจทำให้เกิดค่าใช้จ่ายในการประมวลผล Cloud Tasks ดูข้อมูลเพิ่มเติมได้ที่ Cloud Tasksราคา
สร้างฟังก์ชันคิวงาน
หากต้องการใช้ฟังก์ชันคิวงาน ให้ทำตามเวิร์กโฟลว์นี้
- เขียนฟังก์ชันคิวงานโดยใช้ Firebase SDK สำหรับ Cloud Functions
- ทดสอบฟังก์ชันโดยทริกเกอร์ด้วยคำขอ HTTP
- ทำให้ฟังก์ชันใช้งานได้ด้วย Firebase CLI เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราคำขอและการลองใหม่) ที่ระบุไว้ในซอร์สโค้ด
- เพิ่มงานลงในคิวงานที่สร้างขึ้นใหม่ โดยส่งพารามิเตอร์เพื่อตั้งค่ากำหนดการดำเนินการหากจำเป็น คุณสามารถทำได้โดยเขียนโค้ด โดยใช้ Admin SDK และทำให้โค้ดใช้งานได้ใน Cloud Functions for Firebase
เขียนฟังก์ชันคิวงาน
ตัวอย่างโค้ดในส่วนนี้อิงตามแอปที่ตั้ง ค่าบริการที่สำรองรูปภาพทั้งหมดจากรูปภาพดาราศาสตร์ประจำวันของ NASA's Astronomy Picture of the Day หากต้องการเริ่มต้นใช้งาน ให้นำเข้าโมดูลที่จำเป็นดังนี้
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");
// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
ใช้ onTaskDispatched
หรือ on_task_dispatched
สำหรับฟังก์ชันคิวงาน เมื่อเขียนฟังก์ชันคิวงาน คุณสามารถตั้งค่าการลองใหม่และการจำกัดอัตราการเรียกต่อคิวได้
กำหนดค่าฟังก์ชันคิวงาน
ฟังก์ชันคิวงานมาพร้อมกับการตั้งค่าการกำหนดค่าที่มีประสิทธิภาพเพื่อควบคุมการจำกัดอัตราการเรียกและลักษณะการทำงานของการลองใหม่ของคิวงานได้อย่างแม่นยำ
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(
retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5: ระบบจะลองใหม่แต่ละงานในคิวงานโดยอัตโนมัติสูงสุด 5 ครั้ง ซึ่งจะช่วยลดข้อผิดพลาดชั่วคราว เช่น ข้อผิดพลาดของเครือข่ายหรือการหยุดชะงักของบริการชั่วคราวของบริการภายนอกที่ขึ้นอยู่กับบริการนั้นretryConfig.minBackoffSeconds=60: ระบบจะลองใหม่แต่ละงานโดยเว้นระยะห่างอย่างน้อย 60 วินาทีระหว่างการลองแต่ละครั้ง ซึ่งจะช่วยให้มีบัฟเฟอร์ขนาดใหญ่ระหว่างการลองแต่ละครั้ง เพื่อไม่ให้เราเร่งการลองใหม่ 5 ครั้งเร็วเกินไปrateLimits.maxConcurrentDispatch=6: ระบบจะส่งงานพร้อมกันสูงสุด 6 งานในเวลาที่กำหนด ซึ่งจะช่วยให้มั่นใจได้ว่ามีการส่งคำขอไปยังฟังก์ชันพื้นฐานอย่างต่อเนื่อง และช่วยลดจำนวนอินสแตนซ์ที่ใช้งานอยู่และการเริ่มต้นแบบ Cold Start
ทดสอบฟังก์ชันคิวงาน
ในกรณีส่วนใหญ่ โปรแกรมจำลอง Cloud Functions เป็นวิธีที่ดีที่สุดในการทดสอบฟังก์ชันคิวงาน ดูวิธีติดตั้งเครื่องมือในแอปสำหรับการจำลองฟังก์ชันคิวงานได้ในเอกสารประกอบชุดโปรแกรมจำลอง
นอกจากนี้ ฟังก์ชันคิวงานยังแสดงเป็นฟังก์ชัน HTTP อย่างง่ายใน Firebase Local Emulator Suite คุณสามารถทดสอบฟังก์ชันงานที่จำลองได้โดยส่งคำขอ HTTP POST พร้อมเพย์โหลดข้อมูล JSON ดังนี้
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
ทำให้ฟังก์ชันคิวงานใช้งานได้
ทำให้ฟังก์ชันคิวงานใช้งานได้โดยใช้ Firebase CLI ดังนี้
$ firebase deploy --only functions:backupapod
เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราคำขอและการลองใหม่) ที่ระบุไว้ในซอร์สโค้ด
หากพบข้อผิดพลาดเกี่ยวกับสิทธิ์เมื่อทำให้ฟังก์ชันใช้งานได้ ให้ตรวจสอบว่าได้กำหนด บทบาท IAM ที่เหมาะสมให้กับผู้ใช้ที่เรียกใช้คำสั่งการทำให้ใช้งานได้แล้ว
จัดคิวฟังก์ชันคิวงาน
คุณสามารถจัดคิวฟังก์ชันคิวงานใน Cloud Tasks จากสภาพแวดล้อมเซิร์ฟเวอร์ที่เชื่อถือได้ เช่น Cloud Functions for Firebase โดยใช้ Firebase Admin SDK สำหรับ Node.js หรือไลบรารี Google Cloud สำหรับ Python หากคุณยังไม่เคยใช้ Admin SDK โปรดดู หัวข้อเพิ่ม Firebase ลงในเซิร์ฟเวอร์เพื่อเริ่มต้นใช้งาน
โฟลว์ทั่วไปจะสร้างงานใหม่ จัดคิวงานใน Cloud Tasks และตั้งค่าการกำหนดค่าสำหรับงาน
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(
schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri,
)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
โค้ดตัวอย่างพยายามกระจายการดำเนินการของงานโดยเชื่อมโยงการหน่วงเวลา N นาทีสำหรับงานที่ N ซึ่งหมายความว่าเป็นการทริกเกอร์งานประมาณ 1 งาน/นาที โปรดทราบว่าคุณยังใช้
scheduleTime(Node.js) หรือschedule_time(Python) ได้ด้วยหากต้องการให้ Cloud Tasks ทริกเกอร์งานในเวลาที่เฉพาะเจาะจงโค้ดตัวอย่างจะกำหนดระยะเวลาสูงสุดที่ Cloud Tasksจะรอ ให้งานเสร็จสมบูรณ์ Cloud Tasks จะลองใหม่ งานตามการกำหนดค่าการลองใหม่ ของคิวหรือจนกว่าจะถึงกำหนดเวลา ในตัวอย่างนี้ คิวได้รับการกำหนดค่าให้ลองใหม่งานสูงสุด 5 ครั้ง แต่ระบบจะยกเลิกงานโดยอัตโนมัติหากกระบวนการทั้งหมด (รวมถึงการลองใหม่) ใช้เวลานานกว่า 5 นาที
การแก้ปัญหา
เปิดการบันทึก Cloud Tasks
บันทึกจาก Cloud Tasks มีข้อมูลการวินิจฉัยที่เป็นประโยชน์ เช่น สถานะของคำขอที่เชื่อมโยงกับงาน โดยค่าเริ่มต้น ระบบจะปิดบันทึกจาก Cloud Tasks เนื่องจากบันทึกอาจมีปริมาณมากและอาจสร้างขึ้นในโปรเจ็กต์ของคุณ เราขอแนะนำให้คุณเปิดบันทึกการแก้ไขข้อบกพร่องขณะที่คุณกำลังพัฒนาและแก้ไขข้อบกพร่องของฟังก์ชันคิวงาน ดูหัวข้อ การเปิดการบันทึก
สิทธิ์ IAM
คุณอาจเห็น PERMISSION DENIED ข้อผิดพลาดเมื่อจัดคิวงานหรือเมื่อ
Cloud Tasks พยายามเรียกใช้ฟังก์ชันคิวงาน ตรวจสอบว่าโปรเจ็กต์ของคุณมีการผูกมัด IAM ต่อไปนี้
ข้อมูลประจำตัวที่ใช้จัดคิวงานไปยัง Cloud Tasks ต้องมี
cloudtasks.tasks.createสิทธิ์ IAMในตัวอย่างนี้ ข้อมูลประจำตัวดังกล่าวคือApp Engineบัญชีบริการเริ่มต้น
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
ข้อมูลประจำตัวที่ใช้จัดคิวงานไปยัง Cloud Tasks ต้องมีสิทธิ์ ใช้บัญชีบริการที่เชื่อมโยงกับงานใน Cloud Tasks
ในตัวอย่างนี้ ข้อมูลประจำตัวดังกล่าวคือApp Engineบัญชีบริการเริ่มต้น
ดูวิธีการเพิ่มบัญชีบริการเริ่มต้นเป็นผู้ใช้บัญชีบริการเริ่มต้นได้ในเอกสารประกอบ Cloud IAM ของ GoogleApp EngineApp Engine
ข้อมูลประจำตัวที่ใช้ทริกเกอร์ฟังก์ชันคิวงานต้องมีสิทธิ์
cloudfunctions.functions.invokeในตัวอย่างนี้ ข้อมูลประจำตัวดังกล่าวคือApp Engineบัญชีบริการเริ่มต้น
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker