Module bytewax.inputs
Helpers to let you quickly define epoch / batching semantics.
Use these to wrap an existing iterator which yields items.
Expand source code
"""Helpers to let you quickly define epoch / batching semantics.
Use these to wrap an existing iterator which yields items.
"""
import datetime
import heapq
from dataclasses import dataclass
from typing import Any, Callable, Iterable, Tuple
from .bytewax import AdvanceTo, Emit, KafkaInputConfig, ManualInputConfig, InputConfig
def distribute(elements: Iterable[Any], index: int, count: int) -> Iterable[Any]:
"""Distribute elements equally between a number of buckets and return
the items for the given bucket index.
No two buckets will get the same element.
>>> list(distribute(["blue", "green", "red"], 0, 2))
['blue', 'red']
>>> list(distribute(["blue", "green", "red"], 1, 2))
['green']
Note that if you have more buckets than elements, some buckets
will get nothing.
>>> list(distribute(["blue", "green", "red"], 3, 5))
[]
This is very useful when writing input builders and you want each
of your workers to handle reading a disjoint partition of your
input.
>>> from bytewax import Dataflow, spawn_cluster
>>> from bytewax.inputs import ManualInputConfig
>>> def read_topics(topics):
... for topic in topics:
... for i in enumerate(3):
... yield f"topic:{topic} item:{i}"
>>> def input_builder(i, n):
... all_topics = ["red", "green", "blue"]
... this_workers_topics = distribute(listening_topics, i, n)
... for item in read_topics(this_workers_topics):
... yield Emit(f"worker_index:{worker_index}" + item)
>>> flow = Dataflow()
>>> flow.capture()
>>> spawn_cluster(flow, ManualInputConfig(input_builder), lambda i, n: print)
(0, 'worker_index:1 topic:red item:0')
(0, 'worker_index:1 topic:red item:1')
(0, 'worker_index:1 topic:red item:2')
(0, 'worker_index:1 topic:blue item:0')
(0, 'worker_index:1 topic:blue item:1')
(0, 'worker_index:1 topic:blue item:2')
(0, 'worker_index:2 topic:green item:0')
(0, 'worker_index:2 topic:green item:1')
(0, 'worker_index:2 topic:green item:2')
Args:
elements: To distribute.
index: Index of this bucket / worker starting at 0.
count: Total number of buckets / workers.
Returns:
An iterator of the elements only in this bucket.
"""
assert index < count, f"Highest index should only be {count - 1}; got {index}"
for i, x in enumerate(elements):
if i % count == index:
yield x
def yield_epochs(fn: Callable):
"""A decorator function to unwrap an iterator of [epoch, item]
into successive `AdvanceTo` and `Emit` classes with the
contents of the iterator.
Use this when you have an input_builder function that returns a
generator of (epoch, item) to be used with
`bytewax.cluster_main()` or `bytewax.spawn_cluster()`:
>>> from bytewax import Dataflow, cluster_main
>>> from bytewax.inputs import yield_epochs, fully_ordered, ManualInputConfig
>>> flow = Dataflow()
>>> flow.capture()
>>> @yield_epochs
... def input_builder(i, n, re):
... return fully_ordered(["a", "b", "c"])
>>> cluster_main(flow, ManualInputConfig(input_builder), lambda i, n: print, [], 0, 1)
(0, 'a')
(1, 'b')
(2, 'c')
"""
def inner_fn(worker_index, worker_count, resume_epoch):
gen = fn(worker_index, worker_count, resume_epoch)
for (epoch, item) in gen:
yield AdvanceTo(epoch)
yield Emit(item)
return inner_fn
def single_batch(wrap_iter: Iterable) -> Iterable[Tuple[int, Any]]:
"""All input items are part of the same epoch.
Use this for non-streaming-style batch processing.
>>> from bytewax import Dataflow, run
>>> flow = Dataflow()
>>> flow.capture()
>>> out = run(flow, single_batch(["a", "b", "c"]))
>>> sorted(out)
[(0, 'a'), (0, 'b'), (0, 'c')]
Args:
wrap_iter: Existing input iterable of just items.
Yields:
Tuples of `(epoch, item)`.
"""
for item in wrap_iter:
yield (0, item)
def tumbling_epoch(
wrap_iter: Iterable,
epoch_length: Any,
time_getter: Callable[[Any], Any] = lambda _: datetime.datetime.now(),
epoch_start_time: Any = None,
epoch_start: int = 0,
) -> Iterable[Tuple[int, Any]]:
"""All inputs within a tumbling window are part of the same epoch.
The time of the first item will be used as start of the 0
epoch. Out-of-order items will cause issues as Bytewax requires
inputs to dataflows to be in epoch order. See
`bytewax.inputs.fully_ordered()`.
>>> from bytewax import Dataflow, run
>>> items = [
... {
... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 3),
... "value": "a",
... },
... {
... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 4),
... "value": "b",
... },
... {
... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 8),
... "value": "c",
... },
... ]
>>> flow = Dataflow()
>>> flow.map(lambda item: item["value"])
>>> flow.capture()
>>> out = run(flow, tumbling_epoch(
... items,
... datetime.timedelta(seconds=2),
... lambda item: item["timestamp"],
... ))
>>> sorted(out)
[(0, 'a'), (0, 'b'), (2, 'c')]
By default, uses "ingestion time" and you don't need to specify a
way to access the timestamp in each item.
>>> import pytest; pytest.skip("Figure out sleep in test.")
>>> items = [
... "a", # sleep(4)
... "b", # sleep(1)
... "c",
... ]
>>> list(tumbling_epoch(items, datetime.timedelta(seconds=2)))
[(0, 'a'), (2, 'b'), (2, 'c')]
Args:
wrap_iter: Existing input iterable of just items.
epoch_length: Length of each epoch window.
time_getter: Function that returns a timestamp given an
item. Defaults to current wall time.
epoch_start_time: The timestamp that should correspond to
the start of the 0th epoch. Otherwise defaults to the time
found on the first item.
epoch_start: The integer value to start counting epochs from.
This can be used for continuity during processing.
Yields:
Tuples of `(epoch, item)`.
"""
for item in wrap_iter:
time = time_getter(item)
if epoch_start_time is None:
epoch_start_time = time
epoch = epoch_start
else:
epoch = int((time - epoch_start_time) / epoch_length) + epoch_start
yield (epoch, item)
def fully_ordered(wrap_iter: Iterable) -> Iterable[Tuple[int, Any]]:
"""Each input item increments the epoch.
Be careful using this in high-volume streams with many workers, as
the worker overhead goes up with finely granulated epochs.
>>> from bytewax import Dataflow, run
>>> flow = Dataflow()
>>> flow.capture()
>>> out = run(flow, fully_ordered(["a", "b", "c"]))
>>> sorted(out)
[(0, 'a'), (1, 'b'), (2, 'c')]
Args:
wrap_iter: Existing input iterable of just items.
Yields:
Tuples of `(epoch, item)`.
"""
epoch = 0
for item in wrap_iter:
yield (epoch, item)
epoch += 1
@dataclass
class _HeapItem:
"""Wrapper class which holds pairs of time and item for implementing
`sorted_window()`.
We need some class that has an ordering only based on the time.
"""
time: Any
item: Any
def __lt__(self, other):
"""Compare just by timestamp. Ignore the item."""
return self.time < other.time
def sorted_window(
wrap_iter: Iterable,
window_length: Any,
time_getter: Callable[[Any], Any],
on_drop: Callable[[Any], None] = None,
) -> Iterable[Tuple[int, Any]]:
"""Sort a iterator to be increasing by some timestamp.
To support a possibly infinite iterator, store a limited sorted
buffer of items and only emit things downstream once a certain
window of time has passed, as indicated by the timestamp on new
items.
New input items which are older than those already emitted will be
dropped to maintain sorted output.
The window length needs to be tuned for how "out of order" your
input data is and how much data you're willing to drop: Already
perfectly ordered input data can have a window of "0" and nothing
will be dropped. Completely reversed input data needs a window
that is the difference between the oldest and youngest timestamp
to ensure nothing will be dropped.
>>> from bytewax import Dataflow, run
>>> items = [
... {
... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 4),
... "value": "c",
... },
... {
... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 3),
... "value": "b",
... },
... {
... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 0),
... "value": "a",
... },
... ]
>>> sorted_items = list(
... sorted_window(
... items,
... datetime.timedelta(seconds=2),
... lambda item: item["timestamp"],
... )
... )
>>> sorted_items
[{'timestamp': datetime.datetime(2022, 2, 22, 1, 2, 3), 'value': 'b'},
{'timestamp': datetime.datetime(2022, 2, 22, 1, 2, 4), 'value': 'c'}]
You could imagine using it with `tumbling_epoch()` to ensure you
get in-order, bucketed data into your dataflow.
>>> flow = Dataflow()
>>> flow.map(lambda item: item["value"])
>>> flow.capture()
>>> out = run(flow, tumbling_epoch(
... sorted_items,
... datetime.timedelta(seconds=0.5),
... lambda item: item["timestamp"],
... ))
>>> sorted(out)
[(0, 'b'), (2, 'c')]
Args:
wrap_iter: Existing input iterable.
window_length: Buffering duration. Values will be emitted once
this amount of time has passed.
time_getter: Function to call to produce a timestamp for each
value.
on_drop: Function to call with each dropped item. E.g. log or
increment metrics on drop events to refine your window
length.
Yields:
Values in increasing timestamp order.
"""
sorted_buffer = []
newest_time = None
drop_older_than = None
def is_too_late(time):
return drop_older_than is not None and time <= drop_older_than
def is_newest_item(time):
return newest_time is None or time > newest_time
def emit_all(emit_older_than):
while len(sorted_buffer) > 0 and sorted_buffer[0].time <= emit_older_than:
sort_item = heapq.heappop(sorted_buffer)
yield sort_item.item
for item in wrap_iter:
time = time_getter(item)
if is_too_late(time):
if on_drop:
on_drop(item)
else:
heapq.heappush(sorted_buffer, _HeapItem(time, item))
if is_newest_item(time):
newest_time = time
drop_older_than = time - window_length
yield from emit_all(drop_older_than)
yield from emit_all(newest_time)
Functions
def distribute(elements: Iterable[Any], index: int, count: int) ‑> Iterable[Any]
-
Distribute elements equally between a number of buckets and return the items for the given bucket index.
No two buckets will get the same element.
>>> list(distribute(["blue", "green", "red"], 0, 2)) ['blue', 'red'] >>> list(distribute(["blue", "green", "red"], 1, 2)) ['green']
Note that if you have more buckets than elements, some buckets will get nothing.
>>> list(distribute(["blue", "green", "red"], 3, 5)) []
This is very useful when writing input builders and you want each of your workers to handle reading a disjoint partition of your input.
>>> from bytewax import Dataflow, spawn_cluster >>> from bytewax.inputs import ManualInputConfig >>> def read_topics(topics): ... for topic in topics: ... for i in enumerate(3): ... yield f"topic:{topic} item:{i}" >>> def input_builder(i, n): ... all_topics = ["red", "green", "blue"] ... this_workers_topics = distribute(listening_topics, i, n) ... for item in read_topics(this_workers_topics): ... yield Emit(f"worker_index:{worker_index}" + item) >>> flow = Dataflow() >>> flow.capture() >>> spawn_cluster(flow, ManualInputConfig(input_builder), lambda i, n: print) (0, 'worker_index:1 topic:red item:0') (0, 'worker_index:1 topic:red item:1') (0, 'worker_index:1 topic:red item:2') (0, 'worker_index:1 topic:blue item:0') (0, 'worker_index:1 topic:blue item:1') (0, 'worker_index:1 topic:blue item:2') (0, 'worker_index:2 topic:green item:0') (0, 'worker_index:2 topic:green item:1') (0, 'worker_index:2 topic:green item:2')
Args
elements
- To distribute.
index
- Index of this bucket / worker starting at 0.
count
- Total number of buckets / workers.
Returns
An iterator of the elements only in this bucket.
Expand source code
def distribute(elements: Iterable[Any], index: int, count: int) -> Iterable[Any]: """Distribute elements equally between a number of buckets and return the items for the given bucket index. No two buckets will get the same element. >>> list(distribute(["blue", "green", "red"], 0, 2)) ['blue', 'red'] >>> list(distribute(["blue", "green", "red"], 1, 2)) ['green'] Note that if you have more buckets than elements, some buckets will get nothing. >>> list(distribute(["blue", "green", "red"], 3, 5)) [] This is very useful when writing input builders and you want each of your workers to handle reading a disjoint partition of your input. >>> from bytewax import Dataflow, spawn_cluster >>> from bytewax.inputs import ManualInputConfig >>> def read_topics(topics): ... for topic in topics: ... for i in enumerate(3): ... yield f"topic:{topic} item:{i}" >>> def input_builder(i, n): ... all_topics = ["red", "green", "blue"] ... this_workers_topics = distribute(listening_topics, i, n) ... for item in read_topics(this_workers_topics): ... yield Emit(f"worker_index:{worker_index}" + item) >>> flow = Dataflow() >>> flow.capture() >>> spawn_cluster(flow, ManualInputConfig(input_builder), lambda i, n: print) (0, 'worker_index:1 topic:red item:0') (0, 'worker_index:1 topic:red item:1') (0, 'worker_index:1 topic:red item:2') (0, 'worker_index:1 topic:blue item:0') (0, 'worker_index:1 topic:blue item:1') (0, 'worker_index:1 topic:blue item:2') (0, 'worker_index:2 topic:green item:0') (0, 'worker_index:2 topic:green item:1') (0, 'worker_index:2 topic:green item:2') Args: elements: To distribute. index: Index of this bucket / worker starting at 0. count: Total number of buckets / workers. Returns: An iterator of the elements only in this bucket. """ assert index < count, f"Highest index should only be {count - 1}; got {index}" for i, x in enumerate(elements): if i % count == index: yield x
def fully_ordered(wrap_iter: Iterable) ‑> Iterable[Tuple[int, Any]]
-
Each input item increments the epoch.
Be careful using this in high-volume streams with many workers, as the worker overhead goes up with finely granulated epochs.
>>> from bytewax import Dataflow, run >>> flow = Dataflow() >>> flow.capture() >>> out = run(flow, fully_ordered(["a", "b", "c"])) >>> sorted(out) [(0, 'a'), (1, 'b'), (2, 'c')]
Args
wrap_iter
- Existing input iterable of just items.
Yields
Tuples of
(epoch, item)
.Expand source code
def fully_ordered(wrap_iter: Iterable) -> Iterable[Tuple[int, Any]]: """Each input item increments the epoch. Be careful using this in high-volume streams with many workers, as the worker overhead goes up with finely granulated epochs. >>> from bytewax import Dataflow, run >>> flow = Dataflow() >>> flow.capture() >>> out = run(flow, fully_ordered(["a", "b", "c"])) >>> sorted(out) [(0, 'a'), (1, 'b'), (2, 'c')] Args: wrap_iter: Existing input iterable of just items. Yields: Tuples of `(epoch, item)`. """ epoch = 0 for item in wrap_iter: yield (epoch, item) epoch += 1
def single_batch(wrap_iter: Iterable) ‑> Iterable[Tuple[int, Any]]
-
All input items are part of the same epoch.
Use this for non-streaming-style batch processing.
>>> from bytewax import Dataflow, run >>> flow = Dataflow() >>> flow.capture() >>> out = run(flow, single_batch(["a", "b", "c"])) >>> sorted(out) [(0, 'a'), (0, 'b'), (0, 'c')]
Args
wrap_iter
- Existing input iterable of just items.
Yields
Tuples of
(epoch, item)
.Expand source code
def single_batch(wrap_iter: Iterable) -> Iterable[Tuple[int, Any]]: """All input items are part of the same epoch. Use this for non-streaming-style batch processing. >>> from bytewax import Dataflow, run >>> flow = Dataflow() >>> flow.capture() >>> out = run(flow, single_batch(["a", "b", "c"])) >>> sorted(out) [(0, 'a'), (0, 'b'), (0, 'c')] Args: wrap_iter: Existing input iterable of just items. Yields: Tuples of `(epoch, item)`. """ for item in wrap_iter: yield (0, item)
def sorted_window(wrap_iter: Iterable, window_length: Any, time_getter: Callable[[Any], Any], on_drop: Callable[[Any], None] = None) ‑> Iterable[Tuple[int, Any]]
-
Sort a iterator to be increasing by some timestamp.
To support a possibly infinite iterator, store a limited sorted buffer of items and only emit things downstream once a certain window of time has passed, as indicated by the timestamp on new items.
New input items which are older than those already emitted will be dropped to maintain sorted output.
The window length needs to be tuned for how "out of order" your input data is and how much data you're willing to drop: Already perfectly ordered input data can have a window of "0" and nothing will be dropped. Completely reversed input data needs a window that is the difference between the oldest and youngest timestamp to ensure nothing will be dropped.
>>> from bytewax import Dataflow, run >>> items = [ ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 4), ... "value": "c", ... }, ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 3), ... "value": "b", ... }, ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 0), ... "value": "a", ... }, ... ] >>> sorted_items = list( ... sorted_window( ... items, ... datetime.timedelta(seconds=2), ... lambda item: item["timestamp"], ... ) ... ) >>> sorted_items [{'timestamp': datetime.datetime(2022, 2, 22, 1, 2, 3), 'value': 'b'}, {'timestamp': datetime.datetime(2022, 2, 22, 1, 2, 4), 'value': 'c'}]
You could imagine using it with
tumbling_epoch()
to ensure you get in-order, bucketed data into your dataflow.>>> flow = Dataflow() >>> flow.map(lambda item: item["value"]) >>> flow.capture() >>> out = run(flow, tumbling_epoch( ... sorted_items, ... datetime.timedelta(seconds=0.5), ... lambda item: item["timestamp"], ... )) >>> sorted(out) [(0, 'b'), (2, 'c')]
Args
wrap_iter
- Existing input iterable.
window_length
- Buffering duration. Values will be emitted once this amount of time has passed.
time_getter
- Function to call to produce a timestamp for each value.
on_drop
- Function to call with each dropped item. E.g. log or increment metrics on drop events to refine your window length.
Yields
Values in increasing timestamp order.
Expand source code
def sorted_window( wrap_iter: Iterable, window_length: Any, time_getter: Callable[[Any], Any], on_drop: Callable[[Any], None] = None, ) -> Iterable[Tuple[int, Any]]: """Sort a iterator to be increasing by some timestamp. To support a possibly infinite iterator, store a limited sorted buffer of items and only emit things downstream once a certain window of time has passed, as indicated by the timestamp on new items. New input items which are older than those already emitted will be dropped to maintain sorted output. The window length needs to be tuned for how "out of order" your input data is and how much data you're willing to drop: Already perfectly ordered input data can have a window of "0" and nothing will be dropped. Completely reversed input data needs a window that is the difference between the oldest and youngest timestamp to ensure nothing will be dropped. >>> from bytewax import Dataflow, run >>> items = [ ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 4), ... "value": "c", ... }, ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 3), ... "value": "b", ... }, ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 0), ... "value": "a", ... }, ... ] >>> sorted_items = list( ... sorted_window( ... items, ... datetime.timedelta(seconds=2), ... lambda item: item["timestamp"], ... ) ... ) >>> sorted_items [{'timestamp': datetime.datetime(2022, 2, 22, 1, 2, 3), 'value': 'b'}, {'timestamp': datetime.datetime(2022, 2, 22, 1, 2, 4), 'value': 'c'}] You could imagine using it with `tumbling_epoch()` to ensure you get in-order, bucketed data into your dataflow. >>> flow = Dataflow() >>> flow.map(lambda item: item["value"]) >>> flow.capture() >>> out = run(flow, tumbling_epoch( ... sorted_items, ... datetime.timedelta(seconds=0.5), ... lambda item: item["timestamp"], ... )) >>> sorted(out) [(0, 'b'), (2, 'c')] Args: wrap_iter: Existing input iterable. window_length: Buffering duration. Values will be emitted once this amount of time has passed. time_getter: Function to call to produce a timestamp for each value. on_drop: Function to call with each dropped item. E.g. log or increment metrics on drop events to refine your window length. Yields: Values in increasing timestamp order. """ sorted_buffer = [] newest_time = None drop_older_than = None def is_too_late(time): return drop_older_than is not None and time <= drop_older_than def is_newest_item(time): return newest_time is None or time > newest_time def emit_all(emit_older_than): while len(sorted_buffer) > 0 and sorted_buffer[0].time <= emit_older_than: sort_item = heapq.heappop(sorted_buffer) yield sort_item.item for item in wrap_iter: time = time_getter(item) if is_too_late(time): if on_drop: on_drop(item) else: heapq.heappush(sorted_buffer, _HeapItem(time, item)) if is_newest_item(time): newest_time = time drop_older_than = time - window_length yield from emit_all(drop_older_than) yield from emit_all(newest_time)
def tumbling_epoch(wrap_iter: Iterable, epoch_length: Any, time_getter: Callable[[Any], Any] = <function <lambda>>, epoch_start_time: Any = None, epoch_start: int = 0) ‑> Iterable[Tuple[int, Any]]
-
All inputs within a tumbling window are part of the same epoch.
The time of the first item will be used as start of the 0 epoch. Out-of-order items will cause issues as Bytewax requires inputs to dataflows to be in epoch order. See
fully_ordered()
.>>> from bytewax import Dataflow, run >>> items = [ ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 3), ... "value": "a", ... }, ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 4), ... "value": "b", ... }, ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 8), ... "value": "c", ... }, ... ] >>> flow = Dataflow() >>> flow.map(lambda item: item["value"]) >>> flow.capture() >>> out = run(flow, tumbling_epoch( ... items, ... datetime.timedelta(seconds=2), ... lambda item: item["timestamp"], ... )) >>> sorted(out) [(0, 'a'), (0, 'b'), (2, 'c')]
By default, uses "ingestion time" and you don't need to specify a way to access the timestamp in each item.
>>> import pytest; pytest.skip("Figure out sleep in test.") >>> items = [ ... "a", # sleep(4) ... "b", # sleep(1) ... "c", ... ] >>> list(tumbling_epoch(items, datetime.timedelta(seconds=2))) [(0, 'a'), (2, 'b'), (2, 'c')]
Args
wrap_iter
- Existing input iterable of just items.
epoch_length
- Length of each epoch window.
time_getter
- Function that returns a timestamp given an item. Defaults to current wall time.
epoch_start_time
- The timestamp that should correspond to the start of the 0th epoch. Otherwise defaults to the time found on the first item.
epoch_start
- The integer value to start counting epochs from. This can be used for continuity during processing.
Yields
Tuples of
(epoch, item)
.Expand source code
def tumbling_epoch( wrap_iter: Iterable, epoch_length: Any, time_getter: Callable[[Any], Any] = lambda _: datetime.datetime.now(), epoch_start_time: Any = None, epoch_start: int = 0, ) -> Iterable[Tuple[int, Any]]: """All inputs within a tumbling window are part of the same epoch. The time of the first item will be used as start of the 0 epoch. Out-of-order items will cause issues as Bytewax requires inputs to dataflows to be in epoch order. See `bytewax.inputs.fully_ordered()`. >>> from bytewax import Dataflow, run >>> items = [ ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 3), ... "value": "a", ... }, ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 4), ... "value": "b", ... }, ... { ... "timestamp": datetime.datetime(2022, 2, 22, 1, 2, 8), ... "value": "c", ... }, ... ] >>> flow = Dataflow() >>> flow.map(lambda item: item["value"]) >>> flow.capture() >>> out = run(flow, tumbling_epoch( ... items, ... datetime.timedelta(seconds=2), ... lambda item: item["timestamp"], ... )) >>> sorted(out) [(0, 'a'), (0, 'b'), (2, 'c')] By default, uses "ingestion time" and you don't need to specify a way to access the timestamp in each item. >>> import pytest; pytest.skip("Figure out sleep in test.") >>> items = [ ... "a", # sleep(4) ... "b", # sleep(1) ... "c", ... ] >>> list(tumbling_epoch(items, datetime.timedelta(seconds=2))) [(0, 'a'), (2, 'b'), (2, 'c')] Args: wrap_iter: Existing input iterable of just items. epoch_length: Length of each epoch window. time_getter: Function that returns a timestamp given an item. Defaults to current wall time. epoch_start_time: The timestamp that should correspond to the start of the 0th epoch. Otherwise defaults to the time found on the first item. epoch_start: The integer value to start counting epochs from. This can be used for continuity during processing. Yields: Tuples of `(epoch, item)`. """ for item in wrap_iter: time = time_getter(item) if epoch_start_time is None: epoch_start_time = time epoch = epoch_start else: epoch = int((time - epoch_start_time) / epoch_length) + epoch_start yield (epoch, item)
def yield_epochs(fn: Callable)
-
A decorator function to unwrap an iterator of [epoch, item] into successive
AdvanceTo
andEmit
classes with the contents of the iterator.Use this when you have an input_builder function that returns a generator of (epoch, item) to be used with
cluster_main()
orspawn_cluster()
:>>> from bytewax import Dataflow, cluster_main >>> from bytewax.inputs import yield_epochs, fully_ordered, ManualInputConfig >>> flow = Dataflow() >>> flow.capture() >>> @yield_epochs ... def input_builder(i, n, re): ... return fully_ordered(["a", "b", "c"]) >>> cluster_main(flow, ManualInputConfig(input_builder), lambda i, n: print, [], 0, 1) (0, 'a') (1, 'b') (2, 'c')
Expand source code
def yield_epochs(fn: Callable): """A decorator function to unwrap an iterator of [epoch, item] into successive `AdvanceTo` and `Emit` classes with the contents of the iterator. Use this when you have an input_builder function that returns a generator of (epoch, item) to be used with `bytewax.cluster_main()` or `bytewax.spawn_cluster()`: >>> from bytewax import Dataflow, cluster_main >>> from bytewax.inputs import yield_epochs, fully_ordered, ManualInputConfig >>> flow = Dataflow() >>> flow.capture() >>> @yield_epochs ... def input_builder(i, n, re): ... return fully_ordered(["a", "b", "c"]) >>> cluster_main(flow, ManualInputConfig(input_builder), lambda i, n: print, [], 0, 1) (0, 'a') (1, 'b') (2, 'c') """ def inner_fn(worker_index, worker_count, resume_epoch): gen = fn(worker_index, worker_count, resume_epoch) for (epoch, item) in gen: yield AdvanceTo(epoch) yield Emit(item) return inner_fn
Classes
class AdvanceTo (epoch)
-
Advance to the supplied epoch.
When providing input to a Dataflow, work cannot complete until there is no more data for a given epoch.
AdvanceTo is the signal to a Dataflow that the frontier has moved beyond the current epoch, and that items with an epoch less than the epoch in AdvanceTo can be worked to completion.
Using AdvanceTo and Emit is only necessary when using
spawn_cluster
andcluster_main()
asrun()
andrun_cluster()
will yield AdvanceTo and Emit for you. Likewise, they are only required when using a manual input configuration.See also:
inputs.yield_epochs()
>>> def input_builder(worker_index, worker_count, resume_epoch): ... for i in range(10): ... yield AdvanceTo(i) # Advances the epoch to i ... yield Emit(i) # Adds the input i at epoch i
Instance variables
var epoch
-
Return an attribute of instance, which is of type owner.
class Emit (item)
-
Emit the supplied item into the dataflow at the current epoch
Emit is how we introduce input into a dataflow using a manual input configuration:
>>> def input_builder(worker_index, worker_count, resume_epoch): ... for i in range(10): ... yield AdvanceTo(i) # Advances the epoch to i ... yield Emit(i) # Adds the input i at epoch i
Instance variables
var item
-
Return an attribute of instance, which is of type owner.
class InputConfig
-
Base class for an input config.
InputConfig defines how you will input data to your dataflow.
Use a specific subclass of InputConfig for the kind of input source you are plan to use. See the subclasses in this module.
Subclasses
class KafkaInputConfig (brokers, group_id, topics, offset_reset, auto_commit, messages_per_epoch)
-
Use Kafka as the input source. Currently does not support recovery. Kafka messages will be passed through the dataflow as byte two-tuples of Kafka key and payload.
Currently Kafka input does not support recovery.
Args
brokers
- Comma-separated of broker addresses. E.g. "localhost:9092,localhost:9093"
group_id
- Group id as a string.
topic
- Topic to which consumer will subscribe.
offset_reset
- Can be "earliest" or "latest". Delegates where to resume if auto_commit is not enabled. Defaults to "earliest".
auto_commit
- If true, commit offset of the last message handed to the application. This committed offset will be used when the process restarts to pick up where it left off. Defaults to false.
messages_per_epoch
- (integer) Defines maximum number of
messages per epoch.
Defaults to
1
. If the consumer times out waiting, the system will increment to the next epoch, and fewer (or no) messages may be assigned to the preceding epoch.
Returns
Config object. Pass this as the
input_config
argument to your execution entry point.Ancestors
Instance variables
var auto_commit
-
Return an attribute of instance, which is of type owner.
var brokers
-
Return an attribute of instance, which is of type owner.
var group_id
-
Return an attribute of instance, which is of type owner.
var messages_per_epoch
-
Return an attribute of instance, which is of type owner.
var offset_reset
-
Return an attribute of instance, which is of type owner.
var topics
-
Return an attribute of instance, which is of type owner.
class ManualInputConfig (input_builder)
-
Use a user-defined function that returns an iterable as the input source.
It is your responsibility to design your input handlers in such a way that it jumps to the point in the input that corresponds to the
resume_epoch
argument; if it can't (because the input is ephemeral) you can still recover the dataflow, but the lost input is unable to be replayed so the output will be different.Args
input_builder
- An input_builder function that yields
AdvanceTo
orEmit
with this worker's input. Must resume from the epoch specified.
Returns
Config object. Pass this as the
input_config
argument to your execution entry point.Ancestors