Lee datos en tiempo real con flujos de cambios

Las transmisiones de cambios para Firestore con compatibilidad con MongoDB permiten que las aplicaciones accedan a los cambios en tiempo real (inserciones, actualizaciones y eliminaciones) que se realizan en una colección o en una base de datos completa. Una transmisión de cambios ordena las actualizaciones por hora de modificación.

Se puede acceder a las transmisiones de cambios a través de las APIs compatibles con MongoDB y los controladores tradicionales de MongoDB. La implementación de Firestore con compatibilidad con MongoDB de los flujos de cambios puede controlar cualquier capacidad de procesamiento de escrituras y lecturas a través de una implementación única de partición automática en escrituras y paralelismo de lectura. Esto te permite compilar cargas de trabajo de alto rendimiento. También puedes mejorar la infraestructura de migración y sincronización de datos entre Cloud Firestore y otras soluciones de almacenamiento.

Además de la compatibilidad con los controladores de MongoDB, puedes usar Cloud Firestore para leer transmisiones de cambios en paralelo. Esto te permite compilar cargas de trabajo de lectura paralelas y de alto rendimiento. Cada transmisión representa una partición de resultados bien distribuida.

Las transmisiones de cambios admiten las siguientes funciones:

  • Flujos de cambios configurables con alcance de base de datos o colección
  • Una duración de retención para una transmisión de cambios que se especifica en la creación (la retención predeterminada es de 7 días y la retención mínima es de 1 día) (la retención debe ser un múltiplo de 1 día, hasta un máximo de 7 días) (la duración de retención no se puede cambiar después de la creación) (para cambiar el período de retención, debes quitar y volver a crear la transmisión de cambios)
  • Eventos de cambio delete, insert, update y drop que se pueden observar con db.collection.watch() y db.watch()
  • updateDescription.updatedFields contiene diferencias de actualización
  • Todas las opciones fullDocument y fullDocumentBeforeChange
    • Búsqueda del documento completo para las actualizaciones
    • Imagen previa del documento antes de que se reemplazara, actualizara o borrara
    • Imagen posterior del documento después de que se reemplazó o actualizó
    • Las imágenes previas y posteriores de más de una hora requieren la habilitación de la recuperación de un momento determinado (PITR)
  • Todas las opciones de reanudación, incluidas resumeAfter y startAfter
  • Cuando usas watch() para observar los cambios, puedes encadenar etapas de agregación como $addFields, $match, $project, $replaceRoot, $replaceWith, $set y $unset.

Configura las transmisiones de cambios

Para crear, quitar o ver las transmisiones de cambios existentes para una base de datos, usa la consola de Google Cloud.

Funciones y permisos

Para crear, borrar y enumerar transmisiones de cambios, una entidad principal requiere los permisos de Identity and Access Management (IAM) datastore.schemas.create, datastore.schemas.delete y datastore.schemas.list, respectivamente.

Por ejemplo, la función Administrador del índice de Datastore (roles/datastore.indexAdmin) otorga estos permisos.

Crear transmisión de cambios

Antes de abrir un cursor de transmisión de cambios correspondiente, debes crear una transmisión de cambios. No se admite la habilitación automática de la transmisión de cambios en la creación de la colección o la base de datos.

Para crear una transmisión de cambios, usa la consola de Google Cloud.

  1. En la consola de Google Cloud, ve a la página Bases de datos.

    Ir a Bases de datos

  2. En la lista, selecciona una base de datos de Firestore con compatibilidad con MongoDB. Se abrirá el panel Firestore Studio.
  3. En el panel Explorador , busca el nodo Flujos de cambios , haz clic en Más acciones y, luego, selecciona Crear flujo de cambios.
  4. Ingresa un nombre, un alcance y un período de retención únicos para la transmisión de cambios y, luego, haz clic en Guardar.

Ver transmisiones de cambios

Puedes ver los detalles sobre las transmisiones de cambios en la consola de Google Cloud.

  1. En la consola de Google Cloud, ve a la página Bases de datos.

    Ir a Bases de datos

  2. En la lista, selecciona una base de datos de Firestore con compatibilidad con MongoDB. Se abrirá el panel Firestore Studio.
  3. En el panel Explorador, busca el nodo Flujos de cambios.
  4. Para abrir o cerrar el nodo, haz clic en Alternar nodo.

Borrar flujo de cambios

Para borrar una transmisión de cambios, usa la consola de Google Cloud.

  1. En la consola de Google Cloud, ve a la página Bases de datos.

    Ir a Bases de datos

  2. En la lista, selecciona una base de datos de Firestore con compatibilidad con MongoDB. Se abrirá el panel Firestore Studio.
  3. En el panel Explorador, busca el nodo Flujos de cambios.
  4. Para abrir o cerrar el nodo, haz clic en Alternar nodo.
  5. En el Explorador, busca la transmisión de cambios que deseas borrar.
  6. Haz clic en Más acciones y luego selecciona Borrar transmisión de cambios.
  7. En el diálogo, ingresa el nombre de la transmisión de cambios para confirmar la eliminación y, luego, haz clic en Borrar.

Abre o reanuda un cursor de transmisión de cambios

En los siguientes ejemplos, se muestra cómo crear, reanudar y configurar un cursor de transmisión de cambios.

Antes de crear un cursor de transmisión de cambios, debes crear explícitamente una transmisión de cambios para la base de datos o la colección.

Crea un cursor de transmisión de cambios

Para crear un cursor de transmisión de cambios nuevo, usa el método watch en los controladores de MongoDB. Para escuchar todos los cambios en una base de datos, crea una transmisión de cambios con alcance de base de datos y llama al método watch en el objeto db.

let cursor = db.watch()

Para crear un cursor con alcance en una colección, primero debes crear una transmisión de cambios para esa colección. Luego, llama al método watch en la colección correspondiente.

let cursor = db.my_collection.watch()

Ahora que creaste un cursor de transmisión de cambios, puedes comenzar a transmitir. Por ejemplo, si insertas un documento y llamas a tryNext en el cursor, verás que el cambio aparece en la transmisión de cambios.

let doc = db.my_collection.insertOne({value: "hello world"})
console.log(cursor.tryNext())

Si actualizas y borras el documento, verás esos cambios en la transmisión de cambios:

db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}})
db.my_collection.deleteOne({"_id": doc.insertedId}})

// Prints the update event
console.log(cursor.tryNext())

// Prints the delete event
console.log(cursor.tryNext())

Reanuda una transmisión de cambios

Para reanudar una transmisión de cambios, usa las opciones resumeAfter o startAfter. Para determinar dónde reanudar en el registro de cambios desde resumeAfter y startAfter, usa un token de reanudación.

// Create a cursor and add one event to the change stream.
let cursor = db.my_collection.watch();
db.my_collection.insertOne({value: "hello world"});
let event = cursor.tryNext();

// Get the resume token from the event.
let resumeToken = event._id;

// Add a new event to the change stream.
db.my_collection.insertOne({value: "foobar"});

// Create a new cursor by using the resume token as a starting point.
let newCursor = db.my_collection.watch({resumeAfter: resumeToken})

// Log the change event containing the "foobar" value.
console.log(newCursor.tryNext())

Para usar startAfter, haz lo siguiente:

// Start after the resume token.
let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})

Incluye imágenes previas y posteriores en las actualizaciones y las eliminaciones

Si es necesario, puedes incluir imágenes previas y posteriores de documentos en los eventos de cambio de actualización y eliminación. La disponibilidad de imágenes está sujeta a la ventana de recuperación de un momento determinado (PITR), y para leer imágenes de documentos de más de una hora, debes habilitar la PITR.

Las transmisiones de cambios aprovechan la ventana de PITR para proporcionar una vista del documento antes y después del evento de cambio determinado. De forma predeterminada, los eventos de actualización contienen un campo updateDescription, que es el delta de los campos modificados por la operación de actualización.

Para incluir las imágenes previas y posteriores en un evento de cambio, debes especificar las opciones fullDocumentBeforeChange y fullDocument en la consulta de transmisión de cambios.

let cursor = db.my_collection.watch({
  "fullDocument": "required",
  "fullDocumentBeforeChange": "required"
})

Si la consulta intenta leer un documento fuera de la ventana de retención de PITR o si PITR no está habilitado, el valor required muestra un mensaje de error del servidor.

Como alternativa a mostrar un error, puedes usar el valor whenAvailable para mostrar un valor null si las imágenes ya no están disponibles.

let cursor = db.my_collection.watch({
  "fullDocument": "whenAvailable",
  "fullDocumentBeforeChange": "whenAvailable"
})

Incluye la imagen actual en las actualizaciones

De forma predeterminada, los eventos de actualización contienen un campo updateDescription, que es el delta de los campos modificados por la operación de actualización. Para buscar la versión más actual de todo el documento, usa el valor updateLookup en la opción fullDocument.

Esta función no requiere PITR y realiza una búsqueda del documento.

let cursor = db.my_collection.watch({
  "fullDocument": "updateLookup",
})

Lecturas paralelas

Para aumentar la capacidad de procesamiento, puedes usar la opción firestoreWorkerConfig para dividir una consulta de flujos de cambios en varios trabajadores. Cada trabajador es responsable de entregar los cambios para un conjunto distinto de documentos. Debes crear un cursor paralelo a través de una consulta runCommand o aggregate.

Por ejemplo, puedes distribuir una transmisión de cambios en 3 trabajadores de la siguiente manera:

let cursor1 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }}
  }]);

let cursor2 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }}
  }]);

let cursor3 = db.my_collection.aggregate([{
    "$changeStream": {
        "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }}
  }]);

Transmisiones de cambios y copias de seguridad

Ni la configuración de la transmisión de cambios ni los datos de la transmisión de cambios están disponibles en las operaciones de restablecimiento de la copia de seguridad. Si restableces una base de datos con transmisiones de cambios, debes volver a crear esas transmisiones de cambios en la base de datos de destino para abrir cursores en esa base de datos.

Facturación

Diferencias de comportamiento

En la siguiente sección, se describen las diferencias en las transmisiones de cambios entre Firestore con compatibilidad con MongoDB y MongoDB.

updateDescription

updateDescription es un documento en un evento update que describe los campos que se actualizaron o quitaron con la operación de actualización. En Cloud Firestore, las diferencias notables son las siguientes:

  • En updateDescription, los campos truncatedArrays y disambiguatedPaths no se propagan.
  • updateDescription.updatedFields representa una diferencia canónica entre las imágenes previas y posteriores de un documento antes y después de que se aplica una mutación.

Considera el siguiente estado inicial de un documento:

db.my_collection.insertOne({
  _id: 1,
  root: {
    array: [{a: 1}, {b: 2}, {c: 3}]
  }
})

Situación 1: Mutar solo el primer elemento del array

En esta situación, el comportamiento de Cloud Firestore coincide con MongoDB.

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array.0.a": 100}}
)

{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

Situación 2: Reemplazar con un array completo

En esta situación, la operación actualiza solo el primer campo del array, pero reemplaza todo el array.

La diferencia de actualización de Cloud Firestore no distingue entre estas dos situaciones y muestra el mismo updateDescription.updatedFields para ambas:

db.my_collection.updateOne(
  {_id: 1},
  {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}}
)

// In other implementations, updatedFields reflects the mutation itself
{
  updatedFields: {
    "root.array": [{a: 100}, {b: 2}, {c: 3}]
  },
  removedFields: []
}

// Firestore updatedFields is the diff between the before and after versions of the document
{
  updatedFields: {"root.array.0.a": 100},
  removedFields: []
}

¿Qué sigue?