透過與 MongoDB 相容的 Firestore 變更串流,應用程式可以存取集合或整個資料庫的即時變更 (插入、更新和刪除)。變更串流會依修改時間排序更新。
您可以使用與 MongoDB 相容的 API 和傳統 MongoDB 驅動程式存取 Change Streams。Firestore 與 MongoDB 相容的變更串流實作方式,可透過獨特的寫入自動分割實作方式和讀取平行處理,處理任何寫入和讀取輸送量。這項功能可讓您建構高處理量的工作負載。您也可以改善 Cloud Firestore 和其他儲存空間解決方案之間的遷移和資料同步基礎架構。
除了與 MongoDB 驅動程式相容之外,您也可以使用 Cloud Firestore 平行讀取變更串流。這可讓您建構平行的高處理量讀取工作負載。每個串流都代表結果的平均分配分區。
變更串流支援下列功能:
- 可設定的變更串流,範圍可為資料庫或集合。
- 建立變更串流時指定的保留期限。預設保留期限為 7 天,最短保留期限為 1 天。保留期限必須是 1 天的倍數,最多 7 天。建立後即無法變更保留時間。如要變更保留期限,必須捨棄並重新建立變更串流。
delete、insert、update和drop變更事件,可使用db.collection.watch()和db.watch()觀察。updateDescription.updatedFields包含更新差異。- 所有
fullDocument和fullDocumentBeforeChange選項。- 正在查詢完整文件是否有更新。
- 文件在遭到取代、更新或刪除前的圖片。
- 文件替換或更新後的圖片。
- 如要取得一小時前的影像,必須啟用時間點復原 (PITR) 功能。
- 所有繼續播放選項,包括
resumeAfter和startAfter。 - 使用
watch()觀察變更時,您可以串連匯總階段,例如$addFields、$match、$project、$replaceRoot、$replaceWith、$set和$unset。
設定變更串流
如要建立、捨棄或查看資料庫的現有變更串流,請使用 Google Cloud 控制台。
角色和權限
如要建立、刪除及列出變更串流,主體分別需要 datastore.schemas.create、datastore.schemas.delete 和 datastore.schemas.list Identity and Access Management (IAM) 權限。
例如,Datastore 索引管理員 (roles/datastore.indexAdmin) 角色會授予這些權限。
建立變更串流
您必須先建立變更串流,才能開啟對應的變更串流游標。系統不支援在建立集合或資料庫時,自動啟用變更串流。
如要建立變更串流,請使用 Google Cloud 控制台。
-
前往 Google Cloud 控制台的「資料庫」頁面。
- 從清單中選取與 MongoDB 相容的 Firestore 資料庫。「Firestore Studio」面板隨即開啟。
- 在「Explorer」面板中,找出「變更串流」節點,按一下 「更多動作」,然後選取「建立變更串流」。
- 輸入不重複的變更串流名稱、範圍和保留期限,然後按一下「儲存」。
查看變更串流
您可以在 Google Cloud 控制台中查看變更串流的詳細資料。
-
前往 Google Cloud 控制台的「資料庫」頁面。
- 從清單中選取與 MongoDB 相容的 Firestore 資料庫。「Firestore Studio」面板隨即開啟。
- 在「Explorer」面板中,找出「變更串流」節點。
- 如要開啟或關閉節點,請按一下「切換節點」。
刪除變更串流
如要刪除變更串流,請使用 Google Cloud 控制台。
-
前往 Google Cloud 控制台的「資料庫」頁面。
- 從清單中選取與 MongoDB 相容的 Firestore 資料庫。「Firestore Studio」面板隨即開啟。
- 在「Explorer」面板中,找出「變更串流」節點。
- 如要開啟或關閉節點,請按一下「切換節點」。
- 在「Explorer」中,找出要刪除的變更串流。
- 按一下「更多動作」圖示 ,然後選取「刪除變更串流」。
- 在對話方塊中輸入變更串流名稱來確認刪除,然後按一下「Delete」(刪除)。
開啟或繼續變更串流游標
下列範例說明如何建立、繼續及設定變更串流游標。
建立變更串流指標前,您必須先為資料庫或集合建立變更串流。
建立變更串流指標
如要建立新的變更串流游標,請在 MongoDB 驅動程式中使用 watch 方法。如要監聽資料庫的所有變更,請建立資料庫範圍的變更串流,並呼叫 db 物件的 watch 方法。
let cursor = db.watch()
如要建立以集合為範圍的游標,請先為該集合建立變更串流。然後,在對應的集合上呼叫 watch 方法。
let cursor = db.my_collection.watch()
您已建立變更串流游標,現在可以開始串流。舉例來說,如果您插入文件並在游標上呼叫 tryNext,變更串流就會顯示變更。
let doc = db.my_collection.insertOne({value: "hello world"}) console.log(cursor.tryNext())
如果您更新及刪除文件,變更串流會顯示這些變更:
db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}}) db.my_collection.deleteOne({"_id": doc.insertedId}}) // Prints the update event console.log(cursor.tryNext()) // Prints the delete event console.log(cursor.tryNext())
繼續變更串流
如要繼續變更串流,請使用 resumeAfter 或 startAfter 選項。如要判斷從變更記錄中的哪個位置繼續 resumeAfter 和 startAfter,請使用繼續權杖。
// Create a cursor and add one event to the change stream. let cursor = db.my_collection.watch(); db.my_collection.insertOne({value: "hello world"}); let event = cursor.tryNext(); // Get the resume token from the event. let resumeToken = event._id; // Add a new event to the change stream. db.my_collection.insertOne({value: "foobar"}); // Create a new cursor by using the resume token as a starting point. let newCursor = db.my_collection.watch({resumeAfter: resumeToken}) // Log the change event containing the "foobar" value. console.log(newCursor.tryNext())
如何使用 startAfter:
// Start after the resume token. let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})
在更新和刪除作業中加入前後圖片
如有需要,您可以在更新和刪除變更事件中加入文件的前後圖片。圖片的可用性取決於時間點復原 (PITR) 期間,如要讀取超過一小時前的文件圖片,必須啟用 PITR。
變更串流會利用 PITR 視窗,提供指定變更事件前後的文件檢視畫面。根據預設,更新事件會包含 updateDescription 欄位,這是更新作業修改的欄位差異。
如要在變更事件中加入前後圖片,您必須在變更串流查詢中指定 fullDocumentBeforeChange 和 fullDocument 選項。
let cursor = db.my_collection.watch({ "fullDocument": "required", "fullDocumentBeforeChange": "required" })
如果查詢嘗試讀取 PITR 保留期限外的文件,或未啟用 PITR,則 required 值會擲回伺服器端錯誤訊息。
如果圖片不再可用,您可以改用 whenAvailable 值傳回 null 值,而非擲回錯誤。
let cursor = db.my_collection.watch({ "fullDocument": "whenAvailable", "fullDocumentBeforeChange": "whenAvailable" })
在更新中加入目前圖片
根據預設,更新事件會包含 updateDescription 欄位,這是更新作業修改的欄位差異。如要改為查詢整份文件的最新版本,請使用 fullDocument 選項中的 updateLookup 值。
這項功能不需要 PITR,會執行文件查詢。
let cursor = db.my_collection.watch({ "fullDocument": "updateLookup", })
平行讀取
如要提高總處理量,可以使用 firestoreWorkerConfig 選項,將變更串流查詢分散到多個工作站。每個工作站負責為一組不同的文件提供變更。您必須透過 runCommand 或 aggregate 查詢建立平行游標。
舉例來說,您可以將變更串流分配給 3 個工作站,如下所示:
let cursor1 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }} }]); let cursor2 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }} }]); let cursor3 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }} }]);
變更串流和備份
備份還原作業無法使用變更串流設定和變更串流資料。如果還原含有變更串流的資料庫,您必須在目標資料庫中重新建立這些變更串流,才能開啟該資料庫的游標。
帳單
- 變更串流會產生讀取單位和儲存空間費用。請參閱變更串流價格。
- 如要在讀取要求時間納入 1 小時前的影像,您必須啟用 PITR,這會產生 PITR 費用。
行為差異
以下章節說明與 MongoDB 相容的 Firestore 和 MongoDB 的 Change Streams 差異。
updateDescription
updateDescription 是 update 事件中的文件,用於說明更新作業更新或移除的欄位。在 Cloud Firestore 中,顯著差異如下:
- 在
updateDescription中,系統不會填入truncatedArrays和disambiguatedPaths欄位。 updateDescription.updatedFields代表套用變異前後的文件,在前後圖片之間的標準差異。
假設文件的初始狀態如下:
db.my_collection.insertOne({ _id: 1, root: { array: [{a: 1}, {b: 2}, {c: 3}] } })
情境 1:只變動陣列的第一個元素。
在此情況下,Cloud Firestore 的行為與 MongoDB 相符。
db.my_collection.updateOne( {_id: 1}, {'$set': {"root.array.0.a": 100}} ) { updatedFields: {"root.array.0.a": 100}, removedFields: [] }
情境 2:使用整個陣列覆寫
在此情境中,這項作業只會更新第一個陣列欄位,但會覆寫整個陣列。
Cloud Firestore 更新差異不會區分這兩種情況,而是針對兩者傳回相同的 updateDescription.updatedFields:
db.my_collection.updateOne( {_id: 1}, {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}} ) // In other implementations, updatedFields reflects the mutation itself { updatedFields: { "root.array": [{a: 100}, {b: 2}, {c: 3}] }, removedFields: [] } // Firestore updatedFields is the diff between the before and after versions of the document { updatedFields: {"root.array.0.a": 100}, removedFields: [] }