Fungsi task queue memanfaatkan Google Cloud Tasks untuk membantu aplikasi Anda menjalankan tugas yang memakan waktu, membutuhkan banyak resource, atau dibatasi bandwidth secara asinkron, di luar alur aplikasi utama Anda.
Misalnya, bayangkan Anda ingin membuat cadangan dari banyak kumpulan file gambar yang saat ini dihosting di API dengan batas kapasitas. Untuk menjadi konsumen API yang bertanggung jawab, Anda perlu mematuhi batas kapasitasnya. Selain itu, pekerjaan jangka panjang semacam ini rentan mengalami kegagalan karena waktu tunggu habis dan batas memori.
Untuk mengurangi kompleksitas ini, Anda dapat menulis fungsi task queue yang menetapkan opsi tugas dasar seperti scheduleTime
, dan dispatchDeadline
, lalu melimpahkan fungsi tersebut ke antrean di Cloud Tasks. Lingkungan Cloud Tasks dirancang khusus untuk memastikan kontrol kemacetan dan kebijakan percobaan ulang yang efektif untuk jenis operasi ini.
Firebase SDK untuk Cloud Functions for Firebase v3.20.1 dan yang lebih tinggi memiliki interoperabilitas dengan Firebase Admin SDK v10.2.0 dan yang lebih tinggi untuk mendukung fungsi task queue.
Penggunaan fungsi task queue dengan Firebase dapat menyebabkan tagihan untuk pemrosesan Cloud Tasks. Lihat harga Cloud Tasks untuk informasi selengkapnya.
Membuat fungsi task queue
Untuk menggunakan fungsi task queue, ikuti alur kerja ini:
- Tulis fungsi task queue menggunakan Firebase SDK untuk Cloud Functions.
- Uji fungsi Anda dengan memicunya menggunakan permintaan HTTP.
- Deploy fungsi Anda dengan Firebase CLI. Saat men-deploy fungsi task queue untuk pertama kalinya, CLI akan membuat task queue di Cloud Tasks dengan opsi (pembatasan kapasitas dan percobaan ulang) yang ditentukan dalam kode sumber Anda.
- Tambahkan tugas ke task queue yang baru dibuat, dengan meneruskan parameter untuk menyiapkan jadwal eksekusi jika diperlukan. Anda dapat mencapainya dengan menulis kode menggunakan Admin SDK dan men-deploy-nya ke Cloud Functions for Firebase.
Menulis fungsi task queue
Contoh kode di bagian ini didasarkan pada aplikasi yang menyiapkan layanan yang mencadangkan semua gambar dari Astronomy Picture of the Day milik NASA: Untuk memulai, impor modul yang diperlukan:
Node.js
// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");
// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");
Python
# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion
# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession
Gunakan onTaskDispatched
atau on_task_dispatched
untuk fungsi task queue. Saat menulis fungsi task queue, Anda dapat menetapkan percobaan ulang per antrean dan konfigurasi pembatasan kapasitas.
Mengonfigurasi fungsi task queue
Fungsi task queue dilengkapi dengan kumpulan setelan konfigurasi yang canggih untuk mengontrol secara akurat batas kapasitas dan perilaku percobaan ulang task queue:
Node.js
exports.backupapod = onTaskDispatched(
{
retryConfig: {
maxAttempts: 5,
minBackoffSeconds: 60,
},
rateLimits: {
maxConcurrentDispatches: 6,
},
}, async (req) => {
Python
@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
"""Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
retryConfig.maxAttempts=5
: Setiap tugas dalam task queue secara otomatis dicoba lagi hingga 5 kali. Hal ini membantu mengurangi error sementara seperti error jaringan atau gangguan layanan sementara dari layanan eksternal yang dependen.retryConfig.minBackoffSeconds=60
: Setiap tugas dicoba ulang dengan interval antar percobaan minimal 60 detik. Hal ini memberikan buffer besar di antara setiap percobaan sehingga batas 5 kali percobaan tidak akan terlalu cepat habis.rateLimits.maxConcurrentDispatch=6
: Maksimal 6 tugas dikirim secara bersamaan. Hal ini membantu memastikan aliran permintaan yang stabil ke fungsi yang mendasarinya serta membantu mengurangi jumlah instance aktif dan cold start.
Menguji fungsi task queue
Untuk sebagian besar kasus, emulator Cloud Functions adalah cara terbaik untuk menguji fungsi task queue. Lihat dokumentasi Emulator Suite untuk mempelajari cara Melengkapi aplikasi untuk emulasi fungsi task queue.
Selain itu, task queue functions_sdk diekspos sebagai fungsi HTTP sederhana di Firebase Local Emulator Suite. Anda dapat menguji fungsi tugas yang diemulasi dengan mengirimkan permintaan HTTP POST dengan payload data JSON:
# start the Local Emulator Suite
firebase emulators:start
# trigger the emulated task queue function
curl \
-X POST # An HTTP POST request...
-H "content-type: application/json" \ # ... with a JSON body
http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
-d '{"data": { ... some data .... }}' # ... with JSON encoded data
Men-deploy fungsi task queue
Deploy fungsi task queue menggunakan Firebase CLI:
$ firebase deploy --only functions:backupapod
Saat men-deploy fungsi task queue untuk pertama kalinya, CLI akan membuat task queue di Cloud Tasks dengan opsi (pembatasan kapasitas dan percobaan ulang) yang ditentukan dalam kode sumber Anda.
Jika Anda mengalami error izin saat men-deploy fungsi, pastikan peran IAM yang sesuai ditetapkan kepada pengguna yang menjalankan perintah deployment.
Mengantrekan fungsi task queue
Fungsi task queue dapat diantrekan di Cloud Tasks dari lingkungan server tepercaya seperti Cloud Functions for Firebase menggunakan Firebase Admin SDK untuk library Node.js atau Google Cloud untuk Python. Jika Anda baru pertama kali menggunakan Admin SDK, lihat artikel Menambahkan Firebase ke server untuk memulai.
Alur standar akan membuat tugas baru, menambahkan tugas ke dalam antrean di Cloud Tasks, dan menetapkan konfigurasi untuk tugas tersebut:
Node.js
exports.enqueuebackuptasks = onRequest(
async (_request, response) => {
const queue = getFunctions().taskQueue("backupapod");
const targetUri = await getFunctionUrl("backupapod");
const enqueues = [];
for (let i = 0; i <= BACKUP_COUNT; i += 1) {
const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
// Delay each batch by N * hour
const scheduleDelaySeconds = iteration * (60 * 60);
const backupDate = new Date(BACKUP_START_DATE);
backupDate.setDate(BACKUP_START_DATE.getDate() + i);
// Extract just the date portion (YYYY-MM-DD) as string.
const date = backupDate.toISOString().substring(0, 10);
enqueues.push(
queue.enqueue({date}, {
scheduleDelaySeconds,
dispatchDeadlineSeconds: 60 * 5, // 5 minutes
uri: targetUri,
}),
);
}
await Promise.all(enqueues);
response.sendStatus(200);
});
Python
@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
"""Adds backup tasks to a Cloud Tasks queue."""
task_queue = functions.task_queue("backupapod")
target_uri = get_function_url("backupapod")
for i in range(BACKUP_COUNT):
batch = i // HOURLY_BATCH_SIZE
# Delay each batch by N hours
schedule_delay = timedelta(hours=batch)
schedule_time = datetime.now() + schedule_delay
dispatch_deadline_seconds = 60 * 5 # 5 minutes
backup_date = BACKUP_START_DATE + timedelta(days=i)
body = {"data": {"date": backup_date.isoformat()[:10]}}
task_options = functions.TaskOptions(schedule_time=schedule_time,
dispatch_deadline_seconds=dispatch_deadline_seconds,
uri=target_uri)
task_queue.enqueue(body, task_options)
return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
Kode contoh akan mencoba menyebarkan eksekusi tugas dengan mengaitkan penundaan menit ke-N untuk tugas ke-N. Hal ini akan memicu ~ 1 tugas/menit. Perlu diperhatikan bahwa Anda juga dapat menggunakan
scheduleTime
(Node.js) atauschedule_time
(Python) jika ingin Cloud Tasks memicu tugas pada waktu tertentu.Kode contoh akan menetapkan jumlah waktu maksimum Cloud Tasks akan menunggu hingga tugas selesai. Cloud Tasks akan mencoba kembali tugas ini setelah konfigurasi percobaan ulang antrean atau sampai batas waktu ini tercapai. Dalam contoh, antrean dikonfigurasi untuk mencoba ulang tugas hingga 5 kali, tetapi tugas tersebut akan otomatis dibatalkan jika seluruh proses (termasuk upaya percobaan ulang) memerlukan waktu lebih dari 5 menit.
Mengambil dan menyertakan URI target
Terkait cara Cloud Tasks membuat token autentikasi untuk melakukan autentikasi permintaan pada fungsi task queue dasar, Anda harus menentukan URL Cloud Run dari fungsi tersebut saat menambahkan tugas ke dalam antrean. Sebaiknya Anda mengambil URL secara terprogram untuk fungsi Anda seperti yang ditunjukkan di bawah ini:
Node.js
/**
* Get the URL of a given v2 cloud function.
*
* @param {string} name the function's name
* @param {string} location the function's location
* @return {Promise<string>} The URL of the function
*/
async function getFunctionUrl(name, location="us-central1") {
if (!auth) {
auth = new GoogleAuth({
scopes: "https://www.googleapis.com/auth/cloud-platform",
});
}
const projectId = await auth.getProjectId();
const url = "https://cloudfunctions.googleapis.com/v2beta/" +
`projects/${projectId}/locations/${location}/functions/${name}`;
const client = await auth.getClient();
const res = await client.request({url});
const uri = res.data?.serviceConfig?.uri;
if (!uri) {
throw new Error(`Unable to retreive uri for function at ${url}`);
}
return uri;
}
Python
def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
"""Get the URL of a given v2 cloud function.
Params:
name: the function's name
location: the function's location
Returns: The URL of the function
"""
credentials, project_id = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(credentials)
url = ("https://cloudfunctions.googleapis.com/v2beta/" +
f"projects/{project_id}/locations/{location}/functions/{name}")
response = authed_session.get(url)
data = response.json()
function_url = data["serviceConfig"]["uri"]
return function_url
Pemecahan masalah
Cloud Tasks">Mengaktifkan logging Cloud Tasks
Log dari Cloud Tasks berisi informasi diagnostik yang berguna, seperti status permintaan yang terkait dengan tugas. Secara default, log dari Cloud Tasks dinonaktifkan karena banyaknya log yang dapat dihasilkan di project Anda. Sebaiknya aktifkan log debug saat Anda aktif mengembangkan dan men-debug fungsi task queue Anda. Baca artikel Mengaktifkan logging.
Izin IAM
Anda mungkin melihat error PERMISSION DENIED
saat mengantrekan tugas atau saat Cloud Tasks mencoba memanggil fungsi task queue Anda. Pastikan project Anda memiliki binding IAM berikut:
Identitas yang digunakan untuk mengantrekan tugas ke Cloud Tasks memerlukan izin IAM
cloudtasks.tasks.create
.Dalam contoh, hal ini adalah akun layanan default App Engine
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudtasks.enqueuer
Identitas yang digunakan untuk mengantrekan tugas ke Cloud Tasks memerlukan izin untuk menggunakan akun layanan yang terkait dengan tugas di Cloud Tasks.
Dalam contoh, hal ini adalah akun layanan default App Engine.
Lihat dokumentasi Google Cloud IAM untuk mengetahui petunjuk cara menambahkan akun layanan default App Engine sebagai pengguna akun layanan default App Engine.
Identitas yang digunakan untuk memicu fungsi task queue memerlukan izin
cloudfunctions.functions.invoke
.Dalam contoh, hal ini adalah akun layanan default App Engine
gcloud functions add-iam-policy-binding $FUNCTION_NAME \
--region=us-central1 \
--member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
--role=roles/cloudfunctions.invoker