シャーディングされたタイムスタンプ

コレクションにシーケンシャル インデックス値を持つドキュメントが含まれている場合、Cloud Firestore は書き込みレートを 500 回/秒の書き込みに制限します。このページでは、この制限に対処するためにドキュメント フィールドをシャーディングする方法について説明します。まず、「シーケンシャル インデックス フィールド」の意味を定義し、この制限がどのような場合に適用されるのかを明確にしましょう。

シーケンシャル インデックス フィールド

「シーケンシャル インデックス フィールド」とは、値が単調に増加または減少するインデックス フィールドを含むドキュメントのコレクションを意味します。多くの場合、これは timestamp フィールドを意味しますが、単調に増加または減少するフィールド値により、500 回/秒の書き込み制限がトリガーされることがあります。

この制限がドキュメントのコレクションに適用されるのは、たとえばコレクション内の user ドキュメントにインデックス フィールド userid が含まれていて、アプリが次のように userid 値を割り当てる場合です。

  • 1281, 1282, 1283, 1284, 1285, ...

一方、すべての timestamp フィールドにより、この制限がトリガーされるわけではありません。timestamp フィールドでランダムに分散された値が追跡される場合、書き込み制限は適用されません。また、フィールドの実際の値も重要ではなく、フィールド値は単調に増加または減少します。たとえば、単調に増加する以下の一連のフィールド値は、どちらも書き込み制限をトリガーします。

  • 100000, 100001, 100002, 100003, ...
  • 0, 1, 2, 3, ...

タイムスタンプ フィールドのシャーディング

たとえば、値が単調に増加する timestamp フィールドをアプリで使用しているとします。アプリがこの timestamp フィールドをクエリで使用しない場合、タイムスタンプ フィールドのインデックスを作成しなければ、500 回/秒の書き込み制限を取り除くことができます。クエリに timestamp フィールドが必要な場合は、次のようにシャーディングされたタイムスタンプを使用することで制限を回避できます。

  1. timestamp フィールドとともに shard フィールドを追加します。shard フィールドには、1..n の一意の値を使用します。これにより、このコレクションに対する書き込み制限が 500*n に引き上げられますが、n クエリの集約が必要になります。
  2. 書き込みロジックを更新して、各ドキュメントにランダムに shard 値が割り当てられるようにします
  3. シャーディングされた結果セットを集約するようにクエリを更新します。
  4. shard フィールドと timestamp フィールドの両方の単一フィールド インデックスを無効にします。timestamp フィールドを含む既存の複合インデックスを削除します。
  5. 更新後のクエリをサポートする新しい複合インデックスを作成します。インデックス内のフィールドの順序は重要です。timestamp フィールドの前に shard フィールドを配置する必要があります。timestamp フィールドを含むインデックスには、shard フィールドも含める必要があります。

シャーディングされたタイムスタンプは、書き込みレートが常に 1 秒あたり 500 回を超えるユースケースに限って実装するようにしてください。それ以外の場合では、早計な最適化になってしまいます。timestamp フィールドをシャーディングすると、500 回/秒の書き込み制限が取り除かれますが、クライアントサイドでクエリの集約が必要になるというトレードオフがあります。

以下の例で、timestamp フィールドをシャーディングする方法と、シャーディングされた結果セットをクエリする方法を説明します。

データモデルとクエリの例

一例として、通貨、普通株、ETF などの金融商品をほぼリアルタイムで分析するアプリがあるとします。このアプリは、次のようにドキュメントを instruments コレクションに書き込みます。

Node.js
async function insertData() {
  const instruments = [
    {
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

このアプリは次のクエリを実行し、timestamp フィールドを基準に並べ替えます。

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  return fs.collection('instruments')
      .where(fieldName, fieldOperator, fieldValue)
      .orderBy('timestamp', 'desc')
      .limit(limit)
      .get();
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

調査の結果、このアプリが 1 秒あたりに受信する金融商品の更新は 1,000~1,500 件であると判断しました。このレートは、インデックス付きタイムスタンプ フィールドを持つドキュメントのコレクションに適用される 500 回/秒の書き込み制限を超えています。書き込みスループットを向上させるには、3 つの shard 値が必要です(MAX_INSTRUMENT_UPDATES/500 = 3)。この例では、shard 値 xyz を使用します。shard 値には数字または他の文字を使用することもできます。

shard フィールドを追加する

shard フィールドをドキュメントに追加します。shard フィールドに値 xy、または z を設定すると、コレクションに対する書き込み制限が 1,500 回/秒の書き込みに引き上げられます。

Node.js
// Define our 'K' shard values
const shards = ['x', 'y', 'z'];
// Define a function to help 'chunk' our shards for use in queries.
// When using the 'in' query filter there is a max number of values that can be
// included in the value. If our number of shards is higher than that limit
// break down the shards into the fewest possible number of chunks.
function shardChunks() {
  const chunks = [];
  let start = 0;
  while (start < shards.length) {
    const elements = Math.min(MAX_IN_VALUES, shards.length - start);
    const end = start + elements;
    chunks.push(shards.slice(start, end));
    start = end;
  }
  return chunks;
}

// Add a convenience function to select a random shard
function randomShard() {
  return shards[Math.floor(Math.random() * Math.floor(shards.length))];
}
async function insertData() {
  const instruments = [
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'AAA',
      price: {
        currency: 'USD',
        micros: 34790000
      },
      exchange: 'EXCHG1',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.010Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'BBB',
      price: {
        currency: 'JPY',
        micros: 64272000000
      },
      exchange: 'EXCHG2',
      instrumentType: 'commonstock',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.101Z'))
    },
    {
      shard: randomShard(),  // add the new shard field to the document
      symbol: 'Index1 ETF',
      price: {
        currency: 'USD',
        micros: 473000000
      },
      exchange: 'EXCHG1',
      instrumentType: 'etf',
      timestamp: Timestamp.fromMillis(
          Date.parse('2019-01-01T13:45:23.001Z'))
    }
  ];

  const batch = fs.batch();
  for (const inst of instruments) {
    const ref = fs.collection('instruments').doc();
    batch.set(ref, inst);
  }

  await batch.commit();
}

シャーディングされたタイムスタンプをクエリする

shard フィールドを追加した後は、シャーディングされた結果を集約するようにクエリを更新する必要があります。

Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) {
  // For each shard value, map it to a new query which adds an additional
  // where clause specifying the shard value.
  return Promise.all(shardChunks().map(shardChunk => {
        return fs.collection('instruments')
            .where('shard', 'in', shardChunk)  // new shard condition
            .where(fieldName, fieldOperator, fieldValue)
            .orderBy('timestamp', 'desc')
            .limit(limit)
            .get();
      }))
      // Now that we have a promise of multiple possible query results, we need
      // to merge the results from all of the queries into a single result set.
      .then((snapshots) => {
        // Create a new container for 'all' results
        const docs = [];
        snapshots.forEach((querySnapshot) => {
          querySnapshot.forEach((doc) => {
            // append each document to the new all container
            docs.push(doc);
          });
        });
        if (snapshots.length === 1) {
          // if only a single query was returned skip manual sorting as it is
          // taken care of by the backend.
          return docs;
        } else {
          // When multiple query results are returned we need to sort the
          // results after they have been concatenated.
          // 
          // since we're wanting the `limit` newest values, sort the array
          // descending and take the first `limit` values. By returning negated
          // values we can easily get a descending value.
          docs.sort((a, b) => {
            const aT = a.data().timestamp;
            const bT = b.data().timestamp;
            const secondsDiff = aT.seconds - bT.seconds;
            if (secondsDiff === 0) {
              return -(aT.nanoseconds - bT.nanoseconds);
            } else {
              return -secondsDiff;
            }
          });
          return docs.slice(0, limit);
        }
      });
}

function queryCommonStock() {
  return createQuery('instrumentType', '==', 'commonstock');
}

function queryExchange1Instruments() {
  return createQuery('exchange', '==', 'EXCHG1');
}

function queryUSDInstruments() {
  return createQuery('price.currency', '==', 'USD');
}
insertData()
    .then(() => {
      const commonStock = queryCommonStock()
          .then(
              (docs) => {
                console.log('--- queryCommonStock: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const exchange1Instruments = queryExchange1Instruments()
          .then(
              (docs) => {
                console.log('--- queryExchange1Instruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      const usdInstruments = queryUSDInstruments()
          .then(
              (docs) => {
                console.log('--- queryUSDInstruments: ');
                docs.forEach((doc) => {
                  console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`);
                });
              }
          );
      return Promise.all([commonStock, exchange1Instruments, usdInstruments]);
    });

インデックス定義を更新する

500 回/秒の書き込みという制約を取り除くには、既存の単一フィールド インデックスと、timestamp フィールドを使用する複合インデックスを削除します。

複合インデックス定義を削除する

Firebase コンソール

  1. Firebase コンソールでCloud Firestore の [複合インデックス] ページを開きます。

    [複合インデックス] に移動

  2. timestamp フィールドを含む各インデックスについて、 ボタンをクリックし、[削除] をクリックします。

GCP Console

  1. Google Cloud コンソールで [Database] ページに移動します。

    [データベース] に移動

  2. データベースのリストから、必要なデータベースを選択します。

  3. ナビゲーション メニューで、[インデックス] をクリックし、[複合] タブをクリックします。

  4. [フィルタ] フィールドを使用して、timestamp フィールドを含むインデックス定義を検索します。

  5. 該当するインデックスのそれぞれについて、 ボタンをクリックし、[削除] をクリックします。

Firebase CLI

  1. Firebase CLI を設定していない場合は、この指示に従って CLI をインストールし、firebase init コマンドを実行しますinit コマンドの実行中には、必ず Firestore: Deploy rules and create indexes for Firestore を選択してください。
  2. セットアップ中に、Firebase CLI は既存のインデックス定義を firestore.indexes.json という名前のファイル(デフォルト)にダウンロードします。
  3. timestamp フィールドを含むインデックス定義を削除します。次に例を示します。

    {
    "indexes": [
      // Delete composite index definition that contain the timestamp field
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "exchange",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "instrumentType",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
      {
        "collectionGroup": "instruments",
        "queryScope": "COLLECTION",
        "fields": [
          {
            "fieldPath": "price.currency",
            "order": "ASCENDING"
          },
          {
            "fieldPath": "timestamp",
            "order": "DESCENDING"
          }
        ]
      },
     ]
    }
    
  4. 更新済みのインデックス定義をデプロイします。

    firebase deploy --only firestore:indexes
    

単一フィールド インデックス定義を更新する

Firebase コンソール

  1. Firebase コンソールで、Cloud Firestoreの [単一フィールド インデックス] ページを開きます。

    [単一フィールド インデックス] に移動

  2. [除外を追加] をクリックします。

  3. [コレクション ID] に、「instruments」と入力します。[フィールドのパス] に、「timestamp」と入力します。

  4. [クエリの範囲] で、[コレクション] と [コレクション グループ] の両方をオンにします。

  5. [次へ] をクリックします。

  6. すべてのインデックス設定を [無効] に切り替えます。[保存] をクリックします。

  7. shard フィールドについて、同じ手順を繰り返します。

GCP Console

  1. Google Cloud コンソールで [Database] ページに移動します。

    [データベース] に移動

  2. データベースのリストから、必要なデータベースを選択します。

  3. ナビゲーション メニューで [インデックス] をクリックし、[単一フィールド] タブをクリックします。

  4. [単一フィールド] タブをクリックします。

  5. [除外を追加] をクリックします。

  6. [コレクション ID] に、「instruments」と入力します。[フィールドのパス] に、「timestamp」と入力します。

  7. [クエリの範囲] で、[コレクション] と [コレクション グループ] の両方をオンにします。

  8. [次へ] をクリックします。

  9. すべてのインデックス設定を [無効] に切り替えます。[保存] をクリックします。

  10. shard フィールドについて、同じ手順を繰り返します。

Firebase CLI

  1. インデックス定義ファイルの fieldOverrides セクションに、次の行を追加します。

    {
     "fieldOverrides": [
       // Disable single-field indexing for the timestamp field
       {
         "collectionGroup": "instruments",
         "fieldPath": "timestamp",
         "indexes": []
       },
     ]
    }
    
  2. 更新済みのインデックス定義をデプロイします。

    firebase deploy --only firestore:indexes
    

新しい複合インデックスを作成する

以前の timestamp が含まれるインデックスをすべて削除した後、アプリに必要な新しいインデックスを定義します。timestamp フィールドを含むインデックスには、shard フィールドも含める必要があります。たとえば、上記のクエリをサポートするには、次のインデックスを追加します。

コレクション インデックス登録されるフィールド クエリの範囲
instruments shard、 price.currency、 timestamp コレクション
instruments shard、 exchange、 timestamp コレクション
instruments shard、 instrumentType、 timestamp コレクション

エラー メッセージ

更新後のクエリを実行すると、上記のインデックスを作成できます。

クエリごとに、Firebase コンソールで必要なインデックスを作成するためのリンクが示されたエラー メッセージが返されます。

Firebase CLI

  1. インデックス定義ファイルに、次のインデックスを追加します。

     {
       "indexes": [
       // New indexes for sharded timestamps
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "exchange",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "instrumentType",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
         {
           "collectionGroup": "instruments",
           "queryScope": "COLLECTION",
           "fields": [
             {
               "fieldPath": "shard",
               "order": "DESCENDING"
             },
             {
               "fieldPath": "price.currency",
               "order": "ASCENDING"
             },
             {
               "fieldPath": "timestamp",
               "order": "DESCENDING"
             }
           ]
         },
       ]
     }
    
  2. 更新済みのインデックス定義をデプロイします。

    firebase deploy --only firestore:indexes
    

シーケンシャル インデックス フィールドの書き込み制限について

シーケンシャル インデックス フィールドの書き込みレートに対する上限は、Cloud Firestore がインデックス値を保存してインデックス書き込みをスケーリングする方法に起因します。Cloud Firestore は、インデックスの書き込みごとに、ドキュメント名と各インデックス フィールドの値を連結した Key-Value エントリを定義します。Cloud Firestore は、これらのインデックス エントリを「タブレット」と呼ばれるデータのグループに整理します。各 Cloud Firestore サーバーは 1 つ以上のタブレットを保持します。特定のタブレットへの書き込み負荷が高くなりすぎると、Cloud Firestore は水平スケーリングを行うために、タブレットを小さな複数のタブレットに分割し、それらの新しいタブレットを複数の Cloud Firestore サーバーに分散します。

Cloud Firestore は、辞書順が近い一連のインデックス エントリを同じタブレットに配置します。タイムスタンプ フィールドの場合のように、タブレット内のインデックス値が近すぎると、Cloud Firestore はタブレットを小さな複数のタブレットに効率的に分割できません。このことから、1 つのタブレットで大量のトラフィックを受信する、ホットスポットという状況が生じます。ホットスポットへの読み取り / 書き込みオペレーションには時間がかかります。

タイムスタンプ フィールドをシャーディングすると、Cloud Firestore がワークロードを複数のタブレットに効率的に分割できるようになります。それでもタイムスタンプ フィールドの値は近いままかもしれませんが、連結されたシャードとインデックスの値により、エントリ間には Cloud Firestore がインデックス エントリを複数のタブレットに分割するのに十分な間隔が与えられます。

次のステップ