X-Git-Url: http://www.dolda2000.com/gitweb/?a=blobdiff_plain;f=python3%2Fashd%2Fserve.py;h=54fbff1df0e3e0ac3d26dbfc313e6aeeb2f19409;hb=48dc900a804537aa5b36f203c4c4ba6c0bcb2f20;hp=74ccd929e5c9689b116ed3d55745e23e43456551;hpb=3a3b78e3147cab8cdbc4bb2d577013734b73a8af;p=ashd.git diff --git a/python3/ashd/serve.py b/python3/ashd/serve.py index 74ccd92..54fbff1 100644 --- a/python3/ashd/serve.py +++ b/python3/ashd/serve.py @@ -1,4 +1,4 @@ -import sys, os, threading, time, logging, select +import sys, os, threading, time, logging, select, queue from . import perf log = logging.getLogger("ashd.serve") @@ -75,7 +75,9 @@ class handler(object): def handle(self, request): raise Exception() def ckflush(self, req): - raise Exception() + while len(req.buffer) > 0: + rls, wls, els = select.select([], [req], [req]) + req.flush() def close(self): pass @@ -85,6 +87,25 @@ class handler(object): raise ValueError("unknown handler argument: " + next(iter(args))) return {} +class single(handler): + def handle(self, req): + try: + env = req.mkenv() + with perf.request(env) as reqevent: + respiter = req.handlewsgi(env, req.startreq) + for data in respiter: + req.write(data) + if req.status: + reqevent.response([req.status, req.headers]) + req.flushreq() + self.ckflush(req) + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) + finally: + req.close() + class freethread(handler): def __init__(self, *, max=None, timeout=None, **kw): super().__init__(**kw) @@ -121,11 +142,6 @@ class freethread(handler): while th.is_alive() and th not in self.current: self.tcond.wait() - def ckflush(self, req): - while len(req.buffer) > 0: - rls, wls, els = select.select([], [req], [req]) - req.flush() - def run(self, req): try: th = threading.current_thread() @@ -195,11 +211,6 @@ class threadpool(handler): while not th in self.current: self.pcond.wait() - def ckflush(self, req): - while len(req.buffer) > 0: - rls, wls, els = select.select([], [req], [req]) - req.flush() - def _handle(self, req): try: env = req.mkenv() @@ -272,8 +283,158 @@ class threadpool(handler): self.rcond.notify_all() self.pcond.wait(1) -names = {"free": freethread, - "pool": threadpool} +class resplex(handler): + def __init__(self, *, max=None, **kw): + super().__init__(**kw) + self.current = set() + self.lk = threading.Lock() + self.tcond = threading.Condition(self.lk) + self.max = max + self.cqueue = queue.Queue(5) + self.cnpipe = os.pipe() + self.rthread = reqthread(name="Response thread", target=self.handle2) + self.rthread.start() + + @classmethod + def parseargs(cls, *, max=None, **args): + ret = super().parseargs(**args) + if max: + ret["max"] = int(max) + return ret + + def ckflush(self, req): + raise Exception("resplex handler does not support the write() function") + + def handle(self, req): + with self.lk: + if self.max is not None: + while len(self.current) >= self.max: + self.tcond.wait() + th = reqthread(target=self.handle1, args=[req]) + th.start() + while th.is_alive() and th not in self.current: + self.tcond.wait() + + def handle1(self, req): + try: + th = threading.current_thread() + with self.lk: + self.current.add(th) + self.tcond.notify_all() + try: + env = req.mkenv() + respobj = req.handlewsgi(env, req.startreq) + respiter = iter(respobj) + if not req.status: + log.error("request handler returned without calling start_request") + if hasattr(respiter, "close"): + respiter.close() + return + else: + self.cqueue.put((req, respiter)) + os.write(self.cnpipe[1], b" ") + req = None + finally: + with self.lk: + self.current.remove(th) + self.tcond.notify_all() + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) + finally: + if req is not None: + req.close() + + def handle2(self): + try: + rp = self.cnpipe[0] + current = {} + + def closereq(req): + respiter = current[req] + try: + if respiter is not None and hasattr(respiter, "close"): + respiter.close() + except: + log.error("exception occurred when closing iterator", exc_info=True) + try: + req.close() + except: + log.error("exception occurred when closing request", exc_info=True) + del current[req] + def ckiter(req): + respiter = current[req] + if respiter is not None: + rem = False + try: + data = next(respiter) + except StopIteration: + rem = True + req.flushreq() + except: + rem = True + log.error("exception occurred when iterating response", exc_info=True) + if not rem: + if data: + req.flushreq() + req.writedata(data) + else: + current[req] = None + try: + if hasattr(respiter, "close"): + respiter.close() + except: + log.error("exception occurred when closing iterator", exc_info=True) + respiter = None + if respiter is None and not req.buffer: + closereq(req) + + while True: + bufl = list(req for req in current.keys() if req.buffer) + rls, wls, els = select.select([rp], bufl, [rp] + bufl) + if rp in rls: + ret = os.read(rp, 1024) + if not ret: + os.close(rp) + return + try: + while True: + req, respiter = self.cqueue.get(False) + current[req] = respiter + ckiter(req) + except queue.Empty: + pass + for req in wls: + try: + req.flush() + except closed: + closereq(req) + except: + log.error("exception occurred when writing response", exc_info=True) + closereq(req) + else: + if len(req.buffer) < 65536: + ckiter(req) + except: + log.critical("unexpected exception occurred in response handler thread", exc_info=True) + os.abort() + + def close(self): + while True: + with self.lk: + if len(self.current) > 0: + th = next(iter(self.current)) + else: + break + th.join() + os.close(self.cnpipe[1]) + self.rthread.join() + +names = {"single": single, + "free": freethread, + "pool": threadpool, + "rplex": resplex} def parsehspec(spec): if ":" not in spec: