Odczytywanie danych w czasie rzeczywistym za pomocą strumieni zmian

Strumienie zmian w Firestore w trybie zgodności z MongoDB umożliwiają aplikacjom dostęp do zmian w czasie rzeczywistym (wstawiania, aktualizowania i usuwania) wprowadzanych w kolekcji lub w całej bazie danych. Strumień zmian porządkuje aktualizacje według czasu modyfikacji.

Strumienie zmian są dostępne za pomocą interfejsów API zgodnych z MongoDB i tradycyjnych sterowników MongoDB. Implementacja strumieni zmian w Firestore w trybie zgodności z MongoDB może obsługiwać dowolną przepustowość zapisu i odczytu dzięki unikalnej implementacji automatycznego partycjonowania podczas zapisu i równoległości odczytu. Umożliwia to tworzenie zadań o dużej przepustowości. Możesz też ulepszyć infrastrukturę migracji i synchronizacji danych między Cloud Firestore a innymi rozwiązaniami do przechowywania danych.

Oprócz zgodności ze sterownikami MongoDB, możesz używać Cloud Firestore do równoległego odczytywania strumieni zmian. Umożliwia to tworzenie równoległych zadań odczytu o dużej przepustowości. Każdy strumień reprezentuje dobrze rozproszoną partycję wyników.

Strumienie zmian obsługują te funkcje:

  • Konfigurowalne strumienie zmian o zakresie bazy danych lub kolekcji.
  • Okres przechowywania strumienia zmian określony podczas jego tworzenia. Domyślny okres przechowywania to 7 dni, a minimalny to 1 dzień. Okres przechowywania musi być wielokrotnością 1 dnia i nie może przekraczać 7 dni. Po utworzeniu strumienia zmian nie można zmienić okresu przechowywania. Aby zmienić okres przechowywania, musisz usunąć i ponownie utworzyć strumień zmian.
  • Zdarzenia zmian delete, insert, update i drop, które można obserwować za pomocą db.collection.watch() i db.watch().
  • updateDescription.updatedFields zawiera różnice między aktualizacjami.
  • Wszystkie fullDocument i fullDocumentBeforeChange opcje.
    • Wyszukiwanie pełnego dokumentu pod kątem aktualizacji.
    • Obraz dokumentu przed jego zastąpieniem, zaktualizowaniem lub usunięciem.
    • Obraz dokumentu po jego zastąpieniu lub zaktualizowaniu.
    • Obrazy sprzed godziny i po niej wymagają włączenia odzyskiwania do określonego momentu (PITR).
  • Wszystkie opcje wznawiania, w tym resumeAfter i startAfter.
  • Gdy używasz watch() do obserwowania zmian, możesz łączyć etapy agregacji, takie jak $addFields, $match, $project, $replaceRoot, $replaceWith, $set i $unset.

Konfigurowanie strumieni zmian

Aby utworzyć, usunąć lub wyświetlić istniejące strumienie zmian w bazie danych, użyj konsoli Google Cloud.

Role i uprawnienia

Aby tworzyć, usuwać i wyświetlać strumienie zmian, podmiot zabezpieczeń musi mieć odpowiednio uprawnienia Identity and Access Management (IAM) datastore.schemas.create, datastore.schemas.delete i datastore.schemas.list.

Te uprawnienia przyznaje na przykład rola Administrator indeksu Datastore (roles/datastore.indexAdmin).

Tworzenie strumienia zmian

Zanim otworzysz odpowiedni kursor strumienia zmian, musisz utworzyć strumień zmian. Automatyczne włączanie strumienia zmian podczas tworzenia kolekcji lub bazy danych nie jest obsługiwane.

Aby utworzyć strumień zmian, użyj konsoli Google Cloud.

  1. W konsoli Google Cloud otwórz stronę Bazy danych.

    Otwórz stronę Bazy danych

  2. Na liście wybierz bazę danych Firestore w trybie zgodności z MongoDB. Otworzy się panel Firestore Studio.
  3. W panelu Eksplorator znajdź węzeł Strumienie zmian , kliknij Więcej działań , a następnie wybierz Utwórz strumień zmian.
  4. Wpisz unikalną nazwę strumienia zmian, zakres i okres przechowywania, a następnie kliknij Zapisz.

Wyświetlanie strumieni zmian

Szczegóły strumieni zmian możesz wyświetlić w konsoli Google Cloud.

  1. W konsoli Google Cloud otwórz stronę Bazy danych.

    Otwórz stronę Bazy danych

  2. Na liście wybierz bazę danych Firestore w trybie zgodności z MongoDB. Otworzy się panel Firestore Studio.
  3. W panelu Eksplorator znajdź węzeł Strumienie zmian.
  4. Aby otworzyć lub zamknąć węzeł, kliknij Przełącz węzeł.

Usuwanie strumienia zmian

Aby usunąć strumień zmian, użyj konsoli Google Cloud.

  1. W konsoli Google Cloud otwórz stronę Bazy danych.

    Otwórz stronę Bazy danych

  2. Na liście wybierz bazę danych Firestore w trybie zgodności z MongoDB. Otworzy się panel Firestore Studio.
  3. W panelu Eksplorator znajdź węzeł Strumienie zmian.
  4. Aby otworzyć lub zamknąć węzeł, kliknij Przełącz węzeł.
  5. W Eksploratorze znajdź strumień zmian, który chcesz usunąć.
  6. Kliknij Więcej działań i następnie wybierz Usuń strumień zmian.
  7. W oknie dialogowym wpisz nazwę strumienia zmian, aby potwierdzić usunięcie, a następnie kliknij Usuń.

Otwieranie lub wznawianie kursora strumienia zmian

Z przykładów poniżej dowiesz się, jak utworzyć, wznowić i skonfigurować kursor strumienia zmian.

Zanim utworzysz kursor strumienia zmian, musisz wyraźnie utworzyć strumień zmian dla bazy danych lub kolekcji.

Tworzenie kursora strumienia zmian

Aby utworzyć nowy kursor strumienia zmian, użyj metody watch w sterownikach MongoDB. Aby nasłuchiwać wszystkich zmian w bazie danych, utwórz strumień zmian o zakresie bazy danych i wywołaj metodę watch na obiekcie db.

let cursor = db.watch()

Aby utworzyć kursor o zakresie kolekcji, musisz najpierw utworzyć strumień zmian dla tej kolekcji. Następnie wywołaj metodę watch w odpowiedniej kolekcji.

let cursor = db.my_collection.watch()

Teraz, gdy masz już kursor strumienia zmian, możesz rozpocząć przesyłanie strumieniowe. Jeśli na przykład wstawisz dokument i wywołasz tryNext na kursorze, zobaczysz zmianę w strumieniu zmian.

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

Jeśli zaktualizujesz i usuniesz dokument, zobaczysz te zmiany w strumieniu zmian:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

Wznawianie strumienia zmian

Aby wznowić strumień zmian, użyj opcji resumeAfter lub startAfter. Aby określić, od którego miejsca w historii zmian należy wznowić działanie resumeAfter i startAfter, użyj tokena wznawiania.

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

Aby użyć startAfter:

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

Dołączanie obrazów przed i po aktualizacji oraz usunięciu

W razie potrzeby możesz dołączyć obrazy dokumentów przed i po aktualizacji oraz usunięciu do zdarzeń zmian. Dostępność obrazów zależy od okna odzyskiwania do określonego momentu (PITR), a aby odczytać obrazy dokumentów sprzed godziny, musisz włączyć PITR.

Strumienie zmian wykorzystują okno PITR, aby wyświetlić dokument przed i po danym zdarzeniu zmiany. Domyślnie zdarzenia aktualizacji zawierają pole updateDescription, które jest różnicą między polami zmodyfikowanymi przez operację aktualizacji.

Aby dołączyć obrazy przed i po do zdarzenia zmiany, musisz określić opcje fullDocumentBeforeChange i fullDocument w zapytaniu strumienia zmian.

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

Jeśli zapytanie próbuje odczytać dokument poza oknem przechowywania PITR lub jeśli PITR nie jest włączone, wartość required spowoduje wyświetlenie komunikatu o błędzie po stronie serwera.

Zamiast wyświetlać błąd, możesz użyć wartości whenAvailable, aby zwrócić wartość null, jeśli obrazy nie są już dostępne.

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

Dołączanie bieżącego obrazu do aktualizacji

Domyślnie zdarzenia aktualizacji zawierają pole updateDescription, które jest różnicą między polami zmodyfikowanymi przez operację aktualizacji. Aby zamiast tego wyszukać najnowszą wersję całego dokumentu, użyj wartości updateLookup w opcji fullDocument.

Ta funkcja nie wymaga PITR i wyszukuje dokument.

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

Odczyty równoległe

Aby zwiększyć przepustowość, możesz użyć opcji firestoreWorkerConfig, aby podzielić zapytanie strumienia zmian na kilka instancji roboczych. Każda instancja robocza odpowiada za obsługę zmian w odrębnym zestawie dokumentów. Musisz utworzyć kursor równoległy za pomocą zapytania runCommand lub aggregate.

Możesz na przykład rozdzielić strumień zmian na 3 instancje robocze w ten sposób:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

Strumienie zmian i kopie zapasowe

Podczas przywracania kopii zapasowej nie są dostępne ani konfiguracja strumienia zmian, ani dane strumienia zmian. Jeśli przywrócisz bazę danych ze strumieniami zmian, musisz ponownie utworzyć te strumienie zmian w docelowej bazie danych, aby otworzyć kursory do tej bazy danych.

Płatności

Różnice w działaniu

W tej sekcji opisujemy różnice w strumieniach zmian między Firestore w trybie zgodności z MongoDB a MongoDB.

updateDescription

updateDescription to dokument w zdarzeniu update, który opisuje pola zaktualizowane lub usunięte przez operację aktualizacji. W Cloud Firestore najważniejsze różnice są następujące:

  • W updateDescription pola truncatedArrays i disambiguatedPaths nie są wypełniane.
  • updateDescription.updatedFields reprezentuje kanoniczną różnicę między obrazami dokumentu przed i po zastosowaniu mutacji.

Przyjrzyjmy się początkowemu stanowi dokumentu:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

Przykład 1. Zmiana tylko pierwszego elementu tablicy.

W tym przypadku działanie Cloud Firestore jest zgodne z MongoDB.

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Przykład 2. Zastąpienie całej tablicy.

W tym przypadku operacja aktualizuje tylko pierwsze pole tablicy, ale zastępuje całą tablicę.

Różnica aktualizacji Cloud Firestore nie rozróżnia tych 2 przypadków i zwraca ten sam element updateDescription.updatedFields w obu przypadkach:

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Co dalej?