068d404185d45f006b1a79821d573d9fcfade944
[didex.git] / didex / db.py
1 import time, threading, struct
2 from . import lib
3 from bsddb3 import db as bd
4
5 __all__ = ["environment", "database"]
6
7 deadlock = bd.DBLockDeadlockError
8
9 class environment(lib.closable):
10     def __init__(self, path, *, create=True, recover=False, mode=0o666):
11         self.env = bd.DBEnv()
12         self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
13         fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
14         if recover:
15             fl |= bd.DB_RECOVER
16         if create:
17             fl |= bd.DB_CREATE
18         self.env.open(path, fl, mode)
19         self.lastckp = self.lastarch = time.time()
20         self.lock = threading.Lock()
21         self.dbs = {}
22
23     def close(self):
24         env = self.env
25         if env is None:
26             return
27         env.close()
28         self.env = None
29
30     def maint(self):
31         now = time.time()
32         try:
33             if now - self.lastckp > 60:
34                 self.env.txn_checkpoint(1024)
35                 self.lastckp = now
36             if now - self.lastarch > 3600:
37                 self.env.log_archive(bd.DB_ARCH_REMOVE)
38                 self.lastarch = now
39         except deadlock:
40             pass
41
42     def db(self, name, create=True, mode=0o666):
43         with self.lock:
44             if name not in self.dbs:
45                 self.dbs[name] = database(self, name, create, mode)
46             return self.dbs[name]
47
48     def __del__(self):
49         self.close()
50     def __enter__(self):
51         return self
52     def __exit__(self, *excinfo):
53         self.close()
54         return False
55
56 def opendb(env, fnm, dnm, typ, fl, mode):
57     ret = bd.DB(env)
58     while True:
59         try:
60             self.main.open(fnm, dnm, typ, fl, mode)
61         except deadlock:
62             continue
63         return ret
64
65 class txn(object):
66     def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
67         self.tx = env.env.txn_begin(None, flags)
68         self.env = env
69         self.done = False
70         self.pcommit = set()
71
72     def commit(self):
73         self.done = True
74         self.tx.commit(0)
75         def run1(list):
76             if len(list) > 0:
77                 try:
78                     list[0]()
79                 finally:
80                     run1(list[1:])
81         run1(list(self.pcommit))
82         self.env.maint()
83
84     def abort(self):
85         self.done = True
86         self.tx.abort()
87
88     def __enter__(self):
89         return self
90
91     def __exit__(self, etype, exc, tb):
92         if not self.done:
93             self.abort()
94         return False
95
96     def postcommit(self, fun):
97         self.pcommit.add(fun)
98
99 def dloopfun(fun):
100     def wrapper(self, *args, **kwargs):
101         while True:
102             try:
103                 return fun(self, *args, **kwargs)
104             except deadlock:
105                 continue
106     return wrapper
107
108 def txnfun(envfun):
109     def fxf(fun):
110         def wrapper(self, *args, tx=None, **kwargs):
111             if tx is None:
112                 while True:
113                     try:
114                         with txn(envfun(self)) as ltx:
115                             ret = fun(self, *args, tx=ltx, **kwargs)
116                             ltx.commit()
117                             return ret
118                     except deadlock:
119                         continue
120             else:
121                 return fun(self, *args, tx=tx, **kwargs)
122         return wrapper
123     return fxf
124
125 class database(object):
126     def __init__(self, env, name, create, mode):
127         self.env = env
128         self.mode = mode
129         self.fnm = name
130         fl = bd.DB_THREAD
131         if create:
132             fl |= bd.DB_CREATE
133         self.cf = self._opendb("cf", bd.DB_HASH, fl)
134         self.ob = self._opendb("ob", bd.DB_HASH, fl)
135
136     @txnfun(lambda self: self.env)
137     def _opendb(self, dnm, typ, fl, init=None, *, tx):
138         ret = bd.DB(self.env.env)
139         if init: init(ret)
140         ret.open(self.fnm, dnm, typ, fl, self.mode, txn=tx.tx)
141         return ret
142
143     @txnfun(lambda self: self.env)
144     def _nextseq(self, *, tx):
145         if self.cf.has_key(b"seq", txn=tx.tx):
146             seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
147         else:
148             seq = 1
149         self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
150         return seq
151
152     @txnfun(lambda self: self.env)
153     def add(self, ob, *, tx):
154         seq = self._nextseq(tx=tx)
155         self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
156         return seq
157
158     @txnfun(lambda self: self.env)
159     def replace(self, id, ob, *, tx):
160         key = struct.pack(">Q", id)
161         if not self.ob.has_key(key, txn=tx.tx):
162             raise KeyError(id)
163         self.ob.put(key, ob, txn=tx.tx)
164
165     @txnfun(lambda self: self.env)
166     def get(self, id, *, tx):
167         ret = self.ob.get(struct.pack(">Q", id), None)
168         if ret is None:
169             raise KeyError(id)
170         return ret
171
172     @txnfun(lambda self: self.env)
173     def remove(self, id, *, tx):
174         key = struct.pack(">Q", id)
175         if not self.ob.has_key(key, txn=tx.tx):
176             raise KeyError(id)
177         self.ob.delete(key, txn=tx.tx)