1 import sys, os, threading, time, logging, select, queue, collections
4 log = logging.getLogger("ashd.serve")
6 seqlk = threading.Lock()
15 class closed(IOError):
17 super().__init__("The client has closed the connection.")
19 class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
25 class wsgirequest(object):
26 def __init__(self, *, handler):
30 self.handler = handler
31 self.buffer = bytearray()
37 def writehead(self, status, headers):
43 def writedata(self, data):
44 self.buffer.extend(data)
49 raise Exception("Cannot send response body before starting response.")
51 self.writehead(self.status, self.headers)
53 def write(self, data):
58 self.handler.ckflush(self)
60 def startreq(self, status, headers, exc_info=None):
69 raise Exception("Can only start responding once.")
71 self.headers = headers
74 class handler(object):
75 def handle(self, request):
77 def ckflush(self, req):
79 p.register(req, select.POLLOUT)
80 while len(req.buffer) > 0:
87 def parseargs(cls, **args):
89 raise ValueError("unknown handler argument: " + next(iter(args)))
92 class single(handler):
95 def handle(self, req):
98 with perf.request(env) as reqevent:
99 respiter = req.handlewsgi(env, req.startreq)
100 for data in respiter:
103 reqevent.response([req.status, req.headers])
109 log.error("exception occurred when handling request", exc_info=True)
117 sys.stderr.write(" ")
118 sys.stderr.write(str(a))
120 sys.stderr.write("\n")
123 class freethread(handler):
126 def __init__(self, *, max=None, timeout=None, **kw):
127 super().__init__(**kw)
129 self.lk = threading.Lock()
130 self.tcond = threading.Condition(self.lk)
132 self.timeout = timeout
135 def parseargs(cls, *, max=None, abort=None, **args):
136 ret = super().parseargs(**args)
138 ret["max"] = int(max)
140 ret["timeout"] = int(abort)
143 def handle(self, req):
145 if self.max is not None:
146 if self.timeout is not None:
147 now = start = time.time()
148 while len(self.current) >= self.max:
149 self.tcond.wait(start + self.timeout - now)
151 if now - start > self.timeout:
154 while len(self.current) >= self.max:
156 th = reqthread(target=self.run, args=[req])
157 th.registered = False
159 while not th.registered:
164 th = threading.current_thread()
168 self.tcond.notify_all()
171 with perf.request(env) as reqevent:
172 respiter = req.handlewsgi(env, req.startreq)
173 for data in respiter:
176 reqevent.response([req.status, req.headers])
182 log.error("exception occurred when handling request", exc_info=True)
185 self.current.remove(th)
186 self.tcond.notify_all()
193 if len(self.current) > 0:
194 th = next(iter(self.current))
199 class threadpool(handler):
202 def __init__(self, *, max=25, qsz=100, timeout=None, **kw):
203 super().__init__(**kw)
205 self.clk = threading.Lock()
206 self.ccond = threading.Condition(self.clk)
207 self.queue = collections.deque()
211 self.qlk = threading.Lock()
212 self.qfcond = threading.Condition(self.qlk)
213 self.qecond = threading.Condition(self.qlk)
216 self.timeout = timeout
219 def parseargs(cls, *, max=None, queue=None, abort=None, **args):
220 ret = super().parseargs(**args)
222 ret["max"] = int(max)
224 ret["qsz"] = int(queue)
226 ret["timeout"] = int(abort)
229 def handle(self, req):
232 if self.timeout is not None:
233 now = start = time.time()
234 while len(self.queue) >= self.qsz:
235 self.qecond.wait(start + self.timeout - now)
237 if now - start > self.timeout:
240 while len(self.queue) >= self.qsz:
242 self.queue.append(req)
244 if len(self.waiting) < 1:
248 if len(self.current) < self.max:
249 th = reqthread(target=self.run)
250 th.registered = False
252 while not th.registered:
255 def handle1(self, req):
258 with perf.request(env) as reqevent:
259 respiter = req.handlewsgi(env, req.startreq)
260 for data in respiter:
263 reqevent.response([req.status, req.headers])
269 log.error("exception occurred when handling request", exc_info=True)
273 th = threading.current_thread()
277 self.ccond.notify_all()
280 start = now = time.time()
282 while len(self.queue) < 1:
283 if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout:
287 if len(self.waiting) == self.waitlimit:
289 self.qfcond.wait(start + timeout - now)
291 self.waiting.remove(th)
293 if now - start > timeout:
295 req = self.queue.popleft()
303 self.current.remove(th)
308 if len(self.current) > 0:
309 th = next(iter(self.current))
314 class resplex(handler):
317 def __init__(self, *, max=None, **kw):
318 super().__init__(**kw)
320 self.lk = threading.Lock()
321 self.tcond = threading.Condition(self.lk)
323 self.cqueue = queue.Queue(5)
324 self.cnpipe = os.pipe()
325 self.rthread = reqthread(name="Response thread", target=self.handle2)
329 def parseargs(cls, *, max=None, **args):
330 ret = super().parseargs(**args)
332 ret["max"] = int(max)
335 def ckflush(self, req):
336 raise Exception("resplex handler does not support the write() function")
338 def handle(self, req):
340 if self.max is not None:
341 while len(self.current) >= self.max:
343 th = reqthread(target=self.handle1, args=[req])
344 th.registered = False
346 while not th.registered:
349 def handle1(self, req):
351 th = threading.current_thread()
355 self.tcond.notify_all()
358 respobj = req.handlewsgi(env, req.startreq)
359 respiter = iter(respobj)
361 log.error("request handler returned without calling start_request")
362 if hasattr(respiter, "close"):
366 self.cqueue.put((req, respiter))
367 os.write(self.cnpipe[1], b" ")
371 self.current.remove(th)
372 self.tcond.notify_all()
376 log.error("exception occurred when handling request", exc_info=True)
387 respiter = current[req]
389 if respiter is not None and hasattr(respiter, "close"):
392 log.error("exception occurred when closing iterator", exc_info=True)
396 log.error("exception occurred when closing request", exc_info=True)
399 respiter = current[req]
400 if respiter is not None:
403 data = next(respiter)
404 except StopIteration:
409 log.error("exception occurred when handling response data", exc_info=True)
412 log.error("exception occurred when iterating response", exc_info=True)
419 log.error("exception occurred when handling response data", exc_info=True)
424 if hasattr(respiter, "close"):
427 log.error("exception occurred when closing iterator", exc_info=True)
429 if respiter is None and not req.buffer:
433 bufl = list(req for req in current.keys() if req.buffer)
434 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
436 ret = os.read(rp, 1024)
442 req, respiter = self.cqueue.get(False)
443 current[req] = respiter
453 log.error("exception occurred when writing response", exc_info=True)
456 if len(req.buffer) < 65536:
459 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
465 if len(self.current) > 0:
466 th = next(iter(self.current))
470 os.close(self.cnpipe[1])
473 names = {cls.cname: cls for cls in globals().values() if
474 isinstance(cls, type) and
475 issubclass(cls, handler) and
476 hasattr(cls, "cname")}
478 def parsehspec(spec):
481 nm, spec = spec.split(":", 1)
485 part, spec = spec.split(",", 1)
487 part, spec = spec, None
489 key, val = part.split("=", 1)