ฟังก์ชันคิวงานใช้ประโยชน์จาก Google Cloud Tasks เพื่อช่วยให้แอปของคุณรันงานที่ใช้เวลานาน ใช้ทรัพยากรมาก หรืองานที่มีแบนด์วิดท์จำกัดแบบอะซิงโครนัส นอกขั้นตอนหลักของแอปพลิเคชันของคุณ
ตัวอย่างเช่น ลองจินตนาการว่าคุณต้องการสร้างการสำรองข้อมูลของไฟล์ภาพชุดใหญ่ที่ปัจจุบันโฮสต์อยู่บน API โดยมีขีดจำกัดอัตรา ในการเป็นผู้บริโภคที่มีความรับผิดชอบของ API นั้น คุณจะต้องเคารพขีดจำกัดอัตราของพวกเขา นอกจากนี้ งานที่ต้องใช้เวลานานประเภทนี้อาจเสี่ยงต่อความล้มเหลวเนื่องจากการหมดเวลาและขีดจำกัดของหน่วยความจำ
หากต้องการลดความซับซ้อนนี้ คุณสามารถเขียนฟังก์ชันคิวงานที่ตั้งค่าตัวเลือกงานพื้นฐาน เช่น scheduleTime
และ dispatchDeadline
จากนั้นมอบหมายฟังก์ชันให้กับคิวใน Cloud Tasks สภาพแวดล้อม Cloud Tasks ได้รับการออกแบบมาโดยเฉพาะเพื่อให้แน่ใจว่ามีการควบคุมความแออัดที่มีประสิทธิภาพและลองใช้นโยบายซ้ำสำหรับการดำเนินการประเภทนี้
Firebase SDK สำหรับฟังก์ชันคลาวด์สำหรับ Firebase v3.20.1 และสูงกว่าทำงานร่วมกับ Firebase Admin SDK v10.2.0 และสูงกว่าเพื่อรองรับฟังก์ชันคิวงาน
การใช้ฟังก์ชันคิวงานกับ Firebase อาจส่งผลให้มีการเรียกเก็บเงินสำหรับการประมวลผล Cloud Tasks ดู ราคา Cloud Tasks สำหรับข้อมูลเพิ่มเติม
สร้างฟังก์ชันคิวงาน
หากต้องการใช้ฟังก์ชันคิวงาน ให้ปฏิบัติตามขั้นตอนการทำงานนี้:
- เขียนฟังก์ชันคิวงานโดยใช้ Firebase SDK สำหรับฟังก์ชันคลาวด์
- ทดสอบฟังก์ชันของคุณโดยเรียกใช้ด้วยคำขอ HTTP
- ปรับใช้ฟังก์ชันของคุณด้วย Firebase CLI เมื่อปรับใช้ฟังก์ชันคิวงานของคุณเป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks โดยมีตัวเลือก (จำกัดอัตราและลองใหม่) ที่ระบุไว้ในซอร์สโค้ดของคุณ
- เพิ่มงานลงในคิวงานที่สร้างขึ้นใหม่ โดยส่งผ่านพารามิเตอร์เพื่อตั้งค่ากำหนดการดำเนินการหากจำเป็น คุณสามารถทำได้โดยการเขียนโค้ดโดยใช้ Admin SDK และปรับใช้กับ Cloud Functions for Firebase
เขียนฟังก์ชันคิวงาน
ตัวอย่างโค้ดในส่วนนี้อิงตามแอปที่ตั้งค่าบริการที่สำรองรูปภาพทั้งหมดจาก รูปภาพดาราศาสตร์ประจำวัน ของ NASA ในการเริ่มต้น ให้นำเข้าโมดูลที่จำเป็น:
โหนด js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
หลาม
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
ใช้ onTaskDispatched
หรือ on_task_dispatched
สำหรับฟังก์ชันคิวงาน เมื่อเขียนฟังก์ชันคิวงาน คุณสามารถตั้งค่าการลองใหม่ต่อคิวและการกำหนดค่าการจำกัดอัตราได้
กำหนดค่าฟังก์ชันคิวงาน
ฟังก์ชันคิวงานมาพร้อมกับชุดการตั้งค่าที่มีประสิทธิภาพเพื่อควบคุมขีดจำกัดอัตราอย่างแม่นยำและลองพฤติกรรมของคิวงานอีกครั้ง:
โหนด js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
หลาม
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: แต่ละงานในคิวงานจะถูกลองใหม่โดยอัตโนมัติสูงสุด 5 ครั้ง ซึ่งช่วยลดข้อผิดพลาดชั่วคราว เช่น ข้อผิดพลาดของเครือข่ายหรือการหยุดชะงักของบริการชั่วคราวของบริการภายนอกที่ต้องพึ่งพาretryConfig.minBackoffSeconds=60
: แต่ละงานจะถูกลองใหม่อย่างน้อย 60 วินาทีแยกจากความพยายามแต่ละครั้ง นี่เป็นบัฟเฟอร์ขนาดใหญ่ระหว่างความพยายามแต่ละครั้ง ดังนั้นเราจึงไม่รีบเร่งที่จะหมดความพยายามในการลองใหม่ทั้ง 5 ครั้งเร็วเกินไปrateLimits.maxConcurrentDispatch=6
: มีการจัดส่งงานได้สูงสุด 6 งานในเวลาที่กำหนด สิ่งนี้ช่วยให้มั่นใจได้ถึงการสตรีมคำขอไปยังฟังก์ชันพื้นฐานและช่วยลดจำนวนอินสแตนซ์ที่ใช้งานอยู่และ Cold Start
ทดสอบฟังก์ชันคิวงาน
ฟังก์ชันคิวงานใน Firebase Local Emulator Suite จะแสดงเป็นฟังก์ชัน HTTP แบบธรรมดา คุณสามารถทดสอบฟังก์ชันงานที่จำลองได้โดยการส่งคำขอ HTTP POST พร้อมด้วยเพย์โหลดข้อมูล JSON:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
ปรับใช้ฟังก์ชันคิวงาน
ปรับใช้ฟังก์ชันคิวงานโดยใช้ Firebase CLI:
$ firebase deploy --only functions:backupapod
เมื่อปรับใช้ฟังก์ชันคิวงานเป็นครั้งแรก CLI จะสร้างคิวงานใน Cloud Tasks โดยมีตัวเลือก (การจำกัดอัตราและลองอีกครั้ง) ที่ระบุไว้ในซอร์สโค้ดของคุณ
หากคุณพบข้อผิดพลาดในการอนุญาตเมื่อปรับใช้ฟังก์ชัน ตรวจสอบให้แน่ใจว่าได้กำหนด บทบาท IAM ที่เหมาะสมให้กับผู้ใช้ที่รันคำสั่งการปรับใช้
จัดคิวฟังก์ชันคิวงาน
ฟังก์ชันคิวงานสามารถจัดคิวไว้ใน Cloud Tasks จากสภาพแวดล้อมเซิร์ฟเวอร์ที่เชื่อถือได้ เช่น Cloud Functions สำหรับ Firebase โดยใช้ Firebase Admin SDK สำหรับ Node.js หรือไลบรารี Google Cloud สำหรับ Python หากคุณเพิ่งเริ่มใช้ Admin SDK โปรดดู เพิ่ม Firebase ไปยังเซิร์ฟเวอร์ เพื่อเริ่มต้น
ขั้นตอนทั่วไปจะสร้างงานใหม่ จัดคิวงานไว้ใน Cloud Tasks และตั้งค่าการกำหนดค่าสำหรับงาน ดังนี้
โหนด js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
หลาม
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
tasks_client = tasks_v2.CloudTasksClient()
task_queue = tasks_client.queue_path(params.PROJECT_ID.value, SupportedRegion.US_CENTRAL1,
"backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task = tasks_v2.Task(http_request={
"http_method": tasks_v2.HttpMethod.POST,
"url": target_uri,
"headers": {
"Content-type": "application/json"
},
"body": json.dumps(body).encode()
},
schedule_time=schedule_time)
tasks_client.create_task(parent=task_queue, task=task)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
โค้ดตัวอย่างพยายามกระจายการดำเนินงานโดยการเชื่อมโยงความล่าช้าของนาทีที่ N สำหรับงานที่ N ซึ่งแปลเป็นการทริกเกอร์ ~ 1 งาน/นาที โปรดทราบว่าคุณยังสามารถใช้
scheduleTime
(Node.js) หรือschedule_time
(Python) ได้ หากต้องการให้ Cloud Tasks ทริกเกอร์งานในเวลาที่กำหนดโค้ดตัวอย่างจะกำหนดระยะเวลาสูงสุดที่ Cloud Tasks จะรอให้งานเสร็จสิ้น Cloud Tasks จะพยายามงานอีกครั้งหลังจากลองกำหนดค่าคิวอีกครั้งหรือจนกว่าจะถึงกำหนดเวลานี้ ในตัวอย่าง คิวได้รับการกำหนดค่าให้ลองงานใหม่สูงสุด 5 ครั้ง แต่งานจะถูกยกเลิกโดยอัตโนมัติหากกระบวนการทั้งหมด (รวมถึงความพยายามลองใหม่) ใช้เวลามากกว่า 5 นาที
ดึงข้อมูลและรวม URI เป้าหมาย
เนื่องจากวิธีที่ Cloud Tasks สร้างโทเค็นการตรวจสอบสิทธิ์เพื่อตรวจสอบสิทธิ์คำขอไปยังฟังก์ชันคิวงานที่สำคัญ คุณต้องระบุ Cloud Run URL ของฟังก์ชันเมื่อจัดคิวงาน เราขอแนะนำให้คุณเรียกค้น URL สำหรับฟังก์ชันของคุณโดยทางโปรแกรมดังที่แสดงด้านล่าง:
โหนด js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
หลาม
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
การแก้ไขปัญหา
เปิดการบันทึก Cloud Tasks
บันทึกจาก Cloud Tasks มีข้อมูลการวินิจฉัยที่เป็นประโยชน์ เช่น สถานะของคำขอที่เกี่ยวข้องกับงาน ตามค่าเริ่มต้น บันทึกจาก Cloud Tasks จะถูกปิดเนื่องจากมีบันทึกจำนวนมากที่อาจสร้างได้ในโปรเจ็กต์ของคุณ เราขอแนะนำให้คุณเปิดบันทึกการแก้ไขข้อบกพร่องในขณะที่คุณกำลังพัฒนาและแก้ไขฟังก์ชันคิวงานของคุณ ดู การเปิดการบันทึก
สิทธิ์ IAM
คุณอาจเห็นข้อผิดพลาด PERMISSION DENIED
เมื่อจัดคิวงานหรือเมื่อ Cloud Tasks พยายามเรียกใช้ฟังก์ชันคิวงานของคุณ ตรวจสอบให้แน่ใจว่าโปรเจ็กต์ของคุณมีการเชื่อมโยง IAM ต่อไปนี้:
ข้อมูลระบุตัวตนที่ใช้ในการจัดคิวงานให้กับ Cloud Tasks จำเป็นต้องได้รับสิทธิ์
cloudtasks.tasks.create
IAMในตัวอย่างนี้ นี่คือ บัญชีบริการเริ่มต้นของ App Engine
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
ข้อมูลระบุตัวตนที่ใช้ในการจัดคิวงานให้กับ Cloud Tasks ต้องได้รับสิทธิ์เพื่อใช้บัญชีบริการที่เชื่อมโยงกับงานใน Cloud Tasks
ในตัวอย่างนี้ นี่คือ บัญชีบริการเริ่มต้นของ App Engine
ดู เอกสารประกอบของ Google Cloud IAM สำหรับคำแนะนำเกี่ยวกับวิธีเพิ่มบัญชีบริการเริ่มต้นของ App Engine เป็นผู้ใช้บัญชีบริการเริ่มต้นของ App Engine
ข้อมูลระบุตัวตนที่ใช้ในการทริกเกอร์ฟังก์ชันคิวงานต้องได้รับอนุญาตจาก
cloudfunctions.functions.invoke
ในตัวอย่างนี้ นี่คือ บัญชีบริการเริ่มต้นของ App Engine
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker