Thêm các hàm vào hàng đợi bằng Cloud Tasks


Chức năng hàng đợi tác vụ tận dụng Google Cloud Tasks để giúp ứng dụng của bạn chạy tốn thời gian, tốn nhiều tài nguyên hoặc có giới hạn băng thông các nhiệm vụ một cách không đồng bộ, bên ngoài luồng ứng dụng chính của bạn.

Ví dụ: giả sử bạn muốn tạo bản sao lưu cho một tập hợp lớn hình ảnh các tệp hiện được lưu trữ trên API có giới hạn tốc độ. Để trở thành người tiêu dùng có trách nhiệm của API đó, bạn cần tôn trọng giới hạn số lượng yêu cầu của họ. Hơn nữa, loại công việc chạy trong thời gian dài này có thể dễ bị lỗi do hết thời gian chờ và giới hạn bộ nhớ.

Để giảm thiểu sự phức tạp này, bạn có thể viết một hàm hàng đợi tác vụ đặt cơ bản các tuỳ chọn tác vụ như scheduleTimedispatchDeadline, sau đó chuyển đến vào hàng đợi trong Cloud Tasks. Cloud Tasks được thiết kế riêng để đảm bảo kiểm soát tình trạng tắc nghẽn hiệu quả và thử lại chính sách cho các loại thao tác này.

Firebase SDK for Cloud Functions cho Firebase phiên bản 3.20.1 trở lên có khả năng tương tác cao hơn với SDK quản trị Firebase phiên bản 10.2.0 trở lên để hỗ trợ các chức năng hàng đợi tác vụ.

Việc sử dụng các hàm hàng đợi tác vụ với Firebase có thể dẫn đến việc tính phí Đang xử lý Cloud Tasks. Xem Giá của Cloud Tasks để biết thêm thông tin.

Tạo hàm cho hàng đợi tác vụ

Để sử dụng các hàm của hàng đợi tác vụ, hãy làm theo quy trình sau:

  1. Viết một hàm hàng đợi công việc bằng cách sử dụng Firebase SDK cho Cloud Functions.
  2. Kiểm tra hàm của bạn bằng cách kích hoạt hàm đó bằng một yêu cầu HTTP.
  3. Triển khai hàm bằng Giao diện dòng lệnh (CLI) của Firebase. Khi triển khai tác vụ lần đầu tiên, CLI sẽ tạo một hàng đợi công việc trong Cloud Tasks với các tùy chọn (giới hạn tốc độ và thử lại) được chỉ định trong mã nguồn của bạn.
  4. Thêm công việc vào hàng đợi công việc mới tạo, chuyển các tham số để thiết lập lịch thực thi nếu cần. Bạn có thể thực hiện điều này bằng cách viết mã bằng cách sử dụng SDK dành cho quản trị viên và triển khai nó trên Cloud Functions cho Firebase.

Ghi hàm hàng đợi tác vụ

Mã mẫu trong phần này dựa trên một ứng dụng đặt sao lưu tất cả hình ảnh từ Ảnh thiên văn học trong ngày. Để bắt đầu, hãy nhập các mô-đun bắt buộc:

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

Sử dụng onTaskDispatched hoặc on_task_dispatched cho các hàm hàng đợi tác vụ. Khi viết hàm hàng đợi tác vụ bạn có thể đặt cấu hình thử lại cho mỗi hàng đợi và định cấu hình giới hạn số lượng yêu cầu.

Định cấu hình các hàm của hàng đợi tác vụ

Các chức năng của hàng đợi tác vụ đi kèm với một tập hợp các cài đặt cấu hình mạnh mẽ để kiểm soát chính xác giới hạn số lượng yêu cầu và thử lại hành vi của hàng đợi tác vụ:

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: Mỗi việc cần làm trong hàng đợi việc cần làm sẽ tự động đã thử lại tối đa 5 lần. Việc này giúp giảm thiểu các lỗi tạm thời, chẳng hạn như lỗi mạng lỗi hoặc gián đoạn dịch vụ tạm thời của một dịch vụ phụ thuộc, bên ngoài.

  • retryConfig.minBackoffSeconds=60: Mỗi việc cần làm sẽ được thử lại sau ít nhất 60 giây ngoài mỗi lần thử. Thao tác này cung cấp một vùng đệm lớn giữa mỗi lần thử vì vậy, chúng ta không phải vội vã thử hết 5 lần thử lại quá nhanh.

  • rateLimits.maxConcurrentDispatch=6: Tối đa 6 công việc được gửi đi tại một thời gian nhất định. Điều này giúp đảm bảo luồng yêu cầu ổn định đến đồng thời giúp giảm số lượng thực thể đang hoạt động cũng như lượt khởi động nguội.

Kiểm thử hàm của hàng đợi tác vụ

Các hàm hàng đợi tác vụ trong Bộ công cụ mô phỏng cục bộ của Firebase được thể hiện dưới dạng đơn giản Các hàm HTTP. Bạn có thể kiểm thử một chức năng tác vụ được mô phỏng bằng cách gửi một yêu cầu POST qua HTTP yêu cầu có tải trọng dữ liệu JSON:

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

Triển khai các hàm của hàng đợi tác vụ

Triển khai chức năng hàng đợi tác vụ bằng cách sử dụng Giao diện dòng lệnh (CLI) của Firebase:

$ firebase deploy --only functions:backupapod

Trong lần đầu triển khai chức năng hàng đợi tác vụ, CLI sẽ tạo một hàng đợi tác vụ trong Cloud Tasks với các lựa chọn (giới hạn tốc độ và thử lại) được chỉ định trong mã nguồn của mình.

Nếu bạn gặp phải lỗi về quyền khi triển khai các chức năng, hãy đảm bảo rằng các vai trò quản lý danh tính và quyền truy cập (IAM) thích hợp được chỉ định cho người dùng chạy lệnh triển khai.

Thêm các hàm hàng đợi tác vụ vào hàng đợi

Các chức năng của hàng đợi tác vụ có thể được đưa vào hàng đợi từ môi trường máy chủ như Cloud Functions cho Firebase bằng cách dùng SDK Quản trị của Firebase cho Thư viện Node.js hoặc Google Cloud dành cho Python. Nếu bạn mới sử dụng SDK dành cho quản trị viên, hãy xem Thêm Firebase vào máy chủ để bắt đầu.

Một quy trình thông thường sẽ tạo ra một tác vụ mới, thêm tác vụ đó vào hàng đợi Cloud Tasks và thiết lập cấu hình của nhiệm vụ:

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • Mã mẫu cố gắng mở rộng việc thực thi của nhiệm vụ bằng cách liên kết độ trễ ở phút thứ N cho nhiệm vụ thứ N. Chiến dịch này đồng nghĩa với việc kích hoạt khoảng 1 nhiệm vụ/phút. Xin lưu ý rằng bạn cũng có thể sử dụng scheduleTime (Node.js) hoặc schedule_time (Python) nếu bạn muốn Cloud Tasks để kích hoạt một nhiệm vụ vào một thời điểm cụ thể.

  • Mã mẫu đặt khoảng thời gian tối đa Cloud Tasks sẽ đợi để hoàn thành một nhiệm vụ. Cloud Tasks sẽ thử lại tác vụ sau lần thử lại cấu hình của hàng đợi hoặc cho đến khi đạt đến thời hạn này. Trong mẫu, hàng đợi được định cấu hình để thử lại tác vụ tối đa 5 lần, nhưng tác vụ sẽ tự động bị huỷ nếu toàn bộ quá trình (bao gồm cả các lần thử lại) mất hơn 5 phút.

Truy xuất và thêm URI mục tiêu

Do cách Cloud Tasks tạo mã thông báo xác thực để xác thực yêu cầu đến các hàm hàng đợi tác vụ cơ bản, bạn phải chỉ định URL chạy trên đám mây của hàm khi thêm các tác vụ vào hàng đợi. T4 bạn nên truy xuất URL cho hàm của mình theo phương thức lập trình như được minh hoạ dưới đây:

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

Khắc phục sự cố

Bật tính năng ghi nhật ký trong Cloud Tasks

Nhật ký từ Cloud Tasks chứa thông tin chẩn đoán hữu ích như trạng thái của yêu cầu được liên kết với một công việc. Theo mặc định, nhật ký từ Tính năng Cloud Tasks đã bị tắt do lưu lượng truy cập lớn trong nhật ký có thể tạo ra trong dự án của bạn. Bạn nên bật nhật ký gỡ lỗi trong khi bạn vẫn đang tích cực phát triển và gỡ lỗi các chức năng hàng đợi tác vụ. Xem Bật ghi nhật ký.

Quyền quản lý danh tính và quyền truy cập (IAM)

Bạn có thể gặp lỗi PERMISSION DENIED khi thêm việc cần làm vào hàng đợi hoặc khi Cloud Tasks sẽ cố gắng gọi các chức năng trong hàng đợi nhiệm vụ của bạn. Đảm bảo rằng các thuộc tính dự án có các mối liên kết IAM sau đây:

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • Danh tính dùng để đưa công việc vào hàng đợi vào Cloud Tasks cần được cấp quyền để sử dụng tài khoản dịch vụ được liên kết với một tác vụ trong Cloud Tasks.

    Trong mẫu, đây là tài khoản dịch vụ mặc định của App Engine.

Xem tài liệu về Google Cloud IAM để xem hướng dẫn về cách thêm tài khoản dịch vụ mặc định của App Engine với tư cách là người dùng tài khoản dịch vụ mặc định của App Engine.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker