Module bytewax.outputs

Dataflow output sinks.

Bytewax provides pre-packaged output configuration options for common sinks you might want to write dataflow output to.

Use

Create an OutputConfig subclass for the sink you want to write to. Then pass that config object to the Dataflow.capture() operator.

Expand source code
"""Dataflow output sinks.

Bytewax provides pre-packaged output configuration options for common
sinks you might want to write dataflow output to.

Use
---

Create an `OutputConfig` subclass for the sink you want to write to.
Then pass that config object to the `bytewax.dataflow.Dataflow.capture`
operator.
"""

from .bytewax import (  # noqa: F401
    ManualEpochOutputConfig,
    ManualOutputConfig,
    OutputConfig,
    StdOutputConfig,
)


class TestingEpochOutputConfig(ManualEpochOutputConfig):
    """
    Append each output `(epoch, item)` to a list.
    You only want to use this for unit testing.

    Because the list is in-memory, you will need to carefuly
    coordinate use or assertions on this list when using multiple
    workers.

    Args:

        ls: Append each `(epoch, item)` to this list.

    Returns:

        Config object.
        Pass this to the `bytewax.dataflow.Dataflow.capture` operator.

    """
    # This is needed to avoid pytest trying to load this class as
    # a test since its name starts with "Test"
    __test__ = False

    def __new__(cls, ls):
        """
        In classes defined by PyO3 we can only use __new__, not __init__
        """
        return super().__new__(cls, lambda wi, wn: ls.append)


class TestingOutputConfig(ManualOutputConfig):
    """
    Append each output item to a list.
    You only want to use this for unit testing.

    Because the list is in-memory, you will need to carefuly
    coordinate use or assertions on this list when using multiple
    workers.

    Args:

        ls: Append each `item` to this list.

    Returns:

        Config object.
        Pass this to the `bytewax.dataflow.Dataflow.capture` operator.

    """
    # This is needed to avoid pytest trying to load this class as
    # a test since its name starts with "Test"
    __test__ = False

    def __new__(cls, ls):
        """
        In classes defined by PyO3 we can only use __new__, not __init__
        """
        return super().__new__(cls, lambda wi, wn: ls.append)

Classes

class ManualEpochOutputConfig (output_builder)

Call a Python callback function with each output epoch and item.

You probably want to use ManualOutputConfig unless you know you need specific epoch assignments for deep integration work.

Args

output_builder
output_builder(worker_index: int, worker_count: int) => output_handler(epoch_item: Tuple[int, Any]) Builder function which returns a handler function for each worker thread, called with (epoch, item) whenever an item passes by this capture operator on this worker.

Returns

Config object. Pass this to the Dataflow.capture() operator.

Ancestors

Subclasses

class ManualOutputConfig (output_builder)

Call a Python callback function with each output item.

Args

output_builder
output_builder(worker_index: int, worker_count: int) => output_handler(item: Any) Builder function which returns a handler function for each worker thread, called with item whenever an item passes by this capture operator on this worker.

Returns

Config object. Pass this to the Dataflow.capture() operator.

Ancestors

Subclasses

class OutputConfig

Base class for an output config.

These define how a certain stream of data should be output.

Ues a specific subclass of this that matches the output destination you'd like to write to.

Subclasses

class StdOutputConfig

Write the output items to standard out.

Items must have a valid __str__. If not, map the items into a string before capture.

Returns

Config object. Pass this to the Dataflow.capture() operator.

Ancestors

class TestingEpochOutputConfig (ls)

Append each output (epoch, item) to a list. You only want to use this for unit testing.

Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers.

Args

ls
Append each (epoch, item) to this list.

Returns

Config object. Pass this to the Dataflow.capture() operator.

Expand source code
class TestingEpochOutputConfig(ManualEpochOutputConfig):
    """
    Append each output `(epoch, item)` to a list.
    You only want to use this for unit testing.

    Because the list is in-memory, you will need to carefuly
    coordinate use or assertions on this list when using multiple
    workers.

    Args:

        ls: Append each `(epoch, item)` to this list.

    Returns:

        Config object.
        Pass this to the `bytewax.dataflow.Dataflow.capture` operator.

    """
    # This is needed to avoid pytest trying to load this class as
    # a test since its name starts with "Test"
    __test__ = False

    def __new__(cls, ls):
        """
        In classes defined by PyO3 we can only use __new__, not __init__
        """
        return super().__new__(cls, lambda wi, wn: ls.append)

Ancestors

class TestingOutputConfig (ls)

Append each output item to a list. You only want to use this for unit testing.

Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers.

Args

ls
Append each item to this list.

Returns

Config object. Pass this to the Dataflow.capture() operator.

Expand source code
class TestingOutputConfig(ManualOutputConfig):
    """
    Append each output item to a list.
    You only want to use this for unit testing.

    Because the list is in-memory, you will need to carefuly
    coordinate use or assertions on this list when using multiple
    workers.

    Args:

        ls: Append each `item` to this list.

    Returns:

        Config object.
        Pass this to the `bytewax.dataflow.Dataflow.capture` operator.

    """
    # This is needed to avoid pytest trying to load this class as
    # a test since its name starts with "Test"
    __test__ = False

    def __new__(cls, ls):
        """
        In classes defined by PyO3 we can only use __new__, not __init__
        """
        return super().__new__(cls, lambda wi, wn: ls.append)

Ancestors