Znaczniki czasu podzielone na fragmenty

Jeśli kolekcja zawiera dokumenty z sekwencyjnie indeksowanymi wartościami, Cloud Firestore ogranicza szybkość zapisu do 500 zapisów na sekundę. Na tej stronie opisano, jak podzielić pole dokumentu na fragmenty, aby pokonać ten limit. Najpierw zdefiniujmy, co rozumiemy przez „sekwencyjne pola indeksowane” i wyjaśnijmy, kiedy ma zastosowanie to ograniczenie.

Pola indeksowane sekwencyjnie

„Sekwencyjne pola indeksowane” oznaczają dowolny zbiór dokumentów zawierający monotonicznie rosnące lub malejące pole indeksowane. W wielu przypadkach oznacza to pole timestamp , ale każda monotonicznie rosnąca lub malejąca wartość pola może uruchomić limit zapisu wynoszący 500 zapisów na sekundę.

Na przykład limit dotyczy kolekcji dokumentów user z indeksowanym polem userid , jeśli aplikacja przypisuje wartości userid w następujący sposób:

  • 1281, 1282, 1283, 1284, 1285, ...

Z drugiej strony nie wszystkie pola timestamp uruchamiają ten limit. Jeśli pole timestamp śledzi losowo rozłożone wartości, limit zapisu nie ma zastosowania. Rzeczywista wartość pola również nie ma znaczenia, liczy się tylko to, że pole rośnie lub maleje monotonicznie. Na przykład oba poniższe zestawy monotonicznie rosnących wartości pól wyzwalają limit zapisu:

  • 100000, 100001, 100002, 100003, ...
  • 0, 1, 2, 3, ...

Dzielenie pola znacznika czasu

Załóżmy, że Twoja aplikacja używa monotonicznie rosnącego pola timestamp . Jeśli Twoja aplikacja nie używa pola timestamp w żadnych zapytaniach, możesz usunąć limit 500 zapisów na sekundę, nie indeksując pola sygnatury czasowej. Jeśli potrzebujesz pola timestamp dla swoich zapytań, możesz obejść ten limit, używając fragmentowanych znaczników czasu :

  1. Dodaj pole shard obok pola timestamp . Użyj 1..n różnych wartości dla pola shard . Zwiększa to limit zapisu kolekcji do 500*n , ale należy zagregować n zapytań.
  2. Zaktualizuj logikę zapisu, aby losowo przypisać wartość shard do każdego dokumentu.
  3. Zaktualizuj zapytania, aby zagregować podzielone na fragmenty zestawy wyników.
  4. Wyłącz indeksy jednopolowe zarówno dla pola shard , jak i pola timestamp . Usuń istniejące indeksy złożone zawierające pole timestamp .
  5. Utwórz nowe indeksy złożone do obsługi zaktualizowanych zapytań. Kolejność pól w indeksie ma znaczenie, a pole shard musi znajdować się przed polem timestamp . Wszelkie indeksy zawierające pole timestamp muszą również zawierać pole shard .

Sygnatury czasowe podzielone na fragmenty należy implementować tylko w przypadkach użycia z stałą szybkością zapisu powyżej 500 zapisów na sekundę. W przeciwnym razie jest to przedwczesna optymalizacja. Dzielenie pola timestamp usuwa ograniczenie 500 zapisów na sekundę, ale wiąże się to z koniecznością agregacji zapytań po stronie klienta.

Poniższe przykłady pokazują, jak podzielić pole timestamp i jak wykonać zapytanie dotyczące podzielonego zestawu wyników.

Przykładowy model danych i zapytania

Jako przykład wyobraźmy sobie aplikację do analizy instrumentów finansowych w czasie zbliżonym do rzeczywistego, takich jak waluty, akcje zwykłe i fundusze ETF. Ta aplikacja zapisuje dokumenty w kolekcji instruments w następujący sposób:

Node.js
async function insertData() {
  const instruments = [
    {
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

Ta aplikacja uruchamia następujące zapytania i zamówienia według pola timestamp :

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  return fs.collection('instruments')
      .where(fieldName, fieldOperator, fieldValue)
      .orderBy('timestamp', 'desc')
      .limit(limit)
      .get();
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

Po przeprowadzeniu badań stwierdzasz, że aplikacja będzie otrzymywać od 1000 do 1500 aktualizacji instrumentów na sekundę. Przekracza to prędkość 500 zapisów na sekundę dozwoloną dla kolekcji zawierających dokumenty z indeksowanymi polami sygnatur czasowych. Aby zwiększyć przepustowość zapisu, potrzebujesz 3 wartości fragmentu, MAX_INSTRUMENT_UPDATES/500 = 3 . W tym przykładzie użyto wartości fragmentu x , y i z . Możesz także użyć liczb lub innych znaków dla wartości fragmentu.

Dodanie pola fragmentu

Dodaj pole shard do swoich dokumentów. Ustaw pole shard na wartości x , y lub z , co zwiększa limit zapisu w kolekcji do 1500 zapisów na sekundę.

Node.js
// Define our 'K' shard values
const shards = ['x', 'y', 'z'];
// Define a function to help 'chunk' our shards for use in queries.
// When using the 'in' query filter there is a max number of values that can be
// included in the value. If our number of shards is higher than that limit
// break down the shards into the fewest possible number of chunks.
function shardChunks() {
  const chunks = [];
  let start = 0;
  while (start < shards.length) {
    const elements = Math.min(MAX_IN_VALUES, shards.length - start);
    const end = start + elements;
    chunks.push(shards.slice(start, end));
    start = end;
  }
  return chunks;
}

// Add a convenience function to select a random shard
function randomShard() {
  return shards[Math.floor(Math.random() * Math.floor(shards.length))];
}
async function insertData() {
  const instruments = [
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

Zapytanie o podzielony znacznik czasu

Dodanie pola shard wymaga zaktualizowania zapytań w celu agregowania podzielonych wyników:

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  // For each shard value, map it to a new query which adds an additional
  // where clause specifying the shard value.
  return Promise.all(shardChunks().map(shardChunk => {
        return fs.collection('instruments')
            .where('shard', 'in', shardChunk)  // new shard condition
            .where(fieldName, fieldOperator, fieldValue)
            .orderBy('timestamp', 'desc')
            .limit(limit)
            .get();
      }))
      // Now that we have a promise of multiple possible query results, we need
      // to merge the results from all of the queries into a single result set.
      .then((snapshots) => {
        // Create a new container for 'all' results
        const docs = [];
        snapshots.forEach((querySnapshot) => {
          querySnapshot.forEach((doc) => {
            // append each document to the new all container
            docs.push(doc);
          });
        });
        if (snapshots.length === 1) {
          // if only a single query was returned skip manual sorting as it is
          // taken care of by the backend.
          return docs;
        } else {
          // When multiple query results are returned we need to sort the
          // results after they have been concatenated.
          // 
          // since we're wanting the `limit` newest values, sort the array
          // descending and take the first `limit` values. By returning negated
          // values we can easily get a descending value.
          docs.sort((a, b) => {
            const aT = a.data().timestamp;
            const bT = b.data().timestamp;
            const secondsDiff = aT.seconds - bT.seconds;
            if (secondsDiff === 0) {
              return -(aT.nanoseconds - bT.nanoseconds);
            } else {
              return -secondsDiff;
            }
          });
          return docs.slice(0, limit);
        }
      });
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

Zaktualizuj definicje indeksów

Aby usunąć ograniczenie 500 zapisów na sekundę, usuń istniejące indeksy jednopolowe i złożone, które korzystają z pola timestamp .

Usuń definicje indeksów złożonych

Konsola Firebase

  1. Otwórz stronę Indeksy złożone Cloud Firestore w konsoli Firebase.

    Przejdź do Indeksy złożone

  2. Dla każdego indeksu zawierającego pole timestamp kliknij przycisk i kliknij opcję Usuń .

Konsola GCP

  1. W konsoli Google Cloud Platform przejdź do strony Bazy danych .

    Przejdź do Baz danych

  2. Wybierz żądaną bazę danych z listy baz danych.

  3. W menu nawigacyjnym kliknij opcję Indeksy , a następnie kliknij kartę Złożone .

  4. Użyj pola Filtr , aby wyszukać definicje indeksów zawierające pole timestamp .

  5. Dla każdego z tych indeksów kliknij przycisk i kliknij Usuń .

Interfejs wiersza polecenia Firebase

  1. Jeśli nie skonfigurowałeś interfejsu CLI Firebase, postępuj zgodnie z poniższymi wskazówkami, aby zainstalować interfejs CLI i uruchom polecenie firebase init . Podczas wykonywania polecenia init wybierz opcję Firestore: Deploy rules and create indexes for Firestore .
  2. Podczas instalacji interfejs Firebase CLI pobiera istniejące definicje indeksów do pliku o domyślnej nazwie firestore.indexes.json .
  3. Usuń wszystkie definicje indeksów zawierające pole timestamp , na przykład:

    {
    "indexes": [
      // Delete composite index definition that contain the timestamp field
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "exchange",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "instrumentType",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "price.currency",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
     ]
    }
    
  4. Wdróż zaktualizowane definicje indeksów:

    firebase deploy --only firestore:indexes
    

Zaktualizuj definicje indeksów jednopolowych

Konsola Firebase

  1. Otwórz stronę Indeksy pojedynczych pól Cloud Firestore w konsoli Firebase.

    Przejdź do Indeksy jednopolowe

  2. Kliknij opcję Dodaj wyjątek .

  3. W polu Identyfikator kolekcji wprowadź instruments . W polu Ścieżka pola wprowadź timestamp .

  4. W obszarze Zakres zapytania wybierz Kolekcję i Grupę kolekcji .

  5. Kliknij Następny

  6. Przełącz wszystkie ustawienia indeksu na Wyłączone . Kliknij Zapisz .

  7. Powtórz te same kroki dla pola shard .

Konsola GCP

  1. W konsoli Google Cloud Platform przejdź do strony Bazy danych .

    Przejdź do Baz danych

  2. Wybierz żądaną bazę danych z listy baz danych.

  3. W menu nawigacyjnym kliknij opcję Indeksy , a następnie kliknij kartę Pojedyncze pole .

  4. Kliknij kartę Pojedyncze pole .

  5. Kliknij opcję Dodaj wyjątek .

  6. W polu Identyfikator kolekcji wprowadź instruments . W polu Ścieżka pola wprowadź timestamp .

  7. W obszarze Zakres zapytania wybierz Kolekcję i Grupę kolekcji .

  8. Kliknij Następny

  9. Przełącz wszystkie ustawienia indeksu na Wyłączone . Kliknij Zapisz .

  10. Powtórz te same kroki dla pola shard .

Interfejs wiersza polecenia Firebase

  1. Dodaj następujący wpis do sekcji fieldOverrides pliku definicji indeksów:

    {
     "fieldOverrides": [
       // Disable single-field indexing for the timestamp field
       {
         "collectionGroup": "instruments",
         "fieldPath": "timestamp",
         "indexes": []
       },
     ]
    }
    
  2. Wdróż zaktualizowane definicje indeksów:

    firebase deploy --only firestore:indexes
    

Utwórz nowe indeksy złożone

Po usunięciu wszystkich poprzednich indeksów zawierających timestamp zdefiniuj nowe indeksy wymagane przez aplikację. Każdy indeks zawierający pole timestamp musi również zawierać pole shard . Na przykład, aby obsłużyć powyższe zapytania, dodaj następujące indeksy:

Kolekcja Pola indeksowane Zakres zapytania
instrumenty fragment , cena , sygnatura czasowa Kolekcja
instrumenty fragment , wymiana , znacznik czasu Kolekcja
instrumenty fragment , typ instrumentu , sygnatura czasowa Kolekcja

Komunikaty o błędach

Można zbudować te indeksy, uruchamiając zaktualizowane zapytania.

Każde zapytanie zwraca komunikat o błędzie z linkiem do utworzenia wymaganego indeksu w konsoli Firebase.

Interfejs wiersza polecenia Firebase

  1. Dodaj następujące indeksy do pliku definicji indeksu:

     {
       "indexes": [
       // New indexes for sharded timestamps
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "exchange",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "instrumentType",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "price.currency",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
       ]
     }
    
  2. Wdróż zaktualizowane definicje indeksów:

    firebase deploy --only firestore:indexes
    

Zrozumienie zapisu dla pól indeksowanych sekwencyjnie z limitem

Limit szybkości zapisu dla sekwencyjnych pól indeksowanych wynika ze sposobu, w jaki Cloud Firestore przechowuje wartości indeksów i skaluje zapisy indeksów. Dla każdego zapisu indeksu Cloud Firestore definiuje wpis klucz-wartość, który łączy nazwę dokumentu i wartość każdego indeksowanego pola. Cloud Firestore organizuje te wpisy indeksu w grupy danych zwane tabletami . Każdy serwer Cloud Firestore obsługuje jeden lub więcej tabletów. Kiedy obciążenie zapisem na konkretnym tablecie staje się zbyt duże, Cloud Firestore skaluje się w poziomie, dzieląc tablet na mniejsze tablety i rozprowadzając nowe tablety na różnych serwerach Cloud Firestore.

Cloud Firestore umieszcza leksykograficznie zbliżone wpisy indeksu na tym samym tablecie. Jeśli wartości indeksów na tablecie są zbyt blisko siebie, na przykład w przypadku pól sygnatury czasowej, Cloud Firestore nie będzie w stanie efektywnie podzielić tabletu na mniejsze tablety. Powoduje to utworzenie gorącego punktu, w którym pojedynczy tablet otrzymuje zbyt duży ruch, a operacje odczytu i zapisu w tym gorącym punkcie stają się wolniejsze.

Dzieląc pole sygnatury czasowej, umożliwiasz Cloud Firestore efektywne dzielenie zadań na wiele tabletów. Chociaż wartości pola sygnatury czasowej mogą pozostać blisko siebie, połączony fragment i wartość indeksu zapewniają Cloud Firestore wystarczającą ilość miejsca między wpisami indeksu, aby podzielić wpisy na wiele tabletów.

Co dalej