Typo fix.
[didex.git] / didex / db.py
CommitLineData
a95055e8
FT
1import time, threading, struct
2from . import lib
3from bsddb3 import db as bd
4
cbf73d3a
FT
5__all__ = ["environment", "database"]
6
a95055e8
FT
7deadlock = bd.DBLockDeadlockError
8
9class environment(lib.closable):
10 def __init__(self, path, *, create=True, recover=False, mode=0o666):
11 self.env = bd.DBEnv()
12 self.env.set_lk_detect(bd.DB_LOCK_RANDOM)
13 fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN
14 if recover:
15 fl |= bd.DB_RECOVER
16 if create:
17 fl |= bd.DB_CREATE
18 self.env.open(path, fl, mode)
19 self.lastckp = self.lastarch = time.time()
20 self.lock = threading.Lock()
21 self.dbs = {}
22
23 def close(self):
24 env = self.env
25 if env is None:
26 return
27 env.close()
28 self.env = None
29
30 def maint(self):
31 now = time.time()
32 try:
33 if now - self.lastckp > 60:
34 self.env.txn_checkpoint(1024)
35 self.lastckp = now
36 if now - self.lastarch > 3600:
37 self.env.log_archive(bd.DB_ARCH_REMOVE)
38 self.lastarch = now
39 except deadlock:
40 pass
41
42 def db(self, name, create=True, mode=0o666):
43 with self.lock:
44 if name not in self.dbs:
45 self.dbs[name] = database(self, name, create, mode)
46 return self.dbs[name]
47
48 def __del__(self):
49 self.close()
50 def __enter__(self):
51 return self
52 def __exit__(self, *excinfo):
53 self.close()
54 return False
55
56def opendb(env, fnm, dnm, typ, fl, mode):
57 ret = bd.DB(env)
58 while True:
59 try:
60 self.main.open(fnm, dnm, typ, fl, mode)
61 except deadlock:
62 continue
63 return ret
64
65class txn(object):
66 def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC):
67 self.tx = env.txn_begin(None, flags)
68 self.done = False
da5de014 69 self.pcommit = set()
a95055e8
FT
70
71 def commit(self):
72 self.done = True
73 self.tx.commit(0)
da5de014
FT
74 def run1(list):
75 if len(list) > 0:
76 try:
77 list[0]()
78 finally:
79 run1(list[1:])
80 run1(list(self.pcommit))
a95055e8
FT
81
82 def abort(self):
83 self.done = True
84 self.tx.abort()
85
86 def __enter__(self):
87 return self
88
89 def __exit__(self, etype, exc, tb):
90 if not self.done:
91 self.abort()
92 return False
93
da5de014
FT
94 def postcommit(self, fun):
95 self.pcommit.add(fun)
96
6efe4e23
FT
97def dloopfun(fun):
98 def wrapper(self, *args, **kwargs):
99 while True:
100 try:
101 return fun(self, *args, **kwargs)
102 except deadlock:
103 continue
104 return wrapper
105
8950191c
FT
106def txnfun(envfun):
107 def fxf(fun):
108 def wrapper(self, *args, tx=None, **kwargs):
109 if tx is None:
110 while True:
111 try:
112 with txn(envfun(self)) as ltx:
113 ret = fun(self, *args, tx=ltx, **kwargs)
114 ltx.commit()
115 return ret
116 except deadlock:
117 continue
118 else:
119 return fun(self, *args, tx=tx, **kwargs)
120 return wrapper
121 return fxf
122
a95055e8
FT
123class database(object):
124 def __init__(self, env, name, create, mode):
125 self.env = env
126 self.mode = mode
127 self.fnm = name
73761d10 128 fl = bd.DB_THREAD
a95055e8
FT
129 if create:
130 fl |= bd.DB_CREATE
131 self.cf = self._opendb("cf", bd.DB_HASH, fl)
132 self.ob = self._opendb("ob", bd.DB_HASH, fl)
133
73761d10
FT
134 @txnfun(lambda self: self.env.env)
135 def _opendb(self, dnm, typ, fl, init=None, *, tx):
a95055e8
FT
136 ret = bd.DB(self.env.env)
137 if init: init(ret)
73761d10
FT
138 ret.open(self.fnm, dnm, typ, fl, self.mode, txn=tx.tx)
139 return ret
a95055e8 140
8950191c
FT
141 @txnfun(lambda self: self.env.env)
142 def _nextseq(self, *, tx):
a95055e8
FT
143 if self.cf.has_key(b"seq", txn=tx.tx):
144 seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0]
145 else:
146 seq = 1
147 self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx)
148 return seq
149
8950191c
FT
150 @txnfun(lambda self: self.env.env)
151 def add(self, ob, *, tx):
152 seq = self._nextseq(tx=tx)
153 self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE)
154 return seq
a95055e8 155
8950191c
FT
156 @txnfun(lambda self: self.env.env)
157 def replace(self, id, ob, *, tx):
158 key = struct.pack(">Q", id)
159 if not self.ob.has_key(key, txn=tx.tx):
160 raise KeyError(id)
161 self.ob.put(key, ob, txn=tx.tx)
162
163 @txnfun(lambda self: self.env.env)
164 def get(self, id, *, tx):
165 ret = self.ob.get(struct.pack(">Q", id), None)
166 if ret is None:
167 raise KeyError(id)
168 return ret
a95055e8 169
8950191c
FT
170 @txnfun(lambda self: self.env.env)
171 def remove(self, id, *, tx):
172 key = struct.pack(">Q", id)
173 if not self.ob.has_key(key, txn=tx.tx):
174 raise KeyError(id)
175 self.ob.delete(key, txn=tx.tx)