Module bytewax.execution
How to execute your dataflows.
Run an instantiated Dataflow
using one of the entry
point functions in this module.
Epoch Configs
Epochs define the granularity of recovery in a bytewax dataflow. By default, we snapshot recovery every 10 seconds. You should only need to set this if you are testing the recovery system or are doing deep exactly-once integration work. Changing this does not change the semantics of any of the operators.
Expand source code
"""How to execute your dataflows.
Run an instantiated `bytewax.dataflow.Dataflow` using one of the entry
point functions in this module.
Epoch Configs
-------------
Epochs define the granularity of recovery in a bytewax dataflow. By default, we
snapshot recovery every 10 seconds. You should only need to set this if you are
testing the recovery system or are doing deep exactly-once integration work. Changing
this does not change the semantics of any of the operators.
"""
from typing import Any, Iterable, List, Optional, Tuple
from multiprocess import get_context
from bytewax.dataflow import Dataflow
from bytewax.recovery import RecoveryConfig
from .bytewax import ( # noqa: F401
cluster_main,
EpochConfig,
PeriodicEpochConfig,
run_main,
TestingEpochConfig,
)
def _gen_addresses(proc_count: int) -> Iterable[str]:
return [f"localhost:{proc_id + 2101}" for proc_id in range(proc_count)]
def spawn_cluster(
flow: Dataflow,
*,
epoch_config: Optional[EpochConfig] = None,
recovery_config: Optional[RecoveryConfig] = None,
proc_count: int = 1,
worker_count_per_proc: int = 1,
mp_ctx=get_context("spawn"),
) -> List[Tuple[int, Any]]:
"""Execute a dataflow as a cluster of processes on this machine.
Blocks until execution is complete.
Starts up cluster processes for you and handles connecting them
together. You'd commonly use this for notebook analysis that needs
parallelism and higher throughput, or simple stand-alone demo
programs.
>>> from bytewax.testing import doctest_ctx
>>> from bytewax.dataflow import Dataflow
>>> from bytewax.inputs import TestingInputConfig
>>> from bytewax.outputs import StdOutputConfig
>>> flow = Dataflow()
>>> flow.input("inp", TestingInputConfig(range(3)))
>>> flow.capture(StdOutputConfig())
>>> spawn_cluster(
... flow,
... proc_count=2,
... mp_ctx=doctest_ctx, # Outside a doctest, you'd skip this.
... ) # doctest: +ELLIPSIS
(...)
See `bytewax.run_main()` for a way to test input and output
builders without the complexity of starting a cluster.
See `bytewax.cluster_main()` for starting one process in a cluster
in a distributed situation.
Args:
flow: Dataflow to run.
epoch_config: A custom epoch config. You probably don't need
this. See `EpochConfig` for more info.
recovery_config: State recovery config. See
`bytewax.recovery`. If `None`, state will not be
persisted.
proc_count: Number of processes to start.
worker_count_per_proc: Number of worker threads to start on
each process.
mp_ctx: `multiprocessing` context to use. Use this to
configure starting up subprocesses via spawn or
fork. Defaults to spawn.
"""
addresses = _gen_addresses(proc_count)
with mp_ctx.Pool(processes=proc_count) as pool:
futures = [
pool.apply_async(
cluster_main,
(flow,),
{
"epoch_config": epoch_config,
"recovery_config": recovery_config,
"addresses": addresses,
"proc_id": proc_id,
"worker_count_per_proc": worker_count_per_proc,
},
)
for proc_id in range(proc_count)
]
pool.close()
for future in futures:
# Will re-raise exceptions from subprocesses.
future.get()
pool.join()
Functions
def spawn_cluster(flow: Dataflow, *, epoch_config: Optional[EpochConfig] = None, recovery_config: Optional[RecoveryConfig] = None, proc_count: int = 1, worker_count_per_proc: int = 1, mp_ctx=<multiprocess.context.SpawnContext object>) ‑> List[Tuple[int, Any]]
-
Execute a dataflow as a cluster of processes on this machine.
Blocks until execution is complete.
Starts up cluster processes for you and handles connecting them together. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs.
>>> from bytewax.testing import doctest_ctx >>> from bytewax.dataflow import Dataflow >>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> flow.capture(StdOutputConfig()) >>> spawn_cluster( ... flow, ... proc_count=2, ... mp_ctx=doctest_ctx, # Outside a doctest, you'd skip this. ... ) # doctest: +ELLIPSIS (...)
See
bytewax.run_main()
for a way to test input and output builders without the complexity of starting a cluster.See
bytewax.cluster_main()
for starting one process in a cluster in a distributed situation.Args
flow
- Dataflow to run.
epoch_config
- A custom epoch config. You probably don't need
this. See
EpochConfig
for more info. recovery_config
- State recovery config. See
bytewax.recovery
. IfNone
, state will not be persisted. proc_count
- Number of processes to start.
worker_count_per_proc
- Number of worker threads to start on each process.
mp_ctx
multiprocessing
context to use. Use this to configure starting up subprocesses via spawn or fork. Defaults to spawn.
Expand source code
def spawn_cluster( flow: Dataflow, *, epoch_config: Optional[EpochConfig] = None, recovery_config: Optional[RecoveryConfig] = None, proc_count: int = 1, worker_count_per_proc: int = 1, mp_ctx=get_context("spawn"), ) -> List[Tuple[int, Any]]: """Execute a dataflow as a cluster of processes on this machine. Blocks until execution is complete. Starts up cluster processes for you and handles connecting them together. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs. >>> from bytewax.testing import doctest_ctx >>> from bytewax.dataflow import Dataflow >>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> flow.capture(StdOutputConfig()) >>> spawn_cluster( ... flow, ... proc_count=2, ... mp_ctx=doctest_ctx, # Outside a doctest, you'd skip this. ... ) # doctest: +ELLIPSIS (...) See `bytewax.run_main()` for a way to test input and output builders without the complexity of starting a cluster. See `bytewax.cluster_main()` for starting one process in a cluster in a distributed situation. Args: flow: Dataflow to run. epoch_config: A custom epoch config. You probably don't need this. See `EpochConfig` for more info. recovery_config: State recovery config. See `bytewax.recovery`. If `None`, state will not be persisted. proc_count: Number of processes to start. worker_count_per_proc: Number of worker threads to start on each process. mp_ctx: `multiprocessing` context to use. Use this to configure starting up subprocesses via spawn or fork. Defaults to spawn. """ addresses = _gen_addresses(proc_count) with mp_ctx.Pool(processes=proc_count) as pool: futures = [ pool.apply_async( cluster_main, (flow,), { "epoch_config": epoch_config, "recovery_config": recovery_config, "addresses": addresses, "proc_id": proc_id, "worker_count_per_proc": worker_count_per_proc, }, ) for proc_id in range(proc_count) ] pool.close() for future in futures: # Will re-raise exceptions from subprocesses. future.get() pool.join()
Classes
class EpochConfig
-
Base class for an epoch config.
These define how epochs are assigned on source input data. You should only need to set this if you are testing the recovery system or are doing deep exactly-once integration work. Changing this does not change the semantics of any of the operators.
Use a specific subclass of this for the epoch definition you need.
Subclasses
- TestingEpochConfig
- bytewax.window.PeriodicEpochConfig
class TestingEpochConfig
-
Use for deterministic epochs in tests. Increment epoch by 1 after each item.
This requires all workers to have exactly the same number of input items! Otherwise the dataflow will hang!
You almost assuredly do not want to use this unless you are writing tests of the recovery system.
Returns
Config object. Pass this as the
epoch_config
parameter of your execution entry point.Ancestors