1 import sys, os, threading, time, logging, select, queue
4 log = logging.getLogger("ashd.serve")
6 seqlk = threading.Lock()
15 class closed(IOError):
17 super().__init__("The client has closed the connection.")
19 class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
25 class wsgirequest(object):
26 def __init__(self, handler):
30 self.handler = handler
31 self.buffer = bytearray()
37 def writehead(self, status, headers):
43 def writedata(self, data):
44 self.buffer.extend(data)
49 raise Exception("Cannot send response body before starting response.")
51 self.writehead(self.status, self.headers)
53 def write(self, data):
58 self.handler.ckflush(self)
60 def startreq(self, status, headers, exc_info=None):
69 raise Exception("Can only start responding once.")
71 self.headers = headers
74 class handler(object):
75 def handle(self, request):
77 def ckflush(self, req):
78 while len(req.buffer) > 0:
79 rls, wls, els = select.select([], [req], [req])
85 def parseargs(cls, **args):
87 raise ValueError("unknown handler argument: " + next(iter(args)))
90 class single(handler):
91 def handle(self, req):
94 with perf.request(env) as reqevent:
95 respiter = req.handlewsgi(env, req.startreq)
99 reqevent.response([req.status, req.headers])
105 log.error("exception occurred when handling request", exc_info=True)
109 class freethread(handler):
110 def __init__(self, *, max=None, timeout=None, **kw):
111 super().__init__(**kw)
113 self.lk = threading.Lock()
114 self.tcond = threading.Condition(self.lk)
116 self.timeout = timeout
119 def parseargs(cls, *, max=None, abort=None, **args):
120 ret = super().parseargs(**args)
122 ret["max"] = int(max)
124 ret["timeout"] = int(abort)
127 def handle(self, req):
129 if self.max is not None:
130 if self.timeout is not None:
131 now = start = time.time()
132 while len(self.current) >= self.max:
133 self.tcond.wait(start + self.timeout - now)
135 if now - start > self.timeout:
138 while len(self.current) >= self.max:
140 th = reqthread(target=self.run, args=[req])
142 while th.is_alive() and th not in self.current:
147 th = threading.current_thread()
150 self.tcond.notify_all()
153 with perf.request(env) as reqevent:
154 respiter = req.handlewsgi(env, req.startreq)
155 for data in respiter:
158 reqevent.response([req.status, req.headers])
164 log.error("exception occurred when handling request", exc_info=True)
167 self.current.remove(th)
168 self.tcond.notify_all()
175 if len(self.current) > 0:
176 th = next(iter(self.current))
181 class threadpool(handler):
182 def __init__(self, *, min=0, max=20, live=300, **kw):
183 super().__init__(**kw)
186 self.lk = threading.RLock()
187 self.pcond = threading.Condition(self.lk)
188 self.rcond = threading.Condition(self.lk)
193 for i in range(self.min):
197 def parseargs(cls, *, min=None, max=None, live=None, **args):
198 ret = super().parseargs(**args)
200 ret["min"] = int(min)
202 ret["max"] = int(max)
204 ret["live"] = int(live)
209 th = reqthread(target=self.loop)
211 while not th in self.current:
214 def _handle(self, req):
217 with perf.request(env) as reqevent:
218 respiter = req.handlewsgi(env, req.startreq)
219 for data in respiter:
222 reqevent.response([req.status, req.headers])
228 log.error("exception occurred when handling request", exc_info=True)
233 th = threading.current_thread()
241 self.pcond.notify_all()
242 now = start = time.time()
243 while self.wreq is None:
244 self.rcond.wait(start + self.live - now)
246 if now - start > self.live:
247 if len(self.current) > self.min:
248 self.current.remove(th)
252 req, self.wreq = self.wreq, None
253 self.pcond.notify_all()
261 self.current.remove(th)
264 self.pcond.notify_all()
266 def handle(self, req):
269 if len(self.free) < 1 and len(self.current) < self.max:
271 while self.wreq is not None:
273 if self.wreq is None:
282 while len(self.current) > 0:
283 self.rcond.notify_all()
286 class resplex(handler):
287 def __init__(self, **kw):
288 super().__init__(**kw)
290 self.lk = threading.Lock()
291 self.cqueue = queue.Queue(5)
292 self.cnpipe = os.pipe()
293 self.rthread = reqthread(name="Response thread", target=self.handle2)
296 def ckflush(self, req):
297 raise Exception("resplex handler does not support the write() function")
299 def handle(self, req):
300 reqthread(target=self.handle1, args=[req]).start()
302 def handle1(self, req):
304 th = threading.current_thread()
309 respobj = req.handlewsgi(env, req.startreq)
310 respiter = iter(respobj)
312 log.error("request handler returned without calling start_request")
313 if hasattr(respiter, "close"):
317 self.cqueue.put((req, respiter))
318 os.write(self.cnpipe[1], b" ")
321 self.current.remove(th)
325 log.error("exception occurred when handling request", exc_info=True)
336 respiter = current[req]
338 if respiter is not None and hasattr(respiter, "close"):
341 log.error("exception occurred when closing iterator", exc_info=True)
345 log.error("exception occurred when closing request", exc_info=True)
348 respiter = current[req]
349 if respiter is not None:
352 data = next(respiter)
353 except StopIteration:
358 log.error("exception occurred when iterating response", exc_info=True)
366 if hasattr(respiter, "close"):
369 log.error("exception occurred when closing iterator", exc_info=True)
371 if respiter is None and not req.buffer:
375 bufl = list(req for req in current.keys() if req.buffer)
376 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
378 ret = os.read(rp, 1024)
384 req, respiter = self.cqueue.get(False)
385 current[req] = respiter
395 log.error("exception occurred when writing response", exc_info=True)
398 if len(req.buffer) < 65536:
401 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
407 if len(self.current) > 0:
408 th = next(iter(self.current))
412 os.close(self.cnpipe[1])
415 names = {"single": single,
420 def parsehspec(spec):
423 nm, spec = spec.split(":", 1)
427 part, spec = spec.split(",", 1)
429 part, spec = spec, None
431 key, val = part.split("=", 1)