Package couchdbkit :: Module utils
[hide private]
[frames] | no frames]

Source Code for Module couchdbkit.utils

  1  # -*- coding: utf-8 - 
  2  # 
  3  # This file is part of couchdbkit released under the MIT license.  
  4  # See the NOTICE for more information. 
  5   
  6   
  7  """ 
  8  Mostly utility functions couchdbkit uses internally that don't 
  9  really belong anywhere else in the modules. 
 10  """ 
 11  from __future__ import with_statement 
 12   
 13  import codecs 
 14  import string 
 15  from calendar import timegm 
 16  import datetime 
 17  import decimal 
 18  from hashlib import md5 
 19  import os 
 20  import re 
 21  import sys 
 22  import time 
 23  import urllib 
 24   
 25   
 26  try: 
 27      import json 
 28  except ImportError: 
 29      try: 
 30          import simplejson as json 
 31      except ImportError: 
 32          raise ImportError("""simplejson isn't installed 
 33   
 34  Install it with the command: 
 35   
 36      pip install simplejson 
 37  """) 
 38    
 39   
 40  # backport relpath from python2.6 
 41  if not hasattr(os.path, 'relpath'): 
 42      if os.name == "nt": 
43 - def splitunc(p):
44 if p[1:2] == ':': 45 return '', p # Drive letter present 46 firstTwo = p[0:2] 47 if firstTwo == '//' or firstTwo == '\\\\': 48 # is a UNC path: 49 # vvvvvvvvvvvvvvvvvvvv equivalent to drive letter 50 # \\machine\mountpoint\directories... 51 # directory ^^^^^^^^^^^^^^^ 52 normp = os.path.normcase(p) 53 index = normp.find('\\', 2) 54 if index == -1: 55 ##raise RuntimeError, 'illegal UNC path: "' + p + '"' 56 return ("", p) 57 index = normp.find('\\', index + 1) 58 if index == -1: 59 index = len(p) 60 return p[:index], p[index:] 61 return '', p
62
63 - def relpath(path, start=os.path.curdir):
64 """Return a relative version of a path""" 65 66 if not path: 67 raise ValueError("no path specified") 68 start_list = os.path.abspath(start).split(os.path.sep) 69 path_list = os.path.abspath(path).split(os.path.sep) 70 if start_list[0].lower() != path_list[0].lower(): 71 unc_path, rest = splitunc(path) 72 unc_start, rest = splitunc(start) 73 if bool(unc_path) ^ bool(unc_start): 74 raise ValueError("Cannot mix UNC and non-UNC paths (%s and %s)" 75 % (path, start)) 76 else: 77 raise ValueError("path is on drive %s, start on drive %s" 78 % (path_list[0], start_list[0])) 79 # Work out how much of the filepath is shared by start and path. 80 for i in range(min(len(start_list), len(path_list))): 81 if start_list[i].lower() != path_list[i].lower(): 82 break 83 else: 84 i += 1 85 86 rel_list = [os.path.pardir] * (len(start_list)-i) + path_list[i:] 87 if not rel_list: 88 return curdir 89 return os.path.join(*rel_list)
90 else:
91 - def relpath(path, start=os.path.curdir):
92 """Return a relative version of a path""" 93 94 if not path: 95 raise ValueError("no path specified") 96 97 start_list = os.path.abspath(start).split(os.path.sep) 98 path_list = os.path.abspath(path).split(os.path.sep) 99 100 # Work out how much of the filepath is shared by start and path. 101 i = len(os.path.commonprefix([start_list, path_list])) 102 103 rel_list = [os.path.pardir] * (len(start_list)-i) + path_list[i:] 104 if not rel_list: 105 return curdir 106 return os.path.join(*rel_list)
107 else: 108 relpath = os.path.relpath 109
110 -def split_path(path):
111 parts = [] 112 while True: 113 head, tail = os.path.split(path) 114 parts = [tail] + parts 115 path = head 116 if not path: break 117 return parts
118 119 VALID_DB_NAME = re.compile(r'^[a-z][a-z0-9_$()+-/]*$')
120 -def validate_dbname(name):
121 """ validate dbname """ 122 if not VALID_DB_NAME.match(urllib.unquote(name)): 123 raise ValueError("Invalid db name: '%s'" % name)
124
125 -def to_bytestring(s):
126 """ convert to bytestring an unicode """ 127 if not isinstance(s, basestring): 128 return s 129 if isinstance(s, unicode): 130 return s.encode('utf-8') 131 else: 132 return s
133
134 -def read_file(fname, utf8=True, force_read=False):
135 """ read file content""" 136 if utf8: 137 try: 138 with codecs.open(fname, 'rb', "utf-8") as f: 139 data = f.read() 140 except UnicodeError, e: 141 if force_read: 142 return read_file(fname, utf8=False) 143 else: 144 with open(fname, 'rb') as f: 145 data = f.read() 146 return data
147
148 -def sign_file(file_path):
149 """ return md5 hash from file content 150 151 :attr file_path: string, path of file 152 153 :return: string, md5 hexdigest 154 """ 155 if os.path.isfile(file_path): 156 content = read_file(file_path, force_read=True) 157 return md5(to_bytestring(content)).hexdigest() 158 return ''
159
160 -def write_content(fname, content):
161 """ write content in a file 162 163 :attr fname: string,filename 164 :attr content: string 165 """ 166 f = open(fname, 'wb') 167 f.write(to_bytestring(content)) 168 f.close()
169
170 -def write_json(filename, content):
171 """ serialize content in json and save it 172 173 :attr filename: string 174 :attr content: string 175 176 """ 177 write_content(filename, json.dumps(content))
178
179 -def read_json(filename, use_environment=False):
180 """ read a json file and deserialize 181 182 :attr filename: string 183 :attr use_environment: boolean, default is False. If 184 True, replace environment variable by their value in file 185 content 186 187 :return: dict or list 188 """ 189 try: 190 data = read_file(filename, force_read=True) 191 except IOError, e: 192 if e[0] == 2: 193 return {} 194 raise 195 196 if use_environment: 197 data = string.Template(data).substitute(os.environ) 198 199 try: 200 data = json.loads(data) 201 except ValueError: 202 print >>sys.stderr, "Json is invalid, can't load %s" % filename 203 raise 204 return data
205