Leggi i dati in tempo reale con i flussi di modifiche in tempo reale

I flussi di modifiche per Firestore con compatibilità MongoDB consentono alle applicazioni di accedere alle modifiche in tempo reale (inserimenti, aggiornamenti ed eliminazioni) apportate a una raccolta o a un intero database. Un flusso di modifiche ordina gli aggiornamenti in base all'ora di modifica.

I flussi di modifiche sono accessibili tramite le API compatibili con MongoDB e i driver MongoDB tradizionali. L'implementazione dei flussi di modifiche di Firestore con compatibilità MongoDB può gestire qualsiasi velocità effettiva di scritture e letture tramite un'implementazione unica del partizionamento automatico su scritture e parallelismo di lettura. In questo modo puoi creare carichi di lavoro ad alta velocità effettiva. Puoi anche migliorare l'infrastruttura di migrazione e sincronizzazione dei dati tra Cloud Firestore e altre soluzioni di archiviazione.

Oltre alla compatibilità con i driver MongoDB, puoi utilizzare Cloud Firestore per leggere i flussi di modifiche in parallelo. In questo modo puoi creare carichi di lavoro di lettura paralleli ad alta velocità effettiva. Ogni flusso rappresenta una partizione di risultati ben distribuita.

I flussi di modifiche supportano le seguenti funzionalità:

  • Flussi di modifiche in tempo reale configurabili con ambito database o raccolta.
  • Una durata di conservazione per un flusso di modifiche specificata al momento della creazione. La conservazione predefinita è di 7 giorni e la conservazione minima è di 1 giorno. La conservazione deve essere un multiplo di 1 giorno, fino a un massimo di 7 giorni. La durata di conservazione non può essere modificata dopo la creazione. Per modificare il periodo di conservazione, devi eliminare e ricreare il flusso di modifiche.
  • Eventi di modifica delete, insert, update e drop osservabili utilizzando db.collection.watch() e db.watch().
  • updateDescription.updatedFields contiene le differenze di aggiornamento.
  • Tutte le opzioni fullDocument e fullDocumentBeforeChange.
    • Ricerca del documento completo per gli aggiornamenti.
    • Immagine precedente del documento prima che venisse sostituito, aggiornato o eliminato.
    • Immagine successiva del documento dopo che è stato sostituito o aggiornato.
    • Le immagini precedenti e successive più vecchie di un'ora richiedono l'attivazione del recupero point-in-time (PITR).
  • Tutte le opzioni di ripresa, inclusi resumeAfter e startAfter.
  • Quando utilizzi watch() per osservare le modifiche, puoi concatenare le fasi di aggregazione come $addFields, $match, $project, $replaceRoot, $replaceWith, $set e $unset.

Configurare i flussi di modifiche

Per creare, eliminare o visualizzare i flussi di modifiche esistenti per un database, utilizza la console Google Cloud.

Ruoli e autorizzazioni

Per creare, eliminare ed elencare i flussi di modifiche, un'entità richiede rispettivamente le autorizzazioni di Identity and Access Management (IAM) datastore.schemas.create, datastore.schemas.delete e datastore.schemas.list.

Il ruolo Amministratore indice Datastore (roles/datastore.indexAdmin), ad esempio, concede queste autorizzazioni.

Creare un flusso di modifiche

Prima di poter aprire un cursore del flusso di modifiche corrispondente, devi creare un flusso di modifiche. L'attivazione automatica del flusso di modifiche al momento della creazione della raccolta o del database non è supportata.

Per creare un flusso di modifiche, utilizza la console Google Cloud.

  1. Nella console Google Cloud, vai alla pagina Database.

    Vai a Database

  2. Seleziona un database Firestore con compatibilità MongoDB dall'elenco. Si apre il riquadro Firestore Studio.
  3. Nel riquadro Explorer , individua il nodo Modifiche in tempo reale , fai clic su Altre azioni e seleziona Crea flusso di modifiche.
  4. Inserisci un nome, un ambito e un periodo di conservazione univoci per il flusso di modifiche, quindi fai clic su Salva.

Visualizzare i flussi di modifiche

Puoi visualizzare i dettagli dei flussi di modifiche nella console Google Cloud.

  1. Nella console Google Cloud, vai alla pagina Database.

    Vai a Database

  2. Seleziona un database Firestore con compatibilità MongoDB dall'elenco. Si apre il riquadro Firestore Studio.
  3. Nel riquadro Explorer, individua il nodo modifiche in tempo reale.
  4. Per aprire o chiudere il nodo, fai clic su Attiva/disattiva nodo.

Eliminare un flusso di modifiche

Per eliminare un flusso di modifiche, utilizza la console Google Cloud.

  1. Nella console Google Cloud, vai alla pagina Database.

    Vai a Database

  2. Seleziona un database Firestore con compatibilità MongoDB dall'elenco. Si apre il riquadro Firestore Studio.
  3. Nel riquadro Explorer, individua il nodo modifiche in tempo reale.
  4. Per aprire o chiudere il nodo, fai clic su Attiva/disattiva nodo.
  5. In the Explorer, individua il flusso di modifiche che vuoi eliminare.
  6. Fai clic su Altre azioni e seleziona Elimina flusso di modifiche.
  7. Nella finestra di dialogo, inserisci il nome del flusso di modifiche per confermare l'eliminazione, quindi fai clic su Elimina.

Aprire o riprendere un cursore del flusso di modifiche

Gli esempi seguenti mostrano come creare, riprendere e configurare un cursore del flusso di modifiche.

Prima di creare un cursore del flusso di modifiche, devi creare esplicitamente un flusso di modifiche per il database o la raccolta.

Creare un cursore del flusso di modifiche

Per creare un nuovo cursore del flusso di modifiche, utilizza il metodo watch nei driver MongoDB. Per ascoltare tutte le modifiche apportate a un database, crea un flusso di modifiche con ambito database e chiama il metodo watch sull'oggetto db.

let cursor = db.watch()

Per creare un cursore con ambito limitato a una raccolta, devi prima creare un flusso di modifiche per quella raccolta. Quindi, chiama il metodo watch sulla raccolta corrispondente.

let cursor = db.my_collection.watch()

Ora che hai creato un cursore del flusso di modifiche, puoi iniziare lo streaming. Ad esempio, se inserisci un documento e chiami tryNext sul cursore, vedrai la modifica visualizzata nel flusso di modifiche.

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

Se aggiorni ed elimini il documento, vedrai queste modifiche nel flusso di modifiche:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

Riprendere un flusso di modifiche

Per riprendere un flusso di modifiche, utilizza le opzioni resumeAfter o startAfter. Per determinare da dove riprendere nel log delle modifiche da resumeAfter e startAfter, utilizza un token di ripresa.

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

Per utilizzare startAfter:

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

Includere le immagini precedenti e successive negli aggiornamenti e nelle eliminazioni

Se necessario, puoi includere le immagini precedenti e successive dei documenti negli eventi di modifica di aggiornamento ed eliminazione. La disponibilità delle immagini è soggetta alla finestra di recupero point-in-time (PITR) e, per leggere le immagini dei documenti più vecchie di un'ora, devi abilitare PITR.

I flussi di modifiche sfruttano la finestra PITR per fornire una visualizzazione del documento prima e dopo l'evento di modifica specificato. Per impostazione predefinita, gli eventi di aggiornamento contengono un campo updateDescription, ovvero il delta dei campi modificati dall'operazione di aggiornamento.

Per includere le immagini precedenti e successive in un evento di modifica, devi specificare fullDocumentBeforeChange e fullDocument opzioni nella query del flusso di modifiche.

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

Se la query tenta di leggere un documento al di fuori della finestra di conservazione PITR o se PITR non è abilitato, il valore required genera un messaggio di errore lato server.

In alternativa alla generazione di un errore, puoi utilizzare il valore whenAvailable per restituire un valore null se le immagini non sono più disponibili.

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

Includere l'immagine corrente negli aggiornamenti

Per impostazione predefinita, gli eventi di aggiornamento contengono un campo updateDescription, ovvero il delta dei campi modificati dall'operazione di aggiornamento. Per cercare invece la versione più recente dell'intero documento, utilizza il valore updateLookup nell'opzione fullDocument.

Questa funzionalità non richiede PITR ed esegue una ricerca del documento.

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

Letture parallele

Per aumentare la velocità effettiva, puoi utilizzare l'opzione firestoreWorkerConfig per suddividere una query del flusso di modifiche tra più worker. Ogni worker è responsabile della gestione delle modifiche per un insieme distinto di documenti. Devi creare un cursore parallelo tramite una query runCommand o aggregate.

Ad esempio, puoi distribuire un flusso di modifiche su 3 worker nel seguente modo:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

Flussi di modifiche e backup

Né la configurazione del flusso di modifiche né i dati del flusso di modifiche sono disponibili nelle operazioni di ripristino del backup. Se ripristini un database con flussi di modifiche, devi ricreare questi flussi di modifiche nel database di destinazione per aprire i cursori a quel database.

Fatturazione

Differenze di comportamento

La sezione seguente descrive le differenze nei flussi di modifiche tra Firestore con compatibilità MongoDB e MongoDB.

updateDescription

updateDescription è un documento in un evento update che descrive i campi aggiornati o rimossi dall'operazione di aggiornamento. In Cloud Firestore, le differenze principali sono:

  • In updateDescription, i campi truncatedArrays e disambiguatedPaths non vengono compilati.
  • updateDescription.updatedFields rappresenta una differenza canonica tra le immagini precedenti e successive di un documento prima e dopo l'applicazione di una mutazione.

Considera il seguente stato iniziale di un documento:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

Scenario 1: mutare solo il primo elemento dell'array.

In questo scenario, il comportamento di Cloud Firestore corrisponde a quello di MongoDB.

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Scenario 2: sovrascrivere con un intero array

In questo scenario, l'operazione aggiorna solo il primo campo dell'array, ma sovrascrive l'intero array.

La differenza di aggiornamento Cloud Firestore non distingue tra questi due scenari e restituisce lo stesso updateDescription.updatedFields per entrambi:

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Passaggi successivi