Lee datos en tiempo real con flujos de cambios

Los flujos de cambios para Firestore con compatibilidad con MongoDB permiten que las aplicaciones accedan a los cambios en tiempo real (inserciones, actualizaciones y eliminaciones) que se realizan en una colección o en una base de datos completa. Un flujo de cambios ordena las actualizaciones por hora de modificación.

Se puede acceder a Change Streams a través de las APIs compatibles con MongoDB y los controladores tradicionales de MongoDB. La implementación de flujos de cambios de Firestore con compatibilidad con MongoDB puede controlar cualquier capacidad de procesamiento de escrituras y lecturas a través de una implementación única de partición automática en escrituras y paralelismo de lectura. Esto te permite crear cargas de trabajo de alta capacidad de procesamiento. También puedes mejorar la infraestructura de migración y sincronización de datos entre Cloud Firestore y otras soluciones de almacenamiento.

Además de la compatibilidad con los controladores de MongoDB, puedes usar Cloud Firestore para leer flujos de cambios en paralelo. Esto te permite crear cargas de trabajo de lectura paralelas y de alta capacidad de procesamiento. Cada transmisión representa una partición de resultados bien distribuida.

Los Change Streams admiten las siguientes funciones:

  • Flujos de cambios configurables con alcance de base de datos o colección
  • Es la duración de retención de un flujo de cambios que se especifica en el momento de la creación. La retención predeterminada es de 7 días y la retención mínima es de 1 día. La retención debe ser un múltiplo de 1 día, hasta un máximo de 7 días. La duración de la retención no se puede cambiar después de la creación. Para cambiar el período de retención, debes descartar y volver a crear el flujo de cambios.
  • Eventos de cambio de delete, insert, update y drop que se pueden observar con db.collection.watch() y db.watch().
  • updateDescription.updatedFields contiene las diferencias de actualización.
  • Todas las opciones de fullDocument y fullDocumentBeforeChange
    • Se está buscando el documento completo para ver si hay actualizaciones.
    • Es la imagen previa del documento antes de que se reemplazara, actualizara o borrara.
    • Es la imagen posterior del documento después de que se reemplazó o actualizó.
    • Las imágenes anteriores y posteriores a la hora requieren que se habilite la recuperación de un momento determinado (PITR).
  • Todas las opciones de reanudación, incluidas resumeAfter y startAfter.
  • Cuando usas watch() para observar los cambios, puedes encadenar etapas de agregación como $addFields, $match, $project, $replaceRoot, $replaceWith, $set y $unset.

Configura flujos de cambios

Para crear, descartar o ver Change Streams existentes para una base de datos, usa la consola de Google Cloud.

Funciones y permisos

Para crear, borrar y enumerar Change Streams, una principal requiere los permisos de Identity and Access Management (IAM) datastore.schemas.create, datastore.schemas.delete y datastore.schemas.list, respectivamente.

Por ejemplo, el rol de Administrador del índice de Datastore (roles/datastore.indexAdmin) otorga estos permisos.

Crear transmisión de cambios

Antes de abrir un cursor de transmisión de cambios correspondiente, debes crear una transmisión de cambios. No se admite la habilitación automática de flujos de cambios en la creación de colecciones o bases de datos.

Para crear un flujo de cambios, usa la consola de Google Cloud.

  1. En la consola de Google Cloud, ve a la página Bases de datos.

    Ir a Bases de datos

  2. En la lista, selecciona una base de datos de Firestore con compatibilidad con MongoDB. Se abrirá el panel de Firestore Studio.
  3. En el panel Explorador, busca el nodo Flujos de cambios, haz clic en Más acciones y, luego, selecciona Crear flujo de cambios.
  4. Ingresa un nombre, un alcance y un período de retención únicos para el flujo de cambios y, luego, haz clic en Guardar.

Cómo ver los flujos de cambios

Puedes ver detalles sobre Change Streams en la consola de Google Cloud.

  1. En la consola de Google Cloud, ve a la página Bases de datos.

    Ir a Bases de datos

  2. En la lista, selecciona una base de datos de Firestore con compatibilidad con MongoDB. Se abrirá el panel de Firestore Studio.
  3. En el panel Explorador, busca el nodo flujos de cambios.
  4. Para abrir o cerrar el nodo, haz clic en Alternar nodo.

Borra un flujo de cambios

Para borrar un flujo de cambios, usa la consola de Google Cloud.

  1. En la consola de Google Cloud, ve a la página Bases de datos.

    Ir a Bases de datos

  2. En la lista, selecciona una base de datos de Firestore con compatibilidad con MongoDB. Se abrirá el panel de Firestore Studio.
  3. En el panel Explorador, busca el nodo flujos de cambios.
  4. Para abrir o cerrar el nodo, haz clic en Alternar nodo.
  5. En el Explorador, busca el flujo de cambios que deseas borrar.
  6. Haz clic en Más acciones y, luego, selecciona Borrar flujo de cambios.
  7. En el diálogo, ingresa el nombre del flujo de cambios para confirmar la eliminación y, luego, haz clic en Borrar.

Cómo abrir o reanudar un cursor de transmisión de cambios

En los siguientes ejemplos, se muestra cómo crear, reanudar y configurar un cursor de transmisión de cambios.

Antes de crear un cursor de flujo de cambios, debes crear un flujo de cambios de forma explícita para la base de datos o la colección.

Crea un cursor de flujo de cambios

Para crear un cursor de flujo de cambios nuevo, usa el método watch en los controladores de MongoDB. Para detectar todos los cambios en una base de datos, crea un flujo de cambios con alcance en la base de datos y llama al método watch en el objeto db.

let cursor = db.watch()

Para crear un cursor con alcance en una colección, primero debes crear un flujo de cambios para esa colección. Luego, llama al método watch en la colección correspondiente.

let cursor = db.my_collection.watch()

Ahora que creaste un cursor de flujo de cambios, puedes comenzar a transmitir. Por ejemplo, si insertas un documento y llamas a tryNext en el cursor, verás el cambio en el flujo de cambios.

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

Si actualizas y borras el documento, verás esos cambios en el flujo de cambios:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

Reanudar un flujo de cambios

Para reanudar un flujo de cambios, usa las opciones resumeAfter o startAfter. Para determinar desde qué punto del registro de cambios reanudar la operación resumeAfter y startAfter, usa un token de reanudación.

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

Para usar startAfter, ingresa el siguiente comando:

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

Incluye imágenes previas y posteriores en las actualizaciones y eliminaciones

Si es necesario, puedes incluir imágenes previas y posteriores de los documentos en los eventos de cambio de actualización y eliminación. La disponibilidad de las imágenes está sujeta al período de recuperación de un momento determinado (PITR), y para leer imágenes de documentos de más de una hora, debes habilitar la PITR.

Las transmisiones de cambios aprovechan la ventana de PITR para proporcionar una vista del documento antes y después del evento de cambio determinado. De forma predeterminada, los eventos de actualización contienen un campo updateDescription, que es el delta de los campos modificados por la operación de actualización.

Para incluir las imágenes previas y posteriores en un evento de cambio, debes especificar las opciones fullDocumentBeforeChange y fullDocument en la consulta de la transmisión de cambios.

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

Si la consulta intenta leer un documento fuera del período de retención de la PITR o si la PITR no está habilitada, el valor required arroja un mensaje de error del servidor.

Como alternativa a arrojar un error, puedes usar el valor whenAvailable para devolver un valor null si las imágenes ya no están disponibles.

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

Incluye la imagen actual en las actualizaciones

De forma predeterminada, los eventos de actualización contienen un campo updateDescription, que es el delta de los campos modificados por la operación de actualización. Para buscar la versión más actual de todo el documento, usa el valor updateLookup en la opción fullDocument.

Esta función no requiere PITR y realiza una búsqueda del documento.

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

Lecturas paralelas

Para aumentar la capacidad de procesamiento, puedes usar la opción firestoreWorkerConfig para dividir una consulta de transmisión de cambios en varios trabajadores. Cada trabajador es responsable de publicar los cambios para un conjunto distinto de documentos. Debes crear un cursor paralelo a través de una consulta runCommand o aggregate.

Por ejemplo, puedes distribuir un flujo de cambios en 3 trabajadores de la siguiente manera:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

Cambia las transmisiones y las copias de seguridad

Ni la configuración ni los datos del flujo de cambios están disponibles en las operaciones de restablecimiento de copias de seguridad. Si restableces una base de datos con Change Streams, debes volver a crear esos flujos de cambios en la base de datos de destino para abrir cursores en esa base de datos.

Facturación

Diferencias de comportamiento

En la siguiente sección, se describen las diferencias en los Change Streams entre Firestore con compatibilidad con MongoDB y MongoDB.

updateDescription

updateDescription es un documento en un evento update que describe los campos que se actualizaron o quitaron con la operación de actualización. En Cloud Firestore, las diferencias notables son las siguientes:

  • En updateDescription, no se propagan los campos truncatedArrays y disambiguatedPaths.
  • updateDescription.updatedFields representa una diferencia canónica entre las imágenes previas y posteriores de un documento antes y después de que se aplica una mutación.

Considera el siguiente estado inicial de un documento:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

Situación 1: Se muta solo el primer elemento del array.

En esta situación, el comportamiento de Cloud Firestore coincide con el de MongoDB.

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Situación 2: Sobreescribir con un array completo

En esta situación, la operación solo actualiza el primer campo del array, pero sobrescribe todo el array.

La diferencia de actualización de Cloud Firestore no diferencia entre estos dos casos y devuelve el mismo updateDescription.updatedFields para ambos:

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

¿Qué sigue?