python3: Fixed some threadpool handler bugs.
[ashd.git] / python3 / ashd / serve.py
CommitLineData
ab3275e9 1import sys, os, threading, time, logging, select, queue, collections
46adc298 2from . import perf
552a70bf
FT
3
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
7
8def reqseq():
9 global seq
10 with seqlk:
11 s = seq
12 seq += 1
13 return s
14
552a70bf
FT
15class closed(IOError):
16 def __init__(self):
17 super().__init__("The client has closed the connection.")
18
46adc298
FT
19class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
24
25class wsgirequest(object):
19ff507b 26 def __init__(self, *, handler):
552a70bf
FT
27 self.status = None
28 self.headers = []
29 self.respsent = False
46adc298
FT
30 self.handler = handler
31 self.buffer = bytearray()
552a70bf
FT
32
33 def handlewsgi(self):
34 raise Exception()
46adc298
FT
35 def fileno(self):
36 raise Exception()
552a70bf
FT
37 def writehead(self, status, headers):
38 raise Exception()
46adc298 39 def flush(self):
552a70bf 40 raise Exception()
46adc298
FT
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
552a70bf
FT
45
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
52
46adc298
FT
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
59
552a70bf
FT
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
46adc298 62 if exc_info:
552a70bf
FT
63 try:
64 if self.respsent:
65 raise exc_info[1]
66 finally:
46adc298 67 exc_info = None
552a70bf
FT
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
46adc298
FT
73
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
dd1e6b98
FT
78 while len(req.buffer) > 0:
79 rls, wls, els = select.select([], [req], [req])
80 req.flush()
46adc298
FT
81 def close(self):
82 pass
83
8db41888
FT
84 @classmethod
85 def parseargs(cls, **args):
86 if len(args) > 0:
87 raise ValueError("unknown handler argument: " + next(iter(args)))
88 return {}
89
dd1e6b98 90class single(handler):
5345dcaf
FT
91 cname = "single"
92
dd1e6b98
FT
93 def handle(self, req):
94 try:
95 env = req.mkenv()
96 with perf.request(env) as reqevent:
97 respiter = req.handlewsgi(env, req.startreq)
98 for data in respiter:
99 req.write(data)
100 if req.status:
101 reqevent.response([req.status, req.headers])
102 req.flushreq()
103 self.ckflush(req)
104 except closed:
105 pass
106 except:
107 log.error("exception occurred when handling request", exc_info=True)
108 finally:
109 req.close()
110
76ff6c4d
FT
111def dbg(*a):
112 f = True
113 for o in a:
114 if not f:
115 sys.stderr.write(" ")
116 sys.stderr.write(str(a))
117 f = False
118 sys.stderr.write("\n")
119 sys.stderr.flush()
120
46adc298 121class freethread(handler):
5345dcaf
FT
122 cname = "free"
123
3a3b78e3 124 def __init__(self, *, max=None, timeout=None, **kw):
46adc298
FT
125 super().__init__(**kw)
126 self.current = set()
127 self.lk = threading.Lock()
002ee932
FT
128 self.tcond = threading.Condition(self.lk)
129 self.max = max
3a3b78e3 130 self.timeout = timeout
002ee932
FT
131
132 @classmethod
3a3b78e3 133 def parseargs(cls, *, max=None, abort=None, **args):
002ee932
FT
134 ret = super().parseargs(**args)
135 if max:
136 ret["max"] = int(max)
3a3b78e3
FT
137 if abort:
138 ret["timeout"] = int(abort)
002ee932 139 return ret
46adc298
FT
140
141 def handle(self, req):
002ee932 142 with self.lk:
3a3b78e3
FT
143 if self.max is not None:
144 if self.timeout is not None:
145 now = start = time.time()
146 while len(self.current) >= self.max:
147 self.tcond.wait(start + self.timeout - now)
148 now = time.time()
149 if now - start > self.timeout:
150 os.abort()
151 else:
152 while len(self.current) >= self.max:
153 self.tcond.wait()
002ee932 154 th = reqthread(target=self.run, args=[req])
76ff6c4d 155 th.registered = False
002ee932 156 th.start()
76ff6c4d
FT
157 while not th.registered:
158 self.tcond.wait()
46adc298 159
46adc298 160 def run(self, req):
552a70bf 161 try:
46adc298
FT
162 th = threading.current_thread()
163 with self.lk:
164 self.current.add(th)
76ff6c4d 165 th.registered = True
002ee932 166 self.tcond.notify_all()
552a70bf 167 try:
46adc298
FT
168 env = req.mkenv()
169 with perf.request(env) as reqevent:
170 respiter = req.handlewsgi(env, req.startreq)
171 for data in respiter:
172 req.write(data)
173 if req.status:
174 reqevent.response([req.status, req.headers])
175 req.flushreq()
176 self.ckflush(req)
177 except closed:
178 pass
179 except:
180 log.error("exception occurred when handling request", exc_info=True)
552a70bf 181 finally:
46adc298
FT
182 with self.lk:
183 self.current.remove(th)
002ee932 184 self.tcond.notify_all()
46adc298
FT
185 finally:
186 req.close()
187
188 def close(self):
189 while True:
190 with self.lk:
191 if len(self.current) > 0:
192 th = next(iter(self.current))
193 else:
8db41888 194 return
46adc298
FT
195 th.join()
196
ab3275e9
FT
197class threadpool(handler):
198 cname = "pool"
199
200 def __init__(self, *, max=25, qsz=100, timeout=None, **kw):
201 super().__init__(**kw)
202 self.current = set()
203 self.clk = threading.Lock()
204 self.ccond = threading.Condition(self.clk)
205 self.queue = collections.deque()
206 self.waiting = set()
62220f39
FT
207 self.waitlimit = 5
208 self.wlstart = 0.0
ab3275e9 209 self.qlk = threading.Lock()
7aed82e3
FT
210 self.qfcond = threading.Condition(self.qlk)
211 self.qecond = threading.Condition(self.qlk)
ab3275e9
FT
212 self.max = max
213 self.qsz = qsz
214 self.timeout = timeout
215
216 @classmethod
217 def parseargs(cls, *, max=None, queue=None, abort=None, **args):
218 ret = super().parseargs(**args)
219 if max:
220 ret["max"] = int(max)
221 if queue:
222 ret["qsz"] = int(queue)
223 if abort:
224 ret["timeout"] = int(abort)
225 return ret
226
227 def handle(self, req):
bf8c09bf 228 spawn = False
ab3275e9
FT
229 with self.qlk:
230 if self.timeout is not None:
231 now = start = time.time()
232 while len(self.queue) >= self.qsz:
7aed82e3 233 self.qecond.wait(start + self.timeout - now)
ab3275e9
FT
234 now = time.time()
235 if now - start > self.timeout:
236 os.abort()
237 else:
7aed82e3
FT
238 while len(self.queue) >= self.qsz:
239 self.qecond.wait()
ab3275e9 240 self.queue.append(req)
7aed82e3 241 self.qfcond.notify()
ab3275e9 242 if len(self.waiting) < 1:
bf8c09bf
FT
243 spawn = True
244 if spawn:
ab3275e9
FT
245 with self.clk:
246 if len(self.current) < self.max:
247 th = reqthread(target=self.run)
248 th.registered = False
249 th.start()
250 while not th.registered:
251 self.ccond.wait()
252
253 def handle1(self, req):
254 try:
255 env = req.mkenv()
256 with perf.request(env) as reqevent:
257 respiter = req.handlewsgi(env, req.startreq)
258 for data in respiter:
259 req.write(data)
260 if req.status:
261 reqevent.response([req.status, req.headers])
262 req.flushreq()
263 self.ckflush(req)
264 except closed:
265 pass
266 except:
267 log.error("exception occurred when handling request", exc_info=True)
268
269 def run(self):
270 timeout = 10.0
271 th = threading.current_thread()
272 with self.clk:
273 self.current.add(th)
274 th.registered = True
275 self.ccond.notify_all()
276 try:
277 while True:
278 start = now = time.time()
279 with self.qlk:
280 while len(self.queue) < 1:
62220f39
FT
281 if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout:
282 return
ab3275e9 283 self.waiting.add(th)
62220f39
FT
284 try:
285 if len(self.waiting) == self.waitlimit:
286 self.wlstart = now
7aed82e3 287 self.qfcond.wait(start + timeout - now)
62220f39
FT
288 finally:
289 self.waiting.remove(th)
ab3275e9
FT
290 now = time.time()
291 if now - start > timeout:
292 return
293 req = self.queue.popleft()
7aed82e3 294 self.qecond.notify()
ab3275e9
FT
295 try:
296 self.handle1(req)
297 finally:
298 req.close()
299 finally:
300 with self.clk:
301 self.current.remove(th)
302
303 def close(self):
304 while True:
62220f39 305 with self.clk:
ab3275e9
FT
306 if len(self.current) > 0:
307 th = next(iter(self.current))
308 else:
309 return
310 th.join()
311
a962c944 312class resplex(handler):
5345dcaf
FT
313 cname = "rplex"
314
48dc900a 315 def __init__(self, *, max=None, **kw):
a962c944
FT
316 super().__init__(**kw)
317 self.current = set()
318 self.lk = threading.Lock()
48dc900a
FT
319 self.tcond = threading.Condition(self.lk)
320 self.max = max
a962c944
FT
321 self.cqueue = queue.Queue(5)
322 self.cnpipe = os.pipe()
323 self.rthread = reqthread(name="Response thread", target=self.handle2)
324 self.rthread.start()
325
48dc900a
FT
326 @classmethod
327 def parseargs(cls, *, max=None, **args):
328 ret = super().parseargs(**args)
329 if max:
330 ret["max"] = int(max)
331 return ret
332
a962c944
FT
333 def ckflush(self, req):
334 raise Exception("resplex handler does not support the write() function")
335
336 def handle(self, req):
48dc900a
FT
337 with self.lk:
338 if self.max is not None:
339 while len(self.current) >= self.max:
340 self.tcond.wait()
341 th = reqthread(target=self.handle1, args=[req])
76ff6c4d 342 th.registered = False
48dc900a 343 th.start()
76ff6c4d
FT
344 while not th.registered:
345 self.tcond.wait()
a962c944
FT
346
347 def handle1(self, req):
348 try:
349 th = threading.current_thread()
350 with self.lk:
351 self.current.add(th)
76ff6c4d 352 th.registered = True
48dc900a 353 self.tcond.notify_all()
a962c944
FT
354 try:
355 env = req.mkenv()
356 respobj = req.handlewsgi(env, req.startreq)
357 respiter = iter(respobj)
358 if not req.status:
359 log.error("request handler returned without calling start_request")
360 if hasattr(respiter, "close"):
361 respiter.close()
362 return
363 else:
364 self.cqueue.put((req, respiter))
365 os.write(self.cnpipe[1], b" ")
366 req = None
367 finally:
48dc900a
FT
368 with self.lk:
369 self.current.remove(th)
370 self.tcond.notify_all()
a962c944
FT
371 except closed:
372 pass
373 except:
374 log.error("exception occurred when handling request", exc_info=True)
375 finally:
376 if req is not None:
377 req.close()
378
379 def handle2(self):
380 try:
381 rp = self.cnpipe[0]
382 current = {}
383
384 def closereq(req):
385 respiter = current[req]
386 try:
387 if respiter is not None and hasattr(respiter, "close"):
388 respiter.close()
389 except:
390 log.error("exception occurred when closing iterator", exc_info=True)
391 try:
392 req.close()
393 except:
394 log.error("exception occurred when closing request", exc_info=True)
395 del current[req]
396 def ckiter(req):
397 respiter = current[req]
398 if respiter is not None:
399 rem = False
400 try:
401 data = next(respiter)
402 except StopIteration:
403 rem = True
b0a7be65
FT
404 try:
405 req.flushreq()
406 except:
407 log.error("exception occurred when handling response data", exc_info=True)
a962c944
FT
408 except:
409 rem = True
410 log.error("exception occurred when iterating response", exc_info=True)
411 if not rem:
412 if data:
b0a7be65
FT
413 try:
414 req.flushreq()
415 req.writedata(data)
416 except:
417 log.error("exception occurred when handling response data", exc_info=True)
418 rem = True
419 if rem:
a962c944
FT
420 current[req] = None
421 try:
422 if hasattr(respiter, "close"):
423 respiter.close()
424 except:
425 log.error("exception occurred when closing iterator", exc_info=True)
426 respiter = None
427 if respiter is None and not req.buffer:
428 closereq(req)
429
430 while True:
431 bufl = list(req for req in current.keys() if req.buffer)
432 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
433 if rp in rls:
434 ret = os.read(rp, 1024)
435 if not ret:
436 os.close(rp)
437 return
438 try:
439 while True:
440 req, respiter = self.cqueue.get(False)
441 current[req] = respiter
442 ckiter(req)
443 except queue.Empty:
444 pass
445 for req in wls:
446 try:
447 req.flush()
448 except closed:
449 closereq(req)
450 except:
451 log.error("exception occurred when writing response", exc_info=True)
452 closereq(req)
453 else:
454 if len(req.buffer) < 65536:
455 ckiter(req)
456 except:
457 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
1b0caaa0 458 os.abort()
a962c944
FT
459
460 def close(self):
461 while True:
462 with self.lk:
463 if len(self.current) > 0:
464 th = next(iter(self.current))
465 else:
466 break
467 th.join()
468 os.close(self.cnpipe[1])
469 self.rthread.join()
470
5345dcaf
FT
471names = {cls.cname: cls for cls in globals().values() if
472 isinstance(cls, type) and
473 issubclass(cls, handler) and
474 hasattr(cls, "cname")}
8db41888
FT
475
476def parsehspec(spec):
477 if ":" not in spec:
478 return spec, {}
479 nm, spec = spec.split(":", 1)
480 args = {}
481 while spec:
482 if "," in spec:
483 part, spec = spec.split(",", 1)
484 else:
485 part, spec = spec, None
486 if "=" in part:
487 key, val = part.split("=", 1)
488 else:
489 key, val = part, ""
490 args[key] = val
491 return nm, args