Source code for eventkit.ops.timing

from collections import deque

from ..event import Event
from ..util import NO_VALUE, loop
from .op import Op


[docs]class Delay(Op): __slots__ = ('_delay',) def __init__(self, delay, source=None): Op.__init__(self, source) self._delay = delay
[docs] def on_source(self, *args): loop.call_later(self._delay, self.emit, *args)
[docs] def on_source_error(self, error): loop.call_later(self._delay, self.error_event.emit, error)
[docs] def on_source_done(self, source): loop.call_later(self._delay, self.set_done)
[docs]class Timeout(Op): __slots__ = ('_timeout', '_handle', '_last_time') def __init__(self, timeout, source=None): Op.__init__(self, source) if source.done(): return self._timeout = timeout self._last_time = loop.time() self._handle = None self._schedule()
[docs] def on_source(self, *args): self._last_time = loop.time()
[docs] def on_source_done(self, source): self._disconnect_from(self._source) self._handle.cancel() del self._handle self.set_done()
def _schedule(self): self._handle = loop.call_at( self._last_time + self._timeout, self._on_timer) def _on_timer(self): if loop.time() - self._last_time > self._timeout: self.emit() self.set_done() else: self._schedule()
[docs]class Debounce(Op): __slots__ = ('_interval', '_on_first', '_handle', '_last_time') def __init__(self, interval, on_first=False, source=None): Op.__init__(self, source) self._interval = interval self._on_first = on_first self._last_time = -float('inf') self._handle = None
[docs] def on_source(self, *args): time = loop.time() delta = time - self._last_time self._last_time = time if self._on_first: if delta >= self._interval: self.emit(*args) else: if self._handle: self._handle.cancel() self._handle = loop.call_at( time + self._interval, self._delayed_emit, *args)
def _delayed_emit(self, *args): self._handle = None self.emit(*args) if self._source is None: self.set_done()
[docs] def on_source_done(self, source): self._disconnect_from(source) self._source = None if not self._handle: self.set_done()
[docs]class Throttle(Op): __slots__ = ( 'status_event', '_maximum', '_interval', '_cost_func', '_q', '_time_q', '_cost_q', '_is_throttling') def __init__(self, maximum, interval, cost_func=None, source=None): Op.__init__(self, source) self.status_event = Event('throttle_status') """ Sub event that emits ``True`` when throttling starts and ``False`` when throttling ends. """ self._maximum = maximum self._interval = interval self._cost_func = cost_func self._q = deque() # deque of (args, cost) tuples self._time_q = deque() # deque of previous emit times self._cost_q = deque() # deque of costs of previous emits self._is_throttling = False
[docs] def set_limit(self, maximum, interval): """ Dynamically update the ``maximum`` per ``interval`` limit. """ self._maximum = maximum self._interval = interval
[docs] def on_source(self, *args): cost = self._cost_func if cost is not None: cost = cost(*args) self._q.append((args, cost)) self._try_emit()
[docs] def on_source_done(self, source): self._disconnect_from(source) self._source = None if not self._q: self.set_done() self.status_event.set_done()
def _try_emit(self): t = loop.time() q = self._q times = self._time_q costs = self._cost_q # forget old emit times while times and t - times[0] > self._interval: times.popleft() costs.popleft() # emit values while not exceeding the limit while q: args, cost = q[0] if self._cost_func: cost = self._cost_func(*args) total_cost = cost + sum(costs) else: cost = None total_cost = 1 + len(costs) if self._maximum and total_cost >= self._maximum: break args, cost = q.popleft() times.append(t) costs.append(cost) self.emit(*args) # update status and schedule new emits if q: if not self._is_throttling: self.status_event.emit(True) loop.call_at(times[0] + self._interval, self._try_emit) elif self._is_throttling: self.status_event.emit(False) self._is_throttling = bool(q) if not q and self._source is None: self.set_done() self.status_event.set_done()
[docs]class Sample(Op): __slots__ = ('_timer',) def __init__(self, timer, source=None): Op.__init__(self, source) self._timer = timer timer.connect( self._on_timer, self.on_source_error, self.on_source_done)
[docs] def on_source(self, *args): self._value = args
def _on_timer(self, *args): if self._value is not NO_VALUE: self.emit(*self._value)
[docs] def on_source_done(self, source): Op.on_source_done(self, self) self._timer.disconnect( self._on_timer, self.on_source_error, self.on_source_done) self._timer = None