1
2
3
4
5
6
7 from .base import ConsumerBase
8
9 import pkg_resources
10
12 if uri.startswith("egg:"):
13
14 entry_str = uri.split("egg:")[1]
15 try:
16 dist, name = entry_str.rsplit("#",1)
17 except ValueError:
18 dist = entry_str
19 name = "sync"
20
21 return pkg_resources.load_entry_point(dist,
22 "couchdbkit.consumers", name)
23 else:
24 components = uri.split('.')
25 if len(components) == 1:
26 try:
27 if uri.startswith("#"):
28 uri = uri[1:]
29 return pkg_resources.load_entry_point("couchdbkit",
30 "couchdbkit.consumers", uri)
31 except ImportError:
32 raise RuntimeError("consumer backend invalid or not found")
33 klass = components.pop(-1)
34 mod = __import__('.'.join(components))
35 for comp in components[1:]:
36 mod = getattr(mod, comp)
37 return getattr(mod, klass)
38
40 """ Database change consumer
41
42 Example Usage:
43
44 >>> from couchdbkit import Server, Consumer
45 >>> s = Server()
46 >>> db = s['testdb']
47 >>> c = Consumer(db)
48 >>> def print_line(line):
49 ... print "got %s" % line
50 ...
51 >>> c.wait(print_line,since=0) # Go into receive loop
52
53 """
54
55 - def __init__(self, db, backend='sync', **kwargs):
56 """ Constructor for the consumer
57
58 Args:
59 @param db: Database instance
60 @param backend: backend entry point uri
61 The default class (sync) erialize each call to registered
62 callbacks. Line processing should be fast in this case to not
63 wait on socket read.
64
65 A string referring to one of the following bundled classes:
66
67 * ``sync``
68 * ``eventlet`` - Requires eventlet >= 0.9.7
69 * ``gevent`` - Requires gevent >= 0.12.2 (?)
70
71 You can optionnaly register in ``couchdbkit.consumers``entry point
72 your own worker.
73 """
74 self.db = db
75 self.consumer_class = load_consumer_class(backend)
76 self._consumer = self.consumer_class(db, **kwargs)
77
78 - def fetch(self, cb=None, **params):
79 """ Fetch all changes and return. If since is specified, fetch all changes
80 since this doc sequence
81
82 Args:
83 @param params: kwargs
84 See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes)
85
86 @return: dict, change result
87
88 """
89 return self._consumer.fetch(cb=cb, **params)
90
92 """Wait for one change and return (longpoll feed)
93
94 Args:
95 @param params: kwargs
96 See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes)
97
98 @return: dict, change result
99 """
100
101 return self._consumer.wait_once(cb=cb, **params)
102
103 - def wait(self, cb, **params):
104 """ Wait for changes until the connection close (continuous feed)
105
106 Args:
107 @param params: kwargs
108 See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes)
109
110 @return: dict, line of change
111 """
112 return self._consumer.wait(cb, **params)
113
115 """ like wait_once but doesn't return anything. """
116 return self._consumer.wait_once_async(cb=cb, **params)
117
119 """ like wait but doesn't return anything. """
120 return self._consumer.wait_async(cb, **params)
121