Thêm các hàm vào hàng đợi bằng Cloud Tasks

Các hàm hàng đợi tác vụ tận dụng Google Cloud Tasks để giúp ứng dụng của bạn chạy các tác vụ tốn thời gian, tốn nhiều tài nguyên hoặc bị giới hạn băng thông một cách không đồng bộ, bên ngoài quy trình ứng dụng chính.

Ví dụ: giả sử bạn muốn tạo bản sao lưu của một tập hợp lớn các tệp hình ảnh hiện được lưu trữ trên một API có giới hạn về tốc độ. Để trở thành người tiêu dùng có trách nhiệm đối với API đó, bạn cần tuân thủ các giới hạn về tốc độ của họ. Ngoài ra, loại công việc chạy trong thời gian dài này có thể dễ bị lỗi do thời gian chờ và giới hạn bộ nhớ.

Để giảm bớt sự phức tạp này, bạn có thể viết một hàm hàng đợi tác vụ đặt các tuỳ chọn tác vụ cơ bản như scheduleTime, và dispatchDeadline, sau đó chuyển hàm này sang một hàng đợi trong Cloud Tasks. Môi trường được thiết kế riêng để đảm bảo kiểm soát hiệu quả tình trạng tắc nghẽn và các chính sách thử lại cho những loại hoạt động này.Cloud Tasks

Firebase SDK cho Cloud Functions for Firebase phiên bản 3.20.1 trở lên tương tác với Firebase Admin SDK phiên bản 10.2.0 trở lên để hỗ trợ các hàm hàng đợi tác vụ.

Việc sử dụng các hàm hàng đợi tác vụ với Firebase có thể dẫn đến các khoản phí xử lý Cloud Tasks. Hãy xem Cloud Tasks giá để biết thêm thông tin.

Tạo các hàm hàng đợi tác vụ

Để sử dụng các hàm hàng đợi tác vụ, hãy làm theo quy trình công việc sau:

  1. Viết một hàm hàng đợi tác vụ bằng Firebase SDK cho Cloud Functions.
  2. Kiểm thử hàm bằng cách kích hoạt hàm đó bằng một yêu cầu HTTP.
  3. Triển khai hàm bằng Firebase CLI. Khi triển khai hàm hàng đợi tác vụ lần đầu tiên, CLI sẽ tạo một hàng đợi tác vụ trong Cloud Tasks với các tuỳ chọn (giới hạn về tốc độ và thử lại) được chỉ định trong mã nguồn của bạn.
  4. Thêm các tác vụ vào hàng đợi tác vụ mới tạo, truyền các tham số để thiết lập lịch thực thi nếu cần. Bạn có thể thực hiện việc này bằng cách viết mã bằng Admin SDK và triển khai mã đó vào Cloud Functions for Firebase.

Viết các hàm hàng đợi tác vụ

Các mẫu mã trong phần này dựa trên một ứng dụng thiết lập một dịch vụ sao lưu tất cả hình ảnh từ trang Thiên văn học của NASA về Hình ảnh trong ngày. Để bắt đầu, hãy nhập các mô-đun bắt buộc:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/tasks");
const {onRequest, HttpsError} = require("firebase-functions/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions");

// Dependencies for image backup.
const path = require("path");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Sử dụng onTaskDispatched hoặc on_task_dispatched cho các hàm hàng đợi tác vụ. Khi viết một hàm hàng đợi tác vụ, bạn có thể đặt cấu hình thử lại và giới hạn về tốc độ cho mỗi hàng đợi.

Định cấu hình các hàm hàng đợi tác vụ

Các hàm hàng đợi tác vụ đi kèm với một tập hợp mạnh mẽ các chế độ cài đặt cấu hình để kiểm soát chính xác giới hạn về tốc độ và hành vi thử lại của một hàng đợi tác vụ:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(
    retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
    rate_limits=RateLimits(max_concurrent_dispatches=10),
)
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Mỗi tác vụ trong hàng đợi tác vụ sẽ tự động được thử lại tối đa 5 lần. Điều này giúp giảm thiểu các lỗi tạm thời như lỗi mạng hoặc sự gián đoạn dịch vụ tạm thời của một dịch vụ bên ngoài, phụ thuộc.

  • retryConfig.minBackoffSeconds=60: Mỗi tác vụ được thử lại cách nhau ít nhất 60 giây giữa mỗi lần thử. Điều này cung cấp một vùng đệm lớn giữa mỗi lần thử để chúng ta không vội sử dụng hết 5 lần thử lại quá nhanh.

  • rateLimits.maxConcurrentDispatch=6: Tối đa 6 tác vụ được gửi tại một thời điểm nhất định. Điều này giúp đảm bảo một luồng yêu cầu ổn định cho hàm cơ bản và giúp giảm số lượng thực thể đang hoạt động và số lần khởi động nguội.

Kiểm thử các hàm hàng đợi tác vụ

Trong hầu hết các trường hợp, trình mô phỏng Cloud Functions là cách tốt nhất để kiểm thử các hàm hàng đợi tác vụ. Hãy xem tài liệu về Bộ công cụ mô phỏng để tìm hiểu cách đo lường ứng dụng của bạn để mô phỏng các hàm hàng đợi tác vụ.

Ngoài ra, các hàm hàng đợi tác vụ được hiển thị dưới dạng các hàm HTTP đơn giản trong Firebase Local Emulator Suite. Bạn có thể kiểm thử một hàm tác vụ được mô phỏng bằng cách gửi yêu cầu HTTP POST với tải trọng dữ liệu JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Triển khai các hàm hàng đợi tác vụ

Triển khai hàm hàng đợi tác vụ bằng Firebase CLI:

$ firebase deploy --only functions:backupapod

Khi triển khai hàm hàng đợi tác vụ lần đầu tiên, CLI sẽ tạo một hàng đợi tác vụ trong Cloud Tasks với các tuỳ chọn (giới hạn về tốc độ và thử lại) được chỉ định trong mã nguồn của bạn.

Nếu bạn gặp lỗi về quyền khi triển khai các hàm, hãy đảm bảo rằng các vai trò IAM thích hợp được chỉ định cho người dùng chạy các lệnh triển khai.

Đưa các hàm hàng đợi tác vụ vào hàng đợi

Các hàm hàng đợi tác vụ có thể được đưa vào hàng đợi trong Cloud Tasks từ một môi trường máy chủ đáng tin cậy như Cloud Functions for Firebase bằng Firebase Admin SDK cho Node.js hoặc các thư viện Google Cloud cho Python. Nếu bạn mới sử dụng Admin SDKs, hãy xem Thêm Firebase vào máy chủ để bắt đầu.

Một quy trình thông thường sẽ tạo một tác vụ mới, đưa tác vụ đó vào hàng đợi trong Cloud Tasks, và đặt cấu hình cho tác vụ đó:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(
            schedule_time=schedule_time,
            dispatch_deadline_seconds=dispatch_deadline_seconds,
            uri=target_uri,
        )
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Mã mẫu cố gắng phân tán việc thực thi các tác vụ bằng cách liên kết độ trễ N phút cho tác vụ thứ N. Điều này chuyển thành việc kích hoạt ~ 1 tác vụ/phút. Xin lưu ý rằng bạn cũng có thể sử dụng scheduleTime (Node.js) hoặc schedule_time (Python) nếu bạn muốn Cloud Tasks kích hoạt một tác vụ vào một thời điểm cụ thể.

  • Mã mẫu đặt khoảng thời gian tối đa mà Cloud Tasks sẽ chờ một tác vụ hoàn tất. Cloud Tasks sẽ thử lại tác vụ theo cấu hình thử lại của hàng đợi hoặc cho đến khi đạt đến thời hạn này. Trong mẫu, hàng đợi được định cấu hình để thử lại tác vụ tối đa 5 lần, nhưng tác vụ sẽ tự động bị huỷ nếu toàn bộ quy trình (bao gồm cả các lần thử lại) mất hơn 5 phút.

Khắc phục sự cố

Bật tính năng ghi nhật ký Cloud Tasks

Nhật ký từ Cloud Tasks chứa thông tin chẩn đoán hữu ích như trạng thái của yêu cầu được liên kết với một tác vụ. Theo mặc định, nhật ký từ Cloud Tasks sẽ tắt do khối lượng nhật ký lớn mà nhật ký này có thể tạo trên dự án của bạn. Bạn nên bật nhật ký gỡ lỗi khi đang tích cực phát triển và gỡ lỗi các hàm hàng đợi tác vụ. Xem phần Bật tính năng ghi nhật ký.

Quyền IAM

Bạn có thể thấy lỗi PERMISSION DENIED khi đưa các tác vụ vào hàng đợi hoặc khi Cloud Tasks cố gắng gọi các hàm hàng đợi tác vụ. Đảm bảo rằng dự án của bạn có các liên kết IAM sau:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Tài khoản nhận dạng được dùng để đưa các tác vụ vào hàng đợi trong Cloud Tasks cần có quyền sử dụng tài khoản dịch vụ được liên kết với một tác vụ trong Cloud Tasks.

    Trong mẫu, đây là App Engine tài khoản dịch vụ mặc định.

Hãy xem tài liệu về Cloud IAM của Google để biết hướng dẫn về cách thêm tài khoản dịch vụ mặc định làm người dùng của tài khoản dịch vụ mặc định.App EngineApp Engine

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker