ฟังก์ชันคิวงานใช้ประโยชน์จาก Google Cloud Tasks เพื่อช่วยให้แอปทำงานที่ใช้เวลามาก ต้องใช้ทรัพยากรมาก หรือจำกัดแบนด์วิดท์ได้แบบไม่พร้อมกันนอกขั้นตอนการดำเนินการหลักของแอปพลิเคชัน
เช่น สมมติว่าคุณต้องการสร้างข้อมูลสำรองของไฟล์ภาพจำนวนมากที่โฮสต์อยู่ใน API ที่มีขีดจำกัดอัตราคำขอในปัจจุบัน หากต้องการเป็นผู้บริโภคที่มีความรับผิดชอบของ API ดังกล่าว คุณต้องเคารพขีดจำกัดอัตราคำขอ นอกจากนี้ งานที่ใช้เวลานานนี้อาจเสี่ยงต่อการล้มเหลวเนื่องจากหมดเวลาและขีดจำกัดของหน่วยความจำ
หากต้องการลดความซับซ้อนนี้ คุณสามารถเขียนฟังก์ชันคิวงานที่ตั้งค่าตัวเลือกงานพื้นฐาน เช่น scheduleTime
และ dispatchDeadline
จากนั้นส่งฟังก์ชันไปยังคิวใน Cloud Tasks สภาพแวดล้อมของ Cloud Tasks ออกแบบมาโดยเฉพาะเพื่อให้มั่นใจว่านโยบายในการควบคุมความคับคั่งนั้นมีประสิทธิภาพ และลองใช้นโยบายซ้ำสำหรับการดำเนินการประเภทนี้
Firebase SDK สำหรับฟังก์ชันระบบคลาวด์สำหรับ Firebase v3.20.1 ขึ้นไปทำงานร่วมกับ Firebase Admin SDK เวอร์ชัน 10.2.0 ขึ้นไปเพื่อรองรับฟังก์ชันคิวงาน
การใช้ฟังก์ชันคิวงานกับ Firebase อาจส่งผลให้มีการเรียกเก็บเงินสำหรับการประมวลผล Cloud Tasks โปรดดูข้อมูลเพิ่มเติมที่ราคาของ Cloud Tasks
สร้างฟังก์ชันคิวงาน
หากต้องการใช้ฟังก์ชันคิวงาน ให้ทำตามขั้นตอนต่อไปนี้
- เขียนฟังก์ชันคิวงานโดยใช้ Firebase SDK สำหรับ Cloud Functions
- ทดสอบฟังก์ชันโดยทริกเกอร์ด้วยคำขอ HTTP
- ทำให้ฟังก์ชันใช้งานได้ด้วย Firebase CLI เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราและการลองใหม่) ที่ระบุไว้ในซอร์สโค้ด
- เพิ่มงานลงในคิวงานที่สร้างขึ้นใหม่ แล้วส่งต่อพารามิเตอร์เพื่อตั้งค่ากำหนดการดำเนินการ หากจําเป็น โดยเขียนโค้ดด้วย Admin SDK และทำให้ใช้งานได้กับ Cloud Functions for Firebase
เขียนฟังก์ชันของคิวงาน
ตัวอย่างโค้ดในส่วนนี้อิงจากแอปที่ตั้งค่าบริการที่สำรองข้อมูลทั้งหมดจากภาพดาราศาสตร์ประจำวันของนาซา ในการเริ่มต้น ให้นำเข้าโมดูลที่จำเป็น:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
ใช้ onTaskDispatched
หรือ on_task_dispatched
สำหรับฟังก์ชันคิวงาน เมื่อเขียนฟังก์ชันคิวงาน
คุณสามารถตั้งค่าการลองอีกครั้งต่อคิวและการกำหนดค่าการจำกัดอัตรา
กำหนดค่าฟังก์ชันคิวงาน
ฟังก์ชันคิวงานมาพร้อมกับชุดการกำหนดค่าที่มีประสิทธิภาพเพื่อควบคุมขีดจำกัดอัตราคำขอและการทำงานซ้ำของคิวงานได้อย่างแม่นยำ ดังนี้
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: ระบบจะลองทำแต่ละงานในคิวงานซ้ำโดยอัตโนมัติสูงสุด 5 ครั้ง ซึ่งจะช่วยลดข้อผิดพลาดชั่วคราว เช่น ข้อผิดพลาดเกี่ยวกับเครือข่ายหรือการหยุดชะงักของบริการชั่วคราวของบริการภายนอกที่ต้องพึ่งพาretryConfig.minBackoffSeconds=60
: ระบบจะพยายามทำงานแต่ละงานซ้ำอย่างน้อย 60 วินาทีหลังจากพยายามแต่ละครั้ง ซึ่งจะให้พื้นที่ขนาดใหญ่ระหว่างการพยายามแต่ละครั้ง เพื่อให้เราไม่ต้องพยายามลองใหม่ทั้ง 5 ครั้งเร็วเกินไปrateLimits.maxConcurrentDispatch=6
: โดยจะส่งงานพร้อมกันมากที่สุด 6 งาน วิธีนี้ช่วยให้มั่นใจว่าจะมีคำขอสตรีมอย่างต่อเนื่องไปยังฟังก์ชันที่สำคัญ และช่วยลดจำนวนอินสแตนซ์ที่ใช้งานอยู่และ Cold Start
ทดสอบฟังก์ชันของคิวงาน
ฟังก์ชันคิวงานใน Firebase Local Emulator Suite จะแสดงเป็นฟังก์ชัน HTTP แบบง่าย คุณทดสอบฟังก์ชันงานที่จำลองได้โดยส่งคำขอ HTTP POST ที่มีเพย์โหลดข้อมูล JSON ดังนี้
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
ทำให้ฟังก์ชันคิวงานใช้งานได้
ทำให้ฟังก์ชันคิวงานใช้งานได้โดยใช้ Firebase CLI ดังนี้
$ firebase deploy --only functions:backupapod
เมื่อทำให้ฟังก์ชันคิวงานใช้งานได้เป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks พร้อมตัวเลือก (การจำกัดอัตราและการลองใหม่) ที่ระบุไว้ในซอร์สโค้ด
หากคุณพบข้อผิดพลาดด้านสิทธิ์เมื่อทำให้ฟังก์ชันใช้งานได้ โปรดตรวจสอบว่าได้มอบหมายบทบาท IAM ที่เหมาะสมให้กับผู้ใช้ที่เรียกใช้คำสั่งการติดตั้งใช้งาน
จัดคิวฟังก์ชันของคิวงาน
ฟังก์ชันของคิวงานสามารถจัดคิวได้ใน Cloud Tasks จากสภาพแวดล้อมเซิร์ฟเวอร์ที่เชื่อถือได้ เช่น Cloud Functions for Firebase โดยใช้ Firebase Admin SDK สำหรับไลบรารี Node.js หรือ Google Cloud สำหรับ Python หากยังไม่คุ้นเคยกับ Admin SDK โปรดดูเพิ่ม Firebase ในเซิร์ฟเวอร์เพื่อเริ่มต้นใช้งาน
ขั้นตอนทั่วไปจะสร้างงานใหม่ จัดคิวงานใน Cloud Tasks และตั้งค่าการกำหนดค่าสำหรับงานนั้น
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
โค้ดตัวอย่างจะพยายามกระจายการดำเนินการของงานโดยเชื่อมโยงการหน่วงเวลานาทีที่ N กับงาน N ซึ่งจะแปลเป็นทริกเกอร์ประมาณ 1 งาน/นาที โปรดทราบว่าคุณยังใช้
scheduleTime
(Node.js) หรือschedule_time
(Python) ได้ด้วย หากต้องการให้ Cloud Tasks ทริกเกอร์งานในเวลาที่เฉพาะเจาะจงโค้ดตัวอย่างจะกำหนดระยะเวลาสูงสุดที่ Cloud Tasks จะรอให้งานเสร็จสิ้น Cloud Tasks จะลองทำงานอีกครั้งหลังจากกำหนดค่าการลองอีกครั้งของคิวหรือจนกว่าจะถึงกำหนดเวลานี้ ในตัวอย่าง คิวได้รับการกำหนดค่าให้ลองทำงานใหม่ได้สูงสุด 5 ครั้ง แต่งานจะถูกยกเลิกโดยอัตโนมัติหากกระบวนการทั้งหมด (รวมถึงการพยายามลองอีกครั้ง) ใช้เวลานานกว่า 5 นาที
ดึงข้อมูลและรวม URI เป้าหมาย
คุณต้องระบุ URL ของ Cloud Run ของฟังก์ชันเมื่อจัดคิวงาน เนื่องด้วยวิธีที่ Cloud Tasks สร้างโทเค็นการตรวจสอบสิทธิ์เพื่อตรวจสอบสิทธิ์คำขอไปยังฟังก์ชันคิวงานที่สำคัญ เราขอแนะนำให้คุณเรียกข้อมูล URL สำหรับฟังก์ชันโดยใช้โปรแกรมดังที่แสดงด้านล่าง
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
การแก้ปัญหา
เปิดการบันทึก Cloud Tasks
บันทึกจาก Cloud Tasks มีข้อมูลการวินิจฉัยที่เป็นประโยชน์ เช่น สถานะของคำขอที่เชื่อมโยงกับงาน โดยค่าเริ่มต้น ระบบจะปิดบันทึกจาก Cloud Tasks เนื่องจากมีบันทึกจำนวนมากที่อาจสร้างในโปรเจ็กต์ของคุณได้ เราขอแนะนำให้คุณเปิดบันทึกการแก้ไขข้อบกพร่องขณะพัฒนาและแก้ไขข้อบกพร่องของฟังก์ชันคิวงาน ดูการเปิดการบันทึก
สิทธิ์ IAM
คุณอาจเห็นข้อผิดพลาด PERMISSION DENIED
รายการเมื่อจัดคิวงานหรือเมื่อ Cloud Tasks พยายามเรียกใช้ฟังก์ชันคิวงาน ตรวจสอบว่าโปรเจ็กต์มีการเชื่อมโยง IAM ต่อไปนี้
ข้อมูลประจำตัวที่ใช้จัดคิวงานไปยัง Cloud Tasks ต้องมีสิทธิ์ IAM ของ
cloudtasks.tasks.create
ในตัวอย่างนี้คือบัญชีบริการ App Engine เริ่มต้น
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
ข้อมูลประจำตัวที่ใช้จัดคิวงานไปยัง Cloud Tasks ต้องมีสิทธิ์ใช้บัญชีบริการที่เชื่อมโยงกับงานใน Cloud Tasks
ในตัวอย่างนี้คือบัญชีบริการ App Engine เริ่มต้น
โปรดดูเอกสารประกอบ Google Cloud IAM เกี่ยวกับวิธีการเพิ่มบัญชีบริการเริ่มต้นของ App Engine เป็นผู้ใช้ของบัญชีบริการเริ่มต้นของ App Engine
ข้อมูลประจำตัวที่ใช้ในการทริกเกอร์ฟังก์ชันคิวงานต้องมีสิทธิ์
cloudfunctions.functions.invoke
ในตัวอย่างนี้คือบัญชีบริการ App Engine เริ่มต้น
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker