Processe dados em massa com o Dataflow

Esta página fornece exemplos de como usar o Dataflow para realizar operações em massa do Cloud Firestore em um pipeline do Apache Beam. O Apache Beam oferece suporte a um conector para Cloud Firestore. Você pode usar esse conector para executar operações em lote e de streaming no Dataflow.

Recomendamos o uso do Dataflow e do Apache Beam para cargas de trabalho de processamento de dados em grande escala.

O conector Cloud Firestore para Apache Beam está disponível em Java. Para obter mais informações sobre o conector do Cloud Firestore, consulte SDK do Apache Beam para Java .

Antes de você começar

Antes de ler esta página, você deve estar familiarizado com o modelo de programação do Apache Beam .

Para executar as amostras, você precisa ativar a API Dataflow .

Exemplo de pipeline do Cloud Firestore

Os exemplos abaixo demonstram um pipeline que grava dados e outro que lê e filtra dados. Você pode usar esses exemplos como ponto de partida para seus próprios pipelines.

Executando os pipelines de amostra

O código-fonte dos exemplos está disponível no repositório GitHub googleapis/java-firestore . Para executar esses exemplos, baixe o código-fonte e consulte o README .

Exemplo de pipeline Write

O exemplo a seguir cria documentos na coleção cities-beam-sample :



public class ExampleFirestoreBeamWrite {
 
private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance();

 
public static void main(String[] args) {
    runWrite
(args, "cities-beam-sample");
 
}

 
public static void runWrite(String[] args, String collectionId) {
   
// create pipeline options from the passed in arguments
   
PipelineOptions options =
       
PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
   
Pipeline pipeline = Pipeline.create(options);

   
RpcQosOptions rpcQosOptions =
       
RpcQosOptions.newBuilder()
           
.withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers())
           
.build();

   
// create some writes
   
Write write1 =
       
Write.newBuilder()
           
.setUpdate(
               
Document.newBuilder()
                   
// resolves to
                   
// projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC
                   
.setName(createDocumentName(collectionId, "NYC"))
                   
.putFields("name", Value.newBuilder().setStringValue("New York City").build())
                   
.putFields("state", Value.newBuilder().setStringValue("New York").build())
                   
.putFields("country", Value.newBuilder().setStringValue("USA").build()))
           
.build();

   
Write write2 =
       
Write.newBuilder()
           
.setUpdate(
               
Document.newBuilder()
                   
// resolves to
                   
// projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK
                   
.setName(createDocumentName(collectionId, "TOK"))
                   
.putFields("name", Value.newBuilder().setStringValue("Tokyo").build())
                   
.putFields("country", Value.newBuilder().setStringValue("Japan").build())
                   
.putFields("capital", Value.newBuilder().setBooleanValue(true).build()))
           
.build();

   
// batch write the data
    pipeline
       
.apply(Create.of(write1, write2))
       
.apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build());

   
// run the pipeline
    pipeline
.run().waitUntilFinish();
 
}

 
private static String createDocumentName(String collectionId, String cityDocId) {
   
String documentPath =
       
String.format(
           
"projects/%s/databases/%s/documents",
            FIRESTORE_OPTIONS
.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId());

   
return documentPath + "/" + collectionId + "/" + cityDocId;
 
}
}

O exemplo usa os seguintes argumentos para configurar e executar um pipeline:

GOOGLE_CLOUD_PROJECT=project-id
REGION=region
TEMP_LOCATION=gs://temp-bucket/temp/
NUM_WORKERS=number-workers
MAX_NUM_WORKERS=max-number-workers

Exemplo de pipeline Read

O pipeline de exemplo a seguir lê documentos da coleção cities-beam-sample , aplica um filtro para documentos em que o campo country está definido como USA e retorna os nomes dos documentos correspondentes.



public class ExampleFirestoreBeamRead {

 
public static void main(String[] args) {
    runRead
(args, "cities-beam-sample");
 
}

 
public static void runRead(String[] args, String collectionId) {
   
FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance();

   
PipelineOptions options =
       
PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
   
Pipeline pipeline = Pipeline.create(options);

   
RpcQosOptions rpcQosOptions =
       
RpcQosOptions.newBuilder()
           
.withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers())
           
.build();

    pipeline
       
.apply(Create.of(collectionId))
       
.apply(
           
new FilterDocumentsQuery(
                firestoreOptions
.getProjectId(), firestoreOptions.getDatabaseId()))
       
.apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build())
       
.apply(
           
ParDo.of(
               
// transform each document to its name
               
new DoFn<RunQueryResponse, String>() {
                 
@ProcessElement
                 
public void processElement(ProcessContext c) {
                    c
.output(Objects.requireNonNull(c.element()).getDocument().getName());
                 
}
               
}))
       
.apply(
           
ParDo.of(
               
// print the document name
               
new DoFn<String, Void>() {
                 
@ProcessElement
                 
public void processElement(ProcessContext c) {
                   
System.out.println(c.element());
                 
}
               
}));

    pipeline
.run().waitUntilFinish();
 
}

 
private static final class FilterDocumentsQuery
     
extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> {

   
private final String projectId;
   
private final String databaseId;

   
public FilterDocumentsQuery(String projectId, String databaseId) {
     
this.projectId = projectId;
     
this.databaseId = databaseId;
   
}

   
@Override
   
public PCollection<RunQueryRequest> expand(PCollection<String> input) {
     
return input.apply(
         
ParDo.of(
             
new DoFn<String, RunQueryRequest>() {
               
@ProcessElement
               
public void processElement(ProcessContext c) {
                 
// select from collection "cities-collection-<uuid>"
                 
StructuredQuery.CollectionSelector collection =
                     
StructuredQuery.CollectionSelector.newBuilder()
                         
.setCollectionId(Objects.requireNonNull(c.element()))
                         
.build();
                 
// filter where country is equal to USA
                 
StructuredQuery.Filter countryFilter =
                     
StructuredQuery.Filter.newBuilder()
                         
.setFieldFilter(
                             
StructuredQuery.FieldFilter.newBuilder()
                                 
.setField(
                                     
StructuredQuery.FieldReference.newBuilder()
                                         
.setFieldPath("country")
                                         
.build())
                                 
.setValue(Value.newBuilder().setStringValue("USA").build())
                                 
.setOp(StructuredQuery.FieldFilter.Operator.EQUAL))
                         
.buildPartial();

                 
RunQueryRequest runQueryRequest =
                     
RunQueryRequest.newBuilder()
                         
.setParent(DocumentRootName.format(projectId, databaseId))
                         
.setStructuredQuery(
                             
StructuredQuery.newBuilder()
                                 
.addFrom(collection)
                                 
.setWhere(countryFilter)
                                 
.build())
                         
.build();
                  c
.output(runQueryRequest);
               
}
             
}));
   
}
 
}
}

O exemplo usa os seguintes argumentos para configurar e executar um pipeline:

GOOGLE_CLOUD_PROJECT=project-id
REGION=region
TEMP_LOCATION=gs://temp-bucket/temp/
NUM_WORKERS=number-workers
MAX_NUM_WORKERS=max-number-workers

Preços

A execução de uma carga de trabalho do Cloud Firestore no Dataflow gera custos de uso do Cloud Firestore e do Dataflow. O uso do Dataflow é cobrado pelos recursos usados ​​pelos seus jobs. Consulte a página de preços do Dataflow para obter detalhes. Para ver os preços do Cloud Firestore, consulte a página de preços .

Qual é o próximo