Module bytewax.inputs
Dataflow input sources.
Bytewax provides pre-packaged output configuration options for common sources you might want to read dataflow input from.
Use
Create an InputConfig
subclass for the source you'd like to read
from. Then pass that config object to the Dataflow
constructor.
Expand source code
"""Dataflow input sources.
Bytewax provides pre-packaged output configuration options for common
sources you might want to read dataflow input from.
Use
---
Create an `InputConfig` subclass for the source you'd like to read
from. Then pass that config object to the `bytewax.dataflow.Dataflow`
constructor.
"""
from typing import Any, Iterable
from .bytewax import InputConfig, KafkaInputConfig, ManualInputConfig # noqa: F401
def TestingInputConfig(it):
"""Produce input from this Python iterable. You only want to use this
for unit testing.
The iterable must be identical on all workers; this will
automatically distribute the items across workers and handle
recovery.
Args:
it: Iterable for input.
Returns:
Config object. Pass this to the `bytewax.dataflow.Dataflow`
constructor.
"""
def gen(worker_index, worker_count, resume_state):
resume_i = resume_state or 0
for i, x in enumerate(distribute(it, worker_index, worker_count)):
# FFWD to the resume item.
if i < resume_i:
continue
# Store the index in this worker's partition as the resume
# state.
yield (i + 1, x)
return ManualInputConfig(gen)
def distribute(elements: Iterable[Any], index: int, count: int) -> Iterable[Any]:
"""Distribute elements equally between a number of buckets and return
the items for the given bucket index.
No two buckets will get the same element.
>>> list(distribute(["blue", "green", "red"], 0, 2))
['blue', 'red']
>>> list(distribute(["blue", "green", "red"], 1, 2))
['green']
Note that if you have more buckets than elements, some buckets
will get nothing.
>>> list(distribute(["blue", "green", "red"], 3, 5))
[]
This is very useful when writing input builders and you want each
of your workers to handle reading a disjoint partition of your
input.
>>> from bytewax import Dataflow, spawn_cluster
>>> from bytewax.inputs import ManualInputConfig
>>> def read_topics(topics):
... for topic in topics:
... for i in enumerate(3):
... yield f"topic:{topic} item:{i}"
>>> def input_builder(i, n):
... all_topics = ["red", "green", "blue"]
... this_workers_topics = distribute(listening_topics, i, n)
... for item in read_topics(this_workers_topics):
... yield Emit(f"worker_index:{worker_index}" + item)
>>> flow = Dataflow()
>>> flow.capture()
>>> spawn_cluster(flow, ManualInputConfig(input_builder), lambda i, n: print)
(0, 'worker_index:1 topic:red item:0')
(0, 'worker_index:1 topic:red item:1')
(0, 'worker_index:1 topic:red item:2')
(0, 'worker_index:1 topic:blue item:0')
(0, 'worker_index:1 topic:blue item:1')
(0, 'worker_index:1 topic:blue item:2')
(0, 'worker_index:2 topic:green item:0')
(0, 'worker_index:2 topic:green item:1')
(0, 'worker_index:2 topic:green item:2')
Args:
elements: To distribute.
index: Index of this bucket / worker starting at 0.
count: Total number of buckets / workers.
Returns:
An iterator of the elements only in this bucket.
"""
assert index < count, f"Highest index should only be {count - 1}; got {index}"
for i, x in enumerate(elements):
if i % count == index:
yield x
Functions
def TestingInputConfig(it)
-
Produce input from this Python iterable. You only want to use this for unit testing.
The iterable must be identical on all workers; this will automatically distribute the items across workers and handle recovery.
Args
it
- Iterable for input.
Returns
Config object. Pass this to the
Dataflow
constructor.Expand source code
def TestingInputConfig(it): """Produce input from this Python iterable. You only want to use this for unit testing. The iterable must be identical on all workers; this will automatically distribute the items across workers and handle recovery. Args: it: Iterable for input. Returns: Config object. Pass this to the `bytewax.dataflow.Dataflow` constructor. """ def gen(worker_index, worker_count, resume_state): resume_i = resume_state or 0 for i, x in enumerate(distribute(it, worker_index, worker_count)): # FFWD to the resume item. if i < resume_i: continue # Store the index in this worker's partition as the resume # state. yield (i + 1, x) return ManualInputConfig(gen)
def distribute(elements: Iterable[Any], index: int, count: int) ‑> Iterable[Any]
-
Distribute elements equally between a number of buckets and return the items for the given bucket index.
No two buckets will get the same element.
>>> list(distribute(["blue", "green", "red"], 0, 2)) ['blue', 'red'] >>> list(distribute(["blue", "green", "red"], 1, 2)) ['green']
Note that if you have more buckets than elements, some buckets will get nothing.
>>> list(distribute(["blue", "green", "red"], 3, 5)) []
This is very useful when writing input builders and you want each of your workers to handle reading a disjoint partition of your input.
>>> from bytewax import Dataflow, spawn_cluster >>> from bytewax.inputs import ManualInputConfig >>> def read_topics(topics): ... for topic in topics: ... for i in enumerate(3): ... yield f"topic:{topic} item:{i}" >>> def input_builder(i, n): ... all_topics = ["red", "green", "blue"] ... this_workers_topics = distribute(listening_topics, i, n) ... for item in read_topics(this_workers_topics): ... yield Emit(f"worker_index:{worker_index}" + item) >>> flow = Dataflow() >>> flow.capture() >>> spawn_cluster(flow, ManualInputConfig(input_builder), lambda i, n: print) (0, 'worker_index:1 topic:red item:0') (0, 'worker_index:1 topic:red item:1') (0, 'worker_index:1 topic:red item:2') (0, 'worker_index:1 topic:blue item:0') (0, 'worker_index:1 topic:blue item:1') (0, 'worker_index:1 topic:blue item:2') (0, 'worker_index:2 topic:green item:0') (0, 'worker_index:2 topic:green item:1') (0, 'worker_index:2 topic:green item:2')
Args
elements
- To distribute.
index
- Index of this bucket / worker starting at 0.
count
- Total number of buckets / workers.
Returns
An iterator of the elements only in this bucket.
Expand source code
def distribute(elements: Iterable[Any], index: int, count: int) -> Iterable[Any]: """Distribute elements equally between a number of buckets and return the items for the given bucket index. No two buckets will get the same element. >>> list(distribute(["blue", "green", "red"], 0, 2)) ['blue', 'red'] >>> list(distribute(["blue", "green", "red"], 1, 2)) ['green'] Note that if you have more buckets than elements, some buckets will get nothing. >>> list(distribute(["blue", "green", "red"], 3, 5)) [] This is very useful when writing input builders and you want each of your workers to handle reading a disjoint partition of your input. >>> from bytewax import Dataflow, spawn_cluster >>> from bytewax.inputs import ManualInputConfig >>> def read_topics(topics): ... for topic in topics: ... for i in enumerate(3): ... yield f"topic:{topic} item:{i}" >>> def input_builder(i, n): ... all_topics = ["red", "green", "blue"] ... this_workers_topics = distribute(listening_topics, i, n) ... for item in read_topics(this_workers_topics): ... yield Emit(f"worker_index:{worker_index}" + item) >>> flow = Dataflow() >>> flow.capture() >>> spawn_cluster(flow, ManualInputConfig(input_builder), lambda i, n: print) (0, 'worker_index:1 topic:red item:0') (0, 'worker_index:1 topic:red item:1') (0, 'worker_index:1 topic:red item:2') (0, 'worker_index:1 topic:blue item:0') (0, 'worker_index:1 topic:blue item:1') (0, 'worker_index:1 topic:blue item:2') (0, 'worker_index:2 topic:green item:0') (0, 'worker_index:2 topic:green item:1') (0, 'worker_index:2 topic:green item:2') Args: elements: To distribute. index: Index of this bucket / worker starting at 0. count: Total number of buckets / workers. Returns: An iterator of the elements only in this bucket. """ assert index < count, f"Highest index should only be {count - 1}; got {index}" for i, x in enumerate(elements): if i % count == index: yield x
Classes
class InputConfig
-
Base class for an input config.
These define how you will input data to your dataflow.
Use a specific subclass of InputConfig for the kind of input source you are plan to use. See the subclasses in this module.
Subclasses
class KafkaInputConfig (brokers, topic, tail, starting_offset)
-
Use Kafka as the input source.
Kafka messages will be passed through the dataflow as two-tuples of
(key_bytes, payload_bytes)
.Args
brokers
:List[str]
- List of
host:port
strings of Kafka brokers. topic
:str
- Topic to which consumer will subscribe.
tail
:bool
- Wait for new data on this topic when the end is initially reached.
starting_offset
:str
- Can be "beginning" or "end". Delegates where to resume if auto_commit is not enabled. Defaults to "beginning".
Returns
Config object. Pass this as the
input_config
argument to theDataflow.input()
.Ancestors
Instance variables
var brokers
-
Return an attribute of instance, which is of type owner.
var starting_offset
-
Return an attribute of instance, which is of type owner.
var tail
-
Return an attribute of instance, which is of type owner.
var topic
-
Return an attribute of instance, which is of type owner.
class ManualInputConfig (input_builder)
-
Use a user-defined function that returns an iterable as the input source.
Because Bytewax's execution is cooperative, the resulting iterators must not block waiting for new data, otherwise pending execution of other steps in the dataflow will be delayed an throughput will be reduced. If you are using a generator and no data is ready yet, have it
yield None
or justyield
to signal this.Args
input_builder
input_builder(worker_index: int, worker_count: int, resume_state: Option[Any]) => Iterator[Tuple[Any, Any]]
Builder function which returns an iterator of 2-tuples of(state, item)
.item
is the input that worker should introduce into the dataflow.state
is a snapshot of any internal state it will take to resume this input from its current position after the current item. Note that e.g. returning the same list from each worker will result in duplicate data in the dataflow.
Returns
Config object. Pass this as the
input_config
argument to theDataflow.input()
.Ancestors
Instance variables
var input_builder
-
Return an attribute of instance, which is of type owner.