Change Streams สำหรับ Firestore ที่เข้ากันได้กับ MongoDB ช่วยให้แอปพลิเคชันเข้าถึงการเปลี่ยนแปลงแบบเรียลไทม์ (การแทรก การอัปเดต และการลบ) ที่เกิดขึ้นกับคอลเล็กชันหรือทั้งฐานข้อมูลได้ สตรีมการเปลี่ยนแปลงจะจัดลำดับการอัปเดตตามเวลาการแก้ไข
Change Streams สามารถเข้าถึงได้ผ่าน API ที่เข้ากันได้กับ MongoDB และไดรเวอร์ MongoDB แบบเดิม การใช้งาน Change Streams ใน Firestore ที่เข้ากันได้กับ MongoDB สามารถจัดการอัตราการส่งข้อมูลของการเขียนและการอ่านได้ ผ่านการใช้งานการแบ่งพาร์ติชันอัตโนมัติที่ไม่ซ้ำกันในการเขียนและความขนานของการอ่าน ซึ่งช่วยให้คุณสร้างเวิร์กโหลดที่มีปริมาณงานสูงได้ นอกจากนี้ คุณยังปรับปรุงโครงสร้างพื้นฐานสำหรับการย้ายข้อมูลและการซิงค์ข้อมูลระหว่าง Cloud Firestore กับโซลูชันพื้นที่เก็บข้อมูลอื่นๆ ได้ด้วย
นอกเหนือจากความเข้ากันได้กับไดรเวอร์ MongoDB แล้ว คุณยังใช้ Cloud Firestore เพื่ออ่าน Change Stream แบบขนานได้ด้วย ซึ่งช่วยให้คุณสร้างเวิร์กโหลดการอ่านที่มีปริมาณงานสูงแบบคู่ขนานได้ แต่ละสตรีมแสดงถึง พาร์ติชันผลลัพธ์ที่กระจายอย่างดี
Change Streams รองรับฟีเจอร์ต่อไปนี้
- สตรีมการเปลี่ยนแปลงที่กำหนดค่าได้ที่มีขอบเขตฐานข้อมูลหรือคอลเล็กชัน
- ระยะเวลาการเก็บรักษาสำหรับสตรีมการเปลี่ยนแปลงที่ระบุไว้ตอนสร้าง การเก็บรักษาเริ่มต้นคือ 7 วัน และการเก็บรักษาขั้นต่ำคือ 1 วัน ระยะเวลาเก็บรักษา ต้องเป็นจำนวนเท่าของ 1 วัน โดยสูงสุดไม่เกิน 7 วัน คุณจะเปลี่ยนระยะเวลาการเก็บรักษา หลังจากสร้างแล้วไม่ได้ หากต้องการเปลี่ยนระยะเวลาเก็บรักษา คุณต้อง ดรอปและสร้างสตรีมการเปลี่ยนแปลงใหม่
delete,insert,updateและdropเปลี่ยนเหตุการณ์ที่สังเกตได้ โดยใช้db.collection.watch()และdb.watch()updateDescription.updatedFieldsมีส่วนต่างของการอัปเดต- ตัวเลือก
fullDocumentและfullDocumentBeforeChangeทั้งหมด- กำลังค้นหาเอกสารฉบับเต็มเพื่อดูข้อมูลอัปเดต
- รูปภาพก่อนหน้าของเอกสารก่อนที่จะมีการแทนที่ อัปเดต หรือลบ
- รูปภาพของเอกสารหลังจากมีการแทนที่หรืออัปเดต
- รูปภาพก่อนและหลังที่มีอายุมากกว่า 1 ชั่วโมงต้องเปิดใช้ การกู้คืนช่วงเวลา (PITR)
- ตัวเลือกการกลับมาทำงานทั้งหมด รวมถึง
resumeAfterและstartAfter - เมื่อใช้
watch()เพื่อสังเกตการเปลี่ยนแปลง คุณสามารถเชื่อมโยงขั้นตอนการรวบรวมข้อมูลได้ เช่น$addFields,$match,$project,$replaceRoot$replaceWith,$setและ$unset
กำหนดค่า Change Streams
หากต้องการสร้าง ทิ้ง หรือดู Change Stream ที่มีอยู่สำหรับฐานข้อมูล ให้ใช้ คอนโซล Google Cloud
บทบาทและสิทธิ์
หากต้องการสร้าง ลบ และแสดงรายการสตรีมการเปลี่ยนแปลง หลักการต้องมีสิทธิ์ Identity and Access Management (IAM) ของ
datastore.schemas.create datastore.schemas.delete และ
datastore.schemas.list ตามลำดับ
เช่น บทบาทผู้ดูแลระบบดัชนี Datastore (roles/datastore.indexAdmin) จะให้สิทธิ์ต่อไปนี้
สร้าง Change Stream
คุณต้องสร้าง Change Stream ก่อนจึงจะเปิดเคอร์เซอร์ Change Stream ที่เกี่ยวข้องได้ ระบบไม่รองรับการเปิดใช้สตรีมการเปลี่ยนแปลงอัตโนมัติที่คอลเล็กชันหรือการสร้างฐานข้อมูล
หากต้องการสร้างสตรีมการเปลี่ยนแปลง ให้ใช้คอนโซล Google Cloud
-
ในคอนโซล Google Cloud ให้ไปที่หน้าฐานข้อมูล
- เลือกฐานข้อมูล Firestore ที่เข้ากันได้กับ MongoDB จากรายการ แผง Firestore Studio จะเปิดขึ้น
- ในแผงExplorer ให้ค้นหาโหนดChange Streams คลิก การดำเนินการเพิ่มเติม แล้วเลือกสร้าง Change Stream
- ป้อนชื่อสตรีมการเปลี่ยนแปลง ขอบเขต และระยะเวลาเก็บรักษาที่ไม่ซ้ำกัน แล้วคลิกบันทึก
ดูสตรีมการเปลี่ยนแปลง
คุณดูรายละเอียดเกี่ยวกับ Change Streams ได้ในคอนโซล Google Cloud
-
ในคอนโซล Google Cloud ให้ไปที่หน้าฐานข้อมูล
- เลือกฐานข้อมูล Firestore ที่เข้ากันได้กับ MongoDB จากรายการ แผง Firestore Studio จะเปิดขึ้น
- ในแผงสำรวจ ให้ค้นหาโหนดสตรีมการเปลี่ยนแปลง
- หากต้องการเปิดหรือปิดโหนด ให้คลิก สลับโหนด
ลบสตรีมการเปลี่ยนแปลง
หากต้องการลบสตรีมการเปลี่ยนแปลง ให้ใช้คอนโซล Google Cloud
-
ในคอนโซล Google Cloud ให้ไปที่หน้าฐานข้อมูล
- เลือกฐานข้อมูล Firestore ที่เข้ากันได้กับ MongoDB จากรายการ แผง Firestore Studio จะเปิดขึ้น
- ในแผงสำรวจ ให้ค้นหาโหนดสตรีมการเปลี่ยนแปลง
- หากต้องการเปิดหรือปิดโหนด ให้คลิก สลับโหนด
- ในExplorer ให้ค้นหาสตรีมการเปลี่ยนแปลงที่ต้องการลบ
- คลิก การดำเนินการเพิ่มเติม แล้วเลือกลบสตรีมการเปลี่ยนแปลง
- ในกล่องโต้ตอบ ให้ป้อนชื่อสตรีมการเปลี่ยนแปลงเพื่อยืนยันการลบ แล้วคลิกลบ
เปิดหรือดำเนินการต่อเคอร์เซอร์สตรีมการเปลี่ยนแปลง
ตัวอย่างต่อไปนี้แสดงวิธีสร้าง ดำเนินการต่อ และกำหนดค่า เคอร์เซอร์สตรีมการเปลี่ยนแปลง
ก่อนที่จะสร้างเคอร์เซอร์สตรีมการเปลี่ยนแปลง คุณต้องสร้างสตรีมการเปลี่ยนแปลงสำหรับฐานข้อมูลหรือคอลเล็กชันอย่างชัดเจน
สร้างเคอร์เซอร์สตรีมการเปลี่ยนแปลง
หากต้องการสร้างเคอร์เซอร์สตรีมการเปลี่ยนแปลงใหม่ ให้ใช้วิธี watch ใน
ไดรเวอร์ MongoDB
หากต้องการฟังการเปลี่ยนแปลงทั้งหมดในฐานข้อมูล ให้สร้างสตรีมการเปลี่ยนแปลงที่กำหนดขอบเขตฐานข้อมูล
และเรียกใช้เมธอด watch ในออบเจ็กต์ db
let cursor = db.watch()
หากต้องการสร้างเคอร์เซอร์ที่กำหนดขอบเขตไว้ที่คอลเล็กชัน คุณต้อง
สร้างสตรีมการเปลี่ยนแปลงสำหรับคอลเล็กชันนั้นก่อน จากนั้นเรียกใช้เมธอด watch ในคอลเล็กชันที่
เกี่ยวข้อง
let cursor = db.my_collection.watch()
ตอนนี้คุณสร้างเคอร์เซอร์สตรีมการเปลี่ยนแปลงแล้ว ก็เริ่มสตรีมได้เลย
เช่น หากแทรกเอกสารและเรียกใช้ tryNext ที่เคอร์เซอร์
คุณจะเห็นการเปลี่ยนแปลงปรากฏในสตรีมการเปลี่ยนแปลง
let doc = db.my_collection.insertOne({value: "hello world"}) console.log(cursor.tryNext())
หากคุณอัปเดตและลบเอกสาร คุณจะเห็นการเปลี่ยนแปลงเหล่านั้นปรากฏในสตรีมการเปลี่ยนแปลง ดังนี้
db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}}) db.my_collection.deleteOne({"_id": doc.insertedId}}) // Prints the update event console.log(cursor.tryNext()) // Prints the delete event console.log(cursor.tryNext())
ดำเนินการสตรีมการเปลี่ยนแปลงต่อ
หากต้องการเริ่มสตรีมการเปลี่ยนแปลงต่อ ให้ใช้ตัวเลือก resumeAfter หรือ startAfter
หากต้องการระบุตำแหน่งในบันทึกการเปลี่ยนแปลงที่จะดำเนินการต่อจาก resumeAfter และ
startAfter ให้ใช้โทเค็นการดำเนินการต่อ
// Create a cursor and add one event to the change stream. let cursor = db.my_collection.watch(); db.my_collection.insertOne({value: "hello world"}); let event = cursor.tryNext(); // Get the resume token from the event. let resumeToken = event._id; // Add a new event to the change stream. db.my_collection.insertOne({value: "foobar"}); // Create a new cursor by using the resume token as a starting point. let newCursor = db.my_collection.watch({resumeAfter: resumeToken}) // Log the change event containing the "foobar" value. console.log(newCursor.tryNext())
วิธีใช้ startAfter
// Start after the resume token. let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})
ใส่รูปภาพก่อนและหลังในการอัปเดตและลบ
หากจำเป็น คุณสามารถใส่รูปภาพก่อนและหลังของเอกสารในการอัปเดต และลบเหตุการณ์การเปลี่ยนแปลงได้ ความพร้อมใช้งานของรูปภาพขึ้นอยู่กับช่วงการกู้คืนช่วงเวลา (PITR) และหากต้องการอ่านรูปภาพเอกสารที่มีอายุมากกว่า 1 ชั่วโมง คุณต้องเปิดใช้ PITR
Change Streams ใช้ประโยชน์จากกรอบเวลา PITR เพื่อแสดงมุมมองของ
เอกสารก่อนและหลังเหตุการณ์การเปลี่ยนแปลงที่ระบุ โดยค่าเริ่มต้น เหตุการณ์อัปเดตจะมีฟิลด์ updateDescription ซึ่งเป็นส่วนต่างของฟิลด์ที่
การดำเนินการอัปเดตแก้ไข
หากต้องการรวมรูปภาพก่อนและหลังในเหตุการณ์การเปลี่ยนแปลง
คุณต้อง
ระบุตัวเลือก fullDocumentBeforeChange และ fullDocument ในการค้นหาสตรีมการเปลี่ยนแปลง
let cursor = db.my_collection.watch({ "fullDocument": "required", "fullDocumentBeforeChange": "required" })
หากคำค้นหาพยายามอ่านเอกสารนอกหน้าต่างการเก็บรักษา PITR หรือหากไม่ได้เปิดใช้ PITR required ค่าจะส่งข้อความแสดงข้อผิดพลาดทางฝั่งเซิร์ฟเวอร์
คุณสามารถใช้ค่า whenAvailable เพื่อ
แสดงค่า null หากรูปภาพไม่พร้อมใช้งานอีกต่อไปได้แทนการแสดงข้อผิดพลาด
let cursor = db.my_collection.watch({ "fullDocument": "whenAvailable", "fullDocumentBeforeChange": "whenAvailable" })
รวมรูปภาพปัจจุบันในการอัปเดต
โดยค่าเริ่มต้น เหตุการณ์การอัปเดตจะมีupdateDescription ซึ่งเป็น
ส่วนต่างของฟิลด์ที่แก้ไขโดยการดำเนินการอัปเดต หากต้องการค้นหาเอกสารทั้งฉบับเวอร์ชันล่าสุดแทน ให้ใช้updateLookupค่าในตัวเลือกfullDocument
ฟีเจอร์นี้ไม่จำเป็นต้องใช้ PITR และจะค้นหาเอกสาร
let cursor = db.my_collection.watch({ "fullDocument": "updateLookup", })
การอ่านแบบขนาน
หากต้องการเพิ่มอัตราการส่งข้อมูล คุณสามารถใช้ตัวเลือก firestoreWorkerConfig เพื่อแยกคำค้นหา Change Stream ออกเป็นหลายๆ Worker โดยแต่ละ Worker จะ
มีหน้าที่ให้บริการการเปลี่ยนแปลงสำหรับชุดเอกสารที่แตกต่างกัน คุณต้อง
สร้างเคอร์เซอร์แบบขนานผ่านการค้นหา runCommand หรือ aggregate
เช่น คุณสามารถกระจายสตรีมการเปลี่ยนแปลงในผู้ปฏิบัติงาน 3 คนได้ดังนี้
let cursor1 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }} }]); let cursor2 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }} }]); let cursor3 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }} }]);
Change Streams และการสำรองข้อมูล
ทั้งการกำหนดค่าสตรีมการเปลี่ยนแปลงและข้อมูลสตรีมการเปลี่ยนแปลงจะไม่พร้อมใช้งาน ในการดำเนินการกู้คืนข้อมูลสำรอง หากกู้คืนฐานข้อมูลที่มี Change Streams คุณต้องสร้าง Change Streams เหล่านั้นใหม่ในฐานข้อมูลปลายทางเพื่อเปิด เคอร์เซอร์ไปยังฐานข้อมูลนั้น
การเรียกเก็บเงิน
- Change Streams จะทำให้เกิดค่าใช้จ่ายของหน่วยการอ่านและพื้นที่เก็บข้อมูล ดูราคาของ Change Stream
- หากต้องการรวมรูปภาพก่อนและหลังที่เก่ากว่า 1 ชั่วโมงในเวลาคำขออ่าน คุณต้องเปิดใช้ PITR ซึ่งจะทำให้เกิดค่าใช้จ่าย PITR
ความแตกต่างของลักษณะการทำงาน
ส่วนต่อไปนี้จะอธิบายความแตกต่างของ Change Streams ระหว่าง Firestore ที่เข้ากันได้กับ MongoDB และ MongoDB
updateDescription
updateDescription คือเอกสารในเหตุการณ์ update ที่อธิบายฟิลด์
ซึ่งมีการอัปเดตหรือนำออกโดยการดำเนินการอัปเดต ใน
Cloud Firestore ความแตกต่างที่สำคัญมีดังนี้
- ใน
updateDescriptionระบบจะไม่แสดงข้อมูลในช่องtruncatedArraysและdisambiguatedPaths updateDescription.updatedFieldsแสดงถึงความแตกต่างที่ชัดเจนระหว่าง รูปภาพก่อนและหลังของเอกสารก่อนและหลังใช้การเปลี่ยนแปลง
พิจารณาสถานะเริ่มต้นของเอกสารต่อไปนี้
db.my_collection.insertOne({ _id: 1, root: { array: [{a: 1}, {b: 2}, {c: 3}] } })
สถานการณ์ที่ 1: เปลี่ยนแปลงเฉพาะองค์ประกอบแรกของอาร์เรย์
ในสถานการณ์นี้ ลักษณะการทำงานของ Cloud Firestore จะตรงกับ MongoDB
db.my_collection.updateOne( {_id: 1}, {'$set': {"root.array.0.a": 100}} ) { updatedFields: {"root.array.0.a": 100}, removedFields: [] }
สถานการณ์ที่ 2: เขียนทับด้วยอาร์เรย์ทั้งหมด
ในสถานการณ์นี้ การดำเนินการจะอัปเดตเฉพาะฟิลด์อาร์เรย์แรก แต่จะเขียนทับอาร์เรย์ทั้งหมด
Cloud Firestore การอัปเดต Diff ไม่ได้แยกความแตกต่างระหว่าง 2 สถานการณ์นี้
และจะแสดง updateDescription.updatedFields เดียวกันสำหรับทั้ง 2 สถานการณ์
db.my_collection.updateOne( {_id: 1}, {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}} ) // In other implementations, updatedFields reflects the mutation itself { updatedFields: { "root.array": [{a: 100}, {b: 2}, {c: 3}] }, removedFields: [] } // Firestore updatedFields is the diff between the before and after versions of the document { updatedFields: {"root.array.0.a": 100}, removedFields: [] }