使用 Dataflow 大量處理資料

本頁面提供範例,說明如何使用 Dataflow 在 Apache Beam 管道中執行大量 Cloud Firestore 作業。Apache Beam 支援 Cloud Firestore 的連接器。您可以使用這個連接器在 Dataflow 中執行批次和串流作業。

建議您使用 Dataflow 和 Apache Beam 處理大規模資料處理工作負載。

Java 中提供 Apache Beam 的 Cloud Firestore 連接器。如要進一步瞭解 Cloud Firestore 連接器,請參閱 Java 適用的 Apache Beam SDK

事前準備

閱讀本頁面之前,請先熟悉 Apache Beam 的程式設計模型

如要執行範例,您必須啟用 Dataflow API

範例 Cloud Firestore 管道

以下範例示範寫入資料的管道,以及讀取及篩選資料的管道。您可以使用這些範例,做為自有管道的起點。

執行範例管道

您可以在 googleapis/java-firestore GitHub 存放區中取得範例的原始碼。如要執行這些範例,請下載原始碼並查看「README」

範例 Write 管道

以下範例會在 cities-beam-sample 集合中建立文件:



public class ExampleFirestoreBeamWrite {
  private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance();

  public static void main(String[] args) {
    runWrite(args, "cities-beam-sample");
  }

  public static void runWrite(String[] args, String collectionId) {
    // create pipeline options from the passed in arguments
    PipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);

    RpcQosOptions rpcQosOptions =
        RpcQosOptions.newBuilder()
            .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers())
            .build();

    // create some writes
    Write write1 =
        Write.newBuilder()
            .setUpdate(
                Document.newBuilder()
                    // resolves to
                    // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC
                    .setName(createDocumentName(collectionId, "NYC"))
                    .putFields("name", Value.newBuilder().setStringValue("New York City").build())
                    .putFields("state", Value.newBuilder().setStringValue("New York").build())
                    .putFields("country", Value.newBuilder().setStringValue("USA").build()))
            .build();

    Write write2 =
        Write.newBuilder()
            .setUpdate(
                Document.newBuilder()
                    // resolves to
                    // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK
                    .setName(createDocumentName(collectionId, "TOK"))
                    .putFields("name", Value.newBuilder().setStringValue("Tokyo").build())
                    .putFields("country", Value.newBuilder().setStringValue("Japan").build())
                    .putFields("capital", Value.newBuilder().setBooleanValue(true).build()))
            .build();

    // batch write the data
    pipeline
        .apply(Create.of(write1, write2))
        .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build());

    // run the pipeline
    pipeline.run().waitUntilFinish();
  }

  private static String createDocumentName(String collectionId, String cityDocId) {
    String documentPath =
        String.format(
            "projects/%s/databases/%s/documents",
            FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId());

    return documentPath + "/" + collectionId + "/" + cityDocId;
  }
}

這個範例使用以下引數設定及執行管道:

GOOGLE_CLOUD_PROJECT=project-id
REGION=region
TEMP_LOCATION=gs://temp-bucket/temp/
NUM_WORKERS=number-workers
MAX_NUM_WORKERS=max-number-workers

範例 Read 管道

以下管道範例會讀取 cities-beam-sample 集合的文件,並為欄位 country 設為 USA 的文件套用篩選器,然後傳回符合條件的文件名稱。



public class ExampleFirestoreBeamRead {

  public static void main(String[] args) {
    runRead(args, "cities-beam-sample");
  }

  public static void runRead(String[] args, String collectionId) {
    FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance();

    PipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);

    RpcQosOptions rpcQosOptions =
        RpcQosOptions.newBuilder()
            .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers())
            .build();

    pipeline
        .apply(Create.of(collectionId))
        .apply(
            new FilterDocumentsQuery(
                firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId()))
        .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build())
        .apply(
            ParDo.of(
                // transform each document to its name
                new DoFn<RunQueryResponse, String>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(Objects.requireNonNull(c.element()).getDocument().getName());
                  }
                }))
        .apply(
            ParDo.of(
                // print the document name
                new DoFn<String, Void>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    System.out.println(c.element());
                  }
                }));

    pipeline.run().waitUntilFinish();
  }

  private static final class FilterDocumentsQuery
      extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> {

    private final String projectId;
    private final String databaseId;

    public FilterDocumentsQuery(String projectId, String databaseId) {
      this.projectId = projectId;
      this.databaseId = databaseId;
    }

    @Override
    public PCollection<RunQueryRequest> expand(PCollection<String> input) {
      return input.apply(
          ParDo.of(
              new DoFn<String, RunQueryRequest>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                  // select from collection "cities-collection-<uuid>"
                  StructuredQuery.CollectionSelector collection =
                      StructuredQuery.CollectionSelector.newBuilder()
                          .setCollectionId(Objects.requireNonNull(c.element()))
                          .build();
                  // filter where country is equal to USA
                  StructuredQuery.Filter countryFilter =
                      StructuredQuery.Filter.newBuilder()
                          .setFieldFilter(
                              StructuredQuery.FieldFilter.newBuilder()
                                  .setField(
                                      StructuredQuery.FieldReference.newBuilder()
                                          .setFieldPath("country")
                                          .build())
                                  .setValue(Value.newBuilder().setStringValue("USA").build())
                                  .setOp(StructuredQuery.FieldFilter.Operator.EQUAL))
                          .buildPartial();

                  RunQueryRequest runQueryRequest =
                      RunQueryRequest.newBuilder()
                          .setParent(DocumentRootName.format(projectId, databaseId))
                          .setStructuredQuery(
                              StructuredQuery.newBuilder()
                                  .addFrom(collection)
                                  .setWhere(countryFilter)
                                  .build())
                          .build();
                  c.output(runQueryRequest);
                }
              }));
    }
  }
}

這個範例使用以下引數設定及執行管道:

GOOGLE_CLOUD_PROJECT=project-id
REGION=region
TEMP_LOCATION=gs://temp-bucket/temp/
NUM_WORKERS=number-workers
MAX_NUM_WORKERS=max-number-workers

定價

在 Dataflow 中執行 Cloud Firestore 工作負載時,會產生 Cloud Firestore 用量和 Dataflow 用量的費用。系統會根據工作使用的資源收取 Dataflow 用量費用。詳情請參閱 Dataflow 定價頁面。如需瞭解 Cloud Firestore 的定價,請參閱定價頁面

後續步驟