חותמות זמן מפוצלות

אם אוסף מכיל מסמכים עם ערכים עוקבים במדד, הערך של Cloud Firestore מגביל את קצב הכתיבה ל-500 פעולות כתיבה לשנייה. בדף הזה נסביר איך לפצל שדה של מסמך כדי לעקוף את המגבלה הזו. קודם כל, נגדיר מה המשמעות של 'שדות ברצף שנוספו לאינדקס' ונבהיר מתי ההגבלה הזו חלה.

שדות ברצף שנוספו לאינדקס

'שדות רצפים שנוספו לאינדקס' הם כל אוסף של מסמכים שמכיל שדה נוסף לאינדקס שמתגבר או פוחת באופן מונוטוני. במקרים רבים, הכוונה היא לשדה timestamp, אבל כל ערך שדה שמתגבר או פוחת באופן מונוטוני יכול להפעיל את מגבלת הכתיבה של 500 פעולות כתיבה לשנייה.

לדוגמה, המגבלה חלה על אוסף של מסמכי user עם השדה userid שמתווסף לאינדקס, אם האפליקציה מקצה ערכים של userid באופן הבא:

  • 1281, 1282, 1283, 1284, 1285, ...

לעומת זאת, לא כל השדות מסוג timestamp מפעילים את המגבלה הזו. אם השדה timestamp עוקב אחרי ערכים שמופצים באופן אקראי, מגבלת הכתיבה לא חלה. גם הערך בפועל של השדה לא חשוב, אלא רק שהשדה עולה או יורד באופן מונוטוני. לדוגמה, שתי הקבוצות הבאות של ערכים בשדות שמתחזקים באופן מונוטונית מפעילות את מגבלת הכתיבה:

  • 100000, 100001, 100002, 100003, ...
  • 0, 1, 2, 3, ...

חלוקה של שדה חותמת זמן למקטעים

נניח שבאפליקציה שלכם נעשה שימוש בשדה timestamp בעל עלייה מונוטונית. אם האפליקציה שלכם לא משתמשת בשדה timestamp בשאילתות, תוכלו להסיר את המגבלה של 500 שינויים לשנייה על ידי אי הוספה של שדה חותמת הזמן לאינדקס. אם אתם צריכים להשתמש בשדה timestamp בשאילתות, תוכלו לעקוף את המגבלה באמצעות חותמות זמן מחולקות:

  1. מוסיפים שדה shard לצד השדה timestamp. משתמשים ב-1..n ערכים נפרדים בשדה shard. הפעולה הזו מגדילה את מגבלת הכתיבה של האוסף ל-500*n, אבל צריך לצבור n שאילתות.
  2. מעדכנים את לוגיקת הכתיבה כך שתקצה באופן אקראי ערך shard לכל מסמך.
  3. מעדכנים את השאילתות כדי לצבור את קבוצות התוצאות המפוצלות.
  4. משביתים את המדדים של שדה יחיד גם בשדה shard וגם בשדה timestamp. מוחקים אינדקסים מורכבים קיימים שמכילים את השדה timestamp.
  5. יוצרים אינדקסים מורכבים חדשים שתומכים בשאילתות המעודכנות. חשוב לשים לב לסדר השדות במדד, ושדה shard חייב להופיע לפני השדה timestamp. כל אינדקס שכולל את השדה timestamp חייב לכלול גם את השדה shard.

מומלץ להטמיע חותמות זמן מחולקות רק בתרחישי שימוש עם שיעורי כתיבת מתמשכים של יותר מ-500 כתיבות לשנייה. אחרת, מדובר באופטימיזציה מוקדמת מדי. חלוקה של שדה timestamp למקטעים מסירה את ההגבלה של 500 פעולות כתיבה לשנייה, אבל מחייבת אתכם לבצע צבירת שאילתות בצד הלקוח.

בדוגמאות הבאות מוסבר איך לפצל שדה timestamp ואיך לשלוח שאילתה למערך תוצאות מפוצל.

מודל נתונים ושאילתות לדוגמה

לדוגמה, נניח שאפליקציה מסוימת מבצעת ניתוח של כלי פיננסיים כמו מטבעות, מניות רגילות וקרנות סל (ETF) כמעט בזמן אמת. האפליקציה הזו כותבת מסמכים לאוסף instruments באופן הבא:

Node.js
async function insertData() {
  const instruments = [
    {
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

האפליקציה הזו מפעילה את השאילתות הבאות וממירה אותן לפי השדה timestamp:

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  return fs.collection('instruments')
      .where(fieldName, fieldOperator, fieldValue)
      .orderBy('timestamp', 'desc')
      .limit(limit)
      .get();
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

אחרי מחקר קצר, אתם קובעים שהאפליקציה תקבל בין 1,000 ל-1,500 עדכוני מכשירי מדידה לשנייה. המספר הזה חורג מ-500 פעולות כתיבה לשנייה שמותר לבצע באוספים שמכילים מסמכים עם שדות חותמת זמן שנוספו לאינדקס. כדי להגדיל את קצב הכתיבה, צריך 3 ערכים של פלחים, MAX_INSTRUMENT_UPDATES/500 = 3. בדוגמה הזו נעשה שימוש בערכים של שרידי ה-Shard: x,‏ y ו-z. אפשר גם להשתמש במספרים או בתווים אחרים לערכים של הפיצול.

הוספת שדה חלוקה

מוסיפים שדה shard למסמכים. מגדירים את השדה shard לערך x, y או z, שמגדיל את מגבלת הכתיבה באוסף ל-1,500 פעולות כתיבה לשנייה.

Node.js
// Define our 'K' shard values
const shards = ['x', 'y', 'z'];
// Define a function to help 'chunk' our shards for use in queries.
// When using the 'in' query filter there is a max number of values that can be
// included in the value. If our number of shards is higher than that limit
// break down the shards into the fewest possible number of chunks.
function shardChunks() {
  const chunks = [];
  let start = 0;
  while (start < shards.length) {
    const elements = Math.min(MAX_IN_VALUES, shards.length - start);
    const end = start + elements;
    chunks.push(shards.slice(start, end));
    start = end;
  }
  return chunks;
}

// Add a convenience function to select a random shard
function randomShard() {
  return shards[Math.floor(Math.random() * Math.floor(shards.length))];
}
async function insertData() {
  const instruments = [
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

שליחת שאילתות לחותמת הזמן המפוצלת

כדי להוסיף את השדה shard, צריך לעדכן את השאילתות כדי לצבור תוצאות מקטעים:

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  // For each shard value, map it to a new query which adds an additional
  // where clause specifying the shard value.
  return Promise.all(shardChunks().map(shardChunk => {
        return fs.collection('instruments')
            .where('shard', 'in', shardChunk)  // new shard condition
            .where(fieldName, fieldOperator, fieldValue)
            .orderBy('timestamp', 'desc')
            .limit(limit)
            .get();
      }))
      // Now that we have a promise of multiple possible query results, we need
      // to merge the results from all of the queries into a single result set.
      .then((snapshots) => {
        // Create a new container for 'all' results
        const docs = [];
        snapshots.forEach((querySnapshot) => {
          querySnapshot.forEach((doc) => {
            // append each document to the new all container
            docs.push(doc);
          });
        });
        if (snapshots.length === 1) {
          // if only a single query was returned skip manual sorting as it is
          // taken care of by the backend.
          return docs;
        } else {
          // When multiple query results are returned we need to sort the
          // results after they have been concatenated.
          // 
          // since we're wanting the `limit` newest values, sort the array
          // descending and take the first `limit` values. By returning negated
          // values we can easily get a descending value.
          docs.sort((a, b) => {
            const aT = a.data().timestamp;
            const bT = b.data().timestamp;
            const secondsDiff = aT.seconds - bT.seconds;
            if (secondsDiff === 0) {
              return -(aT.nanoseconds - bT.nanoseconds);
            } else {
              return -secondsDiff;
            }
          });
          return docs.slice(0, limit);
        }
      });
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

עדכון הגדרות האינדקס

כדי להסיר את האילוץ של 500 שגיאות כתיבה לשנייה, מוחקים את המדדים הקיימים של שדה יחיד ומדדים מורכבים שמשתמשים בשדה timestamp.

מחיקת הגדרות של אינדקסים מורכבים

מסוף Firebase

  1. פותחים את הדף Cloud Firestore Composite Indexes במסוף Firebase.

    מעבר לדף Composite Indexes

  2. לכל אינדקס שמכיל את השדה timestamp, לוחצים על הלחצן ואז על Delete.

קונסולת GCP

  1. נכנסים לדף Databases במסוף Google Cloud.

    כניסה לדף Databases

  2. בוחרים את מסד הנתונים הנדרש מרשימת מסדי הנתונים.

  3. בתפריט הניווט, לוחצים על Indexes (אינדקסים) ואז על הכרטיסייה Composite (מורכב).

  4. משתמשים בשדה Filter כדי לחפש הגדרות של אינדקסים שמכילות את השדה timestamp.

  5. לכל אחד מהאינדקסים האלה, לוחצים על הלחצן ואז על Delete.

Firebase CLI

  1. אם עדיין לא הגדרתם את ה-CLI של Firebase, עליכם לפעול לפי ההוראות האלה כדי להתקין את ה-CLI ולהריץ את הפקודה firebase init. במהלך הפקודה init, חשוב לבחור באפשרות Firestore: Deploy rules and create indexes for Firestore.
  2. במהלך ההגדרה, Firebase CLI מוריד את הגדרות האינדקס הקיימות לקובץ שנקרא כברירת מחדל firestore.indexes.json.
  3. מסירים הגדרות של אינדקסים שמכילות את השדה timestamp, לדוגמה:

    {
    "indexes": [
      // Delete composite index definition that contain the timestamp field
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "exchange",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "instrumentType",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "price.currency",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
     ]
    }
    
  4. פורסים את הגדרות האינדקס המעודכנות:

    firebase deploy --only firestore:indexes
    

עדכון של הגדרות אינדקס של שדה יחיד

מסוף Firebase

  1. פותחים את הדף Cloud Firestore Single Field Indexes במסוף Firebase.

    כניסה לדף 'אינדקסים של שדה יחיד'

  2. לוחצים על Add Exemption.

  3. בשדה Collection ID, מזינים instruments. בשדה Field path, מזינים timestamp.

  4. בקטע היקף השאילתה, בוחרים גם באפשרות אוסף וגם באפשרות קבוצת אוספים.

  5. לוחצים על הבא.

  6. משנים את כל ההגדרות של האינדקס למושבתות. לוחצים על שמירה.

  7. חוזרים על אותם שלבים בשדה shard.

קונסולת GCP

  1. נכנסים לדף Databases במסוף Google Cloud.

    כניסה לדף Databases

  2. בוחרים את מסד הנתונים הנדרש מרשימת מסדי הנתונים.

  3. בתפריט הניווט, לוחצים על אינדקסים ואז על הכרטיסייה שדה יחיד.

  4. לוחצים על הכרטיסייה שדה יחיד.

  5. לוחצים על Add Exemption.

  6. בשדה Collection ID, מזינים instruments. בשדה Field path, מזינים timestamp.

  7. בקטע היקף השאילתה, בוחרים גם באפשרות אוסף וגם באפשרות קבוצת אוספים.

  8. לוחצים על הבא.

  9. משנים את כל ההגדרות של האינדקס למושבתות. לוחצים על שמירה.

  10. חוזרים על אותם שלבים בשדה shard.

Firebase CLI

  1. מוסיפים את הטקסט הבא לקטע fieldOverrides בקובץ של הגדרות האינדקס:

    {
     "fieldOverrides": [
       // Disable single-field indexing for the timestamp field
       {
         "collectionGroup": "instruments",
         "fieldPath": "timestamp",
         "indexes": []
       },
     ]
    }
    
  2. פורסים את הגדרות האינדקס המעודכנות:

    firebase deploy --only firestore:indexes
    

יצירת אינדקסים מורכבים חדשים

אחרי הסרת כל האינדקסים הקודמים שמכילים את timestamp, מגדירים את האינדקסים החדשים הנדרשים לאפליקציה. כל אינדקס שמכיל את השדה timestamp חייב להכיל גם את השדה shard. לדוגמה, כדי לתמוך בשאילתות שלמעלה, צריך להוסיף את המדדים הבאים:

איסוף שדות שנוספו לאינדקס היקף השאילתה
כלי נגינה shard, ‏ price.currency, ‏ timestamp איסוף
כלי נגינה חלוקה, תכתובת אחת (exchange), חותמת זמן איסוף
כלי נגינה shard, ‏ instrumentType, ‏ timestamp איסוף

הודעות שגיאה

כדי ליצור את המדדים האלה, מריצים את השאילתות המעודכנות.

כל שאילתה מחזירה הודעת שגיאה עם קישור ליצירת האינדקס הנדרש במסוף Firebase.

Firebase CLI

  1. מוסיפים את האינדקסים הבאים לקובץ הגדרת האינדקס:

     {
       "indexes": [
       // New indexes for sharded timestamps
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "exchange",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "instrumentType",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "price.currency",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
       ]
     }
    
  2. פורסים את הגדרות האינדקס המעודכנות:

    firebase deploy --only firestore:indexes
    

הסבר על מגבלת הכתיבה לשדות רצופיים שנוספו לאינדקס

המגבלה על קצב הכתיבה בשדות ברצף שנוספו לאינדקס נובעת מהאופן שבו Cloud Firestore מאחסן את ערכי האינדקס ומתאים את קצב הכתיבה של האינדקס. לכל כתיבת של אינדקס, Cloud Firestore מגדיר רשומה של מפתח-ערך שמארגנת ברצף את שם המסמך ואת הערך של כל שדה שנוסף לאינדקס. Cloud Firestore מארגן את הרשומות האלה באינדקס לקבוצות של נתונים שנקראות טבלטים. כל שרת Cloud Firestore מכיל לוח אחד או יותר. כשעומס הכתיבה בטאבלט מסוים גבוה מדי, Cloud Firestore מתרחב אופקית על ידי פיצול הטאבלט לטאבלטים קטנים יותר והפצת הטאבלטים החדשים בין שרתים שונים של Cloud Firestore.

Cloud Firestore ממקם באותו טאבלט רשומות אינדקס שנמצאות קרוב זו לזו מבחינה לקסיקוגרפית. אם ערכי האינדקס בטבלת משנה קרובים מדי זה לזה, למשל בשדות של חותמות זמן, Cloud Firestore לא יכול לפצל את הטבלה הזו בצורה יעילה לטבלאות משנה קטנות יותר. כך נוצרת נקודה חמה שבה טאבלט אחד מקבל יותר מדי תנועה, ופעולות הקריאה והכתיבה בנקודה החמה הזו נעשות איטיות יותר.

חלוקה של שדה חותמת זמן מאפשרת ל-Cloud Firestore לפצל ביעילות את עומסי העבודה בין כמה טאבלטים. יכול להיות שהערכים של שדה חותמת הזמן יישארו קרובים זה לזה, אבל הערך המקושר של החלקיק והאינדקס נותן ל-Cloud Firestore מספיק מקום בין הרשומות של האינדקס כדי לפצל את הרשומות בין כמה טבלאות.

המאמרים הבאים