อ่านข้อมูลแบบเรียลไทม์ด้วยสตรีมการเปลี่ยนแปลง

Change Streams สำหรับ Firestore ที่เข้ากันได้กับ MongoDB ช่วยให้แอปพลิเคชันเข้าถึงการเปลี่ยนแปลงแบบเรียลไทม์ (การแทรก การอัปเดต และการลบ) ที่เกิดขึ้นกับคอลเล็กชันหรือฐานข้อมูลทั้งหมดได้ Change Stream จะจัดลำดับการอัปเดตตามเวลาที่แก้ไข

Change Stream สามารถเข้าถึงได้ผ่าน API ที่เข้ากันได้กับ MongoDB และไดรเวอร์ MongoDB แบบเดิม การใช้งาน Change Stream ของ Firestore ที่เข้ากันได้กับ MongoDB สามารถจัดการอัตราการส่งข้อมูลของการเขียนและการอ่านได้ทุกปริมาณงานผ่านการใช้งานการแบ่งพาร์ติชันอัตโนมัติที่ไม่ซ้ำกันในการเขียนและการอ่านแบบขนาน ซึ่งช่วยให้คุณสร้างปริมาณงานที่มีปริมาณงานสูงได้ นอกจากนี้ คุณยังปรับปรุงโครงสร้างพื้นฐานของการย้ายข้อมูลและการซิงค์ข้อมูลระหว่าง Cloud Firestore กับโซลูชันพื้นที่เก็บข้อมูลอื่นๆ ได้ด้วย

นอกเหนือจากความเข้ากันได้กับไดรเวอร์ MongoDB แล้ว คุณยังใช้ Cloud Firestore เพื่ออ่าน Change Stream แบบขนานได้ด้วย ซึ่งช่วยให้คุณสร้างปริมาณงานของการอ่านแบบขนานที่มีปริมาณงานสูงได้ แต่ละสตรีมแสดงถึงพาร์ติชันของผลลัพธ์ที่กระจายอย่างเหมาะสม

Change Stream รองรับฟีเจอร์ต่อไปนี้

  • Change Stream ที่กำหนดค่าได้ซึ่งมีขอบเขตเป็นฐานข้อมูลหรือคอลเล็กชัน
  • ระยะเวลาเก็บรักษาสำหรับ Change Stream ที่ระบุไว้เมื่อสร้าง การเก็บรักษาเริ่มต้นคือ 7 วันและการเก็บรักษาน้อยที่สุดคือ 1 วัน การเก็บรักษาต้องเป็นจำนวนเต็มเท่าของ 1 วัน โดยสูงสุดไม่เกิน 7 วัน คุณจะเปลี่ยนระยะเวลาเก็บรักษาหลังจากสร้างแล้วไม่ได้ หากต้องการเปลี่ยนระยะเวลาเก็บรักษา คุณต้องลบและสร้าง Change Stream ใหม่
  • เหตุการณ์การเปลี่ยนแปลง delete, insert, update และ drop ที่สังเกตได้โดยใช้ db.collection.watch() และ db.watch()
  • updateDescription.updatedFields มีความแตกต่างของการอัปเดต
  • ตัวเลือก fullDocument และ fullDocumentBeforeChange ทั้งหมด
    • การค้นหาเอกสารฉบับเต็มสำหรับการอัปเดต
    • รูปภาพก่อนหน้าของเอกสารก่อนที่จะมีการแทนที่ อัปเดต หรือลบ
    • รูปภาพหลังจากเอกสารมีการแทนที่หรืออัปเดต
    • รูปภาพก่อนหน้าและหลังจากที่มีอายุมากกว่า 1 ชั่วโมงต้องเปิดใช้การกู้คืนช่วงเวลา (PITR)
  • ตัวเลือกการดำเนินการต่อทั้งหมด รวมถึง resumeAfter และ startAfter
  • เมื่อใช้ watch() เพื่อสังเกตการเปลี่ยนแปลง คุณสามารถเชื่อมโยงขั้นตอนการรวม เช่น $addFields, $match, $project, $replaceRoot, $replaceWith, $set และ $unset

กำหนดค่า Change Stream

หากต้องการสร้าง ลบ หรือดู Change Stream ที่มีอยู่สำหรับฐานข้อมูล ให้ใช้คอนโซล Google Cloud

บทบาทและสิทธิ์

หากต้องการสร้าง ลบ และแสดงรายการ Change Stream หลักต้องมีสิทธิ์ datastore.schemas.create, datastore.schemas.delete และ datastore.schemas.list ของ Identity and Access Management (IAM) ตามลำดับ

ตัวอย่างเช่น บทบาทผู้ดูแลระบบดัชนี Datastore (roles/datastore.indexAdmin) จะให้สิทธิ์เหล่านี้

สร้าง Change Stream

คุณต้องสร้าง Change Stream ก่อนจึงจะเปิดเคอร์เซอร์ Change Stream ที่เกี่ยวข้องได้ ระบบไม่รองรับการเปิดใช้ Change Stream โดยอัตโนมัติเมื่อสร้างคอลเล็กชันหรือฐานข้อมูล

หากต้องการสร้าง Change Stream ให้ใช้คอนโซล Google Cloud

  1. ในคอนโซล Google Cloud ให้ไปที่หน้าฐานข้อมูล

    ไปที่ฐานข้อมูล

  2. เลือกฐานข้อมูล Firestore ที่เข้ากันได้กับ MongoDB จากรายการ แผง Firestore Studio จะเปิดขึ้น
  3. ในแผงExplorer ให้ค้นหาโหนด Change streams คลิก การดำเนินการเพิ่มเติม แล้วเลือก สร้าง Change Stream
  4. ป้อนชื่อ Change Stream ขอบเขต และระยะเวลาเก็บรักษาที่ไม่ซ้ำกัน แล้วคลิกบันทึก

ดู Change Stream

คุณสามารถดูรายละเอียดเกี่ยวกับ Change Streams ในคอนโซล Google Cloud ได้

  1. ในคอนโซล Google Cloud ให้ไปที่หน้าฐานข้อมูล

    ไปที่ฐานข้อมูล

  2. เลือกฐานข้อมูล Firestore ที่เข้ากันได้กับ MongoDB จากรายการ แผง Firestore Studio จะเปิดขึ้น
  3. ในแผงExplorer ให้ค้นหาโหนด Change Stream
  4. หากต้องการเปิดหรือปิดโหนด ให้คลิก สลับโหนด

ลบ Change Stream

หากต้องการลบ Change Stream ให้ใช้คอนโซล Google Cloud

  1. ในคอนโซล Google Cloud ให้ไปที่หน้าฐานข้อมูล

    ไปที่ฐานข้อมูล

  2. เลือกฐานข้อมูล Firestore ที่เข้ากันได้กับ MongoDB จากรายการ แผง Firestore Studio จะเปิดขึ้น
  3. ในแผงExplorer ให้ค้นหาโหนด Change Stream
  4. หากต้องการเปิดหรือปิดโหนด ให้คลิก สลับโหนด
  5. ในExplorer ให้ค้นหา Change Stream ที่ต้องการลบ
  6. คลิก การดำเนินการเพิ่มเติม แล้ว เลือก ลบ Change Stream
  7. ในกล่องโต้ตอบ ให้ป้อนชื่อ Change Stream เพื่อยืนยันการลบ แล้วคลิกลบ

เปิดหรือดำเนินการต่อด้วยเคอร์เซอร์ Change Stream

ตัวอย่างต่อไปนี้แสดงวิธีสร้าง ดำเนินการต่อ และกำหนดค่าเคอร์เซอร์ Change Stream

คุณต้องสร้าง Change Stream สำหรับฐานข้อมูลหรือคอลเล็กชันอย่างชัดเจนก่อนจึงจะสร้างเคอร์เซอร์ Change Stream ได้

สร้างเคอร์เซอร์ Change Stream

หากต้องการสร้างเคอร์เซอร์ Change Stream ใหม่ ให้ใช้เมธอด watch ในไดรเวอร์ MongoDB หากต้องการรับฟังการเปลี่ยนแปลงทั้งหมดในฐานข้อมูล ให้สร้าง Change Stream ที่มีขอบเขตเป็นฐานข้อมูล แล้วเรียกใช้เมธอด watch ในออบเจ็กต์ db

let cursor = db.watch()

หากต้องการสร้างเคอร์เซอร์ที่มีขอบเขตเป็นคอลเล็กชัน คุณต้องสร้าง Change Stream สำหรับคอลเล็กชันนั้นก่อน จากนั้นเรียกใช้เมธอด watch ในคอลเล็กชันที่เกี่ยวข้อง

let cursor = db.my_collection.watch()

เมื่อสร้างเคอร์เซอร์ Change Stream แล้ว คุณก็เริ่มสตรีมได้ ตัวอย่างเช่น หากคุณแทรกเอกสารและเรียกใช้ tryNext ในเคอร์เซอร์ คุณจะเห็นการเปลี่ยนแปลงปรากฏใน Change Stream

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

หากคุณอัปเดตและลบเอกสาร คุณจะเห็นการเปลี่ยนแปลงเหล่านั้นปรากฏใน Change Stream ดังนี้

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

ดำเนินการต่อด้วย Change Stream

หากต้องการดำเนินการต่อด้วย Change Stream ให้ใช้ตัวเลือก resumeAfter หรือ startAfter หากต้องการกำหนดตำแหน่งในบันทึกการเปลี่ยนแปลงที่จะดำเนินการต่อจาก resumeAfter และ startAfter ให้ใช้โทเค็นการดำเนินการต่อ

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

วิธีใช้ startAfter

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

ใส่รูปภาพก่อนหน้าและหลังจากในการอัปเดตและการลบ

หากจำเป็น คุณสามารถใส่รูปภาพก่อนหน้าและหลังจากของเอกสารในเหตุการณ์การเปลี่ยนแปลงของการอัปเดตและการลบได้ ความพร้อมใช้งานของรูปภาพขึ้นอยู่กับ กรอบเวลาการกู้คืนช่วงเวลา (PITR) และ หากต้องการอ่านรูปภาพเอกสารที่มีอายุมากกว่า 1 ชั่วโมง คุณต้องเปิดใช้ PITR

Change Stream ใช้ประโยชน์จากกรอบเวลา PITR เพื่อแสดงมุมมองของเอกสารก่อนและหลังเหตุการณ์การเปลี่ยนแปลงที่กำหนด โดยค่าเริ่มต้น เหตุการณ์การอัปเดตจะมีช่อง updateDescription ซึ่งเป็นความแตกต่างของช่องที่แก้ไขโดยการดำเนินการอัปเดต

หากต้องการใส่รูปภาพก่อนหน้าและหลังจากในเหตุการณ์การเปลี่ยนแปลง คุณต้อง ระบุตัวเลือก fullDocumentBeforeChange และ fullDocument ในการค้นหา Change Stream

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

หากการค้นหาพยายามอ่านเอกสารนอกกรอบเวลาเก็บรักษา PITR หรือหากไม่ได้เปิดใช้ PITR ค่า required จะแสดงข้อความแสดงข้อผิดพลาดฝั่งเซิร์ฟเวอร์

คุณสามารถใช้ค่า whenAvailable เพื่อแสดงค่า null หากรูปภาพไม่พร้อมใช้งานอีกต่อไป แทนที่จะแสดงข้อผิดพลาด

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

ใส่รูปภาพปัจจุบันในการอัปเดต

โดยค่าเริ่มต้น เหตุการณ์การอัปเดตจะมีช่อง updateDescription ซึ่งเป็นความแตกต่างของช่องที่แก้ไขโดยการดำเนินการอัปเดต หากต้องการค้นหาเอกสารฉบับเต็มเวอร์ชันล่าสุด ให้ใช้ค่า updateLookup ในตัวเลือก fullDocument แทน

ฟีเจอร์นี้ไม่จำเป็นต้องใช้ PITR และจะทำการค้นหาเอกสาร

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

การอ่านแบบขนาน

หากต้องการเพิ่มอัตราการส่งข้อมูล คุณสามารถใช้ตัวเลือก firestoreWorkerConfig เพื่อแยกการค้นหา Change Stream ไปยัง Worker หลายรายการ Worker แต่ละรายการมีหน้าที่แสดงการเปลี่ยนแปลงสำหรับเอกสารชุดต่างๆ คุณต้องสร้างเคอร์เซอร์แบบขนานผ่านการค้นหา runCommand หรือ aggregate

ตัวอย่างเช่น คุณสามารถกระจาย Change Stream ไปยัง Worker 3 รายการได้ดังนี้

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

Change Stream และการสำรองข้อมูล

ทั้งการกำหนดค่า Change Stream และข้อมูล Change Stream จะไม่พร้อมใช้งานในการดำเนินการกู้คืนข้อมูลสำรอง หากคุณกู้คืนฐานข้อมูลที่มี Change Stream คุณต้องสร้าง Change Stream เหล่านั้นใหม่ในฐานข้อมูลปลายทางเพื่อเปิดเคอร์เซอร์ไปยังฐานข้อมูลนั้น

การเรียกเก็บเงิน

ความแตกต่างของลักษณะการทำงาน

ส่วนต่อไปนี้จะอธิบายความแตกต่างของ Change Stream ระหว่าง Firestore ที่เข้ากันได้กับ MongoDB และ MongoDB

updateDescription

updateDescription เป็นเอกสารในเหตุการณ์ update ที่อธิบายช่อง ที่การดำเนินการอัปเดตได้อัปเดตหรือนำออก ใน Cloud Firestore ความแตกต่างที่สำคัญมีดังนี้

  • ใน updateDescription ระบบจะไม่ป้อนข้อมูลในช่อง truncatedArrays และ disambiguatedPaths
  • updateDescription.updatedFields แสดงถึงความแตกต่างที่ชัดเจนระหว่างรูปภาพก่อนหน้าและหลังจากของเอกสารก่อนและหลังการใช้การเปลี่ยนแปลง

พิจารณาสถานะเริ่มต้นต่อไปนี้ของเอกสาร

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

สถานการณ์ที่ 1: เปลี่ยนเฉพาะองค์ประกอบแรกของอาร์เรย์

ในสถานการณ์นี้ ลักษณะการทำงานของ Cloud Firestore จะตรงกับ MongoDB

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

สถานการณ์ที่ 2: เขียนทับด้วยอาร์เรย์ทั้งหมด

ในสถานการณ์นี้ การดำเนินการจะอัปเดตเฉพาะช่องอาร์เรย์แรก แต่จะเขียนทับอาร์เรย์ทั้งหมด

ความแตกต่างของการอัปเดต Cloud Firestore จะไม่แยกความแตกต่างระหว่าง 2 สถานการณ์นี้ และจะแสดง updateDescription.updatedFields เดียวกันสำหรับทั้ง 2 สถานการณ์

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

ขั้นตอนถัดไป