Module bytewax.inputs

Dataflow input sources.

Bytewax provides pre-packaged output configuration options for common sources you might want to read dataflow input from.

Use

Create an InputConfig subclass for the source you'd like to read from. Then pass that config object to the Dataflow constructor.

Expand source code
"""Dataflow input sources.

Bytewax provides pre-packaged output configuration options for common
sources you might want to read dataflow input from.

Use
---

Create an `InputConfig` subclass for the source you'd like to read
from. Then pass that config object to the `bytewax.dataflow.Dataflow`
constructor.

"""

from typing import Any, Iterable

from .bytewax import InputConfig, KafkaInputConfig, ManualInputConfig  # noqa: F401


def TestingInputConfig(it):
    """Produce input from this Python iterable. You only want to use this
    for unit testing.

    The iterable must be identical on all workers; this will
    automatically distribute the items across workers and handle
    recovery.

    Args:

        it: Iterable for input.

    Returns:

        Config object. Pass this to the `bytewax.dataflow.Dataflow`
        constructor.

    """

    def gen(worker_index, worker_count, resume_state):
        resume_i = resume_state or 0
        for i, x in enumerate(distribute(it, worker_index, worker_count)):
            # FFWD to the resume item.
            if i < resume_i:
                continue
            # Store the index in this worker's partition as the resume
            # state.
            yield (i + 1, x)

    return ManualInputConfig(gen)


def distribute(elements: Iterable[Any], index: int, count: int) -> Iterable[Any]:
    """Distribute elements equally between a number of buckets and return
    the items for the given bucket index.

    No two buckets will get the same element.

    >>> list(distribute(["blue", "green", "red"], 0, 2))
    ['blue', 'red']
    >>> list(distribute(["blue", "green", "red"], 1, 2))
    ['green']

    Note that if you have more buckets than elements, some buckets
    will get nothing.

    >>> list(distribute(["blue", "green", "red"], 3, 5))
    []

    This is very useful when writing input builders and you want each
    of your workers to handle reading a disjoint partition of your
    input.

    >>> from bytewax import Dataflow, spawn_cluster
    >>> from bytewax.inputs import ManualInputConfig
    >>> def read_topics(topics):
    ...     for topic in topics:
    ...         for i in enumerate(3):
    ...             yield f"topic:{topic} item:{i}"
    >>> def input_builder(i, n):
    ...    all_topics = ["red", "green", "blue"]
    ...    this_workers_topics = distribute(listening_topics, i, n)
    ...    for item in read_topics(this_workers_topics):
    ...        yield Emit(f"worker_index:{worker_index}" + item)
    >>> flow = Dataflow()
    >>> flow.capture()
    >>> spawn_cluster(flow, ManualInputConfig(input_builder), lambda i, n: print)
    (0, 'worker_index:1 topic:red item:0')
    (0, 'worker_index:1 topic:red item:1')
    (0, 'worker_index:1 topic:red item:2')
    (0, 'worker_index:1 topic:blue item:0')
    (0, 'worker_index:1 topic:blue item:1')
    (0, 'worker_index:1 topic:blue item:2')
    (0, 'worker_index:2 topic:green item:0')
    (0, 'worker_index:2 topic:green item:1')
    (0, 'worker_index:2 topic:green item:2')

    Args:

        elements: To distribute.

        index: Index of this bucket / worker starting at 0.

        count: Total number of buckets / workers.

    Returns:

        An iterator of the elements only in this bucket.

    """
    assert index < count, f"Highest index should only be {count - 1}; got {index}"
    for i, x in enumerate(elements):
        if i % count == index:
            yield x

Functions

def TestingInputConfig(it)

Produce input from this Python iterable. You only want to use this for unit testing.

The iterable must be identical on all workers; this will automatically distribute the items across workers and handle recovery.

Args

it
Iterable for input.

Returns

Config object. Pass this to the Dataflow constructor.

Expand source code
def TestingInputConfig(it):
    """Produce input from this Python iterable. You only want to use this
    for unit testing.

    The iterable must be identical on all workers; this will
    automatically distribute the items across workers and handle
    recovery.

    Args:

        it: Iterable for input.

    Returns:

        Config object. Pass this to the `bytewax.dataflow.Dataflow`
        constructor.

    """

    def gen(worker_index, worker_count, resume_state):
        resume_i = resume_state or 0
        for i, x in enumerate(distribute(it, worker_index, worker_count)):
            # FFWD to the resume item.
            if i < resume_i:
                continue
            # Store the index in this worker's partition as the resume
            # state.
            yield (i + 1, x)

    return ManualInputConfig(gen)
def distribute(elements: Iterable[Any], index: int, count: int) ‑> Iterable[Any]

Distribute elements equally between a number of buckets and return the items for the given bucket index.

No two buckets will get the same element.

>>> list(distribute(["blue", "green", "red"], 0, 2))
['blue', 'red']
>>> list(distribute(["blue", "green", "red"], 1, 2))
['green']

Note that if you have more buckets than elements, some buckets will get nothing.

>>> list(distribute(["blue", "green", "red"], 3, 5))
[]

This is very useful when writing input builders and you want each of your workers to handle reading a disjoint partition of your input.

>>> from bytewax import Dataflow, spawn_cluster
>>> from bytewax.inputs import ManualInputConfig
>>> def read_topics(topics):
...     for topic in topics:
...         for i in enumerate(3):
...             yield f"topic:{topic} item:{i}"
>>> def input_builder(i, n):
...    all_topics = ["red", "green", "blue"]
...    this_workers_topics = distribute(listening_topics, i, n)
...    for item in read_topics(this_workers_topics):
...        yield Emit(f"worker_index:{worker_index}" + item)
>>> flow = Dataflow()
>>> flow.capture()
>>> spawn_cluster(flow, ManualInputConfig(input_builder), lambda i, n: print)
(0, 'worker_index:1 topic:red item:0')
(0, 'worker_index:1 topic:red item:1')
(0, 'worker_index:1 topic:red item:2')
(0, 'worker_index:1 topic:blue item:0')
(0, 'worker_index:1 topic:blue item:1')
(0, 'worker_index:1 topic:blue item:2')
(0, 'worker_index:2 topic:green item:0')
(0, 'worker_index:2 topic:green item:1')
(0, 'worker_index:2 topic:green item:2')

Args

elements
To distribute.
index
Index of this bucket / worker starting at 0.
count
Total number of buckets / workers.

Returns

An iterator of the elements only in this bucket.

Expand source code
def distribute(elements: Iterable[Any], index: int, count: int) -> Iterable[Any]:
    """Distribute elements equally between a number of buckets and return
    the items for the given bucket index.

    No two buckets will get the same element.

    >>> list(distribute(["blue", "green", "red"], 0, 2))
    ['blue', 'red']
    >>> list(distribute(["blue", "green", "red"], 1, 2))
    ['green']

    Note that if you have more buckets than elements, some buckets
    will get nothing.

    >>> list(distribute(["blue", "green", "red"], 3, 5))
    []

    This is very useful when writing input builders and you want each
    of your workers to handle reading a disjoint partition of your
    input.

    >>> from bytewax import Dataflow, spawn_cluster
    >>> from bytewax.inputs import ManualInputConfig
    >>> def read_topics(topics):
    ...     for topic in topics:
    ...         for i in enumerate(3):
    ...             yield f"topic:{topic} item:{i}"
    >>> def input_builder(i, n):
    ...    all_topics = ["red", "green", "blue"]
    ...    this_workers_topics = distribute(listening_topics, i, n)
    ...    for item in read_topics(this_workers_topics):
    ...        yield Emit(f"worker_index:{worker_index}" + item)
    >>> flow = Dataflow()
    >>> flow.capture()
    >>> spawn_cluster(flow, ManualInputConfig(input_builder), lambda i, n: print)
    (0, 'worker_index:1 topic:red item:0')
    (0, 'worker_index:1 topic:red item:1')
    (0, 'worker_index:1 topic:red item:2')
    (0, 'worker_index:1 topic:blue item:0')
    (0, 'worker_index:1 topic:blue item:1')
    (0, 'worker_index:1 topic:blue item:2')
    (0, 'worker_index:2 topic:green item:0')
    (0, 'worker_index:2 topic:green item:1')
    (0, 'worker_index:2 topic:green item:2')

    Args:

        elements: To distribute.

        index: Index of this bucket / worker starting at 0.

        count: Total number of buckets / workers.

    Returns:

        An iterator of the elements only in this bucket.

    """
    assert index < count, f"Highest index should only be {count - 1}; got {index}"
    for i, x in enumerate(elements):
        if i % count == index:
            yield x

Classes

class InputConfig

Base class for an input config.

These define how you will input data to your dataflow.

Use a specific subclass of InputConfig for the kind of input source you are plan to use. See the subclasses in this module.

Subclasses

class KafkaInputConfig (brokers, topic, tail, starting_offset)

Use Kafka as the input source.

Kafka messages will be passed through the dataflow as two-tuples of (key_bytes, payload_bytes).

Args

brokers : List[str]
List of host:port strings of Kafka brokers.
topic : str
Topic to which consumer will subscribe.
tail : bool
Wait for new data on this topic when the end is initially reached.
starting_offset : str
Can be "beginning" or "end". Delegates where to resume if auto_commit is not enabled. Defaults to "beginning".

Returns

Config object. Pass this as the input_config argument to the Dataflow.input().

Ancestors

Instance variables

var brokers

Return an attribute of instance, which is of type owner.

var starting_offset

Return an attribute of instance, which is of type owner.

var tail

Return an attribute of instance, which is of type owner.

var topic

Return an attribute of instance, which is of type owner.

class ManualInputConfig (input_builder)

Use a user-defined function that returns an iterable as the input source.

Because Bytewax's execution is cooperative, the resulting iterators must not block waiting for new data, otherwise pending execution of other steps in the dataflow will be delayed an throughput will be reduced. If you are using a generator and no data is ready yet, have it yield None or just yield to signal this.

Args

input_builder
input_builder(worker_index: int, worker_count: int, resume_state: Option[Any]) => Iterator[Tuple[Any, Any]] Builder function which returns an iterator of 2-tuples of (state, item). item is the input that worker should introduce into the dataflow. state is a snapshot of any internal state it will take to resume this input from its current position after the current item. Note that e.g. returning the same list from each worker will result in duplicate data in the dataflow.

Returns

Config object. Pass this as the input_config argument to the Dataflow.input().

Ancestors

Instance variables

var input_builder

Return an attribute of instance, which is of type owner.