Odczytywanie danych w czasie rzeczywistym za pomocą strumieni zmian

Strumienie zmian w Firestore w trybie zgodności z MongoDB umożliwiają aplikacjom dostęp w czasie rzeczywistym do zmian (wstawień, aktualizacji i usunięć) wprowadzonych w kolekcji lub w całej bazie danych. Strumień zmian porządkuje aktualizacje według czasu modyfikacji.

Strumienie zmian są dostępne za pomocą interfejsów API zgodnych z MongoDB i tradycyjnych sterowników MongoDB. Implementacja strumieni zmian w Firestore w trybie zgodności z MongoDB może obsługiwać dowolną przepustowość zapisów i odczytów dzięki unikalnej implementacji automatycznego partycjonowania zapisów i równoległości odczytów. Umożliwia to tworzenie zadań o wysokiej przepustowości. Możesz też ulepszyć infrastrukturę migracji i synchronizacji danych między Cloud Firestore a innymi rozwiązaniami do przechowywania danych.

Oprócz zgodności ze sterownikami MongoDB możesz używać Cloud Firestore do równoległego odczytywania strumieni zmian. Umożliwia to tworzenie równoległych zadań odczytu o wysokiej przepustowości. Każdy strumień reprezentuje dobrze rozproszoną partycję wyników.

Strumienie zmian obsługują te funkcje:

  • Konfigurowalne strumienie zmian o zakresie bazy danych lub kolekcji.
  • Okres przechowywania strumienia zmian określony podczas jego tworzenia. Domyślny okres przechowywania to 7 dni, a minimalny to 1 dzień. Okres przechowywania musi być wielokrotnością 1 dnia i nie może przekraczać 7 dni. Po utworzeniu nie można zmienić czasu przechowywania. Aby zmienić okres przechowywania, musisz usunąć strumień zmian i utworzyć go ponownie.
  • delete, insert, updatedrop to zdarzenia zmiany, które można obserwować za pomocą db.collection.watch()db.watch().
  • updateDescription.updatedFields zawiera różnice w aktualizacjach.
  • Wszystkie opcje fullDocumentfullDocumentBeforeChange.
    • Sprawdzanie, czy pełna wersja dokumentu została zaktualizowana.
    • Obraz dokumentu przed jego zastąpieniem, zaktualizowaniem lub usunięciem.
    • Obraz dokumentu po jego zastąpieniu lub zaktualizowaniu.
    • W przypadku obrazów sprzed i po starszych niż godzina wymagane jest włączenie odzyskiwania do określonego momentu (PITR).
  • Wszystkie opcje wznawiania, w tym resumeAfterstartAfter.
  • Gdy używasz funkcji watch() do obserwowania zmian, możesz łączyć etapy agregacji, takie jak:$addFields, $match, $project, $replaceRoot, $replaceWith, $set$unset.

Konfigurowanie strumieni zmian

Aby utworzyć, usunąć lub wyświetlić istniejące strumienie zmian w bazie danych, użyj konsoli Google Cloud.

Role i uprawnienia

Aby tworzyć, usuwać i wyświetlać listy strumieni zmian, podmiot zabezpieczeń musi mieć odpowiednio uprawnienia Identity and Access Management (IAM) datastore.schemas.create, datastore.schemas.deletedatastore.schemas.list.

Na przykład rola Administrator indeksu Datastore (roles/datastore.indexAdmin) przyznaje te uprawnienia.

Utwórz strumień zmian

Zanim otworzysz odpowiedni kursor strumienia zmian, musisz utworzyć strumień zmian. Automatyczne włączanie strumienia zmian podczas tworzenia kolekcji lub bazy danych nie jest obsługiwane.

Aby utworzyć strumień zmian, użyj konsoli Google Cloud.

  1. W konsoli Google Cloud otwórz stronę Bazy danych.

    Otwórz Bazy danych

  2. Z listy wybierz bazę danych Firestore w trybie zgodności z MongoDB. Otworzy się panel Firestore Studio.
  3. W panelu Eksplorator znajdź węzeł Strumienie zmian, kliknij  Więcej działań, a następnie wybierz Utwórz strumień zmian.
  4. Wpisz niepowtarzalną nazwę strumienia zmian, zakres i okres przechowywania, a następnie kliknij Zapisz.

Wyświetlanie strumieni zmian

Szczegóły dotyczące strumieni zmian możesz wyświetlić w konsoli Google Cloud.

  1. W konsoli Google Cloud otwórz stronę Bazy danych.

    Otwórz Bazy danych

  2. Z listy wybierz bazę danych Firestore w trybie zgodności z MongoDB. Otworzy się panel Firestore Studio.
  3. W panelu Eksplorator znajdź węzeł Strumienie zmian.
  4. Aby otworzyć lub zamknąć węzeł, kliknij  Przełącz węzeł.

Usuwanie strumienia zmian

Aby usunąć strumień zmian, użyj konsoli Google Cloud.

  1. W konsoli Google Cloud otwórz stronę Bazy danych.

    Otwórz Bazy danych

  2. Z listy wybierz bazę danych Firestore w trybie zgodności z MongoDB. Otworzy się panel Firestore Studio.
  3. W panelu Eksplorator znajdź węzeł Strumienie zmian.
  4. Aby otworzyć lub zamknąć węzeł, kliknij  Przełącz węzeł.
  5. Eksploratorze znajdź strumień zmian, który chcesz usunąć.
  6. Kliknij  Więcej działań, a następnie wybierz Usuń strumień zmian.
  7. W oknie wpisz nazwę strumienia zmian, aby potwierdzić usunięcie, a następnie kliknij Usuń.

Otwieranie lub wznawianie kursora strumienia zmian

Poniższe przykłady pokazują, jak tworzyć, wznawiać i konfigurować kursor strumienia zmian.

Zanim utworzysz kursor strumienia zmian, musisz jawnie utworzyć strumień zmian dla bazy danych lub kolekcji.

Tworzenie kursora strumienia zmian

Aby utworzyć nowy kursor strumienia zmian, użyj metody watch w sterownikach MongoDB. Aby nasłuchiwać wszystkich zmian w bazie danych, utwórz strumień zmian o zakresie bazy danych i wywołaj metodę watch na obiekcie db.

let cursor = db.watch()

Aby utworzyć kursor ograniczony do kolekcji, musisz najpierw utworzyć strumień zmian dla tej kolekcji. Następnie wywołaj metodę watch w odpowiedniej kolekcji.

let cursor = db.my_collection.watch()

Po utworzeniu kursora strumienia zmian możesz rozpocząć przesyłanie strumieniowe. Jeśli na przykład wstawisz dokument i wywołasz tryNext na kursorze, zobaczysz zmianę w strumieniu zmian.

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

Jeśli zaktualizujesz i usuniesz dokument, zmiany te pojawią się w strumieniu zmian:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

Wznawianie strumienia zmian

Aby wznowić strumień zmian, użyj opcji resumeAfter lub startAfter. Aby określić, od którego miejsca w historii zmian wznowić działanie resumeAfterstartAfter, użyj tokena wznowienia.

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

Aby użyć startAfter:

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

Dodawanie zdjęć przed i po w aktualizacjach oraz usuwanie ich

W razie potrzeby możesz dołączyć obrazy dokumentów przed i po zmianach w zdarzeniach aktualizacji i usuwania. Dostępność obrazów zależy od okna odzyskiwania do określonego momentu (PITR). Aby odczytać obrazy dokumentów starsze niż godzina, musisz włączyć PITR.

Strumienie zmian korzystają z okresu PITR, aby zapewnić widok dokumentu przed i po danym zdarzeniu zmiany. Domyślnie zdarzenia aktualizacji zawierają pole updateDescription, które jest różnicą między polami zmodyfikowanymi przez operację aktualizacji.

Aby uwzględnić obrazy przed i po w zdarzeniu zmiany, musisz określić opcje fullDocumentBeforeChangefullDocument w zapytaniu dotyczącym strumienia zmian.

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

Jeśli zapytanie próbuje odczytać dokument spoza okresu przechowywania PITR lub jeśli PITR nie jest włączony, wartość required zwraca komunikat o błędzie po stronie serwera.

Zamiast zgłaszać błąd, możesz użyć wartości whenAvailable, aby zwrócić wartość null, jeśli obrazy nie są już dostępne.

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

Uwzględnij bieżący obraz w aktualizacjach

Domyślnie zdarzenia aktualizacji zawierają pole updateDescription, które jest różnicą między polami zmodyfikowanymi przez operację aktualizacji. Aby zamiast tego wyszukać najnowszą wersję całego dokumentu, użyj wartości updateLookup w opcji fullDocument.

Ta funkcja nie wymaga odzyskiwania do określonego momentu i wyszukuje dokument.

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

Odczyty równoległe

Aby zwiększyć przepustowość, możesz użyć opcji firestoreWorkerConfig, aby podzielić zapytanie dotyczące strumienia zmian na wielu pracowników. Każdy proces roboczy jest odpowiedzialny za wprowadzanie zmian w odrębnym zestawie dokumentów. Musisz utworzyć kursor równoległy za pomocą zapytania runCommand lub aggregate.

Możesz na przykład rozdzielić strumień zmian na 3 procesy robocze w ten sposób:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

Strumienie zmian i kopie zapasowe

Ani konfiguracja strumienia zmian, ani dane strumienia zmian nie są dostępne w operacjach przywracania kopii zapasowej. Jeśli przywracasz bazę danych ze strumieniami zmian, musisz ponownie utworzyć te strumienie zmian w docelowej bazie danych, aby otworzyć w niej kursory.

Płatności

Różnice w działaniu

W sekcji poniżej opisujemy różnice w strumieniach zmian między Firestore w trybie zgodności z MongoDB a MongoDB.

updateDescription

updateDescription to dokument w update zdarzeniu, który opisuje pola zaktualizowane lub usunięte w wyniku operacji aktualizacji. W przypadku Cloud Firestore najważniejsze różnice są następujące:

  • W języku updateDescription pola truncatedArraysdisambiguatedPaths nie są wypełniane.
  • updateDescription.updatedFields reprezentują kanoniczną różnicę między obrazami dokumentu przed i po zastosowaniu zmiany.

Rozważmy następujący stan początkowy dokumentu:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

Scenariusz 1. Zmiana tylko pierwszego elementu tablicy.

W tym przypadku działanie Cloud Firestore jest zgodne z MongoDB.

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Scenariusz 2. Zastępowanie całej tablicy

W tym przypadku operacja aktualizuje tylko pierwsze pole tablicy, ale zastępuje całą tablicę.

Różnica aktualizacji Cloud Firestore nie rozróżnia tych 2 scenariuszy i w obu przypadkach zwraca ten sam kod updateDescription.updatedFields:

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Co dalej?