1 import sys, os, threading, time, logging, select, queue, collections
4 log = logging.getLogger("ashd.serve")
6 seqlk = threading.Lock()
15 class closed(IOError):
17 super().__init__("The client has closed the connection.")
19 class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
25 class wsgirequest(object):
26 def __init__(self, *, handler):
30 self.handler = handler
31 self.buffer = bytearray()
37 def writehead(self, status, headers):
43 def writedata(self, data):
44 self.buffer.extend(data)
49 raise Exception("Cannot send response body before starting response.")
51 self.writehead(self.status, self.headers)
53 def write(self, data):
58 self.handler.ckflush(self)
60 def startreq(self, status, headers, exc_info=None):
69 raise Exception("Can only start responding once.")
71 self.headers = headers
74 class handler(object):
75 def handle(self, request):
77 def ckflush(self, req):
78 while len(req.buffer) > 0:
79 rls, wls, els = select.select([], [req], [req])
85 def parseargs(cls, **args):
87 raise ValueError("unknown handler argument: " + next(iter(args)))
90 class single(handler):
93 def handle(self, req):
96 with perf.request(env) as reqevent:
97 respiter = req.handlewsgi(env, req.startreq)
101 reqevent.response([req.status, req.headers])
107 log.error("exception occurred when handling request", exc_info=True)
115 sys.stderr.write(" ")
116 sys.stderr.write(str(a))
118 sys.stderr.write("\n")
121 class freethread(handler):
124 def __init__(self, *, max=None, timeout=None, **kw):
125 super().__init__(**kw)
127 self.lk = threading.Lock()
128 self.tcond = threading.Condition(self.lk)
130 self.timeout = timeout
133 def parseargs(cls, *, max=None, abort=None, **args):
134 ret = super().parseargs(**args)
136 ret["max"] = int(max)
138 ret["timeout"] = int(abort)
141 def handle(self, req):
143 if self.max is not None:
144 if self.timeout is not None:
145 now = start = time.time()
146 while len(self.current) >= self.max:
147 self.tcond.wait(start + self.timeout - now)
149 if now - start > self.timeout:
152 while len(self.current) >= self.max:
154 th = reqthread(target=self.run, args=[req])
155 th.registered = False
157 while not th.registered:
162 th = threading.current_thread()
166 self.tcond.notify_all()
169 with perf.request(env) as reqevent:
170 respiter = req.handlewsgi(env, req.startreq)
171 for data in respiter:
174 reqevent.response([req.status, req.headers])
180 log.error("exception occurred when handling request", exc_info=True)
183 self.current.remove(th)
184 self.tcond.notify_all()
191 if len(self.current) > 0:
192 th = next(iter(self.current))
197 class threadpool(handler):
200 def __init__(self, *, max=25, qsz=100, timeout=None, **kw):
201 super().__init__(**kw)
203 self.clk = threading.Lock()
204 self.ccond = threading.Condition(self.clk)
205 self.queue = collections.deque()
209 self.qlk = threading.Lock()
210 self.qfcond = threading.Condition(self.qlk)
211 self.qecond = threading.Condition(self.qlk)
214 self.timeout = timeout
217 def parseargs(cls, *, max=None, queue=None, abort=None, **args):
218 ret = super().parseargs(**args)
220 ret["max"] = int(max)
222 ret["qsz"] = int(queue)
224 ret["timeout"] = int(abort)
227 def handle(self, req):
230 if self.timeout is not None:
231 now = start = time.time()
232 while len(self.queue) >= self.qsz:
233 self.qecond.wait(start + self.timeout - now)
235 if now - start > self.timeout:
238 while len(self.queue) >= self.qsz:
240 self.queue.append(req)
242 if len(self.waiting) < 1:
246 if len(self.current) < self.max:
247 th = reqthread(target=self.run)
248 th.registered = False
250 while not th.registered:
253 def handle1(self, req):
256 with perf.request(env) as reqevent:
257 respiter = req.handlewsgi(env, req.startreq)
258 for data in respiter:
261 reqevent.response([req.status, req.headers])
267 log.error("exception occurred when handling request", exc_info=True)
271 th = threading.current_thread()
275 self.ccond.notify_all()
278 start = now = time.time()
280 while len(self.queue) < 1:
281 if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout:
285 if len(self.waiting) == self.waitlimit:
287 self.qfcond.wait(start + timeout - now)
289 self.waiting.remove(th)
291 if now - start > timeout:
293 req = self.queue.popleft()
301 self.current.remove(th)
306 if len(self.current) > 0:
307 th = next(iter(self.current))
312 class resplex(handler):
315 def __init__(self, *, max=None, **kw):
316 super().__init__(**kw)
318 self.lk = threading.Lock()
319 self.tcond = threading.Condition(self.lk)
321 self.cqueue = queue.Queue(5)
322 self.cnpipe = os.pipe()
323 self.rthread = reqthread(name="Response thread", target=self.handle2)
327 def parseargs(cls, *, max=None, **args):
328 ret = super().parseargs(**args)
330 ret["max"] = int(max)
333 def ckflush(self, req):
334 raise Exception("resplex handler does not support the write() function")
336 def handle(self, req):
338 if self.max is not None:
339 while len(self.current) >= self.max:
341 th = reqthread(target=self.handle1, args=[req])
342 th.registered = False
344 while not th.registered:
347 def handle1(self, req):
349 th = threading.current_thread()
353 self.tcond.notify_all()
356 respobj = req.handlewsgi(env, req.startreq)
357 respiter = iter(respobj)
359 log.error("request handler returned without calling start_request")
360 if hasattr(respiter, "close"):
364 self.cqueue.put((req, respiter))
365 os.write(self.cnpipe[1], b" ")
369 self.current.remove(th)
370 self.tcond.notify_all()
374 log.error("exception occurred when handling request", exc_info=True)
385 respiter = current[req]
387 if respiter is not None and hasattr(respiter, "close"):
390 log.error("exception occurred when closing iterator", exc_info=True)
394 log.error("exception occurred when closing request", exc_info=True)
397 respiter = current[req]
398 if respiter is not None:
401 data = next(respiter)
402 except StopIteration:
407 log.error("exception occurred when handling response data", exc_info=True)
410 log.error("exception occurred when iterating response", exc_info=True)
417 log.error("exception occurred when handling response data", exc_info=True)
422 if hasattr(respiter, "close"):
425 log.error("exception occurred when closing iterator", exc_info=True)
427 if respiter is None and not req.buffer:
431 bufl = list(req for req in current.keys() if req.buffer)
432 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
434 ret = os.read(rp, 1024)
440 req, respiter = self.cqueue.get(False)
441 current[req] = respiter
451 log.error("exception occurred when writing response", exc_info=True)
454 if len(req.buffer) < 65536:
457 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
463 if len(self.current) > 0:
464 th = next(iter(self.current))
468 os.close(self.cnpipe[1])
471 names = {cls.cname: cls for cls in globals().values() if
472 isinstance(cls, type) and
473 issubclass(cls, handler) and
474 hasattr(cls, "cname")}
476 def parsehspec(spec):
479 nm, spec = spec.split(":", 1)
483 part, spec = spec.split(",", 1)
485 part, spec = spec, None
487 key, val = part.split("=", 1)