Esta página fornece exemplos de como usar o Dataflow para realizar operações em massa do Cloud Firestore em um pipeline do Apache Beam. O Apache Beam oferece suporte a um conector para Cloud Firestore. Você pode usar esse conector para executar operações em lote e de streaming no Dataflow.
Recomendamos o uso do Dataflow e do Apache Beam para cargas de trabalho de processamento de dados em grande escala.
O conector Cloud Firestore para Apache Beam está disponível em Java. Para obter mais informações sobre o conector do Cloud Firestore, consulte SDK do Apache Beam para Java .
Antes de você começar
Antes de ler esta página, você deve estar familiarizado com o modelo de programação do Apache Beam .
Para executar as amostras, você precisa ativar a API Dataflow .Exemplo de pipeline do Cloud Firestore
Os exemplos abaixo demonstram um pipeline que grava dados e outro que lê e filtra dados. Você pode usar esses exemplos como ponto de partida para seus próprios pipelines.
Executando os pipelines de amostra
O código-fonte dos exemplos está disponível no repositório GitHub googleapis/java-firestore . Para executar esses exemplos, baixe o código-fonte e consulte o README .
Exemplo de pipeline Write
O exemplo a seguir cria documentos na coleção cities-beam-sample
:
public class ExampleFirestoreBeamWrite {
private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance();
public static void main(String[] args) {
runWrite(args, "cities-beam-sample");
}
public static void runWrite(String[] args, String collectionId) {
// create pipeline options from the passed in arguments
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
RpcQosOptions rpcQosOptions =
RpcQosOptions.newBuilder()
.withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers())
.build();
// create some writes
Write write1 =
Write.newBuilder()
.setUpdate(
Document.newBuilder()
// resolves to
// projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC
.setName(createDocumentName(collectionId, "NYC"))
.putFields("name", Value.newBuilder().setStringValue("New York City").build())
.putFields("state", Value.newBuilder().setStringValue("New York").build())
.putFields("country", Value.newBuilder().setStringValue("USA").build()))
.build();
Write write2 =
Write.newBuilder()
.setUpdate(
Document.newBuilder()
// resolves to
// projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK
.setName(createDocumentName(collectionId, "TOK"))
.putFields("name", Value.newBuilder().setStringValue("Tokyo").build())
.putFields("country", Value.newBuilder().setStringValue("Japan").build())
.putFields("capital", Value.newBuilder().setBooleanValue(true).build()))
.build();
// batch write the data
pipeline
.apply(Create.of(write1, write2))
.apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build());
// run the pipeline
pipeline.run().waitUntilFinish();
}
private static String createDocumentName(String collectionId, String cityDocId) {
String documentPath =
String.format(
"projects/%s/databases/%s/documents",
FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId());
return documentPath + "/" + collectionId + "/" + cityDocId;
}
}
O exemplo usa os seguintes argumentos para configurar e executar um pipeline:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket /temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Exemplo de pipeline Read
O pipeline de exemplo a seguir lê documentos da coleção cities-beam-sample
, aplica um filtro para documentos em que o campo country
está definido como USA
e retorna os nomes dos documentos correspondentes.
public class ExampleFirestoreBeamRead {
public static void main(String[] args) {
runRead(args, "cities-beam-sample");
}
public static void runRead(String[] args, String collectionId) {
FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance();
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
RpcQosOptions rpcQosOptions =
RpcQosOptions.newBuilder()
.withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers())
.build();
pipeline
.apply(Create.of(collectionId))
.apply(
new FilterDocumentsQuery(
firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId()))
.apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build())
.apply(
ParDo.of(
// transform each document to its name
new DoFn<RunQueryResponse, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(Objects.requireNonNull(c.element()).getDocument().getName());
}
}))
.apply(
ParDo.of(
// print the document name
new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element());
}
}));
pipeline.run().waitUntilFinish();
}
private static final class FilterDocumentsQuery
extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> {
private final String projectId;
private final String databaseId;
public FilterDocumentsQuery(String projectId, String databaseId) {
this.projectId = projectId;
this.databaseId = databaseId;
}
@Override
public PCollection<RunQueryRequest> expand(PCollection<String> input) {
return input.apply(
ParDo.of(
new DoFn<String, RunQueryRequest>() {
@ProcessElement
public void processElement(ProcessContext c) {
// select from collection "cities-collection-<uuid>"
StructuredQuery.CollectionSelector collection =
StructuredQuery.CollectionSelector.newBuilder()
.setCollectionId(Objects.requireNonNull(c.element()))
.build();
// filter where country is equal to USA
StructuredQuery.Filter countryFilter =
StructuredQuery.Filter.newBuilder()
.setFieldFilter(
StructuredQuery.FieldFilter.newBuilder()
.setField(
StructuredQuery.FieldReference.newBuilder()
.setFieldPath("country")
.build())
.setValue(Value.newBuilder().setStringValue("USA").build())
.setOp(StructuredQuery.FieldFilter.Operator.EQUAL))
.buildPartial();
RunQueryRequest runQueryRequest =
RunQueryRequest.newBuilder()
.setParent(DocumentRootName.format(projectId, databaseId))
.setStructuredQuery(
StructuredQuery.newBuilder()
.addFrom(collection)
.setWhere(countryFilter)
.build())
.build();
c.output(runQueryRequest);
}
}));
}
}
}
O exemplo usa os seguintes argumentos para configurar e executar um pipeline:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket /temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Preços
A execução de uma carga de trabalho do Cloud Firestore no Dataflow gera custos de uso do Cloud Firestore e do Dataflow. O uso do Dataflow é cobrado pelos recursos usados pelos seus jobs. Consulte a página de preços do Dataflow para obter detalhes. Para ver os preços do Cloud Firestore, consulte a página de preços .
Qual é o próximo
- Consulte Usando o Firestore e o Apache Beam para processamento de dados para ver outro exemplo de pipeline.
- Para saber mais sobre o Dataflow e o Apache Beam, consulte a documentação do Dataflow .