Package couchdbkit :: Package consumer
[hide private]
[frames] | no frames]

Source Code for Package couchdbkit.consumer

  1  # -*- coding: utf-8 - 
  2  # 
  3  # This file is part of couchdbkit released under the MIT license.  
  4  # See the NOTICE for more information. 
  5   
  6   
  7  from .base import ConsumerBase 
  8   
  9  import pkg_resources 
 10   
11 -def load_consumer_class(uri):
12 if uri.startswith("egg:"): 13 # uses entry points 14 entry_str = uri.split("egg:")[1] 15 try: 16 dist, name = entry_str.rsplit("#",1) 17 except ValueError: 18 dist = entry_str 19 name = "sync" 20 21 return pkg_resources.load_entry_point(dist, 22 "couchdbkit.consumers", name) 23 else: 24 components = uri.split('.') 25 if len(components) == 1: 26 try: 27 if uri.startswith("#"): 28 uri = uri[1:] 29 return pkg_resources.load_entry_point("couchdbkit", 30 "couchdbkit.consumers", uri) 31 except ImportError: 32 raise RuntimeError("consumer backend invalid or not found") 33 klass = components.pop(-1) 34 mod = __import__('.'.join(components)) 35 for comp in components[1:]: 36 mod = getattr(mod, comp) 37 return getattr(mod, klass)
38
39 -class Consumer(object):
40 """ Database change consumer 41 42 Example Usage: 43 44 >>> from couchdbkit import Server, Consumer 45 >>> s = Server() 46 >>> db = s['testdb'] 47 >>> c = Consumer(db) 48 >>> def print_line(line): 49 ... print "got %s" % line 50 ... 51 >>> c.wait(print_line,since=0) # Go into receive loop 52 53 """ 54
55 - def __init__(self, db, backend='sync', **kwargs):
56 """ Constructor for the consumer 57 58 Args: 59 @param db: Database instance 60 @param backend: backend entry point uri 61 The default class (sync) erialize each call to registered 62 callbacks. Line processing should be fast in this case to not 63 wait on socket read. 64 65 A string referring to one of the following bundled classes: 66 67 * ``sync`` 68 * ``eventlet`` - Requires eventlet >= 0.9.7 69 * ``gevent`` - Requires gevent >= 0.12.2 (?) 70 71 You can optionnaly register in ``couchdbkit.consumers``entry point 72 your own worker. 73 """ 74 self.db = db 75 self.consumer_class = load_consumer_class(backend) 76 self._consumer = self.consumer_class(db, **kwargs)
77
78 - def fetch(self, cb=None, **params):
79 """ Fetch all changes and return. If since is specified, fetch all changes 80 since this doc sequence 81 82 Args: 83 @param params: kwargs 84 See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes) 85 86 @return: dict, change result 87 88 """ 89 return self._consumer.fetch(cb=cb, **params)
90
91 - def wait_once(self, cb=None, **params):
92 """Wait for one change and return (longpoll feed) 93 94 Args: 95 @param params: kwargs 96 See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes) 97 98 @return: dict, change result 99 """ 100 101 return self._consumer.wait_once(cb=cb, **params)
102
103 - def wait(self, cb, **params):
104 """ Wait for changes until the connection close (continuous feed) 105 106 Args: 107 @param params: kwargs 108 See Changes API (http://wiki.apache.org/couchdb/HTTP_database_API#Changes) 109 110 @return: dict, line of change 111 """ 112 return self._consumer.wait(cb, **params)
113
114 - def wait_once_async(self, cb, **params):
115 """ like wait_once but doesn't return anything. """ 116 return self._consumer.wait_once_async(cb=cb, **params)
117
118 - def wait_async(self, cb, **params):
119 """ like wait but doesn't return anything. """ 120 return self._consumer.wait_async(cb, **params)
121