python*: Use poll instead of select in ckflush.
[ashd.git] / python3 / ashd / serve.py
CommitLineData
ab3275e9 1import sys, os, threading, time, logging, select, queue, collections
46adc298 2from . import perf
552a70bf
FT
3
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
7
8def reqseq():
9 global seq
10 with seqlk:
11 s = seq
12 seq += 1
13 return s
14
552a70bf
FT
15class closed(IOError):
16 def __init__(self):
17 super().__init__("The client has closed the connection.")
18
46adc298
FT
19class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
24
25class wsgirequest(object):
19ff507b 26 def __init__(self, *, handler):
552a70bf
FT
27 self.status = None
28 self.headers = []
29 self.respsent = False
46adc298
FT
30 self.handler = handler
31 self.buffer = bytearray()
552a70bf
FT
32
33 def handlewsgi(self):
34 raise Exception()
46adc298
FT
35 def fileno(self):
36 raise Exception()
552a70bf
FT
37 def writehead(self, status, headers):
38 raise Exception()
46adc298 39 def flush(self):
552a70bf 40 raise Exception()
46adc298
FT
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
552a70bf
FT
45
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
52
46adc298
FT
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
59
552a70bf
FT
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
46adc298 62 if exc_info:
552a70bf
FT
63 try:
64 if self.respsent:
65 raise exc_info[1]
66 finally:
46adc298 67 exc_info = None
552a70bf
FT
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
46adc298
FT
73
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
6e8b9b9d
FT
78 p = select.poll()
79 p.register(req, select.POLLOUT)
dd1e6b98 80 while len(req.buffer) > 0:
6e8b9b9d 81 p.poll()
dd1e6b98 82 req.flush()
46adc298
FT
83 def close(self):
84 pass
85
8db41888
FT
86 @classmethod
87 def parseargs(cls, **args):
88 if len(args) > 0:
89 raise ValueError("unknown handler argument: " + next(iter(args)))
90 return {}
91
dd1e6b98 92class single(handler):
5345dcaf
FT
93 cname = "single"
94
dd1e6b98
FT
95 def handle(self, req):
96 try:
97 env = req.mkenv()
98 with perf.request(env) as reqevent:
99 respiter = req.handlewsgi(env, req.startreq)
100 for data in respiter:
101 req.write(data)
102 if req.status:
103 reqevent.response([req.status, req.headers])
104 req.flushreq()
105 self.ckflush(req)
106 except closed:
107 pass
108 except:
109 log.error("exception occurred when handling request", exc_info=True)
110 finally:
111 req.close()
112
76ff6c4d
FT
113def dbg(*a):
114 f = True
115 for o in a:
116 if not f:
117 sys.stderr.write(" ")
118 sys.stderr.write(str(a))
119 f = False
120 sys.stderr.write("\n")
121 sys.stderr.flush()
122
46adc298 123class freethread(handler):
5345dcaf
FT
124 cname = "free"
125
3a3b78e3 126 def __init__(self, *, max=None, timeout=None, **kw):
46adc298
FT
127 super().__init__(**kw)
128 self.current = set()
129 self.lk = threading.Lock()
002ee932
FT
130 self.tcond = threading.Condition(self.lk)
131 self.max = max
3a3b78e3 132 self.timeout = timeout
002ee932
FT
133
134 @classmethod
3a3b78e3 135 def parseargs(cls, *, max=None, abort=None, **args):
002ee932
FT
136 ret = super().parseargs(**args)
137 if max:
138 ret["max"] = int(max)
3a3b78e3
FT
139 if abort:
140 ret["timeout"] = int(abort)
002ee932 141 return ret
46adc298
FT
142
143 def handle(self, req):
002ee932 144 with self.lk:
3a3b78e3
FT
145 if self.max is not None:
146 if self.timeout is not None:
147 now = start = time.time()
148 while len(self.current) >= self.max:
149 self.tcond.wait(start + self.timeout - now)
150 now = time.time()
151 if now - start > self.timeout:
152 os.abort()
153 else:
154 while len(self.current) >= self.max:
155 self.tcond.wait()
002ee932 156 th = reqthread(target=self.run, args=[req])
76ff6c4d 157 th.registered = False
002ee932 158 th.start()
76ff6c4d
FT
159 while not th.registered:
160 self.tcond.wait()
46adc298 161
46adc298 162 def run(self, req):
552a70bf 163 try:
46adc298
FT
164 th = threading.current_thread()
165 with self.lk:
166 self.current.add(th)
76ff6c4d 167 th.registered = True
002ee932 168 self.tcond.notify_all()
552a70bf 169 try:
46adc298
FT
170 env = req.mkenv()
171 with perf.request(env) as reqevent:
172 respiter = req.handlewsgi(env, req.startreq)
173 for data in respiter:
174 req.write(data)
175 if req.status:
176 reqevent.response([req.status, req.headers])
177 req.flushreq()
178 self.ckflush(req)
179 except closed:
180 pass
181 except:
182 log.error("exception occurred when handling request", exc_info=True)
552a70bf 183 finally:
46adc298
FT
184 with self.lk:
185 self.current.remove(th)
002ee932 186 self.tcond.notify_all()
46adc298
FT
187 finally:
188 req.close()
189
190 def close(self):
191 while True:
192 with self.lk:
193 if len(self.current) > 0:
194 th = next(iter(self.current))
195 else:
8db41888 196 return
46adc298
FT
197 th.join()
198
ab3275e9
FT
199class threadpool(handler):
200 cname = "pool"
201
202 def __init__(self, *, max=25, qsz=100, timeout=None, **kw):
203 super().__init__(**kw)
204 self.current = set()
205 self.clk = threading.Lock()
206 self.ccond = threading.Condition(self.clk)
207 self.queue = collections.deque()
208 self.waiting = set()
62220f39
FT
209 self.waitlimit = 5
210 self.wlstart = 0.0
ab3275e9 211 self.qlk = threading.Lock()
7aed82e3
FT
212 self.qfcond = threading.Condition(self.qlk)
213 self.qecond = threading.Condition(self.qlk)
ab3275e9
FT
214 self.max = max
215 self.qsz = qsz
216 self.timeout = timeout
217
218 @classmethod
219 def parseargs(cls, *, max=None, queue=None, abort=None, **args):
220 ret = super().parseargs(**args)
221 if max:
222 ret["max"] = int(max)
223 if queue:
224 ret["qsz"] = int(queue)
225 if abort:
226 ret["timeout"] = int(abort)
227 return ret
228
229 def handle(self, req):
bf8c09bf 230 spawn = False
ab3275e9
FT
231 with self.qlk:
232 if self.timeout is not None:
233 now = start = time.time()
234 while len(self.queue) >= self.qsz:
7aed82e3 235 self.qecond.wait(start + self.timeout - now)
ab3275e9
FT
236 now = time.time()
237 if now - start > self.timeout:
238 os.abort()
239 else:
7aed82e3
FT
240 while len(self.queue) >= self.qsz:
241 self.qecond.wait()
ab3275e9 242 self.queue.append(req)
7aed82e3 243 self.qfcond.notify()
ab3275e9 244 if len(self.waiting) < 1:
bf8c09bf
FT
245 spawn = True
246 if spawn:
ab3275e9
FT
247 with self.clk:
248 if len(self.current) < self.max:
249 th = reqthread(target=self.run)
250 th.registered = False
251 th.start()
252 while not th.registered:
253 self.ccond.wait()
254
255 def handle1(self, req):
256 try:
257 env = req.mkenv()
258 with perf.request(env) as reqevent:
259 respiter = req.handlewsgi(env, req.startreq)
260 for data in respiter:
261 req.write(data)
262 if req.status:
263 reqevent.response([req.status, req.headers])
264 req.flushreq()
265 self.ckflush(req)
266 except closed:
267 pass
268 except:
269 log.error("exception occurred when handling request", exc_info=True)
270
271 def run(self):
272 timeout = 10.0
273 th = threading.current_thread()
274 with self.clk:
275 self.current.add(th)
276 th.registered = True
277 self.ccond.notify_all()
278 try:
279 while True:
280 start = now = time.time()
281 with self.qlk:
282 while len(self.queue) < 1:
62220f39
FT
283 if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout:
284 return
ab3275e9 285 self.waiting.add(th)
62220f39
FT
286 try:
287 if len(self.waiting) == self.waitlimit:
288 self.wlstart = now
7aed82e3 289 self.qfcond.wait(start + timeout - now)
62220f39
FT
290 finally:
291 self.waiting.remove(th)
ab3275e9
FT
292 now = time.time()
293 if now - start > timeout:
294 return
295 req = self.queue.popleft()
7aed82e3 296 self.qecond.notify()
ab3275e9
FT
297 try:
298 self.handle1(req)
299 finally:
300 req.close()
301 finally:
302 with self.clk:
303 self.current.remove(th)
304
305 def close(self):
306 while True:
62220f39 307 with self.clk:
ab3275e9
FT
308 if len(self.current) > 0:
309 th = next(iter(self.current))
310 else:
311 return
312 th.join()
313
a962c944 314class resplex(handler):
5345dcaf
FT
315 cname = "rplex"
316
48dc900a 317 def __init__(self, *, max=None, **kw):
a962c944
FT
318 super().__init__(**kw)
319 self.current = set()
320 self.lk = threading.Lock()
48dc900a
FT
321 self.tcond = threading.Condition(self.lk)
322 self.max = max
a962c944
FT
323 self.cqueue = queue.Queue(5)
324 self.cnpipe = os.pipe()
325 self.rthread = reqthread(name="Response thread", target=self.handle2)
326 self.rthread.start()
327
48dc900a
FT
328 @classmethod
329 def parseargs(cls, *, max=None, **args):
330 ret = super().parseargs(**args)
331 if max:
332 ret["max"] = int(max)
333 return ret
334
a962c944
FT
335 def ckflush(self, req):
336 raise Exception("resplex handler does not support the write() function")
337
338 def handle(self, req):
48dc900a
FT
339 with self.lk:
340 if self.max is not None:
341 while len(self.current) >= self.max:
342 self.tcond.wait()
343 th = reqthread(target=self.handle1, args=[req])
76ff6c4d 344 th.registered = False
48dc900a 345 th.start()
76ff6c4d
FT
346 while not th.registered:
347 self.tcond.wait()
a962c944
FT
348
349 def handle1(self, req):
350 try:
351 th = threading.current_thread()
352 with self.lk:
353 self.current.add(th)
76ff6c4d 354 th.registered = True
48dc900a 355 self.tcond.notify_all()
a962c944
FT
356 try:
357 env = req.mkenv()
358 respobj = req.handlewsgi(env, req.startreq)
359 respiter = iter(respobj)
360 if not req.status:
361 log.error("request handler returned without calling start_request")
362 if hasattr(respiter, "close"):
363 respiter.close()
364 return
365 else:
366 self.cqueue.put((req, respiter))
367 os.write(self.cnpipe[1], b" ")
368 req = None
369 finally:
48dc900a
FT
370 with self.lk:
371 self.current.remove(th)
372 self.tcond.notify_all()
a962c944
FT
373 except closed:
374 pass
375 except:
376 log.error("exception occurred when handling request", exc_info=True)
377 finally:
378 if req is not None:
379 req.close()
380
381 def handle2(self):
382 try:
383 rp = self.cnpipe[0]
384 current = {}
385
386 def closereq(req):
387 respiter = current[req]
388 try:
389 if respiter is not None and hasattr(respiter, "close"):
390 respiter.close()
391 except:
392 log.error("exception occurred when closing iterator", exc_info=True)
393 try:
394 req.close()
395 except:
396 log.error("exception occurred when closing request", exc_info=True)
397 del current[req]
398 def ckiter(req):
399 respiter = current[req]
400 if respiter is not None:
401 rem = False
402 try:
403 data = next(respiter)
404 except StopIteration:
405 rem = True
b0a7be65
FT
406 try:
407 req.flushreq()
408 except:
409 log.error("exception occurred when handling response data", exc_info=True)
a962c944
FT
410 except:
411 rem = True
412 log.error("exception occurred when iterating response", exc_info=True)
413 if not rem:
414 if data:
b0a7be65
FT
415 try:
416 req.flushreq()
417 req.writedata(data)
418 except:
419 log.error("exception occurred when handling response data", exc_info=True)
420 rem = True
421 if rem:
a962c944
FT
422 current[req] = None
423 try:
424 if hasattr(respiter, "close"):
425 respiter.close()
426 except:
427 log.error("exception occurred when closing iterator", exc_info=True)
428 respiter = None
429 if respiter is None and not req.buffer:
430 closereq(req)
431
432 while True:
433 bufl = list(req for req in current.keys() if req.buffer)
434 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
435 if rp in rls:
436 ret = os.read(rp, 1024)
437 if not ret:
438 os.close(rp)
439 return
440 try:
441 while True:
442 req, respiter = self.cqueue.get(False)
443 current[req] = respiter
444 ckiter(req)
445 except queue.Empty:
446 pass
447 for req in wls:
448 try:
449 req.flush()
450 except closed:
451 closereq(req)
452 except:
453 log.error("exception occurred when writing response", exc_info=True)
454 closereq(req)
455 else:
456 if len(req.buffer) < 65536:
457 ckiter(req)
458 except:
459 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
1b0caaa0 460 os.abort()
a962c944
FT
461
462 def close(self):
463 while True:
464 with self.lk:
465 if len(self.current) > 0:
466 th = next(iter(self.current))
467 else:
468 break
469 th.join()
470 os.close(self.cnpipe[1])
471 self.rthread.join()
472
5345dcaf
FT
473names = {cls.cname: cls for cls in globals().values() if
474 isinstance(cls, type) and
475 issubclass(cls, handler) and
476 hasattr(cls, "cname")}
8db41888
FT
477
478def parsehspec(spec):
479 if ":" not in spec:
480 return spec, {}
481 nm, spec = spec.split(":", 1)
482 args = {}
483 while spec:
484 if "," in spec:
485 part, spec = spec.split(",", 1)
486 else:
487 part, spec = spec, None
488 if "=" in part:
489 key, val = part.split("=", 1)
490 else:
491 key, val = part, ""
492 args[key] = val
493 return nm, args