使用變更串流讀取即時資料

透過與 MongoDB 相容的 Firestore 變更串流,應用程式可以存取集合或整個資料庫的即時變更 (插入、更新和刪除)。變更串流會依修改時間排序更新。

您可以使用與 MongoDB 相容的 API 和傳統 MongoDB 驅動程式存取 Change Streams。Firestore 與 MongoDB 相容的變更串流實作方式,可透過獨特的寫入自動分割實作方式和讀取平行處理,處理任何寫入和讀取輸送量。這項功能可讓您建構高處理量的工作負載。您也可以改善 Cloud Firestore 和其他儲存空間解決方案之間的遷移和資料同步基礎架構。

除了與 MongoDB 驅動程式相容之外,您也可以使用 Cloud Firestore 平行讀取變更串流。這可讓您建構平行的高處理量讀取工作負載。每個串流都代表結果的平均分配分區。

變更串流支援下列功能:

  • 可設定的變更串流,範圍可為資料庫或集合。
  • 建立變更串流時指定的保留期限。預設保留期限為 7 天,最短保留期限為 1 天。保留期限必須是 1 天的倍數,最多 7 天。建立後即無法變更保留時間。如要變更保留期限,必須捨棄並重新建立變更串流。
  • deleteinsertupdatedrop 變更事件,可使用 db.collection.watch()db.watch() 觀察。
  • updateDescription.updatedFields 包含更新差異。
  • 所有 fullDocumentfullDocumentBeforeChange 選項。
    • 正在查詢完整文件是否有更新。
    • 文件在遭到取代、更新或刪除前的圖片。
    • 文件替換或更新後的圖片。
    • 如要取得一小時前的影像,必須啟用時間點復原 (PITR) 功能。
  • 所有繼續播放選項,包括 resumeAfterstartAfter
  • 使用 watch() 觀察變更時,您可以串連匯總階段,例如 $addFields$match$project$replaceRoot$replaceWith$set$unset

設定變更串流

如要建立、捨棄或查看資料庫的現有變更串流,請使用 Google Cloud 控制台。

角色和權限

如要建立、刪除及列出變更串流,主體分別需要 datastore.schemas.createdatastore.schemas.deletedatastore.schemas.list Identity and Access Management (IAM) 權限。

例如,Datastore 索引管理員 (roles/datastore.indexAdmin) 角色會授予這些權限。

建立變更串流

您必須先建立變更串流,才能開啟對應的變更串流游標。系統不支援在建立集合或資料庫時,自動啟用變更串流。

如要建立變更串流,請使用 Google Cloud 控制台。

  1. 前往 Google Cloud 控制台的「資料庫」頁面。

    前往「資料庫」

  2. 從清單中選取與 MongoDB 相容的 Firestore 資料庫。「Firestore Studio」面板隨即開啟。
  3. 在「Explorer」面板中,找出「變更串流」節點,按一下 「更多動作」,然後選取「建立變更串流」
  4. 輸入不重複的變更串流名稱、範圍和保留期限,然後按一下「儲存」

查看變更串流

您可以在 Google Cloud 控制台中查看變更串流的詳細資料。

  1. 前往 Google Cloud 控制台的「資料庫」頁面。

    前往「資料庫」

  2. 從清單中選取與 MongoDB 相容的 Firestore 資料庫。「Firestore Studio」面板隨即開啟。
  3. 在「Explorer」面板中,找出「變更串流」節點。
  4. 如要開啟或關閉節點,請按一下「切換節點」

刪除變更串流

如要刪除變更串流,請使用 Google Cloud 控制台。

  1. 前往 Google Cloud 控制台的「資料庫」頁面。

    前往「資料庫」

  2. 從清單中選取與 MongoDB 相容的 Firestore 資料庫。「Firestore Studio」面板隨即開啟。
  3. 在「Explorer」面板中,找出「變更串流」節點。
  4. 如要開啟或關閉節點,請按一下「切換節點」
  5. 在「Explorer」中,找出要刪除的變更串流。
  6. 按一下「更多動作」圖示 ,然後選取「刪除變更串流」
  7. 在對話方塊中輸入變更串流名稱來確認刪除,然後按一下「Delete」(刪除)

開啟或繼續變更串流游標

下列範例說明如何建立、繼續及設定變更串流游標。

建立變更串流指標前,您必須先為資料庫或集合建立變更串流

建立變更串流指標

如要建立新的變更串流游標,請在 MongoDB 驅動程式中使用 watch 方法。如要監聽資料庫的所有變更,請建立資料庫範圍的變更串流,並呼叫 db 物件的 watch 方法。

let cursor = db.watch()

如要建立以集合為範圍的游標,請先為該集合建立變更串流。然後,在對應的集合上呼叫 watch 方法。

let cursor = db.my_collection.watch()

您已建立變更串流游標,現在可以開始串流。舉例來說,如果您插入文件並在游標上呼叫 tryNext,變更串流就會顯示變更。

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

如果您更新及刪除文件,變更串流會顯示這些變更:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

繼續變更串流

如要繼續變更串流,請使用 resumeAfterstartAfter 選項。如要判斷從變更記錄中的哪個位置繼續 resumeAfterstartAfter,請使用繼續權杖。

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

如何使用 startAfter

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

在更新和刪除作業中加入前後圖片

如有需要,您可以在更新和刪除變更事件中加入文件的前後圖片。圖片的可用性取決於時間點復原 (PITR) 期間,如要讀取超過一小時前的文件圖片,必須啟用 PITR。

變更串流會利用 PITR 視窗,提供指定變更事件前後的文件檢視畫面。根據預設,更新事件會包含 updateDescription 欄位,這是更新作業修改的欄位差異。

如要在變更事件中加入前後圖片,您必須在變更串流查詢中指定 fullDocumentBeforeChangefullDocument 選項。

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

如果查詢嘗試讀取 PITR 保留期限外的文件,或未啟用 PITR,則 required 值會擲回伺服器端錯誤訊息。

如果圖片不再可用,您可以改用 whenAvailable 值傳回 null 值,而非擲回錯誤。

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

在更新中加入目前圖片

根據預設,更新事件會包含 updateDescription 欄位,這是更新作業修改的欄位差異。如要改為查詢整份文件的最新版本,請使用 fullDocument 選項中的 updateLookup 值。

這項功能不需要 PITR,會執行文件查詢。

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

平行讀取

如要提高總處理量,可以使用 firestoreWorkerConfig 選項,將變更串流查詢分散到多個工作站。每個工作站負責為一組不同的文件提供變更。您必須透過 runCommandaggregate 查詢建立平行游標。

舉例來說,您可以將變更串流分配給 3 個工作站,如下所示:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

變更串流和備份

備份還原作業無法使用變更串流設定和變更串流資料。如果還原含有變更串流的資料庫,您必須在目標資料庫中重新建立這些變更串流,才能開啟該資料庫的游標。

帳單

行為差異

以下章節說明與 MongoDB 相容的 Firestore 和 MongoDB 的 Change Streams 差異。

updateDescription

updateDescriptionupdate 事件中的文件,用於說明更新作業更新或移除的欄位。在 Cloud Firestore 中,顯著差異如下:

  • updateDescription 中,系統不會填入 truncatedArraysdisambiguatedPaths 欄位。
  • updateDescription.updatedFields 代表套用變異前後的文件,在前後圖片之間的標準差異。

假設文件的初始狀態如下:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

情境 1:只變動陣列的第一個元素。

在此情況下,Cloud Firestore 的行為與 MongoDB 相符。

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

情境 2:使用整個陣列覆寫

在此情境中,這項作業只會更新第一個陣列欄位,但會覆寫整個陣列。

Cloud Firestore 更新差異不會區分這兩種情況,而是針對兩者傳回相同的 updateDescription.updatedFields

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

後續步驟