本頁提供如何使用 Dataflow 執行的範例 Apache Beam 管道中的大量 Cloud Firestore 作業。 Apache Beam 支援 Cloud Firestore 的連接器。您可以使用 可讓您在 Dataflow 中執行批次和串流作業的連接器。
建議您使用 Dataflow 和 Apache Beam 處理大規模資料 來處理工作負載
Apache Beam 適用的 Cloud Firestore 連接器可透過 Java 使用。如要 如要瞭解 Cloud Firestore 連接器的相關資訊,請參閱 Java 適用的 Apache Beam SDK。
事前準備
閱讀本頁前,您應該先熟悉 Apache Beam 的程式設計模型。
如要執行範例,您必須啟用 Dataflow API。Cloud Firestore 管道範例
以下示例示範了可寫入資料的管道 讀取及篩選資料您可以將這些範例做為起點, 工作管道
執行範例管道
您可以在 googleapis/java-firestore GitHub 存放區。如要執行這些範例,請下載原始碼 並查看 README 檔案。
Write
管道範例
以下範例會在 cities-beam-sample
集合中建立文件:
public class ExampleFirestoreBeamWrite { private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance(); public static void main(String[] args) { runWrite(args, "cities-beam-sample"); } public static void runWrite(String[] args, String collectionId) { // create pipeline options from the passed in arguments PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); // create some writes Write write1 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC .setName(createDocumentName(collectionId, "NYC")) .putFields("name", Value.newBuilder().setStringValue("New York City").build()) .putFields("state", Value.newBuilder().setStringValue("New York").build()) .putFields("country", Value.newBuilder().setStringValue("USA").build())) .build(); Write write2 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK .setName(createDocumentName(collectionId, "TOK")) .putFields("name", Value.newBuilder().setStringValue("Tokyo").build()) .putFields("country", Value.newBuilder().setStringValue("Japan").build()) .putFields("capital", Value.newBuilder().setBooleanValue(true).build())) .build(); // batch write the data pipeline .apply(Create.of(write1, write2)) .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build()); // run the pipeline pipeline.run().waitUntilFinish(); } private static String createDocumentName(String collectionId, String cityDocId) { String documentPath = String.format( "projects/%s/databases/%s/documents", FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId()); return documentPath + "/" + collectionId + "/" + cityDocId; } }
這個範例會使用下列引數設定及執行管道:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
範例 Read
管道
下列範例管道範例會讀取 cities-beam-sample
中的文件
集合, 對欄位 country
設為欄位的文件套用篩選器
USA
,然後傳回相符文件的名稱。
public class ExampleFirestoreBeamRead { public static void main(String[] args) { runRead(args, "cities-beam-sample"); } public static void runRead(String[] args, String collectionId) { FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance(); PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); pipeline .apply(Create.of(collectionId)) .apply( new FilterDocumentsQuery( firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId())) .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build()) .apply( ParDo.of( // transform each document to its name new DoFn<RunQueryResponse, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(Objects.requireNonNull(c.element()).getDocument().getName()); } })) .apply( ParDo.of( // print the document name new DoFn<String, Void>() { @ProcessElement public void processElement(ProcessContext c) { System.out.println(c.element()); } })); pipeline.run().waitUntilFinish(); } private static final class FilterDocumentsQuery extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> { private final String projectId; private final String databaseId; public FilterDocumentsQuery(String projectId, String databaseId) { this.projectId = projectId; this.databaseId = databaseId; } @Override public PCollection<RunQueryRequest> expand(PCollection<String> input) { return input.apply( ParDo.of( new DoFn<String, RunQueryRequest>() { @ProcessElement public void processElement(ProcessContext c) { // select from collection "cities-collection-<uuid>" StructuredQuery.CollectionSelector collection = StructuredQuery.CollectionSelector.newBuilder() .setCollectionId(Objects.requireNonNull(c.element())) .build(); // filter where country is equal to USA StructuredQuery.Filter countryFilter = StructuredQuery.Filter.newBuilder() .setFieldFilter( StructuredQuery.FieldFilter.newBuilder() .setField( StructuredQuery.FieldReference.newBuilder() .setFieldPath("country") .build()) .setValue(Value.newBuilder().setStringValue("USA").build()) .setOp(StructuredQuery.FieldFilter.Operator.EQUAL)) .buildPartial(); RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder() .setParent(DocumentRootName.format(projectId, databaseId)) .setStructuredQuery( StructuredQuery.newBuilder() .addFrom(collection) .setWhere(countryFilter) .build()) .build(); c.output(runQueryRequest); } })); } } }
這個範例會使用下列引數設定及執行管道:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
定價
在 Dataflow 中執行 Cloud Firestore 工作負載會產生費用 。Dataflow 系統會針對工作使用的資源收取用量費用。詳情請參閱 Dataflow 定價頁面 。如需 Cloud Firestore 定價,請參閱 定價頁面。
後續步驟
- 如需其他管道範例,請參閱使用 Firestore 和 Apache Beam 處理資料。
- 如要進一步瞭解 Dataflow 和 Apache Beam,請參閱 Dataflow 說明文件。