Cloud Tasks로 큐에 함수 추가


태스크 큐 함수는 Google Cloud Tasks를 활용하여 앱이 기본 애플리케이션 흐름 외부에서 시간이 많이 걸리거나, 리소스를 많이 소모하거나, 대역폭이 제한적인 태스크를 비동기식으로 실행할 수 있게 해 줍니다.

예를 들어 현재 비율 제한이 있는 API에서 호스팅되는 대규모 이미지 파일 세트의 백업을 만든다고 가정해 보겠습니다. 책임감 있는 API 소비자가 되려면 비율 제한을 준수해야 합니다. 또한 이러한 종류의 장기 실행 작업은 시간 제한 및 메모리 한도로 인해 실패에 취약할 수 있습니다.

이러한 복잡성을 완화하려면 scheduleTimedispatchDeadline과 같은 기본 태스크 옵션을 설정하는 태스크 큐 함수를 작성하면 됩니다. 그러면 함수가 Cloud Tasks의 큐로 전달됩니다. Cloud Tasks 환경은 특히 이러한 종류의 작업에 효과적인 정체 제어와 재시도 정책을 제공하도록 설계되었습니다.

Cloud Functions for Firebase용 Firebase SDK v3.20.1 이상은 Firebase Admin SDK v10.2.0 이상과 연동되어 태스크 큐 함수를 지원합니다.

Firebase에서 태스크 큐 함수를 사용하면 Cloud Tasks 처리 요금이 부과될 수 있습니다. 자세한 내용은 Cloud Tasks 가격 책정을 참조하세요.

태스크 큐 함수 만들기

태스크 큐 함수를 사용하려면 다음 워크플로를 따르세요.

  1. Cloud FunctionsFirebase SDK를 사용하여 태스크 큐 함수를 작성합니다.
  2. HTTP 요청으로 트리거하여 함수를 테스트합니다.
  3. Firebase CLI로 함수를 배포합니다. 태스크 큐 함수를 처음으로 배포할 때 CLI가 소스 코드에서 지정된 옵션(비율 제한 및 재시도)으로 Cloud Tasks에서 태스크 큐를 만듭니다.
  4. 새로 생성된 태스크 큐에 태스크를 추가하고 필요한 경우 매개변수를 전달하여 실행 일정을 설정합니다. Admin SDK를 사용하여 코드를 작성하고 Cloud Functions for Firebase에 배포하면 됩니다.

태스크 큐 함수 작성

이 섹션에 나와 있는 코드 샘플은 NASA Astronomy Picture of the Day 사이트의 모든 이미지를 백업하는 서비스 설정 앱을 기반으로 합니다. 시작하려면 필요한 모듈을 가져옵니다.

Node.js

// Dependencies for task queue functions.
const {onTaskDispatched} = require("firebase-functions/v2/tasks");
const {onRequest, HttpsError} = require("firebase-functions/v2/https");
const {getFunctions} = require("firebase-admin/functions");
const {logger} = require("firebase-functions/v2");

// Dependencies for image backup.
const path = require("path");
const fetch = require("node-fetch");
const {initializeApp} = require("firebase-admin/app");
const {getStorage} = require("firebase-admin/storage");
const {GoogleAuth} = require("google-auth-library");

Python

# Dependencies for task queue functions.
from google.cloud import tasks_v2
import requests
from firebase_functions.options import RetryConfig, RateLimits, SupportedRegion

# Dependencies for image backup.
from datetime import datetime, timedelta
import json
import pathlib
from urllib.parse import urlparse
from firebase_admin import initialize_app, storage, functions
from firebase_functions import https_fn, tasks_fn, params
import google.auth
from google.auth.transport.requests import AuthorizedSession

태스크 큐 함수에는 onTaskDispatched 또는 on_task_dispatched를 사용합니다. 태스크 큐 함수를 작성할 때 큐별 재시도와 비율 제한 구성을 설정할 수 있습니다.

태스크 큐 함수 구성

태스크 큐 함수에는 태스크 큐의 비율 제한과 재시도 동작을 정밀하게 제어하는 강력한 구성 설정 모음이 함께 제공됩니다.

Node.js

exports.backupapod = onTaskDispatched(
    {
      retryConfig: {
        maxAttempts: 5,
        minBackoffSeconds: 60,
      },
      rateLimits: {
        maxConcurrentDispatches: 6,
      },
    }, async (req) => {

Python

@tasks_fn.on_task_dispatched(retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=60),
                             rate_limits=RateLimits(max_concurrent_dispatches=10))
def backupapod(req: tasks_fn.CallableRequest) -> str:
    """Grabs Astronomy Photo of the Day (APOD) using NASA's API."""
  • retryConfig.maxAttempts=5: 태스크 큐의 각 태스크가 최대 5회까지 자동으로 재시도됩니다. 이렇게 하면 네트워크 오류 또는 종속된 외부 서비스의 일시적인 중단과 같은 일시적인 오류를 완화할 수 있습니다.

  • retryConfig.minBackoffSeconds=60: 각 시도마다 최소 60초 간격으로 태스크가 재시도됩니다. 이렇게 하면 각 시도 간에 긴 버퍼가 제공되므로 재시도 횟수 5회가 그다지 빨리 소진되지 않습니다.

  • rateLimits.maxConcurrentDispatch=6: 지정된 시간에 최대 6개의 태스크가 디스패치됩니다. 이렇게 하면 기본 함수에 대한 요청이 꾸준히 이루어지고 활성 인스턴스 및 콜드 스타트 횟수를 줄일 수 있습니다.

태스크 큐 함수 테스트

대부분의 경우 Cloud Functions 에뮬레이터는 태스크 큐 함수를 테스트하는 가장 좋은 방법입니다. 태스크 큐 함수 에뮬레이션을 위해 앱을 계측하는 방법은 에뮬레이터 도구 모음 문서를 참고하세요.

또한 태스크 큐 functions_sdk는 Firebase Local Emulator Suite에서 간단한 HTTP 함수로 노출됩니다. JSON 데이터 페이로드와 함께 HTTP POST 요청을 전송하여 에뮬레이션된 태스크 함수를 테스트할 수 있습니다.

 # start the Local Emulator Suite
 firebase emulators:start

 # trigger the emulated task queue function
 curl \
  -X POST                                            # An HTTP POST request...
  -H "content-type: application/json" \              # ... with a JSON body
  http://localhost:$PORT/$PROJECT_ID/$REGION/$NAME \ # ... to function url
  -d '{"data": { ... some data .... }}'              # ... with JSON encoded data

태스크 큐 함수 배포

Firebase CLI를 사용하여 태스크 큐 함수를 배포하세요.

$ firebase deploy --only functions:backupapod

태스크 큐 함수를 처음으로 배포할 때 CLI가 소스 코드에 지정된 옵션(비율 제한 및 재시도)으로 Cloud Tasks에서 태스크 큐를 만듭니다.

함수를 배포할 때 권한 오류가 발생하면 배포 명령어를 실행하는 사용자에게 적절한 IAM 역할이 할당되었는지 확인합니다.

태스크 큐 함수 큐에 추가

태스크 큐 함수는 Node.js용 Firebase Admin SDK 또는 Python용 Google Cloud 라이브러리를 사용하여 Cloud Functions for Firebase와 같은 신뢰할 수 있는 서버 환경에서 Cloud Tasks의 큐에 추가할 수 있습니다. Admin SDK를 처음 사용하는 경우 서버에 Firebase 추가를 참조하여 시작하세요.

일반적인 흐름은 새 태스크를 만들어 Cloud Tasks의 큐에 추가하고 태스크용 구성을 설정합니다.

Node.js

exports.enqueuebackuptasks = onRequest(
    async (_request, response) => {
      const queue = getFunctions().taskQueue("backupapod");
      const targetUri = await getFunctionUrl("backupapod");

      const enqueues = [];
      for (let i = 0; i <= BACKUP_COUNT; i += 1) {
        const iteration = Math.floor(i / HOURLY_BATCH_SIZE);
        // Delay each batch by N * hour
        const scheduleDelaySeconds = iteration * (60 * 60);

        const backupDate = new Date(BACKUP_START_DATE);
        backupDate.setDate(BACKUP_START_DATE.getDate() + i);
        // Extract just the date portion (YYYY-MM-DD) as string.
        const date = backupDate.toISOString().substring(0, 10);
        enqueues.push(
            queue.enqueue({date}, {
              scheduleDelaySeconds,
              dispatchDeadlineSeconds: 60 * 5, // 5 minutes
              uri: targetUri,
            }),
        );
      }
      await Promise.all(enqueues);
      response.sendStatus(200);
    });

Python

@https_fn.on_request()
def enqueuebackuptasks(_: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""
    task_queue = functions.task_queue("backupapod")
    target_uri = get_function_url("backupapod")

    for i in range(BACKUP_COUNT):
        batch = i // HOURLY_BATCH_SIZE

        # Delay each batch by N hours
        schedule_delay = timedelta(hours=batch)
        schedule_time = datetime.now() + schedule_delay

        dispatch_deadline_seconds = 60 * 5  # 5 minutes

        backup_date = BACKUP_START_DATE + timedelta(days=i)
        body = {"data": {"date": backup_date.isoformat()[:10]}}
        task_options = functions.TaskOptions(schedule_time=schedule_time,
                                             dispatch_deadline_seconds=dispatch_deadline_seconds,
                                             uri=target_uri)
        task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")
  • 샘플 코드는 N 번째 태스크의 N분 지연을 연결하여 태스크 실행을 분산하려고 시도합니다. 즉, 분당 약 1개의 태스크가 트리거됩니다. Cloud Tasks가 특정 시간에 태스크를 트리거하게 하려면 scheduleTime(Node.js) 또는 schedule_time(Python)을 사용할 수도 있습니다.

  • 샘플 코드는 Cloud Tasks에서 태스크가 완료될 때까지 기다리는 최대 시간을 설정합니다. Cloud Tasks는 큐의 재시도 구성에 따라 또는 이 기한에 도달할 때까지 태스크를 재시도합니다. 샘플에는 태스크를 최대 5회까지 재시도하도록 큐가 구성되어 있지만, 전체 프로세스(재시도 포함)가 5분 넘게 걸리는 경우에는 태스크가 자동으로 취소됩니다.

대상 URI 검색 및 포함

Cloud Tasks가 기본 태스크 큐 함수에 대한 요청을 인증하기 위한 인증 토큰을 만드는 방식으로 인해 태스크를 큐에 추가할 때 함수의 Cloud Run URL을 지정해야 합니다. 아래에 나온 것처럼 함수의 URL을 프로그래매틱 방식으로 검색하는 것이 좋습니다.

Node.js

/**
 * Get the URL of a given v2 cloud function.
 *
 * @param {string} name the function's name
 * @param {string} location the function's location
 * @return {Promise<string>} The URL of the function
 */
async function getFunctionUrl(name, location="us-central1") {
  if (!auth) {
    auth = new GoogleAuth({
      scopes: "https://www.googleapis.com/auth/cloud-platform",
    });
  }
  const projectId = await auth.getProjectId();
  const url = "https://cloudfunctions.googleapis.com/v2beta/" +
    `projects/${projectId}/locations/${location}/functions/${name}`;

  const client = await auth.getClient();
  const res = await client.request({url});
  const uri = res.data?.serviceConfig?.uri;
  if (!uri) {
    throw new Error(`Unable to retreive uri for function at ${url}`);
  }
  return uri;
}

Python

def get_function_url(name: str, location: str = SupportedRegion.US_CENTRAL1) -> str:
    """Get the URL of a given v2 cloud function.

    Params:
        name: the function's name
        location: the function's location

    Returns: The URL of the function
    """
    credentials, project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(credentials)
    url = ("https://cloudfunctions.googleapis.com/v2beta/" +
           f"projects/{project_id}/locations/{location}/functions/{name}")
    response = authed_session.get(url)
    data = response.json()
    function_url = data["serviceConfig"]["uri"]
    return function_url

문제 해결

Cloud Tasks>Cloud Tasks 로깅 사용 설정

Cloud Tasks의 로그에는 태스크와 연결된 요청의 상태와 같은 유용한 진단 정보가 포함됩니다. 프로젝트에서 대량의 로그가 생성될 수도 있으므로 기본적으로 Cloud Tasks의 로그는 사용 중지되어 있습니다. 태스크 큐 함수를 적극적으로 개발하고 디버깅하는 동안에는 디버그 로그를 사용 설정하는 것이 좋습니다. 로깅 사용 설정을 참조하세요.

IAM 권한

태스크를 큐에 추가할 때 또는 Cloud Tasks가 태스크 큐 함수를 호출하려고 시도할 때 PERMISSION DENIED 오류가 표시될 수 있습니다. 프로젝트에 다음 IAM 바인딩이 있는지 확인하세요.

  • 태스크를 Cloud Tasks의 큐에 추가하는 데 사용되는 ID에는 cloudtasks.tasks.create IAM 권한이 필요합니다.

    샘플에서는 App Engine 기본 서비스 계정입니다.

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudtasks.enqueuer
  • 태스크를 Cloud Tasks의 큐에 추가하는 데 사용되는 ID에는 Cloud Tasks의 태스크와 연결된 서비스 계정을 사용할 권한이 필요합니다.

    샘플에서는 App Engine 기본 서비스 계정입니다.

App Engine 기본 서비스 계정을 App Engine 기본 서비스 계정의 사용자로 추가하는 방법에 대한 안내는 Google Cloud IAM 문서를 참조하세요.

  • 태스크 큐 함수를 트리거하는 데 사용되는 ID에는 cloudfunctions.functions.invoke 권한이 필요합니다.

    샘플에서는 App Engine 기본 서비스 계정입니다.

gcloud functions add-iam-policy-binding $FUNCTION_NAME \
  --region=us-central1 \
  --member=serviceAccount:${PROJECT_ID}@appspot.gserviceaccount.com \
  --role=roles/cloudfunctions.invoker