Background
Pipeline operations are a new query interface for Cloud Firestore.
This interface provides advanced query functionality that includes complex
expressions. It also adds support for many
new functions like min, max, substring, regex_match and array_contains_all.
With Pipeline operations, index creation is also completely optional, streamlining
the process of developing new queries. Pipeline operations also remove many
limitations on query shape allowing you to specify large in or or queries.
Getting Started
To install and initialize client SDKs, refer to the instructions in the following guides:
Syntax
The following sections give an overview of the syntax for Pipeline operations.
Concepts
One notable difference with Pipeline operations is the introduction of explicit "stage" ordering. This makes it possible to express more complex queries. However, it is a notable deviation from the existing query interface (Core operations) where the ordering of stages was implied. Consider the following Pipeline operation example:
Web
const pipeline = db.pipeline() // Step 1: Start a query with collection scope .collection("cities") // Step 2: Filter the collection .where(field("population").greaterThan(100000)) // Step 3: Sort the remaining documents .sort(field("name").ascending()) // Step 4: Return the top 10. Note applying the limit earlier in the // pipeline would have unintentional results. .limit(10);
Swift
let pipeline = db.pipeline() // Step 1: Start a query with collection scope .collection("cities") // Step 2: Filter the collection .where(Field("population").greaterThan(100000)) // Step 3: Sort the remaining documents .sort([Field("name").ascending()]) // Step 4: Return the top 10. Note applying the limit earlier in the pipeline would have // unintentional results. .limit(10)
Kotlin
val pipeline = db.pipeline() // Step 1: Start a query with collection scope .collection("cities") // Step 2: Filter the collection .where(field("population").greaterThan(100000)) // Step 3: Sort the remaining documents .sort(field("name").ascending()) // Step 4: Return the top 10. Note applying the limit earlier in the pipeline would have // unintentional results. .limit(10)
Java
Pipeline pipeline = db.pipeline() // Step 1: Start a query with collection scope .collection("cities") // Step 2: Filter the collection .where(field("population").greaterThan(100000)) // Step 3: Sort the remaining documents .sort(field("name").ascending()) // Step 4: Return the top 10. Note applying the limit earlier in the pipeline would have // unintentional results. .limit(10);
Python
from google.cloud.firestore_v1.pipeline_expressions import Field pipeline = ( client.pipeline() .collection("cities") .where(Field.of("population").greater_than(100_000)) .sort(Field.of("name").ascending()) .limit(10) )
Initialization
Pipeline operations have a very familiar syntax coming from existing Cloud Firestore queries. To get started, you initialize a query by writing the following:
Web
const { getFirestore } = require("firebase/firestore"); const { execute } = require("firebase/firestore/pipelines"); const database = getFirestore(app, "enterprise"); const pipeline = database.pipeline();
Swift
let firestore = Firestore.firestore(database: "enterprise") let pipeline = firestore.pipeline()
Kotlin
val firestore = Firebase.firestore("enterprise") val pipeline = firestore.pipeline()
Java
FirebaseFirestore firestore = FirebaseFirestore.getInstance("enterprise"); PipelineSource pipeline = firestore.pipeline();
Python
firestore_client = firestore.client(default_app, "your-new-enterprise-database") pipeline = firestore_client.pipeline()
Structure
There are a few terms that are important to understand when creating Pipeline operations: stages, expressions, and functions.

Stages: A pipeline may consist of one or more stages. Logically, these represent the series of steps (or stages) taken to execute the query. Note: In practice, stages may be executed out of order to improve performance. However, this does not modify the intent or correctness of the query.
Expressions: Stages will often accept an expression allowing you to express more complex queries. Expression may be simple and consist of a single function like eq("a", 1). You can also express more complex expressions by nesting expressions like and(eq("a", 1), eq("b", 2)).
Field vs. Constant References
Pipeline operations support complex expressions. As such, it may be necessary to differentiate whether a value represents a field or a constant. Consider the following example:
Web
const pipeline = db.pipeline() .collection("cities") .where(field("name").equal(constant("Toronto")));
Swift
let pipeline = db.pipeline() .collection("cities") .where(Field("name").equal(Constant("Toronto")))
Kotlin
val pipeline = db.pipeline() .collection("cities") .where(field("name").equal(constant("Toronto")))
Java
Pipeline pipeline = db.pipeline() .collection("cities") .where(field("name").equal(constant("Toronto")));
Python
from google.cloud.firestore_v1.pipeline_expressions import Field, Constant pipeline = ( client.pipeline() .collection("cities") .where(Field.of("name").equal(Constant.of("Toronto"))) )
Stages
Input Stages
The input stage represents the first stage of a query. It defines the initial
set of documents you are querying over. For Pipeline operations, this largely is
similar to existing queries, where most queries start with either a
collection(...) or collection_group(...) stage. Two new input stages are
database() and documents(...) where database() allows returning all
documents in the database, while documents(...) acts identical to a batch
read.
Web
let results; // Return all restaurants in San Francisco results = await execute(db.pipeline().collection("cities/sf/restaurants")); // Return all restaurants results = await execute(db.pipeline().collectionGroup("restaurants")); // Return all documents across all collections in the database (the entire database) results = await execute(db.pipeline().database()); // Batch read of 3 documents results = await execute(db.pipeline().documents([ doc(db, "cities", "SF"), doc(db, "cities", "DC"), doc(db, "cities", "NY") ]));
Swift
var results: Pipeline.Snapshot // Return all restaurants in San Francisco results = try await db.pipeline().collection("cities/sf/restaurants").execute() // Return all restaurants results = try await db.pipeline().collectionGroup("restaurants").execute() // Return all documents across all collections in the database (the entire database) results = try await db.pipeline().database().execute() // Batch read of 3 documents results = try await db.pipeline().documents([ db.collection("cities").document("SF"), db.collection("cities").document("DC"), db.collection("cities").document("NY") ]).execute()
Kotlin
var results: Task<Pipeline.Snapshot> // Return all restaurants in San Francisco results = db.pipeline().collection("cities/sf/restaurants").execute() // Return all restaurants results = db.pipeline().collectionGroup("restaurants").execute() // Return all documents across all collections in the database (the entire database) results = db.pipeline().database().execute() // Batch read of 3 documents results = db.pipeline().documents( db.collection("cities").document("SF"), db.collection("cities").document("DC"), db.collection("cities").document("NY") ).execute()
Java
Task<Pipeline.Snapshot> results; // Return all restaurants in San Francisco results = db.pipeline().collection("cities/sf/restaurants").execute(); // Return all restaurants results = db.pipeline().collectionGroup("restaurants").execute(); // Return all documents across all collections in the database (the entire database) results = db.pipeline().database().execute(); // Batch read of 3 documents results = db.pipeline().documents( db.collection("cities").document("SF"), db.collection("cities").document("DC"), db.collection("cities").document("NY") ).execute();
Python
# Return all restaurants in San Francisco results = client.pipeline().collection("cities/sf/restaurants").execute() # Return all restaurants results = client.pipeline().collection_group("restaurants").execute() # Return all documents across all collections in the database (the entire database) results = client.pipeline().database().execute() # Batch read of 3 documents results = ( client.pipeline() .documents( client.collection("cities").document("SF"), client.collection("cities").document("DC"), client.collection("cities").document("NY"), ) .execute() )
As with all other stages, the order of results from these input stages is not
stable. A sort(...) operator should always be added if a specific ordering is
desired.
Where
The where(...) stage acts as a traditional filter operation over documents
generated from the previous stage and mostly mirrors the existing "where" syntax
for existing queries. Any document for where a given expression evaluates to a
non-true value is filtered out from the returned documents.
Multiple where(...) statements can be chained together, and act as an and(...) expression. For example the following two queries are logically equivalent and can be used interchangeably.
Web
let results; results = await execute(db.pipeline().collection("books") .where(field("rating").equal(5)) .where(field("published").lessThan(1900)) ); results = await execute(db.pipeline().collection("books") .where(and(field("rating").equal(5), field("published").lessThan(1900))) );
Swift
var results: Pipeline.Snapshot results = try await db.pipeline().collection("books") .where(Field("rating").equal(5)) .where(Field("published").lessThan(1900)) .execute() results = try await db.pipeline().collection("books") .where(Field("rating").equal(5) && Field("published").lessThan(1900)) .execute()
Kotlin
var results: Task<Pipeline.Snapshot> results = db.pipeline().collection("books") .where(field("rating").equal(5)) .where(field("published").lessThan(1900)) .execute() results = db.pipeline().collection("books") .where(Expression.and(field("rating").equal(5), field("published").lessThan(1900))) .execute()
Java
Task<Pipeline.Snapshot> results; results = db.pipeline().collection("books") .where(field("rating").equal(5)) .where(field("published").lessThan(1900)) .execute(); results = db.pipeline().collection("books") .where(Expression.and( field("rating").equal(5), field("published").lessThan(1900) )) .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import And, Field results = ( client.pipeline() .collection("books") .where(Field.of("rating").equal(5)) .where(Field.of("published").less_than(1900)) .execute() ) results = ( client.pipeline() .collection("books") .where(And(Field.of("rating").equal(5), Field.of("published").less_than(1900))) .execute() )
Select / Add & Remove Fields
The select(...), add_fields(...), & remove_fields(...) all allow you to
modify the fields being returned from a previous stage. These three are
generally referred to as projection-style stages.
The select(...) and add_fields(...) allow you to specify the result of an
expression to a user-provided field name. An expression that results in an error
will result in a null value. The select(...) will only return the documents
with specified field names while add_fields(...) extends the schema of the
previous stage (potentially overwriting values with identical field names).
The remove_fields(...) allows specifying a set of fields to remove from the
previous stage. Specifying field names that do not exist is a no-op.
See the Restrict the Fields to Return section below but in general using such a stage to restrict the result to only the fields needed in the client is helpful in reducing the cost and latency for most queries.
Aggregate / Distinct
The aggregate(...) stage lets you perform a series of aggregations over the input documents. By default, all documents are aggregated together, but an optional grouping argument can be provided, allowing the input documents aggregated into different buckets.
Web
const results = await execute(db.pipeline() .collection("books") .aggregate( field("rating").average().as("avg_rating") ) .distinct(field("genre")) );
Swift
let results = try await db.pipeline() .collection("books") .aggregate([ Field("rating").average().as("avg_rating") ], groups: [ Field("genre") ]) .execute()
Kotlin
val results = db.pipeline() .collection("books") .aggregate( AggregateStage .withAccumulators(AggregateFunction.average("rating").alias("avg_rating")) .withGroups(field("genre")) ) .execute()
Java
Task<Pipeline.Snapshot> results = db.pipeline() .collection("books") .aggregate(AggregateStage .withAccumulators( AggregateFunction.average("rating").alias("avg_rating")) .withGroups(field("genre"))) .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field results = ( client.pipeline() .collection("books") .aggregate( Field.of("rating").average().as_("avg_rating"), groups=[Field.of("genre")] ) .execute() )
When groupings is not specified, this stage will produce only a single document, otherwise a document will be generated for each unique combination of groupings values.
The distinct(...) stage is a simplified aggregation operator which allows generating just the unique groupings without any accumulators. It behaves identically to that of aggregate(...) in all other regards. An example is given below:
Web
const results = await execute(db.pipeline() .collection("books") .distinct( field("author").toUpper().as("author"), field("genre") ) );
Swift
let results = try await db.pipeline() .collection("books") .distinct([ Field("author").toUpper().as("author"), Field("genre") ]) .execute()
Kotlin
val results = db.pipeline() .collection("books") .distinct( field("author").toUpper().alias("author"), field("genre") ) .execute()
Java
Task<Pipeline.Snapshot> results = db.pipeline() .collection("books") .distinct( field("author").toUpper().alias("author"), field("genre") ) .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field results = ( client.pipeline() .collection("books") .distinct(Field.of("author").to_upper().as_("author"), "genre") .execute() )
Functions
Functions are a building block for creating expressions and complex queries. For a complete list of functions with examples, refer to the Functions reference. As a quick reminder, consider the structure of a typical query:

Many stages accept expressions which contain one or more functions. The most common function usage will be found in the where(...) and select(...)stages. There are two main types of functions that you should be familiar with:
Web
let results; // Type 1: Scalar (for use in non-aggregation stages) // Example: Return the min store price for each book. results = await execute(db.pipeline().collection("books") .select(field("current").logicalMinimum(field("updated")).as("price_min")) ); // Type 2: Aggregation (for use in aggregate stages) // Example: Return the min price of all books. results = await execute(db.pipeline().collection("books") .aggregate(field("price").minimum().as("min_price")) );
Swift
var results: Pipeline.Snapshot // Type 1: Scalar (for use in non-aggregation stages) // Example: Return the min store price for each book. results = try await db.pipeline().collection("books") .select([ Field("current").logicalMinimum(["updated"]).as("price_min") ]) .execute() // Type 2: Aggregation (for use in aggregate stages) // Example: Return the min price of all books. results = try await db.pipeline().collection("books") .aggregate([Field("price").minimum().as("min_price")]) .execute()
Kotlin
var results: Task<Pipeline.Snapshot> // Type 1: Scalar (for use in non-aggregation stages) // Example: Return the min store price for each book. results = db.pipeline().collection("books") .select( field("current").logicalMinimum("updated").alias("price_min") ) .execute() // Type 2: Aggregation (for use in aggregate stages) // Example: Return the min price of all books. results = db.pipeline().collection("books") .aggregate(AggregateFunction.minimum("price").alias("min_price")) .execute()
Java
Task<Pipeline.Snapshot> results; // Type 1: Scalar (for use in non-aggregation stages) // Example: Return the min store price for each book. results = db.pipeline().collection("books") .select( field("current").logicalMinimum("updated").alias("price_min") ) .execute(); // Type 2: Aggregation (for use in aggregate stages) // Example: Return the min price of all books. results = db.pipeline().collection("books") .aggregate(AggregateFunction.minimum("price").alias("min_price")) .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field # Type 1: Scalar (for use in non-aggregation stages) # Example: Return the min store price for each book. results = ( client.pipeline() .collection("books") .select( Field.of("current").logical_minimum(Field.of("updated")).as_("price_min") ) .execute() ) # Type 2: Aggregation (for use in aggregate stages) # Example: Return the min price of all books. results = ( client.pipeline() .collection("books") .aggregate(Field.of("price").minimum().as_("min_price")) .execute() )
Limits
For the most part Enterprise Edition doesn't impose limits on the shape of the
query. In other words, you're not limited to a small number of values in an IN
or OR query. Instead, there are two primary limits you should be aware of:
- Deadline: 60 seconds (identical to Standard Edition).
- Memory Usage: 128 MiB limit on the amount of materialized data during query execution.
Errors
You may encounter failed queries for a number of reasons. Here is a link to common errors and the associated action you can take:
| Error Code | Action |
DEADLINE_EXCEEDED
|
The query you are executing exceeds a 60 second deadline and requires additional optimization. See the performance section for tips. If you are unable to root cause the problem, reach out to the team. |
RESOURCE_EXHAUSTED
|
The query you are executing exceeds the memory limits and requires additional optimization. See the performance section for tips. If you are unable to root cause the problem, reach out to the team. |
INTERNAL
|
Contact the team for support. |
Performance
Unlike existing queries, Pipeline operations do not require that an index is always present. This means that a query can exhibit higher latency compared to existing queries which would have just failed immediately with a FAILED_PRECONDITION missing index error. To improve the performance of Pipeline operations, there are a couple of steps you can take.
Create Indexes
Index Used
Query explain lets you identify if your query is being served by an index or falling back to a less efficient operation like a table scan. If your query is not being fully served from an index, you can create an index by following the instructions.
Creating Indexes
You can follow the existing index management documentation to create indexes. Before creating an index, familiarize yourself with general best practices with indexes in Cloud Firestore. To ensure your query can leverage indexes, follow the best practices to create indexes with fields in the following order:
- All fields that will be used in equality filters (in any order)
- All fields that will be sorted on (in the same order)
- Fields that will be used in range or inequality filters in decreasing order of query constraint selectivity
For example, for the following query,
Web
const results = await execute(db.pipeline() .collection("books") .where(field("published").lessThan(1900)) .where(field("genre").equal("Science Fiction")) .where(field("rating").greaterThan(4.3)) .sort(field("published").descending()) );
Swift
let results = try await db.pipeline() .collection("books") .where(Field("published").lessThan(1900)) .where(Field("genre").equal("Science Fiction")) .where(Field("rating").greaterThan(4.3)) .sort([Field("published").descending()]) .execute()
Kotlin
val results = db.pipeline() .collection("books") .where(field("published").lessThan(1900)) .where(field("genre").equal("Science Fiction")) .where(field("rating").greaterThan(4.3)) .sort(field("published").descending()) .execute()
Java
Task<Pipeline.Snapshot> results = db.pipeline() .collection("books") .where(field("published").lessThan(1900)) .where(field("genre").equal("Science Fiction")) .where(field("rating").greaterThan(4.3)) .sort(field("published").descending()) .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field results = ( client.pipeline() .collection("books") .where(Field.of("published").less_than(1900)) .where(Field.of("genre").equal("Science Fiction")) .where(Field.of("rating").greater_than(4.3)) .sort(Field.of("published").descending()) .execute() )
The recommended index is a collection scope index on books for (genre [...], published DESC, avg_rating DESC).
Index density
Cloud Firestore supports sparse and non-sparse indexes. For more information, see Index density.
Covered Queries + Secondary Indexes
Cloud Firestore can skip fetching the full document and just return results from the index if all fields being returned are present in a secondary index. This normally leads to a significant latency (and cost) improvement. Using the sample query below:
Web
const results = await execute(db.pipeline() .collection("books") .where(field("category").like("%fantasy%")) .where(field("title").exists()) .where(field("author").exists()) .select(field("title"), field("author")) );
Swift
let results = try await db.pipeline() .collection("books") .where(Field("category").like("%fantasy%")) .where(Field("title").exists()) .where(Field("author").exists()) .select([Field("title"), Field("author")]) .execute()
Kotlin
val results = db.pipeline() .collection("books") .where(field("category").like("%fantasy%")) .where(field("title").exists()) .where(field("author").exists()) .select(field("title"), field("author")) .execute()
Java
Task<Pipeline.Snapshot> results = db.pipeline() .collection("books") .where(field("category").like("%fantasy%")) .where(field("title").exists()) .where(field("author").exists()) .select(field("title"), field("author")) .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field results = ( client.pipeline() .collection("books") .where(Field.of("category").like("%fantasy%")) .where(Field.of("title").exists()) .where(Field.of("author").exists()) .select("title", "author") .execute() )
If the database already has a collection scope index on books for (category [...], title [...], author [...]) then it can avoid fetching anything from the main documents themselves. In this case the order in the index does not matter, [...] is used to signify that.
Restrict the Fields to Return
By default, a Cloud Firestore query returns all fields in a document, analogous to a SELECT * in traditional systems. If however your application only needs a subset of the fields, the select(...) or restrict(...) stages can be used to push this filtering server-side. This will decrease both the response size (decreasing the network egress cost) as well as improving latency.
Troubleshooting Tools
Query Explain
Query Explain lets you bring visibility in execution metrics and details about indexes used.
Metrics
Pipeline operations if fully integrated with existing Cloud Firestore metrics.
Known Issues / Limitations
Specialized Indexes
Pipeline operations do not yet support existing array-contains & vector index types. Instead of just rejecting such queries, Cloud Firestore will attempt to use other existing ascending & descending indexes. It is expected that during private preview Pipeline operations with such array_contains or find_nearest expressions will be slower than their existing equivalents due to this.
Pagination
Support for easily paginating over a result set is not supported during the private preview. This can be worked around by chaining up equivalent where(...) & sort(...) stages as shown below.
Web
// Existing pagination via `startAt()` const q = query(collection(db, "cities"), orderBy("population"), startAt(1000000)); // Private preview workaround using pipelines const pageSize = 2; const pipeline = db.pipeline() .collection("cities") .select("name", "population", "__name__") .sort(field("population").descending(), field("__name__").ascending()); // Page 1 results let snapshot = await execute(pipeline.limit(pageSize)); // End of page marker const lastDoc = snapshot.results[snapshot.results.length - 1]; // Page 2 results snapshot = await execute( pipeline .where( or( and( field("population").equal(lastDoc.get("population")), field("__name__").greaterThan(lastDoc.ref) ), field("population").lessThan(lastDoc.get("population")) ) ) .limit(pageSize) );
Swift
// Existing pagination via `start(at:)` let query = db.collection("cities").order(by: "population").start(at: [1000000]) // Private preview workaround using pipelines let pipeline = db.pipeline() .collection("cities") .where(Field("population").greaterThanOrEqual(1000000)) .sort([Field("population").descending()])
Kotlin
// Existing pagination via `startAt()` val query = db.collection("cities").orderBy("population").startAt(1000000) // Private preview workaround using pipelines val pipeline = db.pipeline() .collection("cities") .where(field("population").greaterThanOrEqual(1000000)) .sort(field("population").descending())
Java
// Existing pagination via `startAt()` Query query = db.collection("cities").orderBy("population").startAt(1000000); // Private preview workaround using pipelines Pipeline pipeline = db.pipeline() .collection("cities") .where(field("population").greaterThanOrEqual(1000000)) .sort(field("population").descending());
Python
from google.cloud.firestore_v1.pipeline_expressions import Field # Existing pagination via `start_at()` query = ( client.collection("cities") .order_by("population") .start_at({"population": 1_000_000}) ) # Private preview workaround using pipelines pipeline = ( client.pipeline() .collection("cities") .where(Field.of("population").greater_than_or_equal(1_000_000)) .sort(Field.of("population").descending()) )
Emulator Support
The emulator does not yet support Pipeline operations.
Realtime and Offline Support
Pipeline operations don't have realtime and offline capabilities yet.
What's next
- Begin exploring the Functions and Stages reference documentation.