python: Added request limiter to rplex handler.
[ashd.git] / python3 / ashd / serve.py
CommitLineData
a962c944 1import sys, os, threading, time, logging, select, queue
46adc298 2from . import perf
552a70bf
FT
3
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
7
8def reqseq():
9 global seq
10 with seqlk:
11 s = seq
12 seq += 1
13 return s
14
552a70bf
FT
15class closed(IOError):
16 def __init__(self):
17 super().__init__("The client has closed the connection.")
18
46adc298
FT
19class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
24
25class wsgirequest(object):
26 def __init__(self, handler):
552a70bf
FT
27 self.status = None
28 self.headers = []
29 self.respsent = False
46adc298
FT
30 self.handler = handler
31 self.buffer = bytearray()
552a70bf
FT
32
33 def handlewsgi(self):
34 raise Exception()
46adc298
FT
35 def fileno(self):
36 raise Exception()
552a70bf
FT
37 def writehead(self, status, headers):
38 raise Exception()
46adc298 39 def flush(self):
552a70bf 40 raise Exception()
46adc298
FT
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
552a70bf
FT
45
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
52
46adc298
FT
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
59
552a70bf
FT
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
46adc298 62 if exc_info:
552a70bf
FT
63 try:
64 if self.respsent:
65 raise exc_info[1]
66 finally:
46adc298 67 exc_info = None
552a70bf
FT
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
46adc298
FT
73
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
dd1e6b98
FT
78 while len(req.buffer) > 0:
79 rls, wls, els = select.select([], [req], [req])
80 req.flush()
46adc298
FT
81 def close(self):
82 pass
83
8db41888
FT
84 @classmethod
85 def parseargs(cls, **args):
86 if len(args) > 0:
87 raise ValueError("unknown handler argument: " + next(iter(args)))
88 return {}
89
dd1e6b98
FT
90class single(handler):
91 def handle(self, req):
92 try:
93 env = req.mkenv()
94 with perf.request(env) as reqevent:
95 respiter = req.handlewsgi(env, req.startreq)
96 for data in respiter:
97 req.write(data)
98 if req.status:
99 reqevent.response([req.status, req.headers])
100 req.flushreq()
101 self.ckflush(req)
102 except closed:
103 pass
104 except:
105 log.error("exception occurred when handling request", exc_info=True)
106 finally:
107 req.close()
108
46adc298 109class freethread(handler):
3a3b78e3 110 def __init__(self, *, max=None, timeout=None, **kw):
46adc298
FT
111 super().__init__(**kw)
112 self.current = set()
113 self.lk = threading.Lock()
002ee932
FT
114 self.tcond = threading.Condition(self.lk)
115 self.max = max
3a3b78e3 116 self.timeout = timeout
002ee932
FT
117
118 @classmethod
3a3b78e3 119 def parseargs(cls, *, max=None, abort=None, **args):
002ee932
FT
120 ret = super().parseargs(**args)
121 if max:
122 ret["max"] = int(max)
3a3b78e3
FT
123 if abort:
124 ret["timeout"] = int(abort)
002ee932 125 return ret
46adc298
FT
126
127 def handle(self, req):
002ee932 128 with self.lk:
3a3b78e3
FT
129 if self.max is not None:
130 if self.timeout is not None:
131 now = start = time.time()
132 while len(self.current) >= self.max:
133 self.tcond.wait(start + self.timeout - now)
134 now = time.time()
135 if now - start > self.timeout:
136 os.abort()
137 else:
138 while len(self.current) >= self.max:
139 self.tcond.wait()
002ee932
FT
140 th = reqthread(target=self.run, args=[req])
141 th.start()
142 while th.is_alive() and th not in self.current:
143 self.tcond.wait()
46adc298 144
46adc298 145 def run(self, req):
552a70bf 146 try:
46adc298
FT
147 th = threading.current_thread()
148 with self.lk:
149 self.current.add(th)
002ee932 150 self.tcond.notify_all()
552a70bf 151 try:
46adc298
FT
152 env = req.mkenv()
153 with perf.request(env) as reqevent:
154 respiter = req.handlewsgi(env, req.startreq)
155 for data in respiter:
156 req.write(data)
157 if req.status:
158 reqevent.response([req.status, req.headers])
159 req.flushreq()
160 self.ckflush(req)
161 except closed:
162 pass
163 except:
164 log.error("exception occurred when handling request", exc_info=True)
552a70bf 165 finally:
46adc298
FT
166 with self.lk:
167 self.current.remove(th)
002ee932 168 self.tcond.notify_all()
46adc298
FT
169 finally:
170 req.close()
171
172 def close(self):
173 while True:
174 with self.lk:
175 if len(self.current) > 0:
176 th = next(iter(self.current))
177 else:
8db41888 178 return
46adc298
FT
179 th.join()
180
d570c3a5 181class threadpool(handler):
8db41888 182 def __init__(self, *, min=0, max=20, live=300, **kw):
d570c3a5
FT
183 super().__init__(**kw)
184 self.current = set()
185 self.free = set()
186 self.lk = threading.RLock()
187 self.pcond = threading.Condition(self.lk)
188 self.rcond = threading.Condition(self.lk)
189 self.wreq = None
190 self.min = min
191 self.max = max
192 self.live = live
193 for i in range(self.min):
194 self.newthread()
195
8db41888
FT
196 @classmethod
197 def parseargs(cls, *, min=None, max=None, live=None, **args):
198 ret = super().parseargs(**args)
199 if min:
200 ret["min"] = int(min)
201 if max:
202 ret["max"] = int(max)
203 if live:
204 ret["live"] = int(live)
205 return ret
206
d570c3a5
FT
207 def newthread(self):
208 with self.lk:
209 th = reqthread(target=self.loop)
210 th.start()
211 while not th in self.current:
212 self.pcond.wait()
213
d570c3a5
FT
214 def _handle(self, req):
215 try:
216 env = req.mkenv()
217 with perf.request(env) as reqevent:
218 respiter = req.handlewsgi(env, req.startreq)
219 for data in respiter:
220 req.write(data)
221 if req.status:
222 reqevent.response([req.status, req.headers])
223 req.flushreq()
224 self.ckflush(req)
225 except closed:
226 pass
227 except:
228 log.error("exception occurred when handling request", exc_info=True)
229 finally:
230 req.close()
231
232 def loop(self):
233 th = threading.current_thread()
234 with self.lk:
235 self.current.add(th)
236 try:
237 while True:
238 with self.lk:
239 self.free.add(th)
240 try:
241 self.pcond.notify_all()
242 now = start = time.time()
243 while self.wreq is None:
244 self.rcond.wait(start + self.live - now)
245 now = time.time()
246 if now - start > self.live:
247 if len(self.current) > self.min:
248 self.current.remove(th)
249 return
250 else:
251 start = now
252 req, self.wreq = self.wreq, None
253 self.pcond.notify_all()
254 finally:
255 self.free.remove(th)
256 self._handle(req)
257 req = None
258 finally:
259 with self.lk:
260 try:
261 self.current.remove(th)
262 except KeyError:
263 pass
264 self.pcond.notify_all()
265
266 def handle(self, req):
267 while True:
268 with self.lk:
269 if len(self.free) < 1 and len(self.current) < self.max:
270 self.newthread()
271 while self.wreq is not None:
272 self.pcond.wait()
273 if self.wreq is None:
274 self.wreq = req
275 self.rcond.notify(1)
276 return
277
278 def close(self):
279 self.live = 0
280 self.min = 0
281 with self.lk:
282 while len(self.current) > 0:
283 self.rcond.notify_all()
284 self.pcond.wait(1)
285
a962c944 286class resplex(handler):
48dc900a 287 def __init__(self, *, max=None, **kw):
a962c944
FT
288 super().__init__(**kw)
289 self.current = set()
290 self.lk = threading.Lock()
48dc900a
FT
291 self.tcond = threading.Condition(self.lk)
292 self.max = max
a962c944
FT
293 self.cqueue = queue.Queue(5)
294 self.cnpipe = os.pipe()
295 self.rthread = reqthread(name="Response thread", target=self.handle2)
296 self.rthread.start()
297
48dc900a
FT
298 @classmethod
299 def parseargs(cls, *, max=None, **args):
300 ret = super().parseargs(**args)
301 if max:
302 ret["max"] = int(max)
303 return ret
304
a962c944
FT
305 def ckflush(self, req):
306 raise Exception("resplex handler does not support the write() function")
307
308 def handle(self, req):
48dc900a
FT
309 with self.lk:
310 if self.max is not None:
311 while len(self.current) >= self.max:
312 self.tcond.wait()
313 th = reqthread(target=self.handle1, args=[req])
314 th.start()
315 while th.is_alive() and th not in self.current:
316 self.tcond.wait()
a962c944
FT
317
318 def handle1(self, req):
319 try:
320 th = threading.current_thread()
321 with self.lk:
322 self.current.add(th)
48dc900a 323 self.tcond.notify_all()
a962c944
FT
324 try:
325 env = req.mkenv()
326 respobj = req.handlewsgi(env, req.startreq)
327 respiter = iter(respobj)
328 if not req.status:
329 log.error("request handler returned without calling start_request")
330 if hasattr(respiter, "close"):
331 respiter.close()
332 return
333 else:
334 self.cqueue.put((req, respiter))
335 os.write(self.cnpipe[1], b" ")
336 req = None
337 finally:
48dc900a
FT
338 with self.lk:
339 self.current.remove(th)
340 self.tcond.notify_all()
a962c944
FT
341 except closed:
342 pass
343 except:
344 log.error("exception occurred when handling request", exc_info=True)
345 finally:
346 if req is not None:
347 req.close()
348
349 def handle2(self):
350 try:
351 rp = self.cnpipe[0]
352 current = {}
353
354 def closereq(req):
355 respiter = current[req]
356 try:
357 if respiter is not None and hasattr(respiter, "close"):
358 respiter.close()
359 except:
360 log.error("exception occurred when closing iterator", exc_info=True)
361 try:
362 req.close()
363 except:
364 log.error("exception occurred when closing request", exc_info=True)
365 del current[req]
366 def ckiter(req):
367 respiter = current[req]
368 if respiter is not None:
369 rem = False
370 try:
371 data = next(respiter)
372 except StopIteration:
373 rem = True
374 req.flushreq()
375 except:
376 rem = True
377 log.error("exception occurred when iterating response", exc_info=True)
378 if not rem:
379 if data:
380 req.flushreq()
381 req.writedata(data)
382 else:
383 current[req] = None
384 try:
385 if hasattr(respiter, "close"):
386 respiter.close()
387 except:
388 log.error("exception occurred when closing iterator", exc_info=True)
389 respiter = None
390 if respiter is None and not req.buffer:
391 closereq(req)
392
393 while True:
394 bufl = list(req for req in current.keys() if req.buffer)
395 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
396 if rp in rls:
397 ret = os.read(rp, 1024)
398 if not ret:
399 os.close(rp)
400 return
401 try:
402 while True:
403 req, respiter = self.cqueue.get(False)
404 current[req] = respiter
405 ckiter(req)
406 except queue.Empty:
407 pass
408 for req in wls:
409 try:
410 req.flush()
411 except closed:
412 closereq(req)
413 except:
414 log.error("exception occurred when writing response", exc_info=True)
415 closereq(req)
416 else:
417 if len(req.buffer) < 65536:
418 ckiter(req)
419 except:
420 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
1b0caaa0 421 os.abort()
a962c944
FT
422
423 def close(self):
424 while True:
425 with self.lk:
426 if len(self.current) > 0:
427 th = next(iter(self.current))
428 else:
429 break
430 th.join()
431 os.close(self.cnpipe[1])
432 self.rthread.join()
433
dd1e6b98
FT
434names = {"single": single,
435 "free": freethread,
a962c944
FT
436 "pool": threadpool,
437 "rplex": resplex}
8db41888
FT
438
439def parsehspec(spec):
440 if ":" not in spec:
441 return spec, {}
442 nm, spec = spec.split(":", 1)
443 args = {}
444 while spec:
445 if "," in spec:
446 part, spec = spec.split(",", 1)
447 else:
448 part, spec = spec, None
449 if "=" in part:
450 key, val = part.split("=", 1)
451 else:
452 key, val = part, ""
453 args[key] = val
454 return nm, args