Save the date - Google I/O returns May 18-20. Register to get the most out of the digital experience: Build your schedule, reserve space, participate in Q&As, earn Google Developer profile badges, and more. Register now

Verteilte Zähler

Viele Echtzeit-Apps verfügen über Dokumente, die als Zähler dienen. Beispielsweise können Sie "Gefällt mir" für einen Beitrag oder "Favoriten" eines bestimmten Elements zählen.

Im Cloud Firestore können Sie ein einzelnes Dokument nur etwa einmal pro Sekunde aktualisieren, was für einige Anwendungen mit hohem Datenverkehr möglicherweise zu niedrig ist.

Lösung: Verteilte Zähler

Erstellen Sie einen verteilten Zähler, um häufigere Zähleraktualisierungen zu unterstützen. Jeder Zähler ist ein Dokument mit einer Untersammlung von "Shards", und der Wert des Zählers ist die Summe des Werts der Shards.

Der Schreibdurchsatz steigt linear mit der Anzahl der Shards, sodass ein verteilter Zähler mit 10 Shards 10x so viele Schreibvorgänge verarbeiten kann wie ein herkömmlicher Zähler.

Netz

// counters/${ID}
{
  "num_shards": NUM_SHARDS,
  "shards": [subcollection]
}

// counters/${ID}/shards/${NUM}
{
  "count": 123
}

Schnell

// counters/${ID}
struct Counter {
    let numShards: Int

    init(numShards: Int) {
        self.numShards = numShards
    }
}

// counters/${ID}/shards/${NUM}
struct Shard {
    let count: Int

    init(count: Int) {
        self.count = count
    }
}

Ziel c

// counters/${ID}
@interface FIRCounter : NSObject
@property (nonatomic, readonly) NSInteger shardCount;
@end

@implementation FIRCounter
- (instancetype)initWithShardCount:(NSInteger)shardCount {
  self = [super init];
  if (self != nil) {
    _shardCount = shardCount;
  }
  return self;
}
@end

// counters/${ID}/shards/${NUM}
@interface FIRShard : NSObject
@property (nonatomic, readonly) NSInteger count;
@end

@implementation FIRShard
- (instancetype)initWithCount:(NSInteger)count {
  self = [super init];
  if (self != nil) {
    _count = count;
  }
  return self;
}
@end

Java

// counters/${ID}
public class Counter {
    int numShards;

    public Counter(int numShards) {
        this.numShards = numShards;
    }
}

// counters/${ID}/shards/${NUM}
public class Shard {
    int count;

    public Shard(int count) {
        this.count = count;
    }
}

Kotlin + KTX

// counters/${ID}
data class Counter(var numShards: Int)

// counters/${ID}/shards/${NUM}
data class Shard(var count: Int)

Python

import random

from google.cloud import firestore


class Shard(object):
    """
    A shard is a distributed counter. Each shard can support being incremented
    once per second. Multiple shards are needed within a Counter to allow
    more frequent incrementing.
    """

    def __init__(self):
        self._count = 0

    def to_dict(self):
        return {"count": self._count}


class Counter(object):
    """
    A counter stores a collection of shards which are
    summed to return a total count. This allows for more
    frequent incrementing than a single document.
    """

    def __init__(self, num_shards):
        self._num_shards = num_shards

Node.js

Nicht zutreffend, siehe das Zählerinkrement-Snippet unten.

Gehen

import (
	"context"
	"fmt"
	"math/rand"
	"strconv"

	"cloud.google.com/go/firestore"
	"google.golang.org/api/iterator"
)

// Counter is a collection of documents (shards)
// to realize counter with high frequency.
type Counter struct {
	numShards int
}

// Shard is a single counter, which is used in a group
// of other shards within Counter.
type Shard struct {
	Count int
}

PHP

Nicht zutreffend, siehe das Zählerinitialisierungs-Snippet unten.

C #

/// <summary>
/// Shard is a document that contains the count.
/// </summary>
[FirestoreData]
public class Shard
{
    [FirestoreProperty(name: "count")]
    public int Count { get; set; }
}

Der folgende Code initialisiert einen verteilten Zähler:

Netz

function createCounter(ref, num_shards) {
    var batch = db.batch();

    // Initialize the counter document
    batch.set(ref, { num_shards: num_shards });

    // Initialize each shard with count=0
    for (let i = 0; i < num_shards; i++) {
        const shardRef = ref.collection('shards').doc(i.toString());
        batch.set(shardRef, { count: 0 });
    }

    // Commit the write batch
    return batch.commit();
}

Schnell

func createCounter(ref: DocumentReference, numShards: Int) {
    ref.setData(["numShards": numShards]){ (err) in
        for i in 0...numShards {
            ref.collection("shards").document(String(i)).setData(["count": 0])
        }
    }
}

Ziel c

- (void)createCounterAtReference:(FIRDocumentReference *)reference
                      shardCount:(NSInteger)shardCount {
  [reference setData:@{ @"numShards": @(shardCount) } completion:^(NSError * _Nullable error) {
    for (NSInteger i = 0; i < shardCount; i++) {
      NSString *shardName = [NSString stringWithFormat:@"%ld", (long)shardCount];
      [[[reference collectionWithPath:@"shards"] documentWithPath:shardName]
          setData:@{ @"count": @(0) }];
    }
  }];
}

Java

public Task<Void> createCounter(final DocumentReference ref, final int numShards) {
    // Initialize the counter document, then initialize each shard.
    return ref.set(new Counter(numShards))
            .continueWithTask(new Continuation<Void, Task<Void>>() {
                @Override
                public Task<Void> then(@NonNull Task<Void> task) throws Exception {
                    if (!task.isSuccessful()) {
                        throw task.getException();
                    }

                    List<Task<Void>> tasks = new ArrayList<>();

                    // Initialize each shard with count=0
                    for (int i = 0; i < numShards; i++) {
                        Task<Void> makeShard = ref.collection("shards")
                                .document(String.valueOf(i))
                                .set(new Shard(0));

                        tasks.add(makeShard);
                    }

                    return Tasks.whenAll(tasks);
                }
            });
}

Kotlin + KTX

fun createCounter(ref: DocumentReference, numShards: Int): Task<Void> {
    // Initialize the counter document, then initialize each shard.
    return ref.set(Counter(numShards))
            .continueWithTask { task ->
                if (!task.isSuccessful) {
                    throw task.exception!!
                }

                val tasks = arrayListOf<Task<Void>>()

                // Initialize each shard with count=0
                for (i in 0 until numShards) {
                    val makeShard = ref.collection("shards")
                            .document(i.toString())
                            .set(Shard(0))

                    tasks.add(makeShard)
                }

                Tasks.whenAll(tasks)
            }
}

Python

def init_counter(self, doc_ref):
    """
    Create a given number of shards as
    subcollection of specified document.
    """
    col_ref = doc_ref.collection("shards")

    # Initialize each shard with count=0
    for num in range(self._num_shards):
        shard = Shard()
        col_ref.document(str(num)).set(shard.to_dict())

Node.js

Nicht zutreffend, siehe das Zählerinkrement-Snippet unten.

Gehen


// initCounter creates a given number of shards as
// subcollection of specified document.
func (c *Counter) initCounter(ctx context.Context, docRef *firestore.DocumentRef) error {
	colRef := docRef.Collection("shards")

	// Initialize each shard with count=0
	for num := 0; num < c.numShards; num++ {
		shard := Shard{0}

		if _, err := colRef.Doc(strconv.Itoa(num)).Set(ctx, shard); err != nil {
			return fmt.Errorf("Set: %v", err)
		}
	}
	return nil
}

PHP

$numShards = 10;
$colRef = $ref->collection('SHARDS');
for ($i = 0; $i < $numShards; $i++) {
    $doc = $colRef->document($i);
    $doc->set(['Cnt' => 0]);
}

C #

/// <summary>
/// Create a given number of shards as a
/// subcollection of specified document.
/// </summary>
/// <param name="docRef">The document reference <see cref="DocumentReference"/></param>
private static async Task CreateCounterAsync(DocumentReference docRef, int numOfShards)
{
    CollectionReference colRef = docRef.Collection("shards");
    var tasks = new List<Task>();
    // Initialize each shard with Count=0
    for (var i = 0; i < numOfShards; i++)
    {
        tasks.Add(colRef.Document(i.ToString()).SetAsync(new Shard() { Count = 0 }));
    }
    await Task.WhenAll(tasks);
}

Rubin

# project_id = "Your Google Cloud Project ID"
# num_shards = "Number of shards for distributed counter"
# collection_path = "shards"

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new project_id: project_id

shards_ref = firestore.col collection_path

# Initialize each shard with count=0
num_shards.times do |i|
  shards_ref.doc(i).set({ count: 0 })
end

puts "Distributed counter shards collection created."

Um den Zähler zu erhöhen, wählen Sie einen zufälligen Shard und erhöhen Sie die Anzahl:

Netz

function incrementCounter(db, ref, num_shards) {
    // Select a shard of the counter at random
    const shard_id = Math.floor(Math.random() * num_shards).toString();
    const shard_ref = ref.collection('shards').doc(shard_id);

    // Update count
    return shard_ref.update("count", firebase.firestore.FieldValue.increment(1));
}

Schnell

func incrementCounter(ref: DocumentReference, numShards: Int) {
    // Select a shard of the counter at random
    let shardId = Int(arc4random_uniform(UInt32(numShards)))
    let shardRef = ref.collection("shards").document(String(shardId))

    shardRef.updateData([
        "count": FieldValue.increment(Int64(1))
    ])
}

Ziel c

- (void)incrementCounterAtReference:(FIRDocumentReference *)reference
                         shardCount:(NSInteger)shardCount {
  // Select a shard of the counter at random
  NSInteger shardID = (NSInteger)arc4random_uniform((uint32_t)shardCount);
  NSString *shardName = [NSString stringWithFormat:@"%ld", (long)shardID];
  FIRDocumentReference *shardReference =
      [[reference collectionWithPath:@"shards"] documentWithPath:shardName];

  [shardReference updateData:@{
    @"count": [FIRFieldValue fieldValueForIntegerIncrement:1]
  }];
}

Java

public Task<Void> incrementCounter(final DocumentReference ref, final int numShards) {
    int shardId = (int) Math.floor(Math.random() * numShards);
    DocumentReference shardRef = ref.collection("shards").document(String.valueOf(shardId));

    return shardRef.update("count", FieldValue.increment(1));
}

Kotlin + KTX

fun incrementCounter(ref: DocumentReference, numShards: Int): Task<Void> {
    val shardId = Math.floor(Math.random() * numShards).toInt()
    val shardRef = ref.collection("shards").document(shardId.toString())

    return shardRef.update("count", FieldValue.increment(1))
}

Python

def increment_counter(self, doc_ref):
    """Increment a randomly picked shard."""
    doc_id = random.randint(0, self._num_shards - 1)

    shard_ref = doc_ref.collection("shards").document(str(doc_id))
    return shard_ref.update({"count": firestore.Increment(1)})

Node.js

function incrementCounter(docRef, numShards) {
  const shardId = Math.floor(Math.random() * numShards);
  const shardRef = docRef.collection('shards').doc(shardId.toString());
  return shardRef.set({count: FieldValue.increment(1)}, {merge: true});
}

Gehen


// incrementCounter increments a randomly picked shard.
func (c *Counter) incrementCounter(ctx context.Context, docRef *firestore.DocumentRef) (*firestore.WriteResult, error) {
	docID := strconv.Itoa(rand.Intn(c.numShards))

	shardRef := docRef.Collection("shards").Doc(docID)
	return shardRef.Update(ctx, []firestore.Update{
		{Path: "Count", Value: firestore.Increment(1)},
	})
}

PHP

$colRef = $ref->collection('SHARDS');
$numShards = 0;
$docCollection = $colRef->documents();
foreach ($docCollection as $doc) {
    $numShards++;
}
$shardIdx = random_int(0, $numShards-1);
$doc = $colRef->document($shardIdx);
$doc->update([
    ['path' => 'Cnt', 'value' => FieldValue::increment(1)]
]);

C #

/// <summary>
/// Increment a randomly picked shard by 1.
/// </summary>
/// <param name="docRef">The document reference <see cref="DocumentReference"/></param>
/// <returns>The <see cref="Task"/></returns>
private static async Task IncrementCounterAsync(DocumentReference docRef, int numOfShards)
{
    int documentId;
    lock (s_randLock)
    {
        documentId = s_rand.Next(numOfShards);
    }
    var shardRef = docRef.Collection("shards").Document(documentId.ToString());
    await shardRef.UpdateAsync("count", FieldValue.Increment(1));
}

Rubin

# project_id = "Your Google Cloud Project ID"
# num_shards = "Number of shards for distributed counter"
# collection_path = "shards"

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new project_id: project_id

# Select a shard of the counter at random
shard_id = rand 0...num_shards
shard_ref = firestore.doc "#{collection_path}/#{shard_id}"

# increment counter
shard_ref.update({ count: firestore.field_increment(1) })

puts "Counter incremented."

Um die Gesamtzahl zu erhalten, fragen Sie nach allen Shards und summieren Sie deren count :

Netz

function getCount(ref) {
    // Sum the count of each shard in the subcollection
    return ref.collection('shards').get().then((snapshot) => {
        let total_count = 0;
        snapshot.forEach((doc) => {
            total_count += doc.data().count;
        });

        return total_count;
    });
}

Schnell

func getCount(ref: DocumentReference) {
    ref.collection("shards").getDocuments() { (querySnapshot, err) in
        var totalCount = 0
        if err != nil {
            // Error getting shards
            // ...
        } else {
            for document in querySnapshot!.documents {
                let count = document.data()["count"] as! Int
                totalCount += count
            }
        }

        print("Total count is \(totalCount)")
    }
}

Ziel c

- (void)getCountWithReference:(FIRDocumentReference *)reference {
  [[reference collectionWithPath:@"shards"]
      getDocumentsWithCompletion:^(FIRQuerySnapshot *snapshot,
                                   NSError *error) {
        NSInteger totalCount = 0;
        if (error != nil) {
          // Error getting shards
          // ...
        } else {
          for (FIRDocumentSnapshot *document in snapshot.documents) {
            NSInteger count = [document[@"count"] integerValue];
            totalCount += count;
          }

          NSLog(@"Total count is %ld", (long)totalCount);
        }
  }];
}

Java

public Task<Integer> getCount(final DocumentReference ref) {
    // Sum the count of each shard in the subcollection
    return ref.collection("shards").get()
            .continueWith(new Continuation<QuerySnapshot, Integer>() {
                @Override
                public Integer then(@NonNull Task<QuerySnapshot> task) throws Exception {
                    int count = 0;
                    for (DocumentSnapshot snap : task.getResult()) {
                        Shard shard = snap.toObject(Shard.class);
                        count += shard.count;
                    }
                    return count;
                }
            });
}

Kotlin + KTX

fun getCount(ref: DocumentReference): Task<Int> {
    // Sum the count of each shard in the subcollection
    return ref.collection("shards").get()
            .continueWith { task ->
                var count = 0
                for (snap in task.result!!) {
                    val shard = snap.toObject<Shard>()
                    count += shard.count
                }
                count
            }
}

Python

def get_count(self, doc_ref):
    """Return a total count across all shards."""
    total = 0
    shards = doc_ref.collection("shards").list_documents()
    for shard in shards:
        total += shard.get().to_dict().get("count", 0)
    return total

Node.js

async function getCount(docRef) {
  const querySnapshot = await docRef.collection('shards').get();
  const documents = querySnapshot.docs;

  let count = 0;
  for (const doc of documents) {
    count += doc.get('count');
  }
  return count;
}

Gehen


// getCount returns a total count across all shards.
func (c *Counter) getCount(ctx context.Context, docRef *firestore.DocumentRef) (int64, error) {
	var total int64
	shards := docRef.Collection("shards").Documents(ctx)
	for {
		doc, err := shards.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return 0, fmt.Errorf("Next: %v", err)
		}

		vTotal := doc.Data()["Count"]
		shardCount, ok := vTotal.(int64)
		if !ok {
			return 0, fmt.Errorf("firestore: invalid dataType %T, want int64", vTotal)
		}
		total += shardCount
	}
	return total, nil
}

PHP

$result = 0;
$docCollection = $ref->collection('SHARDS')->documents();
foreach ($docCollection as $doc) {
    $result += $doc->data()['Cnt'];
}

C #

/// <summary>
/// Get total count across all shards.
/// </summary>
/// <param name="docRef">The document reference <see cref="DocumentReference"/></param>
/// <returns>The <see cref="int"/></returns>
private static async Task<int> GetCountAsync(DocumentReference docRef)
{
    var snapshotList = await docRef.Collection("shards").GetSnapshotAsync();
    return snapshotList.Sum(shard => shard.GetValue<int>("count"));
}

Rubin

# project_id = "Your Google Cloud Project ID"
# collection_path = "shards"

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new project_id: project_id

shards_ref = firestore.col_group collection_path

count = 0
shards_ref.get do |doc_ref|
  count += doc_ref[:count]
end

puts "Count value is #{count}."

Einschränkungen

Die oben gezeigte Lösung ist eine skalierbare Methode zum Erstellen freigegebener Leistungsindikatoren im Cloud Firestore. Beachten Sie jedoch die folgenden Einschränkungen:

  • Shard-Anzahl - Die Anzahl der Shards steuert die Leistung des verteilten Zählers. Bei zu wenigen Shards müssen einige Transaktionen möglicherweise erneut versucht werden, bevor sie erfolgreich sind, was das Schreiben verlangsamt. Bei zu vielen Shards werden Lesevorgänge langsamer und teurer. Sie können den Leseaufwand ausgleichen, indem Sie die Zählersumme in einem separaten Rollup-Dokument aufbewahren, das langsamer aktualisiert wird (z. B. einmal pro Sekunde), und Clients aus diesem Dokument lesen lassen, um die Gesamtsumme zu erhalten. Der Nachteil ist, dass Clients auf die Aktualisierung des Rollup-Dokuments warten müssen, anstatt die Gesamtsumme zu berechnen, indem sie alle Shards unmittelbar nach jeder Aktualisierung lesen.
  • Kosten - Die Kosten für das Lesen eines Zählerwerts steigen linear mit der Anzahl der Shards, da die gesamte Shards-Untersammlung geladen werden muss.