Package couchdbkit :: Package consumer :: Module ceventlet
[hide private]
[frames] | no frames]

Source Code for Module couchdbkit.consumer.ceventlet

  1  # -*- coding: utf-8 - 
  2  # 
  3  # This file is part of couchdbkit released under the MIT license.  
  4  # See the NOTICE for more information. 
  5   
  6  import traceback 
  7   
  8  import eventlet 
  9  from eventlet.greenthread import GreenThread 
 10  from eventlet import event 
 11   
 12  from .base import check_callable 
 13  from .sync import SyncConsumer 
 14  from ..utils import json 
 15   
 16   
17 -class ChangeConsumer(object):
18 - def __init__(self, db, callback, **params):
19 self.process_change = callback 20 self.params = params 21 self.db = db 22 self.stop_event = event.Event()
23
24 - def wait(self):
25 _ = eventlet.spawn_n(self._run) 26 self.stop_event.wait()
27
28 - def wait_async(self):
29 _ = eventlet.spawn_n(self._run)
30
31 - def _run(self):
32 while True: 33 try: 34 resp = self.db.res.get("_changes", **self.params) 35 return self.consume(resp) 36 except (SystemExit, KeyboardInterrupt): 37 eventlet.sleep(5) 38 break 39 except: 40 traceback.print_exc() 41 eventlet.sleep(5) 42 break 43 self.stop_event.send(True)
44
45 - def consume(self, resp):
46 raise NotImplementedError
47
48 -class ContinuousChangeConsumer(ChangeConsumer):
49
50 - def consume(self, resp):
51 with resp.body_stream() as body: 52 while True: 53 line = body.readline() 54 if not line: 55 break 56 if line.endswith("\r\n"): 57 line = line[:-2] 58 else: 59 line = line[:-1] 60 if not line: 61 continue 62 self.process_change(line) 63 self.stop_event.send(True)
64 65
66 -class LongPollChangeConsumer(ChangeConsumer):
67
68 - def consume(self, resp):
69 with resp.body_stream() as body: 70 buf = [] 71 while True: 72 data = body.read() 73 if not data: 74 break 75 buf.append(data) 76 change = "".join(buf) 77 try: 78 change = json.loads(change) 79 except ValueError: 80 pass 81 self.process_change(change) 82 self.stop_event.send(True)
83 84
85 -class EventletConsumer(SyncConsumer):
86 - def __init__(self, db):
87 eventlet.monkey_patch(socket=True) 88 super(EventletConsumer, self).__init__(db)
89
90 - def _fetch(self, cb, **params):
91 resp = self.db.res.get("_changes", **params) 92 cb(resp.json_body)
93
94 - def fetch(self, cb=None, **params):
95 if cb is None: 96 return super(EventletConsumer, self).wait_once(**params) 97 eventlet.spawn_n(self._fetch, cb, **params)
98
99 - def wait_once(self, cb=None, **params):
100 if cb is None: 101 return super(EventletConsumer, self).wait_once(**params) 102 103 check_callable(cb) 104 params.update({"feed": "longpoll"}) 105 consumer = LongPollChangeConsumer(self.db, callback=cb, 106 **params) 107 consumer.wait()
108
109 - def wait(self, cb, **params):
110 params.update({"feed": "continuous"}) 111 consumer = ContinuousChangeConsumer(self.db, callback=cb, 112 **params) 113 consumer.wait()
114
115 - def wait_once_async(self, cb, **params):
116 check_callable(cb) 117 params.update({"feed": "longpoll"}) 118 consumer = LongPollChangeConsumer(self.db, callback=cb, 119 **params) 120 return consumer.wait_async()
121
122 - def wait_async(self, cb, **params):
123 check_callable(cb) 124 params.update({"feed": "continuous"}) 125 consumer = ContinuousChangeConsumer(self.db, callback=cb, 126 **params) 127 return consumer.wait_async()
128