1 import time, threading, struct
3 from bsddb3 import db as bd
5 __all__ = ["environment", "database"]
7 deadlock = bd.DBLockDeadlockError
9 class environment(lib.closable):
10 def __init__(self, path, *, create=True, recover=False, mode=0o666):
12 self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
13 fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
18 self.env.open(path, fl, mode)
19 self.lastckp = self.lastarch = time.time()
20 self.lock = threading.Lock()
33 if now - self.lastckp > 60:
34 self.env.txn_checkpoint(1024)
36 if now - self.lastarch > 3600:
37 self.env.log_archive(bd.DB_ARCH_REMOVE)
42 def db(self, name, create=True, mode=0o666):
44 if name not in self.dbs:
45 self.dbs[name] = database(self, name, create, mode)
52 def __exit__(self, *excinfo):
56 def opendb(env, fnm, dnm, typ, fl, mode):
60 self.main.open(fnm, dnm, typ, fl, mode)
66 def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
67 self.tx = env.env.txn_begin(None, flags)
81 run1(list(self.pcommit))
91 def __exit__(self, etype, exc, tb):
96 def postcommit(self, fun):
100 def wrapper(self, *args, **kwargs):
103 return fun(self, *args, **kwargs)
110 def wrapper(self, *args, tx=None, **kwargs):
114 with txn(envfun(self)) as ltx:
115 ret = fun(self, *args, tx=ltx, **kwargs)
121 return fun(self, *args, tx=tx, **kwargs)
125 class database(object):
126 def __init__(self, env, name, create, mode):
133 self.cf = self._opendb("cf", bd.DB_HASH, fl)
134 self.ob = self._opendb("ob", bd.DB_HASH, fl)
136 @txnfun(lambda self: self.env)
137 def _opendb(self, dnm, typ, fl, init=None, *, tx):
138 ret = bd.DB(self.env.env)
140 ret.open(self.fnm, dnm, typ, fl, self.mode, txn=tx.tx)
143 @txnfun(lambda self: self.env)
144 def _nextseq(self, *, tx):
145 if self.cf.has_key(b"seq", txn=tx.tx):
146 seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
149 self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
152 @txnfun(lambda self: self.env)
153 def add(self, ob, *, tx):
154 seq = self._nextseq(tx=tx)
155 self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
158 @txnfun(lambda self: self.env)
159 def replace(self, id, ob, *, tx):
160 key = struct.pack(">Q", id)
161 if not self.ob.has_key(key, txn=tx.tx):
163 self.ob.put(key, ob, txn=tx.tx)
165 @txnfun(lambda self: self.env)
166 def get(self, id, *, tx):
167 ret = self.ob.get(struct.pack(">Q", id), None)
172 @txnfun(lambda self: self.env)
173 def remove(self, id, *, tx):
174 key = struct.pack(">Q", id)
175 if not self.ob.has_key(key, txn=tx.tx):
177 self.ob.delete(key, txn=tx.tx)