Using Change Streams
MongoSwift 0.2.0 added support for change streams, which allow applications to access real-time data changes. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.
Note: Change streams only work with MongoDB replica sets and sharded clusters.
Examples
Open a Change Stream on a MongoCollection<Document>
(MongoDB 3.6+)
let client = try MongoClient()
let inventory = client.db("example").collection("inventory")
let stream = try inventory.watch() // returns a `ChangeStream<ChangeStreamEvent<Document>>`
// perform some operations using `inventory`...
for change in stream {
// process `ChangeStreamEvent<Document>` here
}
// check if any errors occurred while iterating
if let error = stream.error {
// handle error
}
Open a Change Stream on a MongoCollection<MyCodableType>
(MongoDB 3.6+)
let client = try MongoClient()
let inventory = client.db("example").collection("inventory", withType: MyCodableType.self)
let stream = try inventory.watch() // returns a `ChangeStream<ChangeStreamEvent<MyCodableType>>`
// perform some operations using `inventory`...
for change in stream {
// process `ChangeStreamEvent<MyCodableType>` here
}
// check if any errors occurred while iterating
if let error = stream.error {
// handle error
}
Use a Custom Codable
Type for the fullDocument
Property of Returned ChangeStreamEvent
s
let client = try MongoClient()
let inventory = client.db("example").collection("inventory")
let stream = try inventory.watch(withFullDocumentType: MyCodableType.self) // returns a `ChangeStream<ChangeStreamEvent<MyCodableType>>`
// perform some operations using `inventory`...
for change in stream {
// process `ChangeStreamEvent<MyCodableType>` here
}
// check if any errors occurred while iterating
if let error = stream.error {
// handle error
}
Use a Custom Codable
Type for the Return type of ChangeStream.next()
let client = try MongoClient()
let inventory = client.db("example").collection("inventory")
let stream = try inventory.watch(withEventType: MyCodableType.self) // returns a `ChangeStream<MyCodableType>`
// perform some operations using `inventory`...
for change in stream {
// process `MyCodableType` here
}
// check if any errors occurred while iterating
if let error = stream.error {
// handle error
}
Open a Change Stream on a MongoDatabase
(MongoDB 4.0+)
let client = try MongoClient()
let db = client.db("example")
let stream = try db.watch()
// perform some operations using `db`...
for change in stream {
// process `ChangeStreamEvent<Document>` here
}
// check if any errors occurred while iterating
if let error = stream.error {
// handle error
}
Note: the types of the fullDocument
property, as well as the return type of ChangeStream.next()
, may be customized in the same fashion as the examples using MongoCollection
above.
Open a Change Stream on a MongoClient
(MongoDB 4.0+)
let client = try MongoClient()
let stream = try client.watch()
// perform some operations using `client`...
for change in stream {
// process `ChangeStreamEvent<Document>` here
}
// check if any errors occurred while iterating
if let error = stream.error {
// handle error
}
Note: the types of the fullDocument
property, as well as the return type of ChangeStream.next()
, may be customized in the same fashion as the examples using MongoCollection
above.
Resume a Change Stream
let client = try MongoClient()
let inventory = client.db("example").collection("inventory")
let stream = try inventory.watch()
// perform some operations using `inventory`...
// read the first change event
let next = try stream.nextOrError()
// create a new change stream that starts after the first change event
let resumeToken = stream.resumeToken
let resumedStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
for change in resumedStream {
// process `ChangeStreamEvent<Document>` here
}
// check if any errors occurred while iterating
if let error = resumedStream.error {
// handle error
}
Modify Change Stream Output
let client = try MongoClient()
let inventory = client.db("example").collection("inventory")
// Only include events where the changed document's username = "alice"
let pipeline: [Document] = [
["$match": ["fullDocument.username": "alice"] as Document]
]
let stream = try inventory.watch(pipeline)
for change in stream {
// process `ChangeStreamEvent<Document>` here
}
// check if any errors occurred while iterating
if let error = stream.error {
// handle error
}