1
2
3
4
5
6 import traceback
7
8 import eventlet
9 from eventlet.greenthread import GreenThread
10 from eventlet import event
11
12 from .base import check_callable
13 from .sync import SyncConsumer
14 from ..utils import json
15
16
18 - def __init__(self, db, callback, **params):
19 self.process_change = callback
20 self.params = params
21 self.db = db
22 self.stop_event = event.Event()
23
25 _ = eventlet.spawn_n(self._run)
26 self.stop_event.wait()
27
29 _ = eventlet.spawn_n(self._run)
30
32 while True:
33 try:
34 resp = self.db.res.get("_changes", **self.params)
35 return self.consume(resp)
36 except (SystemExit, KeyboardInterrupt):
37 eventlet.sleep(5)
38 break
39 except:
40 traceback.print_exc()
41 eventlet.sleep(5)
42 break
43 self.stop_event.send(True)
44
46 raise NotImplementedError
47
49
51 with resp.body_stream() as body:
52 while True:
53 line = body.readline()
54 if not line:
55 break
56 if line.endswith("\r\n"):
57 line = line[:-2]
58 else:
59 line = line[:-1]
60 if not line:
61 continue
62 self.process_change(line)
63 self.stop_event.send(True)
64
65
67
69 with resp.body_stream() as body:
70 buf = []
71 while True:
72 data = body.read()
73 if not data:
74 break
75 buf.append(data)
76 change = "".join(buf)
77 try:
78 change = json.loads(change)
79 except ValueError:
80 pass
81 self.process_change(change)
82 self.stop_event.send(True)
83
84
89
90 - def _fetch(self, cb, **params):
93
94 - def fetch(self, cb=None, **params):
98
108
109 - def wait(self, cb, **params):
114
121
128