変更ストリームを使用してリアルタイム データを読み取る

MongoDB 互換の Firestore の変更ストリームを使用すると、アプリケーションはコレクションまたはデータベース全体に対して行われたリアルタイムの変更(挿入、更新、削除)にアクセスできます。変更ストリームは、更新を更新時刻で順序付けします。

変更ストリームには、MongoDB 互換の API と従来の MongoDB ドライバを介してアクセスできます。MongoDB 互換の Firestore の変更ストリームの実装では、書き込みと読み取りの並列処理で自動パーティショニングを独自に実装することで、書き込みと読み取りのスループットを処理できます。これにより、高スループットのワークロードを構築できます。また、Cloud Firestore と他のストレージ ソリューション間の移行とデータ同期のインフラストラクチャを改善することもできます。

MongoDB ドライバとの互換性に加えて、Cloud Firestore を使用して変更ストリームを並行して読み取ることができます。これにより、並列処理の高スループット読み取りワークロードを構築できます。各ストリームは、結果の分散されたパーティションを表します。

変更ストリームは、次の機能をサポートしています。

  • データベースまたはコレクションのスコープで構成可能な変更ストリーム。
  • 作成時に指定された変更ストリームの保持期間。デフォルトの保持期間は 7 日間、最小保持期間は 1 日間です。保持期間は 1 日の倍数で、最大 7 日間です。保持期間は作成後に変更できません。保持期間を変更するには、変更ストリームを削除して再作成する必要があります。
  • deleteinsertupdatedrop の変更イベント。db.collection.watch()db.watch() を使用してオブザーバブルにできます。
  • updateDescription.updatedFields にはアップデートの差分が含まれています。
  • すべての fullDocument オプションと fullDocumentBeforeChange オプション。
    • アップデートの完全なドキュメントを検索しています。
    • ドキュメントが置き換え、更新、削除される前のイメージ。
    • ドキュメントの置換または更新後のイメージ。
    • 1 時間より古い事前イメージと事後イメージには、ポイントインタイム リカバリ(PITR)の有効化が必要です。
  • resumeAfterstartAfter などのすべての再開オプション。
  • watch() を使用して変更をモニタリングする場合は、$addFields$match$project$replaceRoot$replaceWith$set$unset などの集計ステージをチェーンできます。

変更ストリームを構成する

データベースの既存の変更ストリームを作成、削除、表示するには、Google Cloud コンソールを使用します。

ロールと権限

変更ストリームを作成、削除、一覧表示するには、プリンシパルにそれぞれ datastore.schemas.createdatastore.schemas.deletedatastore.schemas.list の Identity and Access Management(IAM)権限が必要です。

たとえば、Datastore インデックス管理者roles/datastore.indexAdmin)ロールにはこれらの権限が付与されています。

変更ストリームを作成

対応する変更ストリーム カーソルを開く前に、変更ストリームを作成する必要があります。コレクションまたはデータベースの作成時に変更ストリームを自動的に有効にすることは対象外です。

変更ストリームを作成するには、Google Cloud コンソールを使用します。

  1. Google Cloud コンソールで [データベース] ページに移動します。

    [データベース] に移動

  2. リストから、MongoDB 互換の Firestore データベースを選択します。Firestore Studio パネルが開きます。
  3. [エクスプローラ] パネルで、[変更ストリーム] ノードを見つけて その他の操作)をクリックし、[変更ストリームを作成] を選択します。
  4. 一意の変更ストリーム名、スコープ、保持期間を入力し、[保存] をクリックします。

変更ストリームを表示する

変更ストリームの詳細については、Google Cloud コンソールで確認できます。

  1. Google Cloud コンソールで [データベース] ページに移動します。

    [データベース] に移動

  2. リストから、MongoDB 互換の Firestore データベースを選択します。Firestore Studio パネルが開きます。
  3. [エクスプローラ] パネルで、[変更ストリーム] ノードを見つけます。
  4. ノードを開閉するには、 ノードの切り替えをクリックします。

変更ストリームを削除する

変更ストリームを削除するには、Google Cloud コンソールを使用します。

  1. Google Cloud コンソールで [データベース] ページに移動します。

    [データベース] に移動

  2. リストから、MongoDB 互換の Firestore データベースを選択します。Firestore Studio パネルが開きます。
  3. [エクスプローラ] パネルで、[変更ストリーム] ノードを見つけます。
  4. ノードを開閉するには、 ノードの切り替えをクリックします。
  5. [エクスプローラ] で、削除する変更ストリームを見つけます。
  6. [その他の操作] をクリックし、[変更ストリームを削除] を選択します。
  7. ダイアログで、変更ストリーム名を入力して削除を確定し、[削除] をクリックします。

変更ストリーム カーソルを開くか再開する

次の例は、変更ストリーム カーソルを作成、再開、構成する方法を示しています。

変更ストリーム カーソルを作成する前に、データベースまたはコレクションの変更ストリームを明示的に作成する必要があります。

変更ストリーム カーソルを作成する

新しい変更ストリーム カーソルを作成するには、MongoDB ドライバの watch メソッドを使用します。データベースのすべての変更をリッスンするには、データベース スコープの変更ストリームを作成し、db オブジェクトで watch メソッドを呼び出します。

let cursor = db.watch()

コレクションにスコープ設定されたカーソルを作成するには、まずそのコレクションの変更ストリームを作成する必要があります。次に、対応するコレクションで watch メソッドを呼び出します。

let cursor = db.my_collection.watch()

変更ストリーム カーソルを作成したので、ストリーミングを開始できます。たとえば、ドキュメントを挿入してカーソルで tryNext を呼び出すと、変更ストリームに変更が表示されます。

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

ドキュメントを更新して削除すると、変更ストリームに次のように表示されます。

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

変更ストリームを再開する

変更ストリームを再開するには、resumeAfter オプションまたは startAfter オプションを使用します。resumeAfterstartAfter から再開する変更ログ内の位置を特定するには、再開トークンを使用します。

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

startAfter」を使用するには、

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

更新と削除に前後の画像を含める

必要に応じて、更新イベントと削除イベントの変更にドキュメントの更新前後の画像を含めることができます。イメージの可用性は、ポイントインタイム リカバリ(PITR)ウィンドウの影響を受けます。1 時間以上前のドキュメント イメージを読み取るには、PITR を有効にする必要があります。

変更ストリームは、PITR ウィンドウを利用して、特定の変更イベントの前後のドキュメントのビューを提供します。デフォルトでは、更新イベントには updateDescription フィールドが含まれます。これは、更新オペレーションによって変更されたフィールドの差分です。

変更イベントに前後の画像を含めるには、変更ストリーム クエリで fullDocumentBeforeChange オプションと fullDocument オプションを指定する必要があります。

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

クエリが PITR 保持期間外のドキュメントを読み取ろうとした場合、または PITR が有効になっていない場合、required 値はサーバーサイドのエラー メッセージをスローします。

エラーをスローする代わりに、whenAvailable 値を使用して、画像が使用できなくなった場合に null 値を返すことができます。

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

現在の画像をアップデートに含める

デフォルトでは、更新イベントには updateDescription フィールドが含まれます。これは、更新オペレーションによって変更されたフィールドの差分です。代わりにドキュメント全体の最新バージョンを検索するには、fullDocument オプションで updateLookup 値を使用します。

この機能では PITR は必要なく、ドキュメントのルックアップが実行されます。

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

並列読み取り

スループットを向上させるには、firestoreWorkerConfig オプションを使用して、複数のワーカー間で変更ストリーム クエリを分割します。各ワーカーは、一連のドキュメントの変更を処理します。runCommand または aggregate クエリを使用して、並列カーソルを作成する必要があります。

たとえば、次のように変更ストリームを 3 つのワーカーに分散できます。

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

変更ストリームとバックアップ

変更ストリームの構成も変更ストリームのデータも、バックアップ復元オペレーションでは使用できません。変更ストリームを含むデータベースを復元する場合は、そのデータベースへのカーソルを開くために、宛先データベースで変更ストリームを再作成する必要があります。

課金

動作の違い

次のセクションでは、MongoDB 互換の Firestore と MongoDB の間の変更ストリームの違いについて説明します。

updateDescription

updateDescription は、更新オペレーションによって更新または削除されたフィールドを記述する update イベント内のドキュメントです。Cloud Firestore では、次のような違いがあります。

  • updateDescription では、truncatedArrays フィールドと disambiguatedPaths フィールドに値が入力されません。
  • updateDescription.updatedFields は、変更が適用される前後のドキュメントの画像間の標準的な差分を表します。

ドキュメントの次の初期状態を考えてみましょう。

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

シナリオ 1: 配列の最初の要素のみを変更します。

このシナリオでは、Cloud Firestore の動作は MongoDB と一致します。

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

シナリオ 2: 配列全体で上書きする

このシナリオでは、オペレーションは最初の配列フィールドのみを更新しますが、配列全体を上書きします。

Cloud Firestore 更新差分では、この 2 つのシナリオが区別されず、両方に対して同じ updateDescription.updatedFields が返されます。

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

次のステップ