Commit | Line | Data |
---|---|---|
a95055e8 FT |
1 | import time, threading, struct |
2 | from . import lib | |
3 | from bsddb3 import db as bd | |
4 | ||
cbf73d3a FT |
5 | __all__ = ["environment", "database"] |
6 | ||
a95055e8 FT |
7 | deadlock = bd.DBLockDeadlockError |
8 | ||
9 | class environment(lib.closable): | |
10 | def __init__(self, path, *, create=True, recover=False, mode=0o666): | |
11 | self.env = bd.DBEnv() | |
12 | self.env.set_lk_detect(bd.DB_LOCK_RANDOM) | |
13 | fl = bd.DB_THREAD | bd.DB_INIT_MPOOL | bd.DB_INIT_LOCK | bd.DB_INIT_LOG | bd.DB_INIT_TXN | |
14 | if recover: | |
15 | fl |= bd.DB_RECOVER | |
16 | if create: | |
17 | fl |= bd.DB_CREATE | |
18 | self.env.open(path, fl, mode) | |
19 | self.lastckp = self.lastarch = time.time() | |
20 | self.lock = threading.Lock() | |
21 | self.dbs = {} | |
22 | ||
23 | def close(self): | |
24 | env = self.env | |
25 | if env is None: | |
26 | return | |
27 | env.close() | |
28 | self.env = None | |
29 | ||
30 | def maint(self): | |
31 | now = time.time() | |
32 | try: | |
33 | if now - self.lastckp > 60: | |
34 | self.env.txn_checkpoint(1024) | |
35 | self.lastckp = now | |
36 | if now - self.lastarch > 3600: | |
37 | self.env.log_archive(bd.DB_ARCH_REMOVE) | |
38 | self.lastarch = now | |
39 | except deadlock: | |
40 | pass | |
41 | ||
42 | def db(self, name, create=True, mode=0o666): | |
43 | with self.lock: | |
44 | if name not in self.dbs: | |
45 | self.dbs[name] = database(self, name, create, mode) | |
46 | return self.dbs[name] | |
47 | ||
48 | def __del__(self): | |
49 | self.close() | |
50 | def __enter__(self): | |
51 | return self | |
52 | def __exit__(self, *excinfo): | |
53 | self.close() | |
54 | return False | |
55 | ||
56 | def opendb(env, fnm, dnm, typ, fl, mode): | |
57 | ret = bd.DB(env) | |
58 | while True: | |
59 | try: | |
60 | self.main.open(fnm, dnm, typ, fl, mode) | |
61 | except deadlock: | |
62 | continue | |
63 | return ret | |
64 | ||
65 | class txn(object): | |
66 | def __init__(self, env, flags=bd.DB_TXN_WRITE_NOSYNC): | |
67 | self.tx = env.txn_begin(None, flags) | |
68 | self.done = False | |
da5de014 | 69 | self.pcommit = set() |
a95055e8 FT |
70 | |
71 | def commit(self): | |
72 | self.done = True | |
73 | self.tx.commit(0) | |
da5de014 FT |
74 | def run1(list): |
75 | if len(list) > 0: | |
76 | try: | |
77 | list[0]() | |
78 | finally: | |
79 | run1(list[1:]) | |
80 | run1(list(self.pcommit)) | |
a95055e8 FT |
81 | |
82 | def abort(self): | |
83 | self.done = True | |
84 | self.tx.abort() | |
85 | ||
86 | def __enter__(self): | |
87 | return self | |
88 | ||
89 | def __exit__(self, etype, exc, tb): | |
90 | if not self.done: | |
91 | self.abort() | |
92 | return False | |
93 | ||
da5de014 FT |
94 | def postcommit(self, fun): |
95 | self.pcommit.add(fun) | |
96 | ||
6efe4e23 FT |
97 | def dloopfun(fun): |
98 | def wrapper(self, *args, **kwargs): | |
99 | while True: | |
100 | try: | |
101 | return fun(self, *args, **kwargs) | |
102 | except deadlock: | |
103 | continue | |
104 | return wrapper | |
105 | ||
8950191c FT |
106 | def txnfun(envfun): |
107 | def fxf(fun): | |
108 | def wrapper(self, *args, tx=None, **kwargs): | |
109 | if tx is None: | |
110 | while True: | |
111 | try: | |
112 | with txn(envfun(self)) as ltx: | |
113 | ret = fun(self, *args, tx=ltx, **kwargs) | |
114 | ltx.commit() | |
115 | return ret | |
116 | except deadlock: | |
117 | continue | |
118 | else: | |
119 | return fun(self, *args, tx=tx, **kwargs) | |
120 | return wrapper | |
121 | return fxf | |
122 | ||
a95055e8 FT |
123 | class database(object): |
124 | def __init__(self, env, name, create, mode): | |
125 | self.env = env | |
126 | self.mode = mode | |
127 | self.fnm = name | |
73761d10 | 128 | fl = bd.DB_THREAD |
a95055e8 FT |
129 | if create: |
130 | fl |= bd.DB_CREATE | |
131 | self.cf = self._opendb("cf", bd.DB_HASH, fl) | |
132 | self.ob = self._opendb("ob", bd.DB_HASH, fl) | |
133 | ||
73761d10 FT |
134 | @txnfun(lambda self: self.env.env) |
135 | def _opendb(self, dnm, typ, fl, init=None, *, tx): | |
a95055e8 FT |
136 | ret = bd.DB(self.env.env) |
137 | if init: init(ret) | |
73761d10 FT |
138 | ret.open(self.fnm, dnm, typ, fl, self.mode, txn=tx.tx) |
139 | return ret | |
a95055e8 | 140 | |
8950191c FT |
141 | @txnfun(lambda self: self.env.env) |
142 | def _nextseq(self, *, tx): | |
a95055e8 FT |
143 | if self.cf.has_key(b"seq", txn=tx.tx): |
144 | seq = struct.unpack(">Q", self.cf.get(b"seq", txn=tx.tx))[0] | |
145 | else: | |
146 | seq = 1 | |
147 | self.cf.put(b"seq", struct.pack(">Q", seq + 1), txn=tx.tx) | |
148 | return seq | |
149 | ||
8950191c FT |
150 | @txnfun(lambda self: self.env.env) |
151 | def add(self, ob, *, tx): | |
152 | seq = self._nextseq(tx=tx) | |
153 | self.ob.put(struct.pack(">Q", seq), ob, txn=tx.tx, flags=bd.DB_NOOVERWRITE) | |
154 | return seq | |
a95055e8 | 155 | |
8950191c FT |
156 | @txnfun(lambda self: self.env.env) |
157 | def replace(self, id, ob, *, tx): | |
158 | key = struct.pack(">Q", id) | |
159 | if not self.ob.has_key(key, txn=tx.tx): | |
160 | raise KeyError(id) | |
161 | self.ob.put(key, ob, txn=tx.tx) | |
162 | ||
163 | @txnfun(lambda self: self.env.env) | |
164 | def get(self, id, *, tx): | |
165 | ret = self.ob.get(struct.pack(">Q", id), None) | |
166 | if ret is None: | |
167 | raise KeyError(id) | |
168 | return ret | |
a95055e8 | 169 | |
8950191c FT |
170 | @txnfun(lambda self: self.env.env) |
171 | def remove(self, id, *, tx): | |
172 | key = struct.pack(">Q", id) | |
173 | if not self.ob.has_key(key, txn=tx.tx): | |
174 | raise KeyError(id) | |
175 | self.ob.delete(key, txn=tx.tx) |