開始使用 Firestore Pipelines 作業

背景

管道查詢是 Firestore 的全新查詢介面。這項服務提供進階查詢功能,包括複雜的運算式。此外,這個版本也支援許多新函式,例如 min, max, substring, regex_matcharray_contains_all。使用管道查詢時,您也可以完全不建立索引,簡化新查詢的開發程序。此外,Pipeline Queries 也移除了許多查詢形狀限制,可讓您指定大型 inor 查詢。

開始使用

如要安裝及初始化用戶端 SDK,請參閱入門指南中的操作說明。

語法

以下各節將概述管道查詢的語法。

概念

與管道查詢不同的是,管道查詢會明確導入「階段」排序。因此可以表示更複雜的查詢。不過,這與現有查詢介面有顯著差異,因為現有介面會隱含階段順序。請參考下列管道查詢範例:

Web

const pipeline = db.pipeline()
  // Step 1: Start a query with collection scope
  .collection("cities")
  // Step 2: Filter the collection
  .where(field("population").greaterThan(100000))
  // Step 3: Sort the remaining documents
  .sort(field("name").ascending())
  // Step 4: Return the top 10. Note applying the limit earlier in the
  // pipeline would have unintentional results.
  .limit(10);
Swift
let pipeline = db.pipeline()
  // Step 1: Start a query with collection scope
  .collection("cities")
  // Step 2: Filter the collection
  .where(Field("population").greaterThan(100000))
  // Step 3: Sort the remaining documents
  .sort([Field("name").ascending()])
  // Step 4: Return the top 10. Note applying the limit earlier in the pipeline would have
  // unintentional results.
  .limit(10)

Kotlin

val pipeline = db.pipeline()
    // Step 1: Start a query with collection scope
    .collection("cities")
    // Step 2: Filter the collection
    .where(field("population").greaterThan(100000))
    // Step 3: Sort the remaining documents
    .sort(field("name").ascending())
    // Step 4: Return the top 10. Note applying the limit earlier in the pipeline would have
    // unintentional results.
    .limit(10)

Java

Pipeline pipeline = db.pipeline()
    // Step 1: Start a query with collection scope
    .collection("cities")
    // Step 2: Filter the collection
    .where(field("population").greaterThan(100000))
    // Step 3: Sort the remaining documents
    .sort(field("name").ascending())
    // Step 4: Return the top 10. Note applying the limit earlier in the pipeline would have
    // unintentional results.
    .limit(10);
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

pipeline = (
    client.pipeline()
    .collection("cities")
    .where(Field.of("population").greater_than(100_000))
    .sort(Field.of("name").ascending())
    .limit(10)
)

初始化

Pipeline Queries 的語法與現有的 Cloud Firestore 查詢非常相似,如要開始,請編寫下列內容來初始化查詢:

Web

const { getFirestore } = require("firebase/firestore");
const { execute } = require("firebase/firestore/pipelines");
const database = getFirestore(app, "enterprise");
const pipeline = database.pipeline();
Swift
let firestore = Firestore.firestore(database: "enterprise")
let pipeline = firestore.pipeline()

Kotlin

val firestore = Firebase.firestore("enterprise")
val pipeline = firestore.pipeline()

Java

FirebaseFirestore firestore = FirebaseFirestore.getInstance("enterprise");
PipelineSource pipeline = firestore.pipeline();
Python
firestore_client = firestore.client(default_app, "your-new-enterprise-database")
pipeline = firestore_client.pipeline()

結構

建立管道查詢時,請務必瞭解幾個重要術語:階段、運算式和函式。

範例:查詢中的階段和運算式

階段:管道可由一或多個階段組成。從邏輯上來看,這些代表執行查詢時採取的步驟 (或階段) 序列。注意:在實際情況中,階段可能會依不同順序執行,以提升效能。不過,這不會修改查詢的意圖或正確性。

運算式:階段通常會接受運算式,讓您表達更複雜的查詢。運算式可能很簡單,只包含單一函式 (例如 eq("a", 1))。您也可以透過巢狀運算式 (例如 and(eq("a", 1), eq("b", 2)).) 表示更複雜的運算式

欄位與常數參照

Pipeline 查詢支援複雜的運算式。因此,您可能需要區分值代表的是「欄位」還是「常數」。請見如下範例:

Web

const pipeline = db.pipeline()
  .collection("cities")
  .where(field("name").equal(constant("Toronto")));
Swift
let pipeline = db.pipeline()
  .collection("cities")
  .where(Field("name").equal(Constant("Toronto")))

Kotlin

val pipeline = db.pipeline()
    .collection("cities")
    .where(field("name").equal(constant("Toronto")))

Java

Pipeline pipeline = db.pipeline()
    .collection("cities")
    .where(field("name").equal(constant("Toronto")));
Python
from google.cloud.firestore_v1.pipeline_expressions import Field, Constant

pipeline = (
    client.pipeline()
    .collection("cities")
    .where(Field.of("name").equal(Constant.of("Toronto")))
)

階段

輸入階段

輸入階段代表查詢的第一個階段。這會定義您要查詢的初始文件集。對於管道查詢,這與現有查詢大致相似,大多數查詢都是以 collection(...)collection_group(...) 階段開始。兩個新的輸入階段為 database()documents(...),其中 database() 可傳回資料庫中的「所有」文件,而 documents(...) 的作用與批次讀取相同。

Web

let results;

// Return all restaurants in San Francisco
results = await execute(db.pipeline().collection("cities/sf/restaurants"));

// Return all restaurants
results = await execute(db.pipeline().collectionGroup("restaurants"));

// Return all documents across all collections in the database (the entire database)
results = await execute(db.pipeline().database());

// Batch read of 3 documents
results = await execute(db.pipeline().documents([
  doc(db, "cities", "SF"),
  doc(db, "cities", "DC"),
  doc(db, "cities", "NY")
]));
Swift
var results: Pipeline.Snapshot

// Return all restaurants in San Francisco
results = try await db.pipeline().collection("cities/sf/restaurants").execute()

// Return all restaurants
results = try await db.pipeline().collectionGroup("restaurants").execute()

// Return all documents across all collections in the database (the entire database)
results = try await db.pipeline().database().execute()

// Batch read of 3 documents
results = try await db.pipeline().documents([
  db.collection("cities").document("SF"),
  db.collection("cities").document("DC"),
  db.collection("cities").document("NY")
]).execute()

Kotlin

var results: Task<Pipeline.Snapshot>

// Return all restaurants in San Francisco
results = db.pipeline().collection("cities/sf/restaurants").execute()

// Return all restaurants
results = db.pipeline().collectionGroup("restaurants").execute()

// Return all documents across all collections in the database (the entire database)
results = db.pipeline().database().execute()

// Batch read of 3 documents
results = db.pipeline().documents(
    db.collection("cities").document("SF"),
    db.collection("cities").document("DC"),
    db.collection("cities").document("NY")
).execute()

Java

Task<Pipeline.Snapshot> results;

// Return all restaurants in San Francisco
results = db.pipeline().collection("cities/sf/restaurants").execute();

// Return all restaurants
results = db.pipeline().collectionGroup("restaurants").execute();

// Return all documents across all collections in the database (the entire database)
results = db.pipeline().database().execute();

// Batch read of 3 documents
results = db.pipeline().documents(
    db.collection("cities").document("SF"),
    db.collection("cities").document("DC"),
    db.collection("cities").document("NY")
).execute();
Python
# Return all restaurants in San Francisco
results = client.pipeline().collection("cities/sf/restaurants").execute()

# Return all restaurants
results = client.pipeline().collection_group("restaurants").execute()

# Return all documents across all collections in the database (the entire database)
results = client.pipeline().database().execute()

# Batch read of 3 documents
results = (
    client.pipeline()
    .documents(
        client.collection("cities").document("SF"),
        client.collection("cities").document("DC"),
        client.collection("cities").document("NY"),
    )
    .execute()
)

與所有其他階段一樣,這些輸入階段的結果順序並不穩定。如要指定排序方式,請務必加入 sort(...) 運算子。

其中

where(...) 階段會對先前階段產生的文件執行傳統的篩選作業,且大多會反映現有查詢的現有「where」語法。如果指定運算式評估結果為非 true 值,系統會從傳回的文件中篩除該文件。

多個 where(...) 陳述式可以鏈結在一起,並做為 and(...) 運算式。舉例來說,以下兩個查詢在邏輯上相等,可以交替使用。

Web

let results;

results = await execute(db.pipeline().collection("books")
  .where(field("rating").equal(5))
  .where(field("published").lessThan(1900))
);

results = await execute(db.pipeline().collection("books")
  .where(and(field("rating").equal(5), field("published").lessThan(1900)))
);
Swift
var results: Pipeline.Snapshot

results = try await db.pipeline().collection("books")
  .where(Field("rating").equal(5))
  .where(Field("published").lessThan(1900))
  .execute()

results = try await db.pipeline().collection("books")
  .where(Field("rating").equal(5) && Field("published").lessThan(1900))
  .execute()

Kotlin

var results: Task<Pipeline.Snapshot>

results = db.pipeline().collection("books")
    .where(field("rating").equal(5))
    .where(field("published").lessThan(1900))
    .execute()

results = db.pipeline().collection("books")
    .where(Expression.and(field("rating").equal(5),
      field("published").lessThan(1900)))
    .execute()

Java

Task<Pipeline.Snapshot> results;

results = db.pipeline().collection("books")
    .where(field("rating").equal(5))
    .where(field("published").lessThan(1900))
    .execute();

results = db.pipeline().collection("books")
    .where(Expression.and(
        field("rating").equal(5),
        field("published").lessThan(1900)
    ))
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import And, Field

results = (
    client.pipeline()
    .collection("books")
    .where(Field.of("rating").equal(5))
    .where(Field.of("published").less_than(1900))
    .execute()
)

results = (
    client.pipeline()
    .collection("books")
    .where(And(Field.of("rating").equal(5), Field.of("published").less_than(1900)))
    .execute()
)

選取 / 新增及移除欄位

select(...)add_fields(...)remove_fields(...) 都可讓您修改先前階段傳回的欄位。這三種通常稱為投影式舞台。

select(...)add_fields(...) 可讓您將運算式結果指定給使用者提供的欄位名稱。如果運算式導致錯誤,就會產生 null 值。select(...) 只會傳回具有指定欄位名稱的文件,而 add_fields(...) 則會擴充前一階段的結構定義 (可能會覆寫具有相同欄位名稱的值)。

remove_fields(...) 可指定要從前一階段移除的一組欄位。指定不存在的欄位名稱不會有任何作用。

請參閱下方的「限制要傳回的欄位」一節,但一般來說,使用這類階段將結果限制為用戶端中需要的欄位,有助於減少大多數查詢的成本和延遲時間。

匯總 / 刪除重複記錄

aggregate(...) 階段可讓您對輸入文件執行一系列匯總作業。根據預設,所有文件都會匯總在一起,但您可以提供選用的 grouping 引數,將輸入文件匯總到不同 bucket。

Web

const results = await execute(db.pipeline()
  .collection("books")
  .aggregate(
    field("rating").average().as("avg_rating")
  )
  .distinct(field("genre"))
);
Swift
let results = try await db.pipeline()
  .collection("books")
  .aggregate([
    Field("rating").average().as("avg_rating")
  ], groups: [
    Field("genre")
  ])
  .execute()

Kotlin

val results = db.pipeline()
    .collection("books")
    .aggregate(
        AggregateStage
            .withAccumulators(AggregateFunction.average("rating").alias("avg_rating"))
            .withGroups(field("genre"))
    )
    .execute()

Java

Task<Pipeline.Snapshot> results = db.pipeline()
    .collection("books")
    .aggregate(AggregateStage
        .withAccumulators(
            AggregateFunction.average("rating").alias("avg_rating"))
        .withGroups(field("genre")))
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

results = (
    client.pipeline()
    .collection("books")
    .aggregate(
        Field.of("rating").average().as_("avg_rating"), groups=[Field.of("genre")]
    )
    .execute()
)

如未指定 groupings,這個階段只會產生單一文件;否則,系統會為每個不重複的 groupings 值組合產生文件。

distinct(...) 階段是簡化的匯總運算子,可產生不含任何累加器的不重複 groupings。在所有其他方面,其行為與 aggregate(...) 完全相同。範例如下:

Web

const results = await execute(db.pipeline()
  .collection("books")
  .distinct(
    field("author").toUpper().as("author"),
    field("genre")
  )
);
Swift
let results = try await db.pipeline()
  .collection("books")
  .distinct([
    Field("author").toUpper().as("author"),
    Field("genre")
  ])
  .execute()

Kotlin

val results = db.pipeline()
    .collection("books")
    .distinct(
        field("author").toUpper().alias("author"),
        field("genre")
    )
    .execute()

Java

Task<Pipeline.Snapshot> results = db.pipeline()
    .collection("books")
    .distinct(
        field("author").toUpper().alias("author"),
        field("genre")
    )
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

results = (
    client.pipeline()
    .collection("books")
    .distinct(Field.of("author").to_upper().as_("author"), "genre")
    .execute()
)

函式

函式是建立運算式和複雜查詢的建構區塊。如需函式完整清單和範例,請參閱函式參考資料。快速複習一下,一般查詢的結構如下:

範例:查詢中的階段和函式

許多階段都接受包含一或多個函式的運算式。最常見的函式用法是在 where(...)select(...) 階段。您應熟悉兩種主要函式:

Web

let results;

// Type 1: Scalar (for use in non-aggregation stages)
// Example: Return the min store price for each book.
results = await execute(db.pipeline().collection("books")
  .select(field("current").logicalMinimum(field("updated")).as("price_min"))
);

// Type 2: Aggregation (for use in aggregate stages)
// Example: Return the min price of all books.
results = await execute(db.pipeline().collection("books")
  .aggregate(field("price").minimum().as("min_price"))
);
Swift
var results: Pipeline.Snapshot

// Type 1: Scalar (for use in non-aggregation stages)
// Example: Return the min store price for each book.
results = try await db.pipeline().collection("books")
  .select([
    Field("current").logicalMinimum(["updated"]).as("price_min")
  ])
  .execute()

// Type 2: Aggregation (for use in aggregate stages)
// Example: Return the min price of all books.
results = try await db.pipeline().collection("books")
  .aggregate([Field("price").minimum().as("min_price")])
  .execute()

Kotlin

var results: Task<Pipeline.Snapshot>

// Type 1: Scalar (for use in non-aggregation stages)
// Example: Return the min store price for each book.
results = db.pipeline().collection("books")
    .select(
        field("current").logicalMinimum("updated").alias("price_min")
    )
    .execute()

// Type 2: Aggregation (for use in aggregate stages)
// Example: Return the min price of all books.
results = db.pipeline().collection("books")
    .aggregate(AggregateFunction.minimum("price").alias("min_price"))
    .execute()

Java

Task<Pipeline.Snapshot> results;

// Type 1: Scalar (for use in non-aggregation stages)
// Example: Return the min store price for each book.
results = db.pipeline().collection("books")
    .select(
        field("current").logicalMinimum("updated").alias("price_min")
    )
    .execute();

// Type 2: Aggregation (for use in aggregate stages)
// Example: Return the min price of all books.
results = db.pipeline().collection("books")
    .aggregate(AggregateFunction.minimum("price").alias("min_price"))
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

# Type 1: Scalar (for use in non-aggregation stages)
# Example: Return the min store price for each book.
results = (
    client.pipeline()
    .collection("books")
    .select(
        Field.of("current").logical_minimum(Field.of("updated")).as_("price_min")
    )
    .execute()
)

# Type 2: Aggregation (for use in aggregate stages)
# Example: Return the min price of all books.
results = (
    client.pipeline()
    .collection("books")
    .aggregate(Field.of("price").minimum().as_("min_price"))
    .execute()
)

限制

在大多數情況下,Enterprise 版不會限制查詢的形狀。也就是說,您在 INOR 查詢中,可使用的值數量不受限制。不過,您應注意以下兩項主要限制:

  • 期限:60 秒 (與標準版相同)。
  • 記憶體用量:查詢執行期間,具體化資料量上限為 128 MiB。

錯誤

查詢失敗的原因有很多。以下是常見錯誤的連結,以及您可以採取的相關動作:

錯誤代碼 動作
DEADLINE_EXCEEDED 您執行的查詢超過 60 秒期限,需要額外最佳化。請參閱「成效」一節,瞭解相關提示。如果無法找出問題的根本原因,請與團隊聯絡。
RESOURCE_EXHAUSTED 您執行的查詢超出記憶體限制,需要進行額外最佳化。請參閱「成效」一節,瞭解相關提示。如果無法找出問題的根本原因,請與團隊聯絡。
INTERNAL 如需支援,請與團隊聯絡

Performance

與現有查詢不同,管道查詢不一定需要索引。也就是說,與現有查詢相比,查詢可能會出現較高的延遲時間,而現有查詢會因缺少索引錯誤而立即失敗。FAILED_PRECONDITION如要提升管道查詢的效能,可以採取下列幾個步驟。

建立索引

使用的索引

查詢說明可協助您判斷查詢是否由索引提供服務,或是否會改用效率較低的作業 (例如資料表掃描)。如果查詢並非完全從索引提供服務,請按照操作說明建立索引。

建立索引

您可以按照現有的索引管理說明文件建立索引。建立索引前,請先熟悉 Firestore 索引的一般最佳做法。為確保查詢能運用索引,請按照最佳做法,依下列順序建立索引和欄位:

  1. 等式篩選器使用的所有欄位 (順序不限)
  2. 所有要排序的欄位 (依相同順序)
  3. 將用於範圍或不等式篩選器的欄位,依查詢限制選擇性遞減排序

舉例來說,如果查詢如下:

Web

const results = await execute(db.pipeline()
  .collection("books")
  .where(field("published").lessThan(1900))
  .where(field("genre").equal("Science Fiction"))
  .where(field("rating").greaterThan(4.3))
  .sort(field("published").descending())
);
Swift
let results = try await db.pipeline()
  .collection("books")
  .where(Field("published").lessThan(1900))
  .where(Field("genre").equal("Science Fiction"))
  .where(Field("rating").greaterThan(4.3))
  .sort([Field("published").descending()])
  .execute()

Kotlin

val results = db.pipeline()
    .collection("books")
    .where(field("published").lessThan(1900))
    .where(field("genre").equal("Science Fiction"))
    .where(field("rating").greaterThan(4.3))
    .sort(field("published").descending())
    .execute()

Java

Task<Pipeline.Snapshot> results = db.pipeline()
    .collection("books")
    .where(field("published").lessThan(1900))
    .where(field("genre").equal("Science Fiction"))
    .where(field("rating").greaterThan(4.3))
    .sort(field("published").descending())
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

results = (
    client.pipeline()
    .collection("books")
    .where(Field.of("published").less_than(1900))
    .where(Field.of("genre").equal("Science Fiction"))
    .where(Field.of("rating").greater_than(4.3))
    .sort(Field.of("published").descending())
    .execute()
)

建議的索引是 books(genre [...], published DESC, avg_rating DESC). 的集合範圍索引,適用於 (genre [...], published DESC, avg_rating DESC).

索引密度

Cloud Firestore 支援稀疏和非稀疏索引。詳情請參閱索引密度

涵蓋的查詢 + 次要索引

如果傳回的所有欄位都存在於次要索引中,Firestore 就能略過擷取完整文件,只從索引傳回結果。這通常會大幅改善延遲時間 (和成本)。使用下列查詢範例:

Web

const results = await execute(db.pipeline()
  .collection("books")
  .where(field("category").like("%fantasy%"))
  .where(field("title").exists())
  .where(field("author").exists())
  .select(field("title"), field("author"))
);
Swift
let results = try await db.pipeline()
  .collection("books")
  .where(Field("category").like("%fantasy%"))
  .where(Field("title").exists())
  .where(Field("author").exists())
  .select([Field("title"), Field("author")])
  .execute()

Kotlin

val results = db.pipeline()
    .collection("books")
    .where(field("category").like("%fantasy%"))
    .where(field("title").exists())
    .where(field("author").exists())
    .select(field("title"), field("author"))
    .execute()

Java

Task<Pipeline.Snapshot> results = db.pipeline()
    .collection("books")
    .where(field("category").like("%fantasy%"))
    .where(field("title").exists())
    .where(field("author").exists())
    .select(field("title"), field("author"))
    .execute();
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

results = (
    client.pipeline()
    .collection("books")
    .where(Field.of("category").like("%fantasy%"))
    .where(Field.of("title").exists())
    .where(Field.of("author").exists())
    .select("title", "author")
    .execute()
)

如果資料庫已在 (category [...], title [...], author [...]) 上為 books 建立集合範圍索引,即可避免從主要文件中擷取任何內容。在這種情況下,索引中的順序並不重要,[...] 用於表示這一點。

限制要傳回的欄位

根據預設,Firestore 查詢會傳回文件中的所有欄位,類似於傳統系統中的 SELECT *。不過,如果應用程式只需要部分欄位,則可以使用 select(...)restrict(...) 階段,將這項篩選作業推送至伺服器端。這項功能可縮減回應大小 (降低網路輸出成本),並縮短延遲時間。

疑難排解工具

查詢說明

查詢說明功能可讓您查看執行指標,以及所用索引的詳細資料。

指標

如果與現有的 Firestore 指標完全整合,則為管道查詢。

已知問題 / 限制

專用索引

管道查詢目前不支援現有的 array-containsvector 索引類型。Firestore 不會直接拒絕這類查詢,而是會嘗試使用其他現有的 ascending & descending 索引。預計在不公開預先發布期間,由於這個原因,含有這類 array_containsfind_nearest 運算式的 Pipeline 查詢會比現有對應項目慢。

分頁

不公開預先發布版不支援輕鬆分頁瀏覽結果集。如要解決這個問題,可以鏈結等效的 where(...)sort(...) 階段,如下所示。

Web

// Existing pagination via `startAt()`
const q =
  query(collection(db, "cities"), orderBy("population"), startAt(1000000));

// Private preview workaround using pipelines
const pageSize = 2;
const pipeline = db.pipeline()
  .collection("cities")
  .select("name", "population", "__name__")
  .sort(field("population").descending(), field("__name__").ascending());

// Page 1 results
let snapshot = await execute(pipeline.limit(pageSize));

// End of page marker
const lastDoc = snapshot.results[snapshot.results.length - 1];

// Page 2 results
snapshot = await execute(
  pipeline
    .where(
      or(
        and(
          field("population").equal(lastDoc.get("population")),
          field("__name__").greaterThan(lastDoc.ref)
        ),
        field("population").lessThan(lastDoc.get("population"))
      )
    )
    .limit(pageSize)
);
Swift
// Existing pagination via `start(at:)`
let query = db.collection("cities").order(by: "population").start(at: [1000000])

// Private preview workaround using pipelines
let pipeline = db.pipeline()
  .collection("cities")
  .where(Field("population").greaterThanOrEqual(1000000))
  .sort([Field("population").descending()])

Kotlin

// Existing pagination via `startAt()`
val query = db.collection("cities").orderBy("population").startAt(1000000)

// Private preview workaround using pipelines
val pipeline = db.pipeline()
    .collection("cities")
    .where(field("population").greaterThanOrEqual(1000000))
    .sort(field("population").descending())

Java

// Existing pagination via `startAt()`
Query query = db.collection("cities").orderBy("population").startAt(1000000);

// Private preview workaround using pipelines
Pipeline pipeline = db.pipeline()
    .collection("cities")
    .where(field("population").greaterThanOrEqual(1000000))
    .sort(field("population").descending());
Python
from google.cloud.firestore_v1.pipeline_expressions import Field

# Existing pagination via `start_at()`
query = (
    client.collection("cities")
    .order_by("population")
    .start_at({"population": 1_000_000})
)

# Private preview workaround using pipelines
pipeline = (
    client.pipeline()
    .collection("cities")
    .where(Field.of("population").greater_than_or_equal(1_000_000))
    .sort(Field.of("population").descending())
)

模擬器支援

模擬器目前不支援管道查詢。

即時和離線支援

管道查詢目前不支援即時和離線功能。

後續步驟