1#!/usr/bin/env python
2
3"""PickleShare - a small 'shelve' like datastore with concurrency support
4
5Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6shelve, many processes can access the database simultaneously. Changing a
7value in database is immediately visible to other processes accessing the
8same database.
9
10Concurrency is possible because the values are stored in separate files. Hence
11the "database" is a directory where *all* files are governed by PickleShare.
12
13Example usage::
14
15 from pickleshare import *
16 db = PickleShareDB('~/testpickleshare')
17 db.clear()
18 print "Should be empty:",db.items()
19 db['hello'] = 15
20 db['aku ankka'] = [1,2,313]
21 db['paths/are/ok/key'] = [1,(5,46)]
22 print db.keys()
23 del db['aku ankka']
24
25This module is certainly not ZODB, but can be used for low-load
26(non-mission-critical) situations where tiny code size trumps the
27advanced features of a "real" object database.
28
29Installation guide: pip install pickleshare
30
31Author: Ville Vainio <vivainio@gmail.com>
32License: MIT open source license.
33
34"""
35
36from __future__ import print_function
37
38
39__version__ = "0.7.5"
40
41try:
42 from pathlib import Path
43except ImportError:
44 # Python 2 backport
45 from pathlib2 import Path
46
47import os, stat, time
48
49try:
50 import collections.abc as collections_abc
51except ImportError:
52 import collections as collections_abc
53try:
54 import cPickle as pickle
55except ImportError:
56 import pickle
57import errno
58import sys
59
60if sys.version_info[0] >= 3:
61 string_types = (str,)
62else:
63 string_types = (str, unicode)
64
65
66def gethashfile(key):
67 return ("%02x" % abs(hash(key) % 256))[-2:]
68
69
70_sentinel = object()
71
72
73class PickleShareDB(collections_abc.MutableMapping):
74 """The main 'connection' object for PickleShare database"""
75
76 def __init__(self, root):
77 """Return a db object that will manage the specied directory"""
78 if not isinstance(root, string_types):
79 root = str(root)
80 root = os.path.abspath(os.path.expanduser(root))
81 self.root = Path(root)
82 if not self.root.is_dir():
83 # catching the exception is necessary if multiple processes are concurrently trying to create a folder
84 # exists_ok keyword argument of mkdir does the same but only from Python 3.5
85 try:
86 self.root.mkdir(parents=True)
87 except OSError as e:
88 if e.errno != errno.EEXIST:
89 raise
90 # cache has { 'key' : (obj, orig_mod_time) }
91 self.cache = {}
92
93 def __getitem__(self, key):
94 """db['key'] reading"""
95 fil = self.root / key
96 try:
97 mtime = fil.stat()[stat.ST_MTIME]
98 except OSError:
99 raise KeyError(key)
100
101 if fil in self.cache and mtime == self.cache[fil][1]:
102 return self.cache[fil][0]
103 try:
104 # The cached item has expired, need to read
105 with fil.open("rb") as f:
106 obj = pickle.loads(f.read())
107 except:
108 raise KeyError(key)
109
110 self.cache[fil] = (obj, mtime)
111 return obj
112
113 def __setitem__(self, key, value):
114 """db['key'] = 5"""
115 fil = self.root / key
116 parent = fil.parent
117 if parent and not parent.is_dir():
118 parent.mkdir(parents=True)
119 # We specify protocol 2, so that we can mostly go between Python 2
120 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
121 with fil.open("wb") as f:
122 pickle.dump(value, f, protocol=2)
123 try:
124 self.cache[fil] = (value, fil.stat().st_mtime)
125 except OSError as e:
126 if e.errno != errno.ENOENT:
127 raise
128
129 def hset(self, hashroot, key, value):
130 """hashed set"""
131 hroot = self.root / hashroot
132 if not hroot.is_dir():
133 hroot.mkdir()
134 hfile = hroot / gethashfile(key)
135 d = self.get(hfile, {})
136 d.update({key: value})
137 self[hfile] = d
138
139 def hget(self, hashroot, key, default=_sentinel, fast_only=True):
140 """hashed get"""
141 hroot = self.root / hashroot
142 hfile = hroot / gethashfile(key)
143
144 d = self.get(hfile, _sentinel)
145 # print "got dict",d,"from",hfile
146 if d is _sentinel:
147 if fast_only:
148 if default is _sentinel:
149 raise KeyError(key)
150
151 return default
152
153 # slow mode ok, works even after hcompress()
154 d = self.hdict(hashroot)
155
156 return d.get(key, default)
157
158 def hdict(self, hashroot):
159 """Get all data contained in hashed category 'hashroot' as dict"""
160 hfiles = self.keys(hashroot + "/*")
161 hfiles.sort()
162 last = len(hfiles) and hfiles[-1] or ""
163 if last.endswith("xx"):
164 # print "using xx"
165 hfiles = [last] + hfiles[:-1]
166
167 all = {}
168
169 for f in hfiles:
170 # print "using",f
171 try:
172 all.update(self[f])
173 except KeyError:
174 print("Corrupt", f, "deleted - hset is not threadsafe!")
175 del self[f]
176
177 self.uncache(f)
178
179 return all
180
181 def hcompress(self, hashroot):
182 """Compress category 'hashroot', so hset is fast again
183
184 hget will fail if fast_only is True for compressed items (that were
185 hset before hcompress).
186
187 """
188 hfiles = self.keys(hashroot + "/*")
189 all = {}
190 for f in hfiles:
191 # print "using",f
192 all.update(self[f])
193 self.uncache(f)
194
195 self[hashroot + "/xx"] = all
196 for f in hfiles:
197 p = self.root / f
198 if p.name == "xx":
199 continue
200 p.unlink()
201
202 def __delitem__(self, key):
203 """del db["key"]"""
204 fil = self.root / key
205 self.cache.pop(fil, None)
206 try:
207 fil.unlink()
208 except OSError:
209 # notfound and permission denied are ok - we
210 # lost, the other process wins the conflict
211 pass
212
213 def _normalized(self, p):
214 """Make a key suitable for user's eyes"""
215 return str(p.relative_to(self.root)).replace("\\", "/")
216
217 def keys(self, globpat=None):
218 """All keys in DB, or all keys matching a glob"""
219
220 if globpat is None:
221 files = self.root.rglob("*")
222 else:
223 files = self.root.glob(globpat)
224 return [self._normalized(p) for p in files if p.is_file()]
225
226 def __iter__(self):
227 return iter(self.keys())
228
229 def __len__(self):
230 return len(self.keys())
231
232 def uncache(self, *items):
233 """Removes all, or specified items from cache
234
235 Use this after reading a large amount of large objects
236 to free up memory, when you won't be needing the objects
237 for a while.
238
239 """
240 if not items:
241 self.cache = {}
242 for it in items:
243 self.cache.pop(it, None)
244
245 def waitget(self, key, maxwaittime=60):
246 """Wait (poll) for a key to get a value
247
248 Will wait for `maxwaittime` seconds before raising a KeyError.
249 The call exits normally if the `key` field in db gets a value
250 within the timeout period.
251
252 Use this for synchronizing different processes or for ensuring
253 that an unfortunately timed "db['key'] = newvalue" operation
254 in another process (which causes all 'get' operation to cause a
255 KeyError for the duration of pickling) won't screw up your program
256 logic.
257 """
258
259 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
260 tries = 0
261 waited = 0
262 while 1:
263 try:
264 val = self[key]
265 return val
266 except KeyError:
267 pass
268
269 if waited > maxwaittime:
270 raise KeyError(key)
271
272 time.sleep(wtimes[tries])
273 waited += wtimes[tries]
274 if tries < len(wtimes) - 1:
275 tries += 1
276
277 def getlink(self, folder):
278 """Get a convenient link for accessing items"""
279 return PickleShareLink(self, folder)
280
281 def __repr__(self):
282 return "PickleShareDB('%s')" % self.root
283
284
285class PickleShareLink:
286 """A shortdand for accessing nested PickleShare data conveniently.
287
288 Created through PickleShareDB.getlink(), example::
289
290 lnk = db.getlink('myobjects/test')
291 lnk.foo = 2
292 lnk.bar = lnk.foo + 5
293
294 """
295
296 def __init__(self, db, keydir):
297 self.__dict__.update(locals())
298
299 def __getattr__(self, key):
300 return self.__dict__["db"][self.__dict__["keydir"] + "/" + key]
301
302 def __setattr__(self, key, val):
303 self.db[self.keydir + "/" + key] = val
304
305 def __repr__(self):
306 db = self.__dict__["db"]
307 keys = db.keys(self.__dict__["keydir"] + "/*")
308 return "<PickleShareLink '%s': %s>" % (
309 self.__dict__["keydir"],
310 ";".join([Path(k).basename() for k in keys]),
311 )
312
313
314def main():
315 import textwrap
316
317 usage = textwrap.dedent(
318 """\
319 pickleshare - manage PickleShare databases
320
321 Usage:
322
323 pickleshare dump /path/to/db > dump.txt
324 pickleshare load /path/to/db < dump.txt
325 pickleshare test /path/to/db
326 """
327 )
328 DB = PickleShareDB
329 import sys
330
331 if len(sys.argv) < 2:
332 print(usage)
333 return
334
335 cmd = sys.argv[1]
336 args = sys.argv[2:]
337 if cmd == "dump":
338 if not args:
339 args = ["."]
340 db = DB(args[0])
341 import pprint
342
343 pprint.pprint(db.items())
344 elif cmd == "load":
345 cont = sys.stdin.read()
346 db = DB(args[0])
347 data = eval(cont)
348 db.clear()
349 for k, v in db.items():
350 db[k] = v
351 elif cmd == "testwait":
352 db = DB(args[0])
353 db.clear()
354 print(db.waitget("250"))
355 elif cmd == "test":
356 test()
357 stress()
358
359
360if __name__ == "__main__":
361 main()