อ่านข้อมูลแบบเรียลไทม์ด้วยสตรีมการเปลี่ยนแปลง

Change Streams สำหรับ Firestore ที่เข้ากันได้กับ MongoDB ช่วยให้แอปพลิเคชันเข้าถึงการเปลี่ยนแปลงแบบเรียลไทม์ (การแทรก การอัปเดต และการลบ) ที่เกิดขึ้นกับคอลเล็กชันหรือทั้งฐานข้อมูลได้ สตรีมการเปลี่ยนแปลงจะจัดลำดับการอัปเดตตามเวลาการแก้ไข

Change Streams สามารถเข้าถึงได้ผ่าน API ที่เข้ากันได้กับ MongoDB และไดรเวอร์ MongoDB แบบเดิม การใช้งาน Change Streams ใน Firestore ที่เข้ากันได้กับ MongoDB สามารถจัดการอัตราการส่งข้อมูลของการเขียนและการอ่านได้ ผ่านการใช้งานการแบ่งพาร์ติชันอัตโนมัติที่ไม่ซ้ำกันในการเขียนและความขนานของการอ่าน ซึ่งช่วยให้คุณสร้างเวิร์กโหลดที่มีปริมาณงานสูงได้ นอกจากนี้ คุณยังปรับปรุงโครงสร้างพื้นฐานสำหรับการย้ายข้อมูลและการซิงค์ข้อมูลระหว่าง Cloud Firestore กับโซลูชันพื้นที่เก็บข้อมูลอื่นๆ ได้ด้วย

นอกเหนือจากความเข้ากันได้กับไดรเวอร์ MongoDB แล้ว คุณยังใช้ Cloud Firestore เพื่ออ่าน Change Stream แบบขนานได้ด้วย ซึ่งช่วยให้คุณสร้างเวิร์กโหลดการอ่านที่มีปริมาณงานสูงแบบคู่ขนานได้ แต่ละสตรีมแสดงถึง พาร์ติชันผลลัพธ์ที่กระจายอย่างดี

Change Streams รองรับฟีเจอร์ต่อไปนี้

  • สตรีมการเปลี่ยนแปลงที่กำหนดค่าได้ที่มีขอบเขตฐานข้อมูลหรือคอลเล็กชัน
  • ระยะเวลาการเก็บรักษาสำหรับสตรีมการเปลี่ยนแปลงที่ระบุไว้ตอนสร้าง การเก็บรักษาเริ่มต้นคือ 7 วัน และการเก็บรักษาขั้นต่ำคือ 1 วัน ระยะเวลาเก็บรักษา ต้องเป็นจำนวนเท่าของ 1 วัน โดยสูงสุดไม่เกิน 7 วัน คุณจะเปลี่ยนระยะเวลาการเก็บรักษา หลังจากสร้างแล้วไม่ได้ หากต้องการเปลี่ยนระยะเวลาเก็บรักษา คุณต้อง ดรอปและสร้างสตรีมการเปลี่ยนแปลงใหม่
  • delete, insert, update และ drop เปลี่ยนเหตุการณ์ที่สังเกตได้ โดยใช้ db.collection.watch() และ db.watch()
  • updateDescription.updatedFields มีส่วนต่างของการอัปเดต
  • ตัวเลือก fullDocument และ fullDocumentBeforeChange ทั้งหมด
    • กำลังค้นหาเอกสารฉบับเต็มเพื่อดูข้อมูลอัปเดต
    • รูปภาพก่อนหน้าของเอกสารก่อนที่จะมีการแทนที่ อัปเดต หรือลบ
    • รูปภาพของเอกสารหลังจากมีการแทนที่หรืออัปเดต
    • รูปภาพก่อนและหลังที่มีอายุมากกว่า 1 ชั่วโมงต้องเปิดใช้ การกู้คืนช่วงเวลา (PITR)
  • ตัวเลือกการกลับมาทำงานทั้งหมด รวมถึง resumeAfter และ startAfter
  • เมื่อใช้ watch() เพื่อสังเกตการเปลี่ยนแปลง คุณสามารถเชื่อมโยงขั้นตอนการรวบรวมข้อมูลได้ เช่น $addFields, $match, $project, $replaceRoot $replaceWith, $set และ $unset

กำหนดค่า Change Streams

หากต้องการสร้าง ทิ้ง หรือดู Change Stream ที่มีอยู่สำหรับฐานข้อมูล ให้ใช้ คอนโซล Google Cloud

บทบาทและสิทธิ์

หากต้องการสร้าง ลบ และแสดงรายการสตรีมการเปลี่ยนแปลง หลักการต้องมีสิทธิ์ Identity and Access Management (IAM) ของ datastore.schemas.create datastore.schemas.delete และ datastore.schemas.list ตามลำดับ

เช่น บทบาทผู้ดูแลระบบดัชนี Datastore (roles/datastore.indexAdmin) จะให้สิทธิ์ต่อไปนี้

สร้าง Change Stream

คุณต้องสร้าง Change Stream ก่อนจึงจะเปิดเคอร์เซอร์ Change Stream ที่เกี่ยวข้องได้ ระบบไม่รองรับการเปิดใช้สตรีมการเปลี่ยนแปลงอัตโนมัติที่คอลเล็กชันหรือการสร้างฐานข้อมูล

หากต้องการสร้างสตรีมการเปลี่ยนแปลง ให้ใช้คอนโซล Google Cloud

  1. ในคอนโซล Google Cloud ให้ไปที่หน้าฐานข้อมูล

    ไปที่ฐานข้อมูล

  2. เลือกฐานข้อมูล Firestore ที่เข้ากันได้กับ MongoDB จากรายการ แผง Firestore Studio จะเปิดขึ้น
  3. ในแผงExplorer ให้ค้นหาโหนดChange Streams คลิก การดำเนินการเพิ่มเติม แล้วเลือกสร้าง Change Stream
  4. ป้อนชื่อสตรีมการเปลี่ยนแปลง ขอบเขต และระยะเวลาเก็บรักษาที่ไม่ซ้ำกัน แล้วคลิกบันทึก

ดูสตรีมการเปลี่ยนแปลง

คุณดูรายละเอียดเกี่ยวกับ Change Streams ได้ในคอนโซล Google Cloud

  1. ในคอนโซล Google Cloud ให้ไปที่หน้าฐานข้อมูล

    ไปที่ฐานข้อมูล

  2. เลือกฐานข้อมูล Firestore ที่เข้ากันได้กับ MongoDB จากรายการ แผง Firestore Studio จะเปิดขึ้น
  3. ในแผงสำรวจ ให้ค้นหาโหนดสตรีมการเปลี่ยนแปลง
  4. หากต้องการเปิดหรือปิดโหนด ให้คลิก สลับโหนด

ลบสตรีมการเปลี่ยนแปลง

หากต้องการลบสตรีมการเปลี่ยนแปลง ให้ใช้คอนโซล Google Cloud

  1. ในคอนโซล Google Cloud ให้ไปที่หน้าฐานข้อมูล

    ไปที่ฐานข้อมูล

  2. เลือกฐานข้อมูล Firestore ที่เข้ากันได้กับ MongoDB จากรายการ แผง Firestore Studio จะเปิดขึ้น
  3. ในแผงสำรวจ ให้ค้นหาโหนดสตรีมการเปลี่ยนแปลง
  4. หากต้องการเปิดหรือปิดโหนด ให้คลิก สลับโหนด
  5. ในExplorer ให้ค้นหาสตรีมการเปลี่ยนแปลงที่ต้องการลบ
  6. คลิก การดำเนินการเพิ่มเติม แล้วเลือกลบสตรีมการเปลี่ยนแปลง
  7. ในกล่องโต้ตอบ ให้ป้อนชื่อสตรีมการเปลี่ยนแปลงเพื่อยืนยันการลบ แล้วคลิกลบ

เปิดหรือดำเนินการต่อเคอร์เซอร์สตรีมการเปลี่ยนแปลง

ตัวอย่างต่อไปนี้แสดงวิธีสร้าง ดำเนินการต่อ และกำหนดค่า เคอร์เซอร์สตรีมการเปลี่ยนแปลง

ก่อนที่จะสร้างเคอร์เซอร์สตรีมการเปลี่ยนแปลง คุณต้องสร้างสตรีมการเปลี่ยนแปลงสำหรับฐานข้อมูลหรือคอลเล็กชันอย่างชัดเจน

สร้างเคอร์เซอร์สตรีมการเปลี่ยนแปลง

หากต้องการสร้างเคอร์เซอร์สตรีมการเปลี่ยนแปลงใหม่ ให้ใช้วิธี watch ใน ไดรเวอร์ MongoDB หากต้องการฟังการเปลี่ยนแปลงทั้งหมดในฐานข้อมูล ให้สร้างสตรีมการเปลี่ยนแปลงที่กำหนดขอบเขตฐานข้อมูล และเรียกใช้เมธอด watch ในออบเจ็กต์ db

let cursor = db.watch()

หากต้องการสร้างเคอร์เซอร์ที่กำหนดขอบเขตไว้ที่คอลเล็กชัน คุณต้อง สร้างสตรีมการเปลี่ยนแปลงสำหรับคอลเล็กชันนั้นก่อน จากนั้นเรียกใช้เมธอด watch ในคอลเล็กชันที่ เกี่ยวข้อง

let cursor = db.my_collection.watch()

ตอนนี้คุณสร้างเคอร์เซอร์สตรีมการเปลี่ยนแปลงแล้ว ก็เริ่มสตรีมได้เลย เช่น หากแทรกเอกสารและเรียกใช้ tryNext ที่เคอร์เซอร์ คุณจะเห็นการเปลี่ยนแปลงปรากฏในสตรีมการเปลี่ยนแปลง

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

หากคุณอัปเดตและลบเอกสาร คุณจะเห็นการเปลี่ยนแปลงเหล่านั้นปรากฏในสตรีมการเปลี่ยนแปลง ดังนี้

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

ดำเนินการสตรีมการเปลี่ยนแปลงต่อ

หากต้องการเริ่มสตรีมการเปลี่ยนแปลงต่อ ให้ใช้ตัวเลือก resumeAfter หรือ startAfter หากต้องการระบุตำแหน่งในบันทึกการเปลี่ยนแปลงที่จะดำเนินการต่อจาก resumeAfter และ startAfter ให้ใช้โทเค็นการดำเนินการต่อ

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

วิธีใช้ startAfter

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

ใส่รูปภาพก่อนและหลังในการอัปเดตและลบ

หากจำเป็น คุณสามารถใส่รูปภาพก่อนและหลังของเอกสารในการอัปเดต และลบเหตุการณ์การเปลี่ยนแปลงได้ ความพร้อมใช้งานของรูปภาพขึ้นอยู่กับช่วงการกู้คืนช่วงเวลา (PITR) และหากต้องการอ่านรูปภาพเอกสารที่มีอายุมากกว่า 1 ชั่วโมง คุณต้องเปิดใช้ PITR

Change Streams ใช้ประโยชน์จากกรอบเวลา PITR เพื่อแสดงมุมมองของ เอกสารก่อนและหลังเหตุการณ์การเปลี่ยนแปลงที่ระบุ โดยค่าเริ่มต้น เหตุการณ์อัปเดตจะมีฟิลด์ updateDescription ซึ่งเป็นส่วนต่างของฟิลด์ที่ การดำเนินการอัปเดตแก้ไข

หากต้องการรวมรูปภาพก่อนและหลังในเหตุการณ์การเปลี่ยนแปลง คุณต้อง ระบุตัวเลือก fullDocumentBeforeChange และ fullDocument ในการค้นหาสตรีมการเปลี่ยนแปลง

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

หากคำค้นหาพยายามอ่านเอกสารนอกหน้าต่างการเก็บรักษา PITR หรือหากไม่ได้เปิดใช้ PITR required ค่าจะส่งข้อความแสดงข้อผิดพลาดทางฝั่งเซิร์ฟเวอร์

คุณสามารถใช้ค่า whenAvailable เพื่อ แสดงค่า null หากรูปภาพไม่พร้อมใช้งานอีกต่อไปได้แทนการแสดงข้อผิดพลาด

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

รวมรูปภาพปัจจุบันในการอัปเดต

โดยค่าเริ่มต้น เหตุการณ์การอัปเดตจะมีupdateDescription ซึ่งเป็น ส่วนต่างของฟิลด์ที่แก้ไขโดยการดำเนินการอัปเดต หากต้องการค้นหาเอกสารทั้งฉบับเวอร์ชันล่าสุดแทน ให้ใช้updateLookupค่าในตัวเลือกfullDocument

ฟีเจอร์นี้ไม่จำเป็นต้องใช้ PITR และจะค้นหาเอกสาร

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

การอ่านแบบขนาน

หากต้องการเพิ่มอัตราการส่งข้อมูล คุณสามารถใช้ตัวเลือก firestoreWorkerConfig เพื่อแยกคำค้นหา Change Stream ออกเป็นหลายๆ Worker โดยแต่ละ Worker จะ มีหน้าที่ให้บริการการเปลี่ยนแปลงสำหรับชุดเอกสารที่แตกต่างกัน คุณต้อง สร้างเคอร์เซอร์แบบขนานผ่านการค้นหา runCommand หรือ aggregate

เช่น คุณสามารถกระจายสตรีมการเปลี่ยนแปลงในผู้ปฏิบัติงาน 3 คนได้ดังนี้

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

Change Streams และการสำรองข้อมูล

ทั้งการกำหนดค่าสตรีมการเปลี่ยนแปลงและข้อมูลสตรีมการเปลี่ยนแปลงจะไม่พร้อมใช้งาน ในการดำเนินการกู้คืนข้อมูลสำรอง หากกู้คืนฐานข้อมูลที่มี Change Streams คุณต้องสร้าง Change Streams เหล่านั้นใหม่ในฐานข้อมูลปลายทางเพื่อเปิด เคอร์เซอร์ไปยังฐานข้อมูลนั้น

การเรียกเก็บเงิน

ความแตกต่างของลักษณะการทำงาน

ส่วนต่อไปนี้จะอธิบายความแตกต่างของ Change Streams ระหว่าง Firestore ที่เข้ากันได้กับ MongoDB และ MongoDB

updateDescription

updateDescription คือเอกสารในเหตุการณ์ update ที่อธิบายฟิลด์ ซึ่งมีการอัปเดตหรือนำออกโดยการดำเนินการอัปเดต ใน Cloud Firestore ความแตกต่างที่สำคัญมีดังนี้

  • ใน updateDescription ระบบจะไม่แสดงข้อมูลในช่อง truncatedArrays และ disambiguatedPaths
  • updateDescription.updatedFields แสดงถึงความแตกต่างที่ชัดเจนระหว่าง รูปภาพก่อนและหลังของเอกสารก่อนและหลังใช้การเปลี่ยนแปลง

พิจารณาสถานะเริ่มต้นของเอกสารต่อไปนี้

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

สถานการณ์ที่ 1: เปลี่ยนแปลงเฉพาะองค์ประกอบแรกของอาร์เรย์

ในสถานการณ์นี้ ลักษณะการทำงานของ Cloud Firestore จะตรงกับ MongoDB

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

สถานการณ์ที่ 2: เขียนทับด้วยอาร์เรย์ทั้งหมด

ในสถานการณ์นี้ การดำเนินการจะอัปเดตเฉพาะฟิลด์อาร์เรย์แรก แต่จะเขียนทับอาร์เรย์ทั้งหมด

Cloud Firestore การอัปเดต Diff ไม่ได้แยกความแตกต่างระหว่าง 2 สถานการณ์นี้ และจะแสดง updateDescription.updatedFields เดียวกันสำหรับทั้ง 2 สถานการณ์

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

ขั้นตอนถัดไป