Yêu cầu các chức năng với Cloud Tasks


Chức năng xếp hàng tác vụ tận dụng Google Cloud Tasks để giúp ứng dụng của bạn chạy không đồng bộ các tác vụ tốn thời gian, tốn nhiều tài nguyên hoặc bị giới hạn băng thông, bên ngoài luồng ứng dụng chính của bạn.

Ví dụ: hãy tưởng tượng rằng bạn muốn tạo bản sao lưu của một tập hợp lớn các tệp hình ảnh hiện được lưu trữ trên API có giới hạn tốc độ. Để trở thành người tiêu dùng có trách nhiệm với API đó, bạn cần tôn trọng giới hạn tỷ lệ của họ. Ngoài ra, loại công việc kéo dài này có thể dễ bị lỗi do hết thời gian chờ và giới hạn bộ nhớ.

Để giảm thiểu sự phức tạp này, bạn có thể viết một hàm xếp hàng tác vụ đặt các tùy chọn tác vụ cơ bản như scheduleTimedispatchDeadline , sau đó chuyển hàm này sang hàng đợi trong Nhiệm vụ trên đám mây. Môi trường Nhiệm vụ trên đám mây được thiết kế đặc biệt để đảm bảo các chính sách thử lại và kiểm soát tắc nghẽn hiệu quả cho các loại hoạt động này.

SDK Firebase dành cho Chức năng đám mây dành cho Firebase v3.20.1 trở lên tương tác với SDK quản trị Firebase v10.2.0 trở lên để hỗ trợ các chức năng xếp hàng nhiệm vụ.

Việc sử dụng chức năng xếp hàng tác vụ với Firebase có thể dẫn đến tính phí xử lý Nhiệm vụ trên đám mây. Xem giá Nhiệm vụ trên đám mây để biết thêm thông tin.

Tạo chức năng xếp hàng nhiệm vụ

Để sử dụng các chức năng xếp hàng tác vụ, hãy làm theo quy trình làm việc sau:

  1. Viết chức năng xếp hàng tác vụ bằng SDK Firebase cho Chức năng đám mây.
  2. Kiểm tra chức năng của bạn bằng cách kích hoạt nó bằng yêu cầu HTTP.
  3. Triển khai chức năng của bạn với Firebase CLI. Khi triển khai chức năng hàng đợi nhiệm vụ của bạn lần đầu tiên, CLI sẽ tạo hàng đợi nhiệm vụ trong Nhiệm vụ trên đám mây với các tùy chọn (giới hạn tốc độ và thử lại) được chỉ định trong mã nguồn của bạn.
  4. Thêm nhiệm vụ vào hàng nhiệm vụ mới được tạo, chuyển các tham số để thiết lập lịch thực hiện nếu cần. Bạn có thể đạt được điều này bằng cách viết mã bằng SDK quản trị và triển khai mã đó lên Cloud Functions cho Firebase.

Viết chức năng xếp hàng nhiệm vụ

Các mẫu mã trong phần này dựa trên một ứng dụng thiết lập dịch vụ sao lưu tất cả hình ảnh từ Bức ảnh thiên văn trong ngày của NASA. Để bắt đầu, hãy nhập các mô-đun cần thiết:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Sử dụng onTaskDispatched hoặc on_task_dispatched cho các chức năng xếp hàng nhiệm vụ. Khi viết chức năng xếp hàng tác vụ, bạn có thể đặt cấu hình thử lại trên mỗi hàng đợi và giới hạn tốc độ.

Cấu hình chức năng hàng đợi nhiệm vụ

Các chức năng hàng đợi tác vụ đi kèm với một bộ cài đặt cấu hình mạnh mẽ để kiểm soát chính xác giới hạn tốc độ và hành vi thử lại của hàng đợi tác vụ:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5 : Mỗi tác vụ trong hàng tác vụ được tự động thử lại tối đa 5 lần. Điều này giúp giảm thiểu các lỗi nhất thời như lỗi mạng hoặc gián đoạn dịch vụ tạm thời của một dịch vụ phụ thuộc bên ngoài.

  • retryConfig.minBackoffSeconds=60 : Mỗi tác vụ được thử lại cách nhau ít nhất 60 giây sau mỗi lần thử. Điều này cung cấp một khoảng đệm lớn giữa mỗi lần thử để chúng tôi không vội sử dụng hết 5 lần thử lại quá nhanh.

  • rateLimits.maxConcurrentDispatch=6 : Tối đa 6 nhiệm vụ được gửi đi tại một thời điểm nhất định. Điều này giúp đảm bảo luồng yêu cầu ổn định đến chức năng cơ bản và giúp giảm số lượng phiên bản đang hoạt động và khởi động nguội.

Kiểm tra chức năng hàng đợi nhiệm vụ

Các hàm hàng đợi tác vụ trong Bộ mô phỏng cục bộ Firebase được hiển thị dưới dạng các hàm HTTP đơn giản. Bạn có thể kiểm tra chức năng tác vụ được mô phỏng bằng cách gửi yêu cầu HTTP POST với tải trọng dữ liệu JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Triển khai các chức năng xếp hàng nhiệm vụ

Triển khai chức năng xếp hàng tác vụ bằng Firebase CLI:

$ firebase deploy --only functions:backupapod

Khi triển khai chức năng xếp hàng nhiệm vụ lần đầu tiên, CLI sẽ tạo hàng đợi nhiệm vụ trong Nhiệm vụ trên đám mây với các tùy chọn (giới hạn tốc độ và thử lại) được chỉ định trong mã nguồn của bạn.

Nếu bạn gặp phải lỗi về quyền khi triển khai các chức năng, hãy đảm bảo rằng vai trò IAM thích hợp được gán cho người dùng đang chạy lệnh triển khai.

Enqueue chức năng xếp hàng nhiệm vụ

Các chức năng hàng đợi nhiệm vụ có thể được đưa vào hàng đợi trong Nhiệm vụ đám mây từ môi trường máy chủ đáng tin cậy như Cloud Functions cho Firebase bằng cách sử dụng SDK quản trị Firebase cho Node.js hoặc thư viện Google Cloud cho Python. Nếu bạn chưa quen với SDK quản trị, hãy xem Thêm Firebase vào máy chủ để bắt đầu.

Một quy trình điển hình sẽ tạo một tác vụ mới, đưa tác vụ đó vào hàng đợi trong Cloud Tasks và đặt cấu hình cho tác vụ:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Mã mẫu cố gắng dàn trải việc thực thi các nhiệm vụ bằng cách liên kết độ trễ N phút cho nhiệm vụ thứ N. Điều này có nghĩa là kích hoạt ~ 1 tác vụ/phút. Lưu ý rằng bạn cũng có thể sử dụng scheduleTime (Node.js) hoặc schedule_time (Python) nếu bạn muốn Cloud Tasks kích hoạt một tác vụ vào một thời điểm cụ thể.

  • Mã mẫu đặt lượng thời gian tối đa mà Nhiệm vụ trên đám mây sẽ đợi một tác vụ hoàn thành. Cloud Tasks sẽ thử lại tác vụ theo cấu hình thử lại của hàng đợi hoặc cho đến khi đạt đến thời hạn này. Trong mẫu, hàng đợi được cấu hình để thử lại tác vụ tối đa 5 lần, nhưng tác vụ sẽ tự động bị hủy nếu toàn bộ quá trình (bao gồm cả các lần thử lại) mất hơn 5 phút.

Truy xuất và bao gồm URI đích

Do cách Cloud Tasks tạo mã thông báo xác thực để xác thực các yêu cầu đối với các hàm hàng đợi tác vụ cơ bản, bạn phải chỉ định URL Cloud Run của hàm khi sắp xếp các tác vụ. Chúng tôi khuyên bạn nên truy xuất URL cho hàm của mình theo cách lập trình như minh họa bên dưới:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Xử lý sự cố

Bật tính năng ghi nhật ký Nhiệm vụ trên đám mây

Nhật ký từ Nhiệm vụ trên đám mây chứa thông tin chẩn đoán hữu ích như trạng thái của yêu cầu liên quan đến nhiệm vụ. Theo mặc định, nhật ký từ Cloud Tasks bị tắt do khối lượng nhật ký lớn mà nó có thể tạo ra trong dự án của bạn. Chúng tôi khuyên bạn nên bật nhật ký gỡ lỗi trong khi đang tích cực phát triển và gỡ lỗi các chức năng hàng đợi nhiệm vụ của mình. Xem Bật ghi nhật ký .

Quyền IAM

Bạn có thể thấy lỗi PERMISSION DENIED khi sắp xếp các tác vụ vào hàng đợi hoặc khi Cloud Task cố gắng gọi các chức năng xếp hàng tác vụ của bạn. Đảm bảo rằng dự án của bạn có các ràng buộc IAM sau:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer

Xem tài liệu Google Cloud IAM để biết hướng dẫn về cách thêm tài khoản dịch vụ mặc định của App Engine với tư cách là người dùng tài khoản dịch vụ mặc định của App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker