Module bytewax.outputs
Dataflow output sinks.
Bytewax provides pre-packaged output configuration options for common sinks you might want to write dataflow output to.
Use
Create an OutputConfig
subclass for the sink you want to write to.
Then pass that config object to the Dataflow.capture()
operator.
Expand source code
"""Dataflow output sinks.
Bytewax provides pre-packaged output configuration options for common
sinks you might want to write dataflow output to.
Use
---
Create an `OutputConfig` subclass for the sink you want to write to.
Then pass that config object to the `bytewax.dataflow.Dataflow.capture`
operator.
"""
from .bytewax import ( # noqa: F401
ManualEpochOutputConfig,
ManualOutputConfig,
OutputConfig,
StdOutputConfig,
)
class TestingEpochOutputConfig(ManualEpochOutputConfig):
"""
Append each output `(epoch, item)` to a list.
You only want to use this for unit testing.
Because the list is in-memory, you will need to carefuly
coordinate use or assertions on this list when using multiple
workers.
Args:
ls: Append each `(epoch, item)` to this list.
Returns:
Config object.
Pass this to the `bytewax.dataflow.Dataflow.capture` operator.
"""
# This is needed to avoid pytest trying to load this class as
# a test since its name starts with "Test"
__test__ = False
def __new__(cls, ls):
"""
In classes defined by PyO3 we can only use __new__, not __init__
"""
return super().__new__(cls, lambda wi, wn: ls.append)
class TestingOutputConfig(ManualOutputConfig):
"""
Append each output item to a list.
You only want to use this for unit testing.
Because the list is in-memory, you will need to carefuly
coordinate use or assertions on this list when using multiple
workers.
Args:
ls: Append each `item` to this list.
Returns:
Config object.
Pass this to the `bytewax.dataflow.Dataflow.capture` operator.
"""
# This is needed to avoid pytest trying to load this class as
# a test since its name starts with "Test"
__test__ = False
def __new__(cls, ls):
"""
In classes defined by PyO3 we can only use __new__, not __init__
"""
return super().__new__(cls, lambda wi, wn: ls.append)
Classes
class ManualEpochOutputConfig (output_builder)
-
Call a Python callback function with each output epoch and item.
You probably want to use
ManualOutputConfig
unless you know you need specific epoch assignments for deep integration work.Args
output_builder
output_builder(worker_index: int, worker_count: int) => output_handler(epoch_item: Tuple[int, Any])
Builder function which returns a handler function for each worker thread, called with(epoch, item)
whenever an item passes by this capture operator on this worker.
Returns
Config object. Pass this to the
Dataflow.capture()
operator.Ancestors
Subclasses
class ManualOutputConfig (output_builder)
-
Call a Python callback function with each output item.
Args
output_builder
output_builder(worker_index: int, worker_count: int) => output_handler(item: Any)
Builder function which returns a handler function for each worker thread, called withitem
whenever an item passes by this capture operator on this worker.
Returns
Config object. Pass this to the
Dataflow.capture()
operator.Ancestors
Subclasses
class OutputConfig
-
Base class for an output config.
These define how a certain stream of data should be output.
Ues a specific subclass of this that matches the output destination you'd like to write to.
Subclasses
class StdOutputConfig
-
Write the output items to standard out.
Items must have a valid
__str__
. If not, map the items into a string before capture.Returns
Config object. Pass this to the
Dataflow.capture()
operator.Ancestors
class TestingEpochOutputConfig (ls)
-
Append each output
(epoch, item)
to a list. You only want to use this for unit testing.Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers.
Args
ls
- Append each
(epoch, item)
to this list.
Returns
Config object. Pass this to the
Dataflow.capture()
operator.Expand source code
class TestingEpochOutputConfig(ManualEpochOutputConfig): """ Append each output `(epoch, item)` to a list. You only want to use this for unit testing. Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers. Args: ls: Append each `(epoch, item)` to this list. Returns: Config object. Pass this to the `bytewax.dataflow.Dataflow.capture` operator. """ # This is needed to avoid pytest trying to load this class as # a test since its name starts with "Test" __test__ = False def __new__(cls, ls): """ In classes defined by PyO3 we can only use __new__, not __init__ """ return super().__new__(cls, lambda wi, wn: ls.append)
Ancestors
class TestingOutputConfig (ls)
-
Append each output item to a list. You only want to use this for unit testing.
Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers.
Args
ls
- Append each
item
to this list.
Returns
Config object. Pass this to the
Dataflow.capture()
operator.Expand source code
class TestingOutputConfig(ManualOutputConfig): """ Append each output item to a list. You only want to use this for unit testing. Because the list is in-memory, you will need to carefuly coordinate use or assertions on this list when using multiple workers. Args: ls: Append each `item` to this list. Returns: Config object. Pass this to the `bytewax.dataflow.Dataflow.capture` operator. """ # This is needed to avoid pytest trying to load this class as # a test since its name starts with "Test" __test__ = False def __new__(cls, ls): """ In classes defined by PyO3 we can only use __new__, not __init__ """ return super().__new__(cls, lambda wi, wn: ls.append)
Ancestors