Функция Change Streams для Firestore с поддержкой MongoDB позволяет приложениям получать доступ к изменениям в реальном времени (вставкам, обновлениям и удалениям), вносимым в коллекцию или во всю базу данных. В потоке изменений обновления упорядочиваются по времени модификации.
Доступ к Change Streams осуществляется через API, совместимые с MongoDB, и традиционные драйверы MongoDB. Реализация Change Streams в Firestore с поддержкой MongoDB может обрабатывать любую пропускную способность операций записи и чтения благодаря уникальной реализации автоматического разделения данных на параллельные операции записи и чтения. Это позволяет создавать высокопроизводительные рабочие нагрузки. Также можно улучшить инфраструктуру миграции и синхронизации данных между Cloud Firestore и другими решениями для хранения данных.
Помимо совместимости с драйверами MongoDB, вы можете использовать Cloud Firestore для параллельного чтения потоков изменений. Это позволяет создавать параллельные высокопроизводительные задачи чтения. Каждый поток представляет собой хорошо распределенную часть результатов.
Change Streams поддерживают следующие функции:
- Настраиваемые потоки изменений с областью действия в базе данных или коллекции.
- Срок хранения потока изменений, указанный при создании. Срок хранения по умолчанию составляет 7 дней, минимальный срок — 1 день. Срок хранения должен быть кратным 1 дню, максимальный — 7 дней. Срок хранения нельзя изменить после создания. Для изменения срока хранения необходимо удалить и создать поток изменений заново.
- События
delete,insert,updateиdrop, которые можно отслеживать с помощьюdb.collection.watch()иdb.watch(). -
updateDescription.updatedFieldsсодержит информацию о различиях в обновлениях. - Все параметры
fullDocumentиfullDocumentBeforeChange.- Просматриваю полный документ на предмет обновлений.
- Исходное изображение документа до его замены, обновления или удаления.
- Фотография документа после его замены или обновления.
- Для изображений, полученных до и после процедуры и устаревших более чем на час, необходимо включить функцию восстановления на определенный момент времени (PITR).
- Все параметры резюме, включая
resumeAfterиstartAfter. - При использовании
watch()для отслеживания изменений можно объединять этапы агрегации, такие как$addFields,$match,$project,$replaceRoot,$replaceWith,$setи$unset.
Настройка потоков изменений
Для создания, удаления или просмотра существующих потоков изменений для базы данных используйте консоль Google Cloud.
Роли и права доступа
Для создания, удаления и отображения потоков изменений субъекту требуются разрешения управления идентификацией и доступом (IAM) datastore.schemas.create , datastore.schemas.delete и datastore.schemas.list соответственно.
Например, роль администратора индекса хранилища данных ( roles/datastore.indexAdmin ) предоставляет эти разрешения.
Создать поток изменений
Прежде чем открыть соответствующий курсор потока изменений, необходимо создать поток изменений. Автоматическое включение потока изменений при создании коллекции или базы данных не поддерживается.
Для создания потока изменений используйте консоль Google Cloud.
В консоли Google Cloud перейдите на страницу «Базы данных» .
- Из списка выберите базу данных Firestore, совместимую с MongoDB. Откроется панель Firestore Studio .
- В панели «Проводник» найдите узел «Изменение потоков» , щелкните «Дополнительные действия» , а затем выберите «Создать поток изменений» .
- Введите уникальное имя потока изменений, область действия и период хранения, а затем нажмите «Сохранить».
Просмотреть потоки изменений
Подробную информацию о потоках изменений можно посмотреть в консоли Google Cloud.
В консоли Google Cloud перейдите на страницу «Базы данных» .
- Из списка выберите базу данных Firestore, совместимую с MongoDB. Откроется панель Firestore Studio .
- В панели Проводника найдите узел «Изменить потоки» .
- Чтобы открыть или закрыть узел, нажмите на Переключить узел» .
Удалить поток изменений
Для удаления потока изменений используйте консоль Google Cloud.
В консоли Google Cloud перейдите на страницу «Базы данных» .
- Из списка выберите базу данных Firestore, совместимую с MongoDB. Откроется панель Firestore Studio .
- В панели Проводника найдите узел «Изменить потоки» .
- Чтобы открыть или закрыть узел, нажмите на Переключить узел» .
- В обозревателе найдите поток изменений, который хотите удалить.
- Нажмите « Дополнительные действия» , а затем выберите «Удалить поток изменений» .
- В диалоговом окне введите имя потока изменений для подтверждения удаления, а затем нажмите кнопку «Удалить» .
Открыть или возобновить курсор потока изменений
Следующие примеры демонстрируют, как создать, возобновить и настроить курсор потока изменений.
Перед созданием курсора потока изменений необходимо явно создать поток изменений для базы данных или коллекции.
Создайте курсор потока изменений.
Для создания нового курсора потока изменений используйте метод watch в драйверах MongoDB. Чтобы отслеживать все изменения в базе данных, создайте поток изменений с областью видимости базы данных и вызовите метод watch для объекта db .
let cursor = db.watch()
Чтобы создать курсор, ограниченный областью действия коллекции, необходимо сначала создать поток изменений для этой коллекции. Затем вызовите метод watch для соответствующей коллекции.
let cursor = db.my_collection.watch()
Теперь, когда вы создали курсор потока изменений, вы можете начать потоковую передачу. Например, если вы вставите документ и вызовете tryNext для курсора, вы увидите, как изменение отобразится в потоке изменений.
let doc = db.my_collection.insertOne({value: "hello world"}) console.log(cursor.tryNext())
Если вы обновите и удалите документ, то увидите, что эти изменения отобразятся в ленте изменений:
db.my_collection.updateOne({"_id": doc.insertedId}, {$set: {value: "hello world!"}}) db.my_collection.deleteOne({"_id": doc.insertedId}}) // Prints the update event console.log(cursor.tryNext()) // Prints the delete event console.log(cursor.tryNext())
Возобновить поток изменений
Для возобновления потока изменений используйте параметры resumeAfter или startAfter . Чтобы определить, с какого места в журнале изменений следует возобновить выполнение с помощью параметров resumeAfter и startAfter , используйте токен возобновления.
// Create a cursor and add one event to the change stream. let cursor = db.my_collection.watch(); db.my_collection.insertOne({value: "hello world"}); let event = cursor.tryNext(); // Get the resume token from the event. let resumeToken = event._id; // Add a new event to the change stream. db.my_collection.insertOne({value: "foobar"}); // Create a new cursor by using the resume token as a starting point. let newCursor = db.my_collection.watch({resumeAfter: resumeToken}) // Log the change event containing the "foobar" value. console.log(newCursor.tryNext())
Для использования startAfter :
// Start after the resume token. let startAfterCursor = db.my_collection.watch({startAfter: resumeToken})
Включайте изображения до и после в обновления и удаляйте.
При необходимости вы можете включать изображения документов до и после обновления и удаления в события изменения. Доступность изображений зависит от временного окна восстановления (PITR) , и для чтения изображений документов старше одного часа необходимо включить PITR.
Потоки изменений используют окно PITR для отображения документа до и после заданного события изменения. По умолчанию события обновления содержат поле updateDescription , которое представляет собой разницу между полями, измененными в результате операции обновления.
Чтобы включить изображения до и после изменения в событие изменения, необходимо указать параметры fullDocumentBeforeChange и fullDocument в запросе потока изменений.
let cursor = db.my_collection.watch({ "fullDocument": "required", "fullDocumentBeforeChange": "required" })
Если запрос пытается прочитать документ вне окна хранения PITR или если PITR не включен, то при попытке доступа к required значению на стороне сервера возникает ошибка.
В качестве альтернативы генерации ошибки вы можете использовать значение whenAvailable для возврата значения null , если изображения больше недоступны.
let cursor = db.my_collection.watch({ "fullDocument": "whenAvailable", "fullDocumentBeforeChange": "whenAvailable" })
Включайте текущее изображение в обновления.
По умолчанию события обновления содержат поле updateDescription , которое представляет собой разницу между полями, измененными в результате операции обновления. Чтобы вместо этого найти самую актуальную версию всего документа, используйте значение updateLookup в параметре fullDocument .
Эта функция не требует PITR и выполняет поиск документа.
let cursor = db.my_collection.watch({ "fullDocument": "updateLookup", })
Параллельное чтение
Для повышения пропускной способности можно использовать параметр firestoreWorkerConfig , чтобы разделить запрос потока изменений между несколькими рабочими процессами. Каждый рабочий процесс отвечает за обработку изменений для отдельного набора документов. Необходимо создать параллельный курсор с помощью runCommand или aggregate запроса.
Например, вы можете распределить поток изменений между тремя рабочими процессами следующим образом:
let cursor1 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 0 }} }]); let cursor2 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 1 }} }]); let cursor3 = db.my_collection.aggregate([{ "$changeStream": { "firestoreWorkerConfig": {numWorkers: 3, workerId: 2 }} }]);
Изменение потоков и резервных копий
Ни конфигурация потока изменений, ни данные потока изменений недоступны при восстановлении резервных копий. При восстановлении базы данных с потоками изменений необходимо заново создать эти потоки изменений в целевой базе данных, чтобы открыть курсоры для этой базы данных.
Выставление счетов
- Смена потоков влечет за собой затраты на чтение и хранение данных. См. цены на смену потоков .
- Чтобы включить в анализ изображения, созданные до и после запроса на чтение, если им более 1 часа на момент запроса , необходимо включить функцию PITR, что влечет за собой соответствующие расходы .
Различия в поведении
В следующем разделе описываются различия в потоках изменений между Firestore с совместимостью с MongoDB и MongoDB.
updateDescription
updateDescription — это документ в событии update , описывающий поля, которые были обновлены или удалены в результате операции обновления. В Cloud Firestore существенные отличия заключаются в следующем:
- В
updateDescriptionполяtruncatedArraysиdisambiguatedPathsне заполнены. -
updateDescription.updatedFieldsпредставляют собой каноническую разницу между исходным и исходным изображениями документа до и после применения изменения.
Рассмотрим следующее исходное состояние документа:
db.my_collection.insertOne({ _id: 1, root: { array: [{a: 1}, {b: 2}, {c: 3}] } })
Сценарий 1: изменить только первый элемент массива.
В этом сценарии поведение Cloud Firestore соответствует поведению MongoDB.
db.my_collection.updateOne( {_id: 1}, {'$set': {"root.array.0.a": 100}} ) { updatedFields: {"root.array.0.a": 100}, removedFields: [] }
Сценарий 2: перезапись целым массивом.
В этом сценарии операция обновляет только первое поле массива, но перезаписывает весь массив.
В сравнении обновлений Cloud Firestore эти два сценария не различаются, и для обоих возвращается одно и то же значение updateDescription.updatedFields :
db.my_collection.updateOne( {_id: 1}, {'$set': {"root.array": [{a: 100}, {b: 2}, {c: 3}]}} ) // In other implementations, updatedFields reflects the mutation itself { updatedFields: { "root.array": [{a: 100}, {b: 2}, {c: 3}] }, removedFields: [] } // Firestore updatedFields is the diff between the before and after versions of the document { updatedFields: {"root.array.0.a": 100}, removedFields: [] }