Membaca data real-time dengan aliran data perubahan

Change Streams untuk Firestore dengan kompatibilitas MongoDB memungkinkan aplikasi mengakses perubahan real-time (penyisipan, pembaruan, dan penghapusan) yang dilakukan pada koleksi atau seluruh database. Aliran perubahan mengurutkan pembaruan berdasarkan waktu modifikasi.

Change Streams dapat diakses melalui API yang kompatibel dengan MongoDB dan Driver MongoDB tradisional. Implementasi Aliran Perubahan Firestore dengan kompatibilitas MongoDB dapat menangani throughput penulisan dan pembacaan apa pun melalui implementasi unik partisi otomatis pada penulisan dan paralelisme pembacaan. Dengan demikian, Anda dapat membangun workload dengan throughput tinggi. Anda juga dapat meningkatkan infrastruktur migrasi dan sinkronisasi data antara Cloud Firestore dan solusi penyimpanan lainnya.

Selain kompatibilitas dengan driver MongoDB, Anda dapat menggunakan Cloud Firestore untuk membaca Aliran Data Perubahan secara paralel. Dengan demikian, Anda dapat membuat beban kerja pembacaan paralel dengan throughput tinggi. Setiap aliran mewakili partisi hasil yang terdistribusi dengan baik.

Change Streams mendukung fitur berikut:

  • Aliran perubahan yang dapat dikonfigurasi dengan cakupan database atau koleksi.
  • Durasi retensi untuk aliran perubahan yang ditentukan saat pembuatan. Retensi default adalah 7 hari dan retensi minimum adalah 1 hari. Retensi harus kelipatan 1 hari, hingga maksimum 7 hari. Durasi retensi tidak dapat diubah setelah dibuat. Untuk mengubah periode retensi, Anda harus menghapus dan membuat ulang aliran perubahan.
  • Peristiwa perubahan delete, insert, update, dan drop yang dapat diamati menggunakan db.collection.watch() dan db.watch().
  • updateDescription.updatedFields berisi perbedaan update.
  • Semua opsi fullDocument dan fullDocumentBeforeChange.
    • Mencari update dokumen lengkap.
    • Gambar awal dokumen sebelum diganti, diperbarui, atau dihapus.
    • Gambar dokumen setelah diganti atau diperbarui.
    • Gambar sebelum dan sesudah yang lebih lama dari satu jam memerlukan pengaktifan pemulihan point-in-time (PITR).
  • Semua opsi melanjutkan, termasuk resumeAfter dan startAfter.
  • Saat menggunakan watch() untuk mengamati perubahan, Anda dapat menggabungkan tahap agregasi seperti $addFields, $match, $project, $replaceRoot, $replaceWith, $set, dan $unset.

Mengonfigurasi Aliran Data Perubahan

Untuk membuat, menghapus, atau melihat Aliran Perubahan yang ada untuk database, gunakan Konsol Google Cloud.

Peran dan izin

Untuk membuat, menghapus, dan mencantumkan Aliran Perubahan, akun utama memerlukan izin Identity and Access Management (IAM) datastore.schemas.create, datastore.schemas.delete, dan datastore.schemas.list.

Peran Datastore Index Admin (roles/datastore.indexAdmin), misalnya, memberikan izin ini.

Membuat aliran perubahan

Sebelum dapat membuka kursor aliran perubahan yang sesuai, Anda harus membuat aliran perubahan. Pengaktifan otomatis aliran perubahan saat pembuatan koleksi atau database tidak didukung.

Untuk membuat aliran perubahan, gunakan Konsol Google Cloud.

  1. Di konsol Google Cloud, buka halaman Databases.

    Buka Databases

  2. Dari daftar, pilih database Firestore dengan kompatibilitas MongoDB. Panel Firestore Studio akan terbuka.
  3. Di panel Explorer, temukan node Change streams, klik More actions, lalu pilih Create change stream.
  4. Masukkan nama, cakupan, dan periode retensi aliran perubahan yang unik, lalu klik Simpan

Melihat Aliran Perubahan

Anda dapat melihat detail tentang Aliran Perubahan di konsol Google Cloud.

  1. Di konsol Google Cloud, buka halaman Databases.

    Buka Databases

  2. Dari daftar, pilih database Firestore dengan kompatibilitas MongoDB. Panel Firestore Studio akan terbuka.
  3. Di panel Explorer, temukan node Change streams.
  4. Untuk membuka atau menutup node, klik Alihkan node.

Menghapus aliran perubahan

Untuk menghapus aliran perubahan, gunakan konsol Google Cloud.

  1. Di konsol Google Cloud, buka halaman Databases.

    Buka Databases

  2. Dari daftar, pilih database Firestore dengan kompatibilitas MongoDB. Panel Firestore Studio akan terbuka.
  3. Di panel Explorer, temukan node Change streams.
  4. Untuk membuka atau menutup node, klik Alihkan node.
  5. Di Explorer, temukan aliran perubahan yang ingin Anda hapus.
  6. Klik More actions, lalu pilih Delete change stream.
  7. Dalam dialog, masukkan nama aliran perubahan untuk mengonfirmasi penghapusan, lalu klik Hapus.

Membuka atau melanjutkan kursor aliran perubahan

Contoh berikut menunjukkan cara membuat, melanjutkan, dan mengonfigurasi kursor aliran perubahan.

Sebelum membuat kursor aliran perubahan, Anda harus secara eksplisit membuat aliran perubahan untuk database atau koleksi.

Membuat kursor aliran perubahan

Untuk membuat kursor aliran perubahan baru, gunakan metode watch di driver MongoDB. Untuk memproses semua perubahan pada database, buat aliran perubahan cakupan database dan panggil metode watch pada objek db.

let cursor = db.watch()

Untuk membuat kursor yang tercakup dalam koleksi, Anda harus membuat aliran data perubahan untuk koleksi tersebut terlebih dahulu. Kemudian, panggil metode watch pada koleksi yang sesuai.

let cursor = db.my_collection.watch()

Setelah membuat kursor aliran perubahan, Anda dapat mulai melakukan streaming. Misalnya, jika Anda menyisipkan dokumen dan memanggil tryNext pada kursor, Anda akan melihat perubahan muncul di aliran perubahan.

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

Jika Anda memperbarui dan menghapus dokumen, Anda akan melihat perubahan tersebut muncul di aliran perubahan:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

Melanjutkan aliran perubahan

Untuk melanjutkan aliran perubahan, gunakan opsi resumeAfter atau startAfter. Untuk menentukan titik di log perubahan yang akan dilanjutkan dari resumeAfter dan startAfter, gunakan token lanjutan.

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

Untuk menggunakan startAfter:

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

Menyertakan gambar sebelum dan sesudah dalam pembaruan dan penghapusan

Jika diperlukan, Anda dapat menyertakan gambar sebelum dan sesudah dokumen dalam peristiwa perubahan update dan hapus. Ketersediaan gambar tunduk pada periode pemulihan point-in-time (PITR), dan untuk membaca gambar dokumen yang lebih lama dari satu jam, Anda harus mengaktifkan PITR.

Change Streams memanfaatkan periode PITR untuk memberikan tampilan dokumen sebelum dan setelah peristiwa perubahan tertentu. Secara default, peristiwa update berisi kolom updateDescription yang merupakan delta kolom yang diubah oleh operasi update.

Untuk menyertakan gambar sebelum dan sesudah dalam peristiwa perubahan, Anda harus menentukan opsi fullDocumentBeforeChange dan fullDocument dalam kueri aliran perubahan.

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

Jika kueri mencoba membaca dokumen di luar periode retensi PITR atau jika PITR tidak diaktifkan, nilai required akan memunculkan pesan error di sisi server.

Sebagai alternatif untuk menampilkan error, Anda dapat menggunakan nilai whenAvailable untuk menampilkan nilai null jika gambar tidak lagi tersedia.

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

Menyertakan gambar saat ini dalam update

Secara default, peristiwa pembaruan berisi kolom updateDescription yang merupakan delta kolom yang diubah oleh operasi pembaruan. Untuk mencari versi terbaru dari seluruh dokumen, gunakan nilai updateLookup dalam opsi fullDocument.

Fitur ini tidak memerlukan PITR dan melakukan pencarian untuk dokumen.

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

Operasi Baca Paralel

Untuk meningkatkan throughput, Anda dapat menggunakan opsi firestoreWorkerConfig untuk membagi kueri aliran perubahan di beberapa pekerja. Setiap pekerja bertanggung jawab untuk menyajikan perubahan untuk sekumpulan dokumen yang berbeda. Anda harus membuat kursor paralel melalui kueri runCommand atau aggregate.

Misalnya, Anda dapat mendistribusikan aliran perubahan di 3 pekerja seperti ini:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

Aliran data perubahan dan pencadangan

Konfigurasi aliran perubahan maupun data aliran perubahan tidak tersedia dalam operasi pemulihan cadangan. Jika Anda memulihkan database dengan Change Streams, Anda harus membuat ulang aliran perubahan tersebut di database tujuan untuk membuka kursor ke database tersebut.

Penagihan

Perbedaan perilaku

Bagian berikut menjelaskan perbedaan dalam Change Streams antara Firestore dengan kompatibilitas MongoDB dan MongoDB.

updateDescription

updateDescription adalah dokumen dalam peristiwa update yang menjelaskan kolom yang diperbarui atau dihapus oleh operasi pembaruan. Di Cloud Firestore, perbedaan pentingnya adalah:

  • Di updateDescription, kolom truncatedArrays dan disambiguatedPaths tidak diisi.
  • updateDescription.updatedFields merepresentasikan perbedaan kanonis antara gambar sebelum dan sesudah dokumen sebelum dan setelah mutasi diterapkan.

Pertimbangkan status awal dokumen berikut:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

Skenario 1: mengubah hanya elemen pertama array.

Dalam skenario ini, perilaku Cloud Firestore cocok dengan MongoDB.

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Skenario 2: menimpa dengan seluruh array

Dalam skenario ini, operasi hanya memperbarui kolom array pertama, tetapi menimpa seluruh array.

Perbedaan pembaruan Cloud Firestore tidak membedakan kedua skenario ini dan menampilkan updateDescription.updatedFields yang sama untuk keduanya:

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Langkah berikutnya