変更ストリームを使用してリアルタイム データを読み取る

MongoDB 互換の Firestore の変更ストリームを使用すると、アプリケーションはコレクションまたはデータベース全体に加えられたリアルタイムの変更(挿入、更新、削除)にアクセスできます。変更ストリームは、更新を更新時刻順に並べます。

変更ストリームには、MongoDB 互換の API と従来の MongoDB ドライバからアクセスできます。MongoDB 互換の Firestore の変更ストリームの実装は、書き込みと読み取りの並列処理に対する自動パーティショニングの一意の実装により、書き込みと読み取りのスループットを処理できます。これにより、高スループットのワークロードを構築できます。 また、 Cloud Firestoreと他のストレージ ソリューション間の移行とデータ同期のインフラストラクチャを改善することもできます。

MongoDB ドライバとの互換性に加えて、 Cloud Firestore を使用して変更ストリームを並行して読み取ることもできます。これにより、並列処理による高スループットの読み取りワークロードを構築できます。各ストリームは、結果の分散されたパーティションを表します。

変更ストリームは、次の機能をサポートしています。

  • データベースまたはコレクションのスコープで構成可能な変更ストリーム。
  • 作成時に指定された変更ストリームの保持期間。 デフォルトの保持期間は 7 日間で、最小保持期間は 1 日です。保持期間は 1 日の倍数で、最大 7 日間です。作成後に保持期間を変更することはできません。保持期間を変更するには、変更ストリームを削除して再作成する必要があります。
  • db.collection.watch()db.watch() を使用して監視できる deleteinsertupdatedrop の変更イベント。
  • updateDescription.updatedFields には更新差分が含まれます。
  • すべての fullDocument オプションと fullDocumentBeforeChange オプション。
    • 更新の完全なドキュメントの検索。
    • ドキュメントが置き換え、更新、削除される前のイメージ。
    • ドキュメントが置き換えまたは更新された後のイメージ。
    • 1 時間より前のイメージを取得するには、ポイントインタイム リカバリ(PITR)を有効にする必要があります。
  • resumeAfterstartAfter などのすべての再開オプション。
  • watch() を使用して変更をモニタリングする場合は、$addFields$match$project$replaceRoot$replaceWith$set$unset などの集計ステージを連結できます。

変更ストリームを構成する

データベースの既存の変更ストリームを作成、削除、表示するには、Google Cloud コンソールを使用します。

ロールと権限

変更ストリームを作成、削除、一覧表示するには、プリンシパルに datastore.schemas.createdatastore.schemas.deletedatastore.schemas.list の Identity and Access Management(IAM)権限が必要です。

たとえば、Datastore Index Adminroles/datastore.indexAdmin)ロールには、これらの権限が付与されます。

変更ストリームを作成

対応する変更ストリーム カーソルを開くには、変更ストリームを作成する必要があります。コレクションまたはデータベースの作成時に変更ストリームを自動的に有効にすることは対象外です。

変更ストリームを作成するには、Google Cloud コンソールを使用します。

  1. Google Cloud コンソールで [データベース] ページに移動します。

    [データベース] に移動

  2. リストから、MongoDB 互換の Firestore データベースを選択します。[Firestore Studio] パネルが開きます。
  3. In the [エクスプローラ] パネルで [変更ストリーム] ノードを見つけ、 [その他の操作] をクリックして、[変更ストリームを作成] を選択します。
  4. 一意の変更ストリーム名、スコープ、保持期間を入力して、[保存] をクリックします。

変更ストリームを表示する

変更ストリームの詳細については、Google Cloud コンソールをご覧ください。

  1. Google Cloud コンソールで [データベース] ページに移動します。

    [データベース] に移動

  2. リストから、MongoDB 互換の Firestore データベースを選択します。[Firestore Studio] パネルが開きます。
  3. [エクスプローラ] パネルで [変更ストリーム] ノードを見つけます。
  4. ノードを開閉するには、 [**ノードの切り替え**] をクリックします。

変更ストリームを削除する

変更ストリームを削除するには、Google Cloud コンソールを使用します。

  1. Google Cloud コンソールで [データベース] ページに移動します。

    [データベース] に移動

  2. リストから、MongoDB 互換の Firestore データベースを選択します。[Firestore Studio] パネルが開きます。
  3. [エクスプローラ] パネルで [変更ストリーム] ノードを見つけます。
  4. ノードを開閉するには、 [**ノードの切り替え**] をクリックします。
  5. [**エクスプローラ**] で、削除する変更ストリームを見つけます。
  6. [**その他の操作**] をクリックして、[**変更ストリームを削除**] を選択します。
  7. ダイアログで変更ストリーム名を入力して削除を確認し、[削除] をクリックします。

変更ストリーム カーソルを開くか再開する

次の例では、変更ストリーム カーソルを作成、再開、構成する方法を示します。

変更ストリーム カーソルを作成する前に、データベースまたはコレクションの変更ストリームを明示的に 作成する必要があります。

変更ストリーム カーソルを作成する

新しい変更ストリーム カーソルを作成するには、MongoDB ドライバの watch メソッドを使用します。 データベースのすべての変更をリッスンするには、データベース スコープの変更ストリームを作成し、db オブジェクトで watch メソッドを呼び出します。

let cursor = db.watch()

コレクションにスコープ設定されたカーソルを作成するには、まずそのコレクションの変更ストリームを作成する必要があります。次に、対応するコレクションで watch メソッドを呼び出します。

let cursor = db.my_collection.watch()

変更ストリーム カーソルを作成したので、ストリーミングを開始できます。 たとえば、ドキュメントを挿入してカーソルで tryNext を呼び出すと、変更ストリームに変更が表示されます。

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

ドキュメントを更新して削除すると、変更ストリームに次のような変更が表示されます。

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

変更ストリームを再開する

変更ストリームを再開するには、resumeAfter オプションまたは startAfter オプションを使用します。 resumeAfterstartAfter から再開する変更ログ内の位置を特定するには、再開トークンを使用します。

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

startAfter を使用するには:

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

更新と削除に事前イメージと事後イメージを含める

必要に応じて、ドキュメントの事前イメージと事後イメージを更新イベントと削除イベントに含めることができます。 イメージの可用性は ポイントインタイム リカバリ(PITR)ウィンドウによって異なります。 1 時間より前のドキュメント イメージを読み取るには、 PITR を有効にする必要があります。

変更ストリームは、PITR ウィンドウを利用して、特定の変更イベントの前後のドキュメントのビューを提供します。デフォルトでは、更新イベントには updateDescription フィールドが含まれます。これは、更新オペレーションによって変更されたフィールドの差分です。

変更イベントに事前イメージと事後イメージを含めるには、 変更ストリーム クエリで fullDocumentBeforeChange オプションと fullDocument オプションを指定する必要があります。

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

クエリが PITR 保持期間外のドキュメントを読み取ろうとした場合、または PITR が有効になっていない場合、required 値はサーバーサイド エラー メッセージをスローします。

エラーをスローする代わりに、whenAvailable 値を使用して、イメージが使用できなくなった場合に null 値を返すことができます。

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

更新に現在のイメージを含める

デフォルトでは、更新イベントには updateDescription フィールドが含まれます。これは、更新オペレーションによって変更されたフィールドの差分です。代わりにドキュメント全体の最新バージョンを検索するには、fullDocument オプションで updateLookup 値を使用します。

この機能では PITR は不要で、ドキュメントの検索を実行します。

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

並列読み取り

スループットを向上させるには、firestoreWorkerConfig オプションを使用して、変更ストリーム クエリを複数のワーカーに分割します。各ワーカーは、個別のドキュメント セットの変更を処理します。runCommand クエリまたは aggregate クエリを使用して、並列カーソルを作成する必要があります。

たとえば、次のように変更ストリームを 3 つのワーカーに分散できます。

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

変更ストリームとバックアップ

変更ストリームの構成も変更ストリームのデータも、バックアップ復元オペレーションでは使用できません。変更ストリームを使用してデータベースを復元する場合は、宛先データベースで変更ストリームを再作成して、そのデータベースへのカーソルを開く必要があります。

課金

  • 変更ストリームには、読み取りユニットとストレージの費用が発生します。変更ストリームの料金をご覧ください。
  • 読み取りリクエスト時に 1 時間より前の事前イメージと事後イメージを含めるには、 PITR を有効にする必要があります。これにより、 PITR の費用が発生します

動作の違い

次のセクションでは、MongoDB 互換の Firestore と MongoDB の変更ストリームの違いについて説明します。

updateDescription

updateDescription は、更新オペレーションによって更新または削除されたフィールド を記述する update イベントのドキュメントです。 Cloud Firestore では、次のような違いがあります。

  • updateDescription では、truncatedArrays フィールドと disambiguatedPaths フィールドに値が入力されません。
  • updateDescription.updatedFields は、ミューテーションが適用される前後のドキュメントの事前イメージと事後イメージの正規の差分を表します。

ドキュメントの初期状態を次に示します。

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

シナリオ 1: 配列の最初の要素のみを変更する。

このシナリオでは、Cloud Firestore の動作は MongoDB と一致します。

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

シナリオ 2: 配列全体を上書きする

このシナリオでは、オペレーションは最初の配列フィールドのみを更新しますが、配列全体を上書きします。

Cloud Firestore の更新差分では、これら 2 つのシナリオが区別されず、両方に対して同じ updateDescription.updatedFields が返されます。

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

次のステップ