MongoDB 互換の Firestore の変更ストリームを使用すると、アプリケーションはコレクションまたはデータベース全体に加えられたリアルタイムの変更(挿入、更新、削除)にアクセスできます。変更ストリームは、更新を更新時刻順に並べます。
変更ストリームには、MongoDB 互換の API と従来の MongoDB ドライバからアクセスできます。MongoDB 互換の Firestore の変更ストリームの実装は、書き込みと読み取りの並列処理に対する自動パーティショニングの一意の実装により、書き込みと読み取りのスループットを処理できます。これにより、高スループットのワークロードを構築できます。 また、 Cloud Firestoreと他のストレージ ソリューション間の移行とデータ同期のインフラストラクチャを改善することもできます。
MongoDB ドライバとの互換性に加えて、 Cloud Firestore を使用して変更ストリームを並行して読み取ることもできます。これにより、並列処理による高スループットの読み取りワークロードを構築できます。各ストリームは、結果の分散されたパーティションを表します。
変更ストリームは、次の機能をサポートしています。
- データベースまたはコレクションのスコープで構成可能な変更ストリーム。
- 作成時に指定された変更ストリームの保持期間。 デフォルトの保持期間は 7 日間で、最小保持期間は 1 日です。保持期間は 1 日の倍数で、最大 7 日間です。作成後に保持期間を変更することはできません。保持期間を変更するには、変更ストリームを削除して再作成する必要があります。
db.collection.watch()とdb.watch()を使用して監視できるdelete、insert、update、dropの変更イベント。updateDescription.updatedFieldsには更新差分が含まれます。- すべての
fullDocumentオプションとfullDocumentBeforeChangeオプション。- 更新の完全なドキュメントの検索。
- ドキュメントが置き換え、更新、削除される前のイメージ。
- ドキュメントが置き換えまたは更新された後のイメージ。
- 1 時間より前のイメージを取得するには、ポイントインタイム リカバリ(PITR)を有効にする必要があります。
resumeAfterやstartAfterなどのすべての再開オプション。watch()を使用して変更をモニタリングする場合は、$addFields、$match、$project、$replaceRoot、$replaceWith、$set、$unsetなどの集計ステージを連結できます。
変更ストリームを構成する
データベースの既存の変更ストリームを作成、削除、表示するには、Google Cloud コンソールを使用します。
ロールと権限
変更ストリームを作成、削除、一覧表示するには、プリンシパルに datastore.schemas.create、datastore.schemas.delete、datastore.schemas.list の Identity and Access Management(IAM)権限が必要です。
たとえば、Datastore Index Admin (roles/datastore.indexAdmin)ロールには、これらの権限が付与されます。
変更ストリームを作成
対応する変更ストリーム カーソルを開くには、変更ストリームを作成する必要があります。コレクションまたはデータベースの作成時に変更ストリームを自動的に有効にすることは対象外です。
変更ストリームを作成するには、Google Cloud コンソールを使用します。
-
Google Cloud コンソールで [データベース] ページに移動します。
- リストから、MongoDB 互換の Firestore データベースを選択します。[Firestore Studio] パネルが開きます。
- In the [エクスプローラ] パネルで [変更ストリーム] ノードを見つけ、 [その他の操作] をクリックして、[変更ストリームを作成] を選択します。
- 一意の変更ストリーム名、スコープ、保持期間を入力して、[保存] をクリックします。
変更ストリームを表示する
変更ストリームの詳細については、Google Cloud コンソールをご覧ください。
-
Google Cloud コンソールで [データベース] ページに移動します。
- リストから、MongoDB 互換の Firestore データベースを選択します。[Firestore Studio] パネルが開きます。
- [エクスプローラ] パネルで [変更ストリーム] ノードを見つけます。
- ノードを開閉するには、 [**ノードの切り替え**] をクリックします。
変更ストリームを削除する
変更ストリームを削除するには、Google Cloud コンソールを使用します。
-
Google Cloud コンソールで [データベース] ページに移動します。
- リストから、MongoDB 互換の Firestore データベースを選択します。[Firestore Studio] パネルが開きます。
- [エクスプローラ] パネルで [変更ストリーム] ノードを見つけます。
- ノードを開閉するには、 [**ノードの切り替え**] をクリックします。
- [**エクスプローラ**] で、削除する変更ストリームを見つけます。
- [**その他の操作**] をクリックして、[**変更ストリームを削除**] を選択します。
- ダイアログで変更ストリーム名を入力して削除を確認し、[削除] をクリックします。
変更ストリーム カーソルを開くか再開する
次の例では、変更ストリーム カーソルを作成、再開、構成する方法を示します。
変更ストリーム カーソルを作成する前に、データベースまたはコレクションの変更ストリームを明示的に 作成する必要があります。
変更ストリーム カーソルを作成する
新しい変更ストリーム カーソルを作成するには、MongoDB ドライバの watch
メソッドを使用します。
データベースのすべての変更をリッスンするには、データベース スコープの変更ストリームを作成し、db
オブジェクトで watch メソッドを呼び出します。
let cursor = db.watch()
コレクションにスコープ設定されたカーソルを作成するには、まずそのコレクションの変更ストリームを作成する必要があります。次に、対応するコレクションで
watch メソッドを呼び出します。
let cursor = db.my_collection.watch()
変更ストリーム カーソルを作成したので、ストリーミングを開始できます。
たとえば、ドキュメントを挿入してカーソルで tryNext
を呼び出すと、変更ストリームに変更が表示されます。
let doc = db.my_collection.insertOne({value: "hello world"}) console.log(cursor.tryNext())
ドキュメントを更新して削除すると、変更ストリームに次のような変更が表示されます。
db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}}) db.my_collection.deleteOne({"_id": doc.insertedId}}) // Prints the update event console.log(cursor.tryNext()) // Prints the delete event console.log(cursor.tryNext())
変更ストリームを再開する
変更ストリームを再開するには、resumeAfter オプションまたは startAfter
オプションを使用します。
resumeAfter と startAfter から再開する変更ログ内の位置を特定するには、再開トークンを使用します。
// Create a cursor and add one event to the change stream. let cursor = db.my_collection.watch(); db.my_collection.insertOne({value: "hello world"}); let event = cursor.tryNext(); // Get the resume token from the event. let resumeToken = event._id; // Add a new event to the change stream. db.my_collection.insertOne({value: "foobar"}); // Create a new cursor by using the resume token as a starting point. let newCursor = db.my_collection.watch({resumeAfter: resumeToken}) // Log the change event containing the "foobar" value. console.log(newCursor.tryNext())
startAfter を使用するには:
// Start after the resume token. let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})
更新と削除に事前イメージと事後イメージを含める
必要に応じて、ドキュメントの事前イメージと事後イメージを更新イベントと削除イベントに含めることができます。 イメージの可用性は ポイントインタイム リカバリ(PITR)ウィンドウによって異なります。 1 時間より前のドキュメント イメージを読み取るには、 PITR を有効にする必要があります。
変更ストリームは、PITR ウィンドウを利用して、特定の変更イベントの前後のドキュメントのビューを提供します。デフォルトでは、更新イベントには
updateDescription フィールドが含まれます。これは、更新オペレーションによって変更されたフィールドの差分です。
変更イベントに事前イメージと事後イメージを含めるには、
変更ストリーム クエリで fullDocumentBeforeChange オプションと
fullDocument オプションを指定する必要があります。
let cursor = db.my_collection.watch({ "fullDocument": "required", "fullDocumentBeforeChange": "required" })
クエリが PITR 保持期間外のドキュメントを読み取ろうとした場合、または PITR が有効になっていない場合、required 値はサーバーサイド エラー メッセージをスローします。
エラーをスローする代わりに、whenAvailable 値を使用して、イメージが使用できなくなった場合に null 値を返すことができます。
let cursor = db.my_collection.watch({ "fullDocument": "whenAvailable", "fullDocumentBeforeChange": "whenAvailable" })
更新に現在のイメージを含める
デフォルトでは、更新イベントには updateDescription フィールドが含まれます。これは、更新オペレーションによって変更されたフィールドの差分です。代わりにドキュメント全体の最新バージョンを検索するには、fullDocument
オプションで updateLookup 値を使用します。
この機能では PITR は不要で、ドキュメントの検索を実行します。
let cursor = db.my_collection.watch({ "fullDocument": "updateLookup", })
並列読み取り
スループットを向上させるには、firestoreWorkerConfig オプションを使用して、変更ストリーム
クエリを複数のワーカーに分割します。各ワーカーは、個別のドキュメント セットの変更を処理します。runCommand
クエリまたは aggregate クエリを使用して、並列カーソルを作成する必要があります。
たとえば、次のように変更ストリームを 3 つのワーカーに分散できます。
let cursor1 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }} }]); let cursor2 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }} }]); let cursor3 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }} }]);
変更ストリームとバックアップ
変更ストリームの構成も変更ストリームのデータも、バックアップ復元オペレーションでは使用できません。変更ストリームを使用してデータベースを復元する場合は、宛先データベースで変更ストリームを再作成して、そのデータベースへのカーソルを開く必要があります。
課金
- 変更ストリームには、読み取りユニットとストレージの費用が発生します。変更ストリームの料金をご覧ください。
- 読み取りリクエスト時に 1 時間より前の事前イメージと事後イメージを含めるには、 PITR を有効にする必要があります。これにより、 PITR の費用が発生します。
動作の違い
次のセクションでは、MongoDB 互換の Firestore と MongoDB の変更ストリームの違いについて説明します。
updateDescription
updateDescription は、更新オペレーションによって更新または削除されたフィールド
を記述する update イベントのドキュメントです。
Cloud Firestore では、次のような違いがあります。
updateDescriptionでは、truncatedArraysフィールドとdisambiguatedPathsフィールドに値が入力されません。updateDescription.updatedFieldsは、ミューテーションが適用される前後のドキュメントの事前イメージと事後イメージの正規の差分を表します。
ドキュメントの初期状態を次に示します。
db.my_collection.insertOne({ _id: 1, root: { array: [{a: 1}, {b: 2}, {c: 3}] } })
シナリオ 1: 配列の最初の要素のみを変更する。
このシナリオでは、Cloud Firestore の動作は MongoDB と一致します。
db.my_collection.updateOne( {_id: 1}, {'$set': {"root.array.0.a": 100}} ) { updatedFields: {"root.array.0.a": 100}, removedFields: [] }
シナリオ 2: 配列全体を上書きする
このシナリオでは、オペレーションは最初の配列フィールドのみを更新しますが、配列全体を上書きします。
Cloud Firestore の更新差分では、これら
2 つのシナリオが区別されず、両方に対して同じ updateDescription.updatedFields が返されます。
db.my_collection.updateOne( {_id: 1}, {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}} ) // In other implementations, updatedFields reflects the mutation itself { updatedFields: { "root.array": [{a: 100}, {b: 2}, {c: 3}] }, removedFields: [] } // Firestore updatedFields is the diff between the before and after versions of the document { updatedFields: {"root.array.0.a": 100}, removedFields: [] }