קריאת נתונים בזמן אמת באמצעות סנכרון שינויים בזרמי נתונים

‫Change Streams for Firestore עם תאימות ל-MongoDB מאפשרים לאפליקציות לגשת לשינויים בזמן אמת (הוספות, עדכונים ומחיקות) שבוצעו באוסף או במסד נתונים שלם. מקור נתונים לשינויים מסדר את העדכונים לפי זמן השינוי.

אפשר לגשת ל-Change Streams דרך ממשקי API שתואמים ל-MongoDB ודרך דרייברים מסורתיים של MongoDB. ההטמעה של Change Streams ב-Firestore עם תאימות ל-MongoDB יכולה לטפל בכל נפח נתונים של פעולות כתיבה וקריאה באמצעות הטמעה ייחודית של חלוקה אוטומטית למחיצות בפעולות כתיבה וקריאה מקבילות. כך תוכלו ליצור עומסי עבודה עם תפוקה גבוהה. אפשר גם לשפר את התשתית להעברת נתונים ולסנכרון נתונים בין Cloud Firestore לבין פתרונות אחסון אחרים.

בנוסף לתאימות לדרייברים של MongoDB, אפשר להשתמש ב-Cloud Firestore כדי לקרוא Change Streams במקביל. כך תוכלו ליצור עומסי עבודה מקבילים של קריאה עם תפוקה גבוהה. כל זרם מייצג חלוקה של התוצאות.

התכונה 'שינויים בנתונים' תומכת בתכונות הבאות:

  • סנכרון שינויים בזרמי נתונים שניתנים להגדרה עם היקף מסד נתונים או אוסף.
  • משך השמירה של שינוי בזרם, שמוגדר בזמן היצירה. ברירת המחדל של תקופת השמירה היא 7 ימים, ותקופת השמירה המינימלית היא יום אחד. השמירה צריכה להיות כפולה של יום אחד, עד למקסימום של 7 ימים. אי אפשר לשנות את משך השמירה אחרי שיוצרים את המדיניות. כדי לשנות את תקופת השמירה, צריך להפסיק את השימוש בפיד השינויים וליצור אותו מחדש.
  • אירועי השינוי delete, insert, update ו-drop ניתנים לצפייה באמצעות db.collection.watch() ו-db.watch().
  • updateDescription.updatedFields מכיל הבדלים בין עדכונים.
  • כל האפשרויות של fullDocument ושל fullDocumentBeforeChange.
    • חיפוש עדכונים במסמך המלא.
    • תמונה של המסמך לפני שהוא הוחלף, עודכן או נמחק.
    • תמונה של המסמך אחרי שהוחלפה או עודכנה.
    • כדי לשחזר תמונות מלפני שעה או יותר, צריך להפעיל את התכונה 'שחזור לנקודת זמן מסוימת' (PITR).
  • כל אפשרויות ההפעלה מחדש, כולל resumeAfter ו-startAfter.
  • כשמשתמשים ב-watch() כדי לעקוב אחרי שינויים, אפשר לשרשר שלבי צבירה כמו $addFields,‏ $match,‏ $project,‏ $replaceRoot,‏ $replaceWith,‏ $set ו-$unset.

הגדרת סנכרון שינויים בזרמי נתונים

כדי ליצור, להסיר או להציג נתוני Change Streams קיימים במסד נתונים, משתמשים במסוף Google Cloud.

תפקידים והרשאות

כדי ליצור, למחוק ולהציג ברשימה Change Streams, לחשבון המשתמש צריכות להיות הרשאות ניהול זהויות והרשאות גישה (IAM) מסוג datastore.schemas.create, datastore.schemas.delete ו-datastore.schemas.list, בהתאמה.

לדוגמה, התפקיד Datastore Index Admin (roles/datastore.indexAdmin) מעניק את ההרשאות האלה.

יצירת שינוי בשידור

כדי לפתוח סמן תואם של שינוי נתונים, צריך ליצור שינוי נתונים. אין תמיכה בהפעלה אוטומטית של זרם שינויים בזמן יצירת אוסף או מסד נתונים.

כדי ליצור זרם שינויים, משתמשים במסוף Google Cloud.

  1. נכנסים לדף Databases במסוף Google Cloud.

    מעבר אל Databases

  2. בוחרים מהרשימה מסד נתונים של Firestore עם תאימות ל-MongoDB. החלונית Firestore Studio תיפתח.
  3. בחלונית Explorer (סייר), מאתרים את הצומת Change streams (סנכרון שינויים בזרמי נתונים), לוחצים על More actions (פעולות נוספות) ואז בוחרים באפשרות Create change stream (יצירת סנכרון שינויים בזרמי נתונים).
  4. מזינים שם ייחודי לשינוי הנתונים, היקף ותקופת שמירה, ואז לוחצים על שמירה.

צפייה בשינויים בזרמי נתונים

אפשר לראות פרטים על Change Streams במסוף Google Cloud.

  1. נכנסים לדף Databases במסוף Google Cloud.

    מעבר אל Databases

  2. בוחרים מהרשימה מסד נתונים של Firestore עם תאימות ל-MongoDB. החלונית Firestore Studio תיפתח.
  3. בחלונית Explorer, מאתרים את הצומת סנכרון שינויים בזרמי נתונים.
  4. כדי לפתוח או לסגור את הצומת, לוחצים על החלפת צומת.

מחיקת שינוי השידור החי

כדי למחוק זרם שינויים, משתמשים במסוף Google Cloud.

  1. נכנסים לדף Databases במסוף Google Cloud.

    מעבר אל Databases

  2. בוחרים מהרשימה מסד נתונים של Firestore עם תאימות ל-MongoDB. החלונית Firestore Studio תיפתח.
  3. בחלונית Explorer, מאתרים את הצומת סנכרון שינויים בזרמי נתונים.
  4. כדי לפתוח או לסגור את הצומת, לוחצים על החלפת צומת.
  5. בסייר, מאתרים את שינוי הנתונים שרוצים למחוק.
  6. לוחצים על פעולות נוספות ואז בוחרים באפשרות מחיקת נתוני השינויים.
  7. בתיבת הדו-שיח, מזינים את שם הזרם לשינויים כדי לאשר את המחיקה, ואז לוחצים על מחיקה.

פתיחה או הפעלה מחדש של סמן של שינוי בסטרימינג

בדוגמאות הבאות מוסבר איך ליצור סמן של זרם שינויים, להמשיך אותו ולהגדיר אותו.

לפני שיוצרים סמן של פיד שינויים, צריך ליצור פיד שינויים באופן מפורש עבור מסד הנתונים או האוסף.

יצירת סמן של שינוי בנתונים

כדי ליצור סמן חדש של שינוי הנתונים, משתמשים בשיטה watch במנהלי ההתקנים של MongoDB. כדי להאזין לכל השינויים במסד נתונים, יוצרים שינוי בהיקף מסד הנתונים ומפעילים את השיטה watch באובייקט db.

let cursor = db.watch()

כדי ליצור סמן בהיקף של אוסף, צריך קודם ליצור זרם שינויים לאותו אוסף. לאחר מכן, מבצעים קריאה ל-watch באוסף המתאים.

let cursor = db.my_collection.watch()

אחרי שיוצרים סמן של שינוי בנתונים, אפשר להתחיל בשידור. לדוגמה, אם מוסיפים מסמך ומפעילים את הפונקציה tryNext במיקום הסמן, השינוי יופיע בזרם השינויים.

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

אם מעדכנים ומוחקים את המסמך, השינויים האלה יופיעו בפיד השינויים:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

המשך של שינוי בשידור

כדי להפעיל מחדש את הזרם לשינויים, משתמשים באפשרויות resumeAfter או startAfter. כדי לקבוע מאיפה להמשיך ביומן השינויים resumeAfter וstartAfter, משתמשים בטוקן המשך.

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

כדי להשתמש ב-startAfter:

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

הכללת תמונות לפני ואחרי בעדכונים ומחיקה

אם נדרש, אפשר לכלול תמונות של מסמכים לפני ואחרי עדכון ומחיקה של אירועי שינוי. זמינות התמונות תלויה בחלון השחזור לנקודת זמן מסוימת (PITR). כדי לקרוא תמונות של מסמכים מלפני יותר משעה, צריך להפעיל PITR.

התכונה 'הזנת שינויים' מנצלת את חלון ה-PITR כדי לספק תצוגה של המסמך לפני ואחרי אירוע השינוי הנתון. כברירת מחדל, אירועי עדכון מכילים שדה updateDescription שהוא הדלתא של השדות ששונו על ידי פעולת העדכון.

כדי לכלול את התמונות לפני ואחרי באירוע שינוי, צריך לציין את האפשרויות fullDocumentBeforeChange ו-fullDocument בשאילתת שינוי הנתונים.

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

אם השאילתה מנסה לקרוא מסמך מחוץ לחלון השמירה של PITR או אם PITR לא מופעל, הערך required יחזיר הודעת שגיאה בצד השרת.

במקום להציג שגיאה, אפשר להשתמש בערך whenAvailable כדי להחזיר ערך null אם התמונות כבר לא זמינות.

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

הכללת התמונה הנוכחית בעדכונים

כברירת מחדל, אירועי עדכון מכילים שדה updateDescription שהוא הדלתא של השדות ששונו על ידי פעולת העדכון. כדי לחפש את הגרסה העדכנית ביותר של המסמך כולו, משתמשים בערך updateLookup באפשרות fullDocument.

התכונה הזו לא דורשת PITR ומבצעת חיפוש של המסמך.

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

קריאה מקבילית

כדי להגדיל את קצב העברת הנתונים, אפשר להשתמש באפשרות firestoreWorkerConfig כדי לפצל שאילתה של שינוי נתונים בין כמה עובדים. כל עובד אחראי להצגת השינויים עבור קבוצה נפרדת של מסמכים. צריך ליצור סמן מקביל באמצעות שאילתת runCommand או aggregate.

לדוגמה, אפשר לפצל את נתוני הסטרימינג של השינויים בין 3 עובדים באופן הבא:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

שינוי של זרמי נתונים וגיבויים

אי אפשר לגשת לנתונים של פיד השינויים או להגדרות שלו בפעולות שחזור מגיבוי. אם משחזרים מסד נתונים עם Change Streams, צריך ליצור מחדש את ה-Change Streams במסד הנתונים של היעד כדי לפתוח סמנים (cursors) למסד הנתונים הזה.

חיוב

הבדלים בהתנהגות

בקטע הבא מתוארים ההבדלים בזרמי השינויים בין Firestore עם תאימות ל-MongoDB לבין MongoDB.

updateDescription

updateDescription הוא מסמך באירוע update שמתאר את השדות שעודכנו או הוסרו על ידי פעולת העדכון. ב-Cloud Firestore, ההבדלים הבולטים הם:

  • בשדה updateDescription, השדות truncatedArrays ו-disambiguatedPaths לא מאוכלסים.
  • updateDescription.updatedFields מייצג את ההבדל הקנוני בין התמונות של מסמך לפני ואחרי החלת שינוי.

נניח שזה המצב ההתחלתי של מסמך:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

תרחיש 1: שינוי רק של הרכיב הראשון במערך.

בתרחיש הזה, ההתנהגות של Cloud Firestore זהה לזו של MongoDB.

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

תרחיש 2: החלפה של מערך שלם

בתרחיש הזה, הפעולה מעדכנת רק את שדה המערך הראשון, אבל דורסת את המערך כולו.

ההפרש בעדכון Cloud Firestore לא מבחין בין שני התרחישים האלה ומחזיר את אותו updateDescription.updatedFields בשניהם:

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

המאמרים הבאים