python: Gather handler names better.
[ashd.git] / python3 / ashd / serve.py
CommitLineData
a962c944 1import sys, os, threading, time, logging, select, queue
46adc298 2from . import perf
552a70bf
FT
3
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
7
8def reqseq():
9 global seq
10 with seqlk:
11 s = seq
12 seq += 1
13 return s
14
552a70bf
FT
15class closed(IOError):
16 def __init__(self):
17 super().__init__("The client has closed the connection.")
18
46adc298
FT
19class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
24
25class wsgirequest(object):
26 def __init__(self, handler):
552a70bf
FT
27 self.status = None
28 self.headers = []
29 self.respsent = False
46adc298
FT
30 self.handler = handler
31 self.buffer = bytearray()
552a70bf
FT
32
33 def handlewsgi(self):
34 raise Exception()
46adc298
FT
35 def fileno(self):
36 raise Exception()
552a70bf
FT
37 def writehead(self, status, headers):
38 raise Exception()
46adc298 39 def flush(self):
552a70bf 40 raise Exception()
46adc298
FT
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
552a70bf
FT
45
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
52
46adc298
FT
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
59
552a70bf
FT
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
46adc298 62 if exc_info:
552a70bf
FT
63 try:
64 if self.respsent:
65 raise exc_info[1]
66 finally:
46adc298 67 exc_info = None
552a70bf
FT
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
46adc298
FT
73
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
dd1e6b98
FT
78 while len(req.buffer) > 0:
79 rls, wls, els = select.select([], [req], [req])
80 req.flush()
46adc298
FT
81 def close(self):
82 pass
83
8db41888
FT
84 @classmethod
85 def parseargs(cls, **args):
86 if len(args) > 0:
87 raise ValueError("unknown handler argument: " + next(iter(args)))
88 return {}
89
dd1e6b98 90class single(handler):
5345dcaf
FT
91 cname = "single"
92
dd1e6b98
FT
93 def handle(self, req):
94 try:
95 env = req.mkenv()
96 with perf.request(env) as reqevent:
97 respiter = req.handlewsgi(env, req.startreq)
98 for data in respiter:
99 req.write(data)
100 if req.status:
101 reqevent.response([req.status, req.headers])
102 req.flushreq()
103 self.ckflush(req)
104 except closed:
105 pass
106 except:
107 log.error("exception occurred when handling request", exc_info=True)
108 finally:
109 req.close()
110
46adc298 111class freethread(handler):
5345dcaf
FT
112 cname = "free"
113
3a3b78e3 114 def __init__(self, *, max=None, timeout=None, **kw):
46adc298
FT
115 super().__init__(**kw)
116 self.current = set()
117 self.lk = threading.Lock()
002ee932
FT
118 self.tcond = threading.Condition(self.lk)
119 self.max = max
3a3b78e3 120 self.timeout = timeout
002ee932
FT
121
122 @classmethod
3a3b78e3 123 def parseargs(cls, *, max=None, abort=None, **args):
002ee932
FT
124 ret = super().parseargs(**args)
125 if max:
126 ret["max"] = int(max)
3a3b78e3
FT
127 if abort:
128 ret["timeout"] = int(abort)
002ee932 129 return ret
46adc298
FT
130
131 def handle(self, req):
002ee932 132 with self.lk:
3a3b78e3
FT
133 if self.max is not None:
134 if self.timeout is not None:
135 now = start = time.time()
136 while len(self.current) >= self.max:
137 self.tcond.wait(start + self.timeout - now)
138 now = time.time()
139 if now - start > self.timeout:
140 os.abort()
141 else:
142 while len(self.current) >= self.max:
143 self.tcond.wait()
002ee932
FT
144 th = reqthread(target=self.run, args=[req])
145 th.start()
146 while th.is_alive() and th not in self.current:
147 self.tcond.wait()
46adc298 148
46adc298 149 def run(self, req):
552a70bf 150 try:
46adc298
FT
151 th = threading.current_thread()
152 with self.lk:
153 self.current.add(th)
002ee932 154 self.tcond.notify_all()
552a70bf 155 try:
46adc298
FT
156 env = req.mkenv()
157 with perf.request(env) as reqevent:
158 respiter = req.handlewsgi(env, req.startreq)
159 for data in respiter:
160 req.write(data)
161 if req.status:
162 reqevent.response([req.status, req.headers])
163 req.flushreq()
164 self.ckflush(req)
165 except closed:
166 pass
167 except:
168 log.error("exception occurred when handling request", exc_info=True)
552a70bf 169 finally:
46adc298
FT
170 with self.lk:
171 self.current.remove(th)
002ee932 172 self.tcond.notify_all()
46adc298
FT
173 finally:
174 req.close()
175
176 def close(self):
177 while True:
178 with self.lk:
179 if len(self.current) > 0:
180 th = next(iter(self.current))
181 else:
8db41888 182 return
46adc298
FT
183 th.join()
184
a962c944 185class resplex(handler):
5345dcaf
FT
186 cname = "rplex"
187
48dc900a 188 def __init__(self, *, max=None, **kw):
a962c944
FT
189 super().__init__(**kw)
190 self.current = set()
191 self.lk = threading.Lock()
48dc900a
FT
192 self.tcond = threading.Condition(self.lk)
193 self.max = max
a962c944
FT
194 self.cqueue = queue.Queue(5)
195 self.cnpipe = os.pipe()
196 self.rthread = reqthread(name="Response thread", target=self.handle2)
197 self.rthread.start()
198
48dc900a
FT
199 @classmethod
200 def parseargs(cls, *, max=None, **args):
201 ret = super().parseargs(**args)
202 if max:
203 ret["max"] = int(max)
204 return ret
205
a962c944
FT
206 def ckflush(self, req):
207 raise Exception("resplex handler does not support the write() function")
208
209 def handle(self, req):
48dc900a
FT
210 with self.lk:
211 if self.max is not None:
212 while len(self.current) >= self.max:
213 self.tcond.wait()
214 th = reqthread(target=self.handle1, args=[req])
215 th.start()
216 while th.is_alive() and th not in self.current:
217 self.tcond.wait()
a962c944
FT
218
219 def handle1(self, req):
220 try:
221 th = threading.current_thread()
222 with self.lk:
223 self.current.add(th)
48dc900a 224 self.tcond.notify_all()
a962c944
FT
225 try:
226 env = req.mkenv()
227 respobj = req.handlewsgi(env, req.startreq)
228 respiter = iter(respobj)
229 if not req.status:
230 log.error("request handler returned without calling start_request")
231 if hasattr(respiter, "close"):
232 respiter.close()
233 return
234 else:
235 self.cqueue.put((req, respiter))
236 os.write(self.cnpipe[1], b" ")
237 req = None
238 finally:
48dc900a
FT
239 with self.lk:
240 self.current.remove(th)
241 self.tcond.notify_all()
a962c944
FT
242 except closed:
243 pass
244 except:
245 log.error("exception occurred when handling request", exc_info=True)
246 finally:
247 if req is not None:
248 req.close()
249
250 def handle2(self):
251 try:
252 rp = self.cnpipe[0]
253 current = {}
254
255 def closereq(req):
256 respiter = current[req]
257 try:
258 if respiter is not None and hasattr(respiter, "close"):
259 respiter.close()
260 except:
261 log.error("exception occurred when closing iterator", exc_info=True)
262 try:
263 req.close()
264 except:
265 log.error("exception occurred when closing request", exc_info=True)
266 del current[req]
267 def ckiter(req):
268 respiter = current[req]
269 if respiter is not None:
270 rem = False
271 try:
272 data = next(respiter)
273 except StopIteration:
274 rem = True
275 req.flushreq()
276 except:
277 rem = True
278 log.error("exception occurred when iterating response", exc_info=True)
279 if not rem:
280 if data:
281 req.flushreq()
282 req.writedata(data)
283 else:
284 current[req] = None
285 try:
286 if hasattr(respiter, "close"):
287 respiter.close()
288 except:
289 log.error("exception occurred when closing iterator", exc_info=True)
290 respiter = None
291 if respiter is None and not req.buffer:
292 closereq(req)
293
294 while True:
295 bufl = list(req for req in current.keys() if req.buffer)
296 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
297 if rp in rls:
298 ret = os.read(rp, 1024)
299 if not ret:
300 os.close(rp)
301 return
302 try:
303 while True:
304 req, respiter = self.cqueue.get(False)
305 current[req] = respiter
306 ckiter(req)
307 except queue.Empty:
308 pass
309 for req in wls:
310 try:
311 req.flush()
312 except closed:
313 closereq(req)
314 except:
315 log.error("exception occurred when writing response", exc_info=True)
316 closereq(req)
317 else:
318 if len(req.buffer) < 65536:
319 ckiter(req)
320 except:
321 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
1b0caaa0 322 os.abort()
a962c944
FT
323
324 def close(self):
325 while True:
326 with self.lk:
327 if len(self.current) > 0:
328 th = next(iter(self.current))
329 else:
330 break
331 th.join()
332 os.close(self.cnpipe[1])
333 self.rthread.join()
334
5345dcaf
FT
335names = {cls.cname: cls for cls in globals().values() if
336 isinstance(cls, type) and
337 issubclass(cls, handler) and
338 hasattr(cls, "cname")}
8db41888
FT
339
340def parsehspec(spec):
341 if ":" not in spec:
342 return spec, {}
343 nm, spec = spec.split(":", 1)
344 args = {}
345 while spec:
346 if "," in spec:
347 part, spec = spec.split(",", 1)
348 else:
349 part, spec = spec, None
350 if "=" in part:
351 key, val = part.split("=", 1)
352 else:
353 key, val = part, ""
354 args[key] = val
355 return nm, args