Replication primitives (beta)
With the replication primitives plugin, you can build a realtime replication based on any transport layer like REST, WebRTC or websockets.
Trade offs
This plugin is made to do a many-to-one replication like you would do when you replicate many clients with one backend server. It is not possible to replicate things in a star schema like it can be done with the couchdb replication.
This plugin is made for fast and reliable replication, it has less overhead then the couchdb replication for example.
It is assumes that the remote instance is the single source of truth that also resolves conflicts.
The replication of attachments or local documents is not supported at the moment.
Data Layout
To use the replication primitives you first have to ensure that your data is sortable by update time.
For example if your documents look like this:
{
"id": "foobar",
"name": "Alice",
"lastName": "Wilson",
"updatedAt": 1564483474
}
Then your data is always sortable by updatedAt
. This ensures that when RxDB fetches 'new' changes, it can send the latest updatedAt
to the remote endpoint and then recieve all newer documents.
The replication cycle
The replication works in cycles. A cycle is triggered when:
- Automatically on local writes
- When
liveInterval
is reached from the last cycle run. - The
run()
method is called manually.
A cycle performs these steps in the same order:
- Get a batch of local unreplicated changes and call the
push handler
with it to send it to the remote. - Repeat step
1
until there are no more local unreplicated changes. - Get the
latestPullDocument
from the local database. - Call the
pull handler
withlatestPullDocument
to fetch a batch from remote unreplicated changes. - Update
latestPullDocument
with the newest latest document from the remote. - Repeat step
3+4+5
until the pull handler returnshasMoreDocuments: false
.
replicateRxCollection()
You can start the replication of a single RxCollection
by calling replicateRxCollection()
like in the following:
const replicationState = await replicateRxCollection({
collection: myRxCollection,
replicationIdentifier: 'my-custom-rest-replication',
/**
* By default it will do a one-time replication.
* By settings live: true the replication will continuously
* replicate all changes.
* (optional), default is false.
*/
live: true,
/**
* Interval in milliseconds on when to run the next replication cycle.
* Set this to 0 when you have a back-channel from your remote
* that that tells the client when to fetch remote changes.
* (optional), only needed when live=true, default is 10 seconds.
*/
liveInterval: 10000,
/**
* Time in milliseconds after when a failed replication cycle
* has to be retried.
* (optional), default is 5 seconds.
*/
retryTime: number,
/**
* Optional,
* only needed when you want to replicate remote changes to the local state.
*/
pull: {
/**
* Pull handler
*/
async handler(latestPullDocument) {
const limitPerPull = 10;
const minTimestamp = latestPullDocument ? latestPullDocument.updatedAt : 0;
/**
* In this example we replicate with a remote REST server
*/
const documentsFromRemote = fetch(
`https://example.com/api/sync/?minUpdatedAt=${minTimestamp}&limit=${limitPerPull}`
).json();
return {
/**
* Contains the pulled documents from the remote.
*/
documents: docsData,
/**
* Must be true if there might be more newer changes on the remote.
*/
hasMoreDocuments: documentsFromRemote.length !== limitPerPull
};
}
},
/**
* Optional,
* only needed when you want to replicate local changes to the remote instance.
*/
push: {
/**
* Push handler
*/
async handler(docs) {
/**
* Push the local documents to a remote REST server.
*/
await postData('https://example.com/api/sync/push', { docs });
},
/**
* Batch size, optional
* Defines how many documents will be given to the push handler at once.
*/
batchSize: 5
}
});
Back channel
The replication has to somehow know when a change happens in the remote instance and when to fetch new documents from the remote.
For the pull-replication, RxDB will run the pull-function every time liveInterval
is reached.
This means that when a change happens on the server, RxDB will, in the worst case, take liveInterval
milliseconds until the changes is replicated to the client.
To improve this, it is recommended to setup a back channel where the remote instance can tell the local database when something has changed and a replication cycle must be run.
For REST for example you might want to use a WebSocket.
const exampleSocket = new WebSocket('wss://example.com/socketserver', ['protocolOne', 'protocolTwo']);
exampleSocket.onmessage = () => {
/**
* Trigger a replication cycle
* when the websocket recieves a message.
*/
replicationState.run();
}
RxReplicationState
The function replicateRxCollection()
returns a RxReplicationState
that can be used to manage and observe the replication.
Observable
To observe the replication, the RxReplicationState
has some Observable
properties:
// emits each document that was recieved from the remote
myRxReplicationState.received$.subscribe(doc => console.dir(doc));
// emits each document that was send to the remote
myRxReplicationState.send$.subscribe(doc => console.dir(doc));
// emits all errors that happen when running the push- & pull-handlers.
myRxReplicationState.error$.subscribe(error => console.dir(error));
// emits true when the replication was canceled, false when not.
myRxReplicationState.canceled$.subscribe(bool => console.dir(bool));
// emits true when a replication cycle is running, false when not.
myRxReplicationState.active$.subscribe(bool => console.dir(bool));
awaitInitialReplication()
With awaitInitialReplication()
you can await the initial replication that is done when a full replication cycle was finished for the first time.
await myRxReplicationState.awaitInitialReplication();
cancel()
Cancels the replication.
myRxReplicationState.cancel()
run()
Runs a new replication cycle. The replication plugin will always make sure that at any point in time, only one cycle is running.
await myRxReplicationState.run();
If you are new to RxDB, you should continue here