Module bytewax.recovery
Bytewax's state recovery machinery.
Bytewax allows you to recover a stateful dataflow; it will let you resume processing and output due to a failure without re-processing all initial data to re-calculate all internal state.
It does this by storing state and progress information for a single
dataflow instance in a recovery store backed by a durable state
storage system of your choosing, e.g. Sqlite or Kafka. See the
subclasses of RecoveryConfig
in this module for the supported
datastores, the specifics of how each is utilized, and tradeoffs.
Preparation
-
Create a recovery config for describing how to connect to the recovery store of your choosing.
-
Pass that recovery config as the
recovery_config
argument to the entry point running your dataflow (e.g.cluster_main()
). -
Run your dataflow! It will start backing up recovery data automatically.
Caveats
Recovery data for multiple dataflows must not be mixed together. See
the docs for each RecoveryConfig
subclass for what this means
depending on the recovery store. E.g. when using a
KafkaRecoveryConfig
, each dataflow must have a distinct topic
prefix.
The epoch is the unit of recovery: dataflows will only resume on epoch boundaries.
See comments on input configuration types in bytewax.inputs
for any
limitations each might have regarding recovery.
It is possible that your output systems will see duplicate data during the resume epoch; design your systems to support at-least-once processing.
Currently it is not possible to recover a dataflow with a different number of workers than when it failed.
On Failure
If the dataflow fails, first you must fix whatever underlying fault caused the issue. That might mean deploying new code which fixes a bug or resolving an issue with a connected system.
Once that is done, re-run the dataflow using the same recovery config and thus re-connect to the same recovery store. Bytewax will automatically read the progress of the previous dataflow execution and determine the most recent epoch that processing can resume at, called the resume epoch. Output should resume from the beginning of the resume epoch.
If you want to fully restart a dataflow and ignore previous state, delete the data in the recovery store using whatever operational tools you have for that storage type.
Expand source code
"""Bytewax's state recovery machinery.
Bytewax allows you to **recover** a stateful dataflow; it will let you
resume processing and output due to a failure without re-processing
all initial data to re-calculate all internal state.
It does this by storing state and progress information for a single
dataflow instance in a **recovery store** backed by a durable state
storage system of your choosing, e.g. Sqlite or Kafka. See the
subclasses of `RecoveryConfig` in this module for the supported
datastores, the specifics of how each is utilized, and tradeoffs.
Preparation
-----------
1. Create a **recovery config** for describing how to connect to the
recovery store of your choosing.
2. Pass that recovery config as the `recovery_config` argument to the
entry point running your dataflow (e.g. `bytewax.cluster_main()`).
3. Run your dataflow! It will start backing up recovery data
automatically.
Caveats
-------
Recovery data for multiple dataflows _must not_ be mixed together. See
the docs for each `RecoveryConfig` subclass for what this means
depending on the recovery store. E.g. when using a
`KafkaRecoveryConfig`, each dataflow must have a distinct topic
prefix.
The epoch is the unit of recovery: dataflows will only resume on epoch
boundaries.
See comments on input configuration types in `bytewax.inputs` for any
limitations each might have regarding recovery.
It is possible that your output systems will see duplicate data during
the resume epoch; design your systems to support at-least-once
processing.
Currently it is not possible to recover a dataflow with a different
number of workers than when it failed.
On Failure
----------
If the dataflow fails, first you must fix whatever underlying fault
caused the issue. That might mean deploying new code which fixes a bug
or resolving an issue with a connected system.
Once that is done, re-run the dataflow using the _same recovery
config_ and thus re-connect to the _same recovery store_. Bytewax will
automatically read the progress of the previous dataflow execution and
determine the most recent epoch that processing can resume at, called
the **resume epoch**. Output should resume from the beginning of the
resume epoch.
If you want to fully restart a dataflow and ignore previous state,
delete the data in the recovery store using whatever operational tools
you have for that storage type.
"""
from .bytewax import KafkaRecoveryConfig, RecoveryConfig, SqliteRecoveryConfig # noqa
Classes
class KafkaRecoveryConfig (hosts, topic_prefix, create)
-
Use Kafka to store recovery data.
Uses a "progress" topic and a "state" topic with a number of partitions equal to the number of workers. Will take advantage of log compaction so that topic size is proportional to state size, not epoch count.
Use a distinct topic prefix per dataflow so recovery data is not mixed.
>>> flow = Dataflow() >>> flow.capture() >>> def input_builder(worker_index, worker_count, resume_epoch): ... for epoch, item in enumerate(range(resume_epoch, 3)): ... yield AdvanceTo(epoch) ... yield Emit(item) >>> def output_builder(worker_index, worker_count): ... return print >>> recovery_config = KafkaRecoveryConfig( ... ["localhost:9092"], ... "sample-dataflow", ... ) >>> run_main( ... flow, ... ManualConfig(input_builder), ... output_builder, ... recovery_config=recovery_config, ... ) # doctest: +ELLIPSIS (...)
If there's no previous recovery data, topics will automatically be created with the correct number of partitions and log compaction enabled
Args
hosts
- List of
host:port
strings of Kafka brokers. topic_prefix
- Prefix used for naming topics. Must be distinct
per-dataflow. Two topics will be created using this prefix
"topic_prefix-progress"
and"topic_prefix-state"
.
Returns
Config object. Pass this as the
recovery_config
argument to your execution entry point.Ancestors
Instance variables
var hosts
-
Return an attribute of instance, which is of type owner.
var topic_prefix
-
Return an attribute of instance, which is of type owner.
class RecoveryConfig
-
Base class for a recovery config.
This describes how each worker in a dataflow cluster should store its recovery data.
Use a specific subclass of this that matches the kind of storage system you are going to use. See the subclasses in this module.
Subclasses
class SqliteRecoveryConfig (db_dir)
-
Use SQLite to store recovery data.
Creates a SQLite DB per-worker in a given directory. Multiple DBs are used to allow workers to write without contention.
Use a distinct directory per dataflow so recovery data is not mixed.
>>> flow = Dataflow() >>> flow.capture() >>> def input_builder(worker_index, worker_count, resume_epoch): ... for epoch, item in enumerate(range(resume_epoch, 3)): ... yield AdvanceTo(epoch) ... yield Emit(item) >>> def output_builder(worker_index, worker_count): ... return print >>> tmp_dir = TemporaryDirectory() # We'll store this somewhere temporary for this test. >>> recovery_config = SqliteRecoveryConfig(tmp_dir) >>> run_main( ... flow, ... ManualConfig(input_builder), ... output_builder, ... recovery_config=recovery_config, ... ) # doctest: +ELLIPSIS (...)
DB files and tables will automatically be created if there's no previous recovery data.
Args
db_dir
- Existing directory to store per-worker DBs in. Must be
distinct per-dataflow. DB files will have names like
"worker0.sqlite3"
. You can use"."
for the current directory.
Returns
Config object. Pass this as the
recovery_config
argument to your execution entry point.Ancestors
Instance variables
var db_dir
-
Return an attribute of instance, which is of type owner.