Package bytewax
Bytewax is an open source Python framework for building highly scalable dataflows in a streaming or batch context.
See our readme for more documentation.
Expand source code
"""Bytewax is an open source Python framework for building highly
scalable dataflows in a streaming or batch context.
[See our readme for more
documentation.](https://github.com/bytewax/bytewax)
"""
from .bytewax import cluster_main, Dataflow
from .execution import run, run_cluster, spawn_cluster
__all__ = [
"Dataflow",
"run",
"run_cluster",
"spawn_cluster",
"cluster_main",
]
__pdoc__ = {
# This is the PyO3 module that has to be named "bytewax". Hide it
# since we import all its members here.
"bytewax": False,
# Hide execution because we import all its members here.
"execution": False,
}
Sub-modules
Functions
def cluster_main(flow, input_builder, output_builder, addresses, proc_id, worker_count_per_proc)
-
Execute a dataflow in the current process as part of a cluster.
You have to coordinate starting up all the processes in the cluster and ensuring they each are assigned a unique ID and know the addresses of other processes. You'd commonly use this for starting processes as part of a Kubernetes cluster.
Blocks until execution is complete.
See
run_cluster()
for a convenience method to pass data through a dataflow for notebook development.See
spawn_cluster()
for starting a simple cluster locally on one machine.>>> flow = Dataflow() >>> def input_builder(worker_index, worker_count): ... return enumerate(range(3)) >>> def output_builder(worker_index, worker_count): ... return print >>> cluster_main(flow, input_builder, output_builder)
Args
flow
- Dataflow to run.
input_builder
- Returns input that each worker thread should process.
output_builder
- Returns a callback function for each worker
thread, called with
(epoch, item)
whenever and item passes by a capture operator on this process. addresses
- List of host/port addresses for all processes in this cluster (including this one).
proc_id
- Index of this process in cluster; starts from 0.
worker_count_per_proc
- Number of worker threads to start on each process.
def run(flow: Dataflow, inp: Iterable[Tuple[int, Any]]) ‑> List[Tuple[int, Any]]
-
Pass data through a dataflow running in the current thread.
Blocks until execution is complete.
Handles distributing input and collecting output. You'd commonly use this for tests or prototyping in notebooks.
Input must be finite, otherwise collected output will grow unbounded.
>>> flow = Dataflow() >>> flow.map(str.upper) >>> flow.capture() >>> out = run(flow, [(0, "a"), (1, "b"), (2, "c")]) >>> sorted(out) [(0, 'A'), (1, 'B'), (2, 'C')]
Args
flow
- Dataflow to run.
inp
- Input data.
Returns
List of
(epoch, item)
tuples seen by capture operators.Expand source code
def run(flow: Dataflow, inp: Iterable[Tuple[int, Any]]) -> List[Tuple[int, Any]]: """Pass data through a dataflow running in the current thread. Blocks until execution is complete. Handles distributing input and collecting output. You'd commonly use this for tests or prototyping in notebooks. Input must be finite, otherwise collected output will grow unbounded. >>> flow = Dataflow() >>> flow.map(str.upper) >>> flow.capture() >>> out = run(flow, [(0, "a"), (1, "b"), (2, "c")]) >>> sorted(out) [(0, 'A'), (1, 'B'), (2, 'C')] Args: flow: Dataflow to run. inp: Input data. Returns: List of `(epoch, item)` tuples seen by capture operators. """ def input_builder(worker_index, worker_count): assert worker_index == 0 return inp out = [] def output_builder(worker_index, worker_count): assert worker_index == 0 return out.append _run(flow, input_builder, output_builder) return out
def run_cluster(flow: Dataflow, inp: Iterable[Tuple[int, Any]], proc_count: int = 1, worker_count_per_proc: int = 1) ‑> List[Tuple[int, Any]]
-
Pass data through a dataflow running as a cluster of processes on this machine.
Blocks until execution is complete.
Starts up cluster processes for you, handles connecting them together, distributing input, and collecting output. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs.
Input must be finite because it is reified into a list before distribution to cluster and otherwise collected output will grow unbounded.
See
spawn_cluster()
for starting a cluster on this machine with full control over inputs and outputs.See
cluster_main()
for starting one process in a cluster in a distributed situation.>>> __skip_doctest_on_win_gha() >>> flow = Dataflow() >>> flow.map(str.upper) >>> flow.capture() >>> out = run_cluster(flow, [(0, "a"), (1, "b"), (2, "c")], proc_count=2) >>> sorted(out) [(0, 'A'), (1, 'B'), (2, 'C')]
Args
flow
- Dataflow to run.
inp
- Input data. Will be reifyied to a list before sending to processes. Will be partitioned between workers for you.
proc_count
- Number of processes to start.
worker_count_per_proc
- Number of worker threads to start on each process.
Returns
List of
(epoch, item)
tuples seen by capture operators.Expand source code
def run_cluster( flow: Dataflow, inp: Iterable[Tuple[int, Any]], proc_count: int = 1, worker_count_per_proc: int = 1, ) -> List[Tuple[int, Any]]: """Pass data through a dataflow running as a cluster of processes on this machine. Blocks until execution is complete. Starts up cluster processes for you, handles connecting them together, distributing input, and collecting output. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs. Input must be finite because it is reified into a list before distribution to cluster and otherwise collected output will grow unbounded. See `bytewax.spawn_cluster()` for starting a cluster on this machine with full control over inputs and outputs. See `bytewax.cluster_main()` for starting one process in a cluster in a distributed situation. >>> __skip_doctest_on_win_gha() >>> flow = Dataflow() >>> flow.map(str.upper) >>> flow.capture() >>> out = run_cluster(flow, [(0, "a"), (1, "b"), (2, "c")], proc_count=2) >>> sorted(out) [(0, 'A'), (1, 'B'), (2, 'C')] Args: flow: Dataflow to run. inp: Input data. Will be reifyied to a list before sending to processes. Will be partitioned between workers for you. proc_count: Number of processes to start. worker_count_per_proc: Number of worker threads to start on each process. Returns: List of `(epoch, item)` tuples seen by capture operators. """ man = Manager() inp = man.list(list(inp)) def input_builder(worker_index, worker_count): for i, epoch_item in enumerate(inp): if i % worker_count == worker_index: yield epoch_item out = man.list() def output_builder(worker_index, worker_count): return out.append spawn_cluster( flow, input_builder, output_builder, proc_count, worker_count_per_proc ) return out
def spawn_cluster(flow: Dataflow, input_builder: Callable[[int, int], Iterable[Tuple[int, Any]]], output_builder: Callable[[int, int], Callable[[Tuple[int, Any]], None]], proc_count: int = 1, worker_count_per_proc: int = 1) ‑> List[Tuple[int, Any]]
-
Execute a dataflow as a cluster of processes on this machine.
Blocks until execution is complete.
Starts up cluster processes for you and handles connecting them together. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs.
See
run_cluster()
for a convenience method to pass data through a dataflow for notebook development.See
cluster_main()
for starting one process in a cluster in a distributed situation.>>> __skip_doctest_on_win_gha() >>> __fix_pickling_in_doctest() >>> flow = Dataflow() >>> flow.capture() >>> def input_builder(worker_index, worker_count): ... return enumerate(range(3)) >>> def output_builder(worker_index, worker_count): ... return print >>> spawn_cluster(flow, input_builder, output_builder, proc_count=2)
Args
flow
- Dataflow to run.
input_builder
- Returns input that each worker thread should process.
output_builder
- Returns a callback function for each worker
thread, called with
(epoch, item)
whenever and item passes by a capture operator on this process. proc_count
- Number of processes to start.
worker_count_per_proc
- Number of worker threads to start on each process.
Expand source code
def spawn_cluster( flow: Dataflow, input_builder: Callable[[int, int], Iterable[Tuple[int, Any]]], output_builder: Callable[[int, int], Callable[[Tuple[int, Any]], None]], proc_count: int = 1, worker_count_per_proc: int = 1, ) -> List[Tuple[int, Any]]: """Execute a dataflow as a cluster of processes on this machine. Blocks until execution is complete. Starts up cluster processes for you and handles connecting them together. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs. See `bytewax.run_cluster()` for a convenience method to pass data through a dataflow for notebook development. See `bytewax.cluster_main()` for starting one process in a cluster in a distributed situation. >>> __skip_doctest_on_win_gha() >>> __fix_pickling_in_doctest() >>> flow = Dataflow() >>> flow.capture() >>> def input_builder(worker_index, worker_count): ... return enumerate(range(3)) >>> def output_builder(worker_index, worker_count): ... return print >>> spawn_cluster(flow, input_builder, output_builder, proc_count=2) Args: flow: Dataflow to run. input_builder: Returns input that each worker thread should process. output_builder: Returns a callback function for each worker thread, called with `(epoch, item)` whenever and item passes by a capture operator on this process. proc_count: Number of processes to start. worker_count_per_proc: Number of worker threads to start on each process. """ addresses = _gen_addresses(proc_count) with Pool(processes=proc_count) as pool: futures = [ pool.apply_async( cluster_main, ( flow, input_builder, output_builder, addresses, proc_id, worker_count_per_proc, ), ) for proc_id in range(proc_count) ] pool.close() for future in futures: # Will re-raise exceptions from subprocesses. future.get() pool.join()
Classes
class Dataflow
-
A definition of a Bytewax dataflow graph.
Use the methods defined on this class to add steps with operators of the same name.
See the execution functions in the
bytewax
to run.TODO: Right now this is just a linear dataflow only.
Methods
def capture(self)
-
Capture causes all
(epoch, item)
tuples that pass by this point in the Dataflow to be passed to the Dataflow's output handler.Every dataflow must contain at least one capture.
If you use this operator multiple times, the results will be combined.
There are no guarantees on the order that output is passed to the handler. Read the attached epoch to discern order.
def filter(self, predicate)
-
Filter selectively keeps only some items.
It calls a function
predicate(item: Any) => should_emit: bool
on each item.It emits the item downstream unmodified if the predicate returns
True
. def flat_map(self, mapper)
-
Flat Map is a one-to-many transformation of items.
It calls a function
mapper(item: Any) => emit: Iterable[Any]
on each item.It emits each element in the downstream iterator individually.
def inspect(self, inspector)
-
Inspect allows you to observe, but not modify, items.
It calls a function
inspector(item: Any) => None
on each item.The return value is ignored; it emits items downstream unmodified.
def inspect_epoch(self, inspector)
-
Inspect Epoch allows you to observe, but not modify, items and their epochs.
It calls a function
inspector(epoch: int, item: Any) => None
on each item with its epoch.The return value is ignored; it emits items downstream unmodified.
def map(self, mapper)
-
Map is a one-to-one transformation of items.
It calls a function
mapper(item: Any) => updated_item: Any
on each item.It emits each updated item downstream.
def reduce(self, reducer, is_complete)
-
Reduce lets you combine items for a key into an aggregator in epoch order.
Since this is a stateful operator, it requires the the input stream has items that are
(key, value)
tuples so we can ensure that all relevant values are routed to the relevant aggregator.It calls two functions:
-
A
reducer(aggregator: Any, value: Any) => updated_aggregator: Any
which combines two values. The aggregator is initially the first value seen for a key. Values will be passed in epoch order, but no order is defined within an epoch. -
An
is_complete(updated_aggregator: Any) => should_emit: bool
which returns true if the most recent(key, aggregator)
should be emitted downstream and the aggregator for that key forgotten. If there was only a single value for a key, it is passed in as the aggregator here.
It emits
(key, aggregator)
tuples downstream when you tell it to. -
def reduce_epoch(self, reducer)
-
Reduce Epoch lets you combine all items for a key within an epoch into an aggregator.
This is like
reduce
but marks the aggregator as complete automatically at the end of each epoch.Since this is a stateful operator, it requires the the input stream has items that are
(key, value)
tuples so we can ensure that all relevant values are routed to the relevant aggregator.It calls a function
reducer(aggregator: Any, value: Any) => updated_aggregator: Any
which combines two values. The aggregator is initially the first value seen for a key. Values will be passed in arbitrary order.It emits
(key, aggregator)
tuples downstream at the end of each epoch. def reduce_epoch_local(self, reducer)
-
Reduce Epoch Local lets you combine all items for a key within an epoch on a single worker.
It is exactly like
reduce_epoch
but does no internal exchange between workers. You'll probably should use that instead unless you are using this as a network-overhead optimization. def stateful_map(self, builder, mapper)
-
Stateful Map is a one-to-one transformation of values in
(key, value)
pairs, but allows you to reference a persistent state for each key when doing the transformation.Since this is a stateful operator, it requires the the input stream has items that are
(key, value)
tuples so we can ensure that all relevant values are routed to the relevant state.It calls two functions:
-
A
builder(key: Any) => new_state: Any
which returns a new state and will be called whenever a new key is encountered with the key as a parameter. -
A
mapper(state: Any, value: Any) => (updated_state: Any, updated_value: Any)
which transforms values. Values will be passed in epoch order, but no order is defined within an epoch. If the updated state isNone
, the state will be forgotten.
It emits a
(key, updated_value)
tuple downstream for each input item. -