1 import time, threading, struct
3 from bsddb3 import db as bd
5 deadlock = bd.DBLockDeadlockError
7 class environment(lib.closable):
8 def __init__(self, path, *, create=True, recover=False, mode=0o666):
10 self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
11 fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
16 self.env.open(path, fl, mode)
17 self.lastckp = self.lastarch = time.time()
18 self.lock = threading.Lock()
31 if now - self.lastckp > 60:
32 self.env.txn_checkpoint(1024)
34 if now - self.lastarch > 3600:
35 self.env.log_archive(bd.DB_ARCH_REMOVE)
40 def db(self, name, create=True, mode=0o666):
42 if name not in self.dbs:
43 self.dbs[name] = database(self, name, create, mode)
50 def __exit__(self, *excinfo):
54 def opendb(env, fnm, dnm, typ, fl, mode):
58 self.main.open(fnm, dnm, typ, fl, mode)
64 def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
65 self.tx = env.txn_begin(None, flags)
79 def __exit__(self, etype, exc, tb):
84 class database(object):
85 def __init__(self, env, name, create, mode):
89 fl = bd.DB_THREAD | bd.DB_AUTO_COMMIT
92 self.cf = self._opendb("cf", bd.DB_HASH, fl)
93 self.ob = self._opendb("ob", bd.DB_HASH, fl)
95 def _opendb(self, dnm, typ, fl, init=None):
96 ret = bd.DB(self.env.env)
100 ret.open(self.fnm, dnm, typ, fl, self.mode)
105 def _nextseq(self, tx):
106 if self.cf.has_key(b"seq", txn=tx.tx):
107 seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
110 self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
116 with txn(self.env.env) as tx:
117 seq = self._nextseq(tx)
118 self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
124 def replace(self, id, ob):
127 with txn(self.env.env) as tx:
128 key = struct.pack(">Q", id)
129 if not self.ob.has_key(key, txn=tx.tx):
131 self.ob.put(key, ob, txn=tx.tx)
140 return self.ob[struct.pack(">Q", id)]
142 raise KeyError(id) from None
146 def remove(self, id):
149 with txn(self.env.env) as tx:
150 self.ob.delete(struct.pack(">Q", id), txn=tx.tx)