1
2
3
4
5
6 from __future__ import with_statement
7
8 import asyncore
9 import asynchat
10 import socket
11 import sys
12
13 from .base import ConsumerBase, check_callable
14 from ..utils import json
15
16 __all__ = ['SyncConsumer']
17
19
21 if cb is not None:
22 check_callable(cb)
23
24 params.update({"feed": "longpoll"})
25 resp = self.db.res.get("_changes", **params)
26 buf = ""
27 with resp.body_stream() as body:
28 while True:
29 data = body.read()
30 if not data:
31 break
32 buf += data
33
34 ret = json.loads(buf)
35 if cb is not None:
36 cb(ret)
37 return
38
39 return ret
40
41 - def wait(self, cb, **params):
42 check_callable(cb)
43 params.update({"feed": "continuous"})
44 resp = self.db.res.get("_changes", **params)
45
46 with resp.body_stream() as body:
47 while True:
48 try:
49 line = body.readline()
50 if not line:
51 break
52 if line.endswith("\r\n"):
53 line = line[:-2]
54 else:
55 line = line[:-1]
56 if not line:
57 continue
58
59 cb(json.loads(line))
60 except (KeyboardInterrupt, SystemExit,):
61 break
62