import operator
import itertools
from collections import deque
from ..util import NO_VALUE
from .op import Op
from .transform import Iterate
[docs]class Count(Iterate):
__slots__ = ()
def __init__(self, start=0, step=1, source=None):
it = itertools.count(start, step)
Iterate.__init__(self, it, source)
[docs]class Reduce(Op):
__slots__ = ('_func', '_initializer', '_prev')
def __init__(self, func, initializer=NO_VALUE, source=None):
Op.__init__(self, source)
self._func = func
self._initializer = initializer
self._prev = NO_VALUE
[docs] def on_source(self, arg):
if self._prev is NO_VALUE:
if self._initializer is NO_VALUE:
self._prev = arg
else:
self._prev = self._func(self._initializer, arg)
self.emit(self._prev)
else:
self._prev = self._func(self._prev, arg)
self.emit(self._prev)
[docs]class Min(Reduce):
__slots__ = ()
def __init__(self, source=None):
Reduce.__init__(self, min, float('inf'), source)
[docs]class Max(Reduce):
__slots__ = ()
def __init__(self, source=None):
Reduce.__init__(self, max, -float('inf'), source)
[docs]class Sum(Reduce):
__slots__ = ()
def __init__(self, start=0, source=None):
Reduce.__init__(self, operator.add, start, source)
[docs]class Product(Reduce):
__slots__ = ()
def __init__(self, start=1, source=None):
Reduce.__init__(self, operator.mul, start, source)
[docs]class Mean(Op):
__slots__ = ('_count', '_sum')
def __init__(self, source=None):
Op.__init__(self, source)
self._count = 0
self._sum = 0
[docs] def on_source(self, arg):
self._count += 1
self._sum += arg
self.emit(self._sum / self._count)
[docs]class Any(Reduce):
__slots__ = ()
def __init__(self, source=None):
Reduce.__init__(self, lambda prev, v: prev or bool(v), False, source)
[docs]class All(Reduce):
__slots__ = ()
def __init__(self, source=None):
Reduce.__init__(self, lambda prev, v: prev and bool(v), True, source)
[docs]class Ema(Op):
__slots__ = ('_f1', '_f2', '_prev')
def __init__(self, n=None, weight=None, source=None):
Op.__init__(self, source)
self._f1 = weight or 2.0 / (n + 1)
self._f2 = 1 - self._f1
self._prev = NO_VALUE
[docs] def on_source(self, *args):
if self._prev is NO_VALUE:
value = args
else:
value = [
self._f2 * p + self._f1 * a for p, a in zip(self._prev, args)]
self._prev = value
self.emit(*value)
[docs]class Pairwise(Op):
__slots__ = ('_prev', '_has_prev')
def __init__(self, source=None):
Op.__init__(self, source)
self._has_prev = False
[docs] def on_source(self, *args):
value = args[0] if len(args) == 1 else args if args else NO_VALUE
if self._has_prev:
self.emit(self._prev, value)
else:
self._has_prev = True
self._prev = value
[docs]class List(Op):
__slots__ = ('_values')
def __init__(self, source=None):
Op.__init__(self, source)
self._values = []
[docs] def on_source(self, *args):
self._values.append(
args[0] if len(args) == 1 else args if args else NO_VALUE)
[docs] def on_source_done(self, source):
self.emit(self._values)
self._disconnect_from(self._source)
self._source = None
self.set_done()
[docs]class Deque(Op):
__slots__ = ('_count', '_q')
def __init__(self, count, source=None):
Op.__init__(self, source)
self._count = count
self._q = deque()
[docs] def on_source(self, *args):
self._q.append(
args[0] if len(args) == 1 else args if args else NO_VALUE)
if self._count and len(self._q) > self._count:
self._q.popleft()
self.emit(self._q)