Các hàm hàng đợi tác vụ tận dụng Cloud Tasks của Google để giúp ứng dụng của bạn chạy các tác vụ tốn thời gian, tốn nhiều tài nguyên hoặc bị giới hạn băng thông một cách không đồng bộ, bên ngoài luồng ứng dụng chính.
Ví dụ: giả sử bạn muốn tạo bản sao lưu cho một nhóm lớn các tệp hình ảnh hiện đang được lưu trữ trên một API có giới hạn về tốc độ. Để trở thành một người dùng có trách nhiệm của API đó, bạn cần phải tuân thủ giới hạn tỷ lệ của họ. Ngoài ra, loại công việc chạy trong thời gian dài này có thể dễ gặp lỗi do hết thời gian chờ và giới hạn bộ nhớ.
Để giảm bớt sự phức tạp này, bạn có thể viết một hàm hàng đợi tác vụ đặt các lựa chọn cơ bản cho tác vụ như scheduleTime
và dispatchDeadline
, sau đó chuyển hàm này sang một hàng đợi trong Cloud Tasks. Môi trường Cloud Tasks được thiết kế riêng để đảm bảo các chính sách kiểm soát tình trạng tắc nghẽn và thử lại hiệu quả cho các loại hoạt động này.
Firebase SDK cho Cloud Functions for Firebase phiên bản 3.20.1 trở lên tương tác với Firebase Admin SDK phiên bản 10.2.0 trở lên để hỗ trợ các chức năng hàng đợi tác vụ.
Việc sử dụng các hàm hàng đợi tác vụ với Firebase có thể dẫn đến các khoản phí xử lý Cloud Tasks. Hãy xem phần Giá của Cloud Tasks để biết thêm thông tin.
Tạo các hàm hàng đợi tác vụ
Để sử dụng các hàm hàng đợi tác vụ, hãy làm theo quy trình sau:
- Viết một hàm hàng đợi tác vụ bằng cách sử dụng SDK Firebase cho Cloud Functions.
- Kiểm thử hàm bằng cách kích hoạt hàm đó bằng một yêu cầu HTTP.
- Triển khai hàm bằng CLI Firebase. Khi triển khai hàm hàng đợi tác vụ lần đầu tiên, CLI sẽ tạo một hàng đợi tác vụ trong Cloud Tasks với các lựa chọn (giới hạn tốc độ và thử lại) được chỉ định trong mã nguồn của bạn.
- Thêm các tác vụ vào hàng đợi tác vụ mới tạo, truyền các tham số để thiết lập lịch thực thi nếu cần. Bạn có thể thực hiện việc này bằng cách viết mã bằng Admin SDK và triển khai mã đó đến Cloud Functions for Firebase.
Viết các hàm hàng đợi tác vụ
Các mẫu mã trong phần này dựa trên một ứng dụng thiết lập dịch vụ sao lưu tất cả hình ảnh từ Astronomy Picture of the Day (Ảnh thiên văn học trong ngày) của NASA. Để bắt đầu, hãy nhập các mô-đun bắt buộc:
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Sử dụng onTaskDispatched
hoặc on_task_dispatched
cho các hàm hàng đợi tác vụ. Khi viết một hàm hàng đợi tác vụ, bạn có thể đặt cấu hình giới hạn tốc độ và thử lại cho mỗi hàng đợi.
Định cấu hình các hàm hàng đợi tác vụ
Các hàm hàng đợi tác vụ đi kèm với một bộ chế độ cài đặt cấu hình mạnh mẽ để kiểm soát chính xác giới hạn tốc độ và hành vi thử lại của hàng đợi tác vụ:
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: Mỗi tác vụ trong hàng đợi tác vụ sẽ tự động được thử lại tối đa 5 lần. Điều này giúp giảm thiểu các lỗi tạm thời như lỗi mạng hoặc sự cố gián đoạn dịch vụ tạm thời của một dịch vụ bên ngoài, phụ thuộc.retryConfig.minBackoffSeconds=60
: Mỗi tác vụ được thử lại ít nhất 60 giây sau mỗi lần thử. Điều này giúp tạo ra một khoảng đệm lớn giữa mỗi lần thử lại để chúng ta không vội vàng sử dụng hết 5 lần thử lại quá nhanh.rateLimits.maxConcurrentDispatch=6
: Tại một thời điểm nhất định, có tối đa 6 tác vụ được gửi đi. Điều này giúp đảm bảo luồng yêu cầu ổn định cho hàm cơ bản, đồng thời giúp giảm số lượng phiên bản đang hoạt động và lượt khởi động nguội.
Kiểm thử các hàm hàng đợi tác vụ
Trong hầu hết các trường hợp, trình mô phỏng Cloud Functions là cách tốt nhất để kiểm thử các hàm hàng đợi tác vụ. Hãy xem tài liệu về Emulator Suite để tìm hiểu cách đo lường ứng dụng của bạn cho hoạt động mô phỏng các chức năng hàng đợi tác vụ.
Ngoài ra, các hàm task queue functions_sdk được hiển thị dưới dạng các hàm HTTP đơn giản trong Firebase Local Emulator Suite. Bạn có thể kiểm thử một hàm tác vụ được mô phỏng bằng cách gửi yêu cầu HTTP POST kèm theo tải trọng dữ liệu JSON:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Triển khai các hàm hàng đợi tác vụ
Triển khai hàm hàng đợi tác vụ bằng CLI Firebase:
$ firebase deploy --only functions:backupapod
Khi triển khai hàm hàng đợi tác vụ lần đầu tiên, CLI sẽ tạo một hàng đợi tác vụ trong Cloud Tasks với các lựa chọn (giới hạn tốc độ và thử lại) được chỉ định trong mã nguồn của bạn.
Nếu bạn gặp lỗi về quyền khi triển khai các hàm, hãy đảm bảo rằng các vai trò IAM thích hợp được chỉ định cho người dùng chạy các lệnh triển khai.
Thêm các hàm hàng đợi tác vụ vào hàng đợi
Các hàm hàng đợi tác vụ có thể được xếp hàng đợi trong Cloud Tasks từ một môi trường máy chủ đáng tin cậy như Cloud Functions for Firebase bằng cách sử dụng Firebase Admin SDK cho Node.js hoặc thư viện Google Cloud cho Python. Nếu bạn mới sử dụng Admin SDK, hãy xem phần Thêm Firebase vào một máy chủ để bắt đầu.
Một quy trình thông thường sẽ tạo một tác vụ mới, đưa tác vụ đó vào hàng đợi trong Cloud Tasks và đặt cấu hình cho tác vụ:
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
Mã mẫu cố gắng phân tán việc thực thi các tác vụ bằng cách liên kết độ trễ N phút cho tác vụ thứ N. Điều này có nghĩa là bạn sẽ kích hoạt khoảng 1 tác vụ/phút. Xin lưu ý rằng bạn cũng có thể sử dụng
scheduleTime
(Node.js) hoặcschedule_time
(Python) nếu muốn Cloud Tasks kích hoạt một tác vụ tại một thời điểm cụ thể.Mã mẫu đặt khoảng thời gian tối đa mà Cloud Tasks sẽ đợi một tác vụ hoàn tất. Cloud Tasks sẽ thử lại tác vụ theo cấu hình thử lại của hàng đợi hoặc cho đến khi đạt đến thời hạn này. Trong mẫu, hàng đợi được định cấu hình để thử lại tác vụ tối đa 5 lần, nhưng tác vụ sẽ tự động bị huỷ nếu toàn bộ quy trình (bao gồm cả các lần thử lại) mất hơn 5 phút.
Truy xuất và thêm URI mục tiêu
Do cách Cloud Tasks tạo mã thông báo xác thực để xác thực các yêu cầu đối với các hàm hàng đợi tác vụ cơ bản, bạn phải chỉ định URL Cloud Run của hàm khi đưa các tác vụ vào hàng đợi. Bạn nên truy xuất URL cho hàm của mình bằng cách lập trình như minh hoạ dưới đây:
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Khắc phục sự cố
Bật tính năng ghi nhật ký Cloud Tasks
Nhật ký từ Cloud Tasks chứa thông tin chẩn đoán hữu ích, chẳng hạn như trạng thái của yêu cầu liên kết với một tác vụ. Theo mặc định, nhật ký từ Cloud Tasks sẽ bị tắt do số lượng lớn nhật ký mà nó có thể tạo trên dự án của bạn. Bạn nên bật nhật ký gỡ lỗi trong khi đang tích cực phát triển và gỡ lỗi các hàm hàng đợi tác vụ. Xem phần Bật tính năng ghi nhật ký.
Quyền IAM
Bạn có thể thấy lỗi PERMISSION DENIED
khi xếp hàng các tác vụ hoặc khi Cloud Tasks cố gắng gọi các hàm hàng đợi tác vụ của bạn. Đảm bảo rằng dự án của bạn có các liên kết IAM sau:
Danh tính dùng để xếp hàng các tác vụ vào Cloud Tasks cần có quyền IAM
cloudtasks.tasks.create
.Trong mẫu này, đây là tài khoản dịch vụ mặc định App Engine
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
Danh tính dùng để xếp hàng các tác vụ vào Cloud Tasks cần có quyền sử dụng tài khoản dịch vụ được liên kết với một tác vụ trong Cloud Tasks.
Trong mẫu này, đây là tài khoản dịch vụ mặc định App Engine.
Hãy xem tài liệu về IAM của Google Cloud để biết hướng dẫn về cách thêm tài khoản dịch vụ mặc định App Engine làm người dùng của tài khoản dịch vụ mặc định App Engine.
Danh tính dùng để kích hoạt hàm hàng đợi tác vụ cần có quyền
cloudfunctions.functions.invoke
.Trong mẫu này, đây là tài khoản dịch vụ mặc định App Engine
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker