X-Git-Url: http://www.dolda2000.com/gitweb/?p=ashd.git;a=blobdiff_plain;f=python3%2Fashd%2Fserve.py;h=0927710ae7c7ac90c0411ffb45e6c48d8326d236;hp=9f9fe7fe04388a896f78927004c72bc479843e95;hb=6e8b9b9d6d043ecbcaeb8ef807bd9648424aedd7;hpb=552a70bf49c3798c301ecc1b8c2d9118aa325beb diff --git a/python3/ashd/serve.py b/python3/ashd/serve.py index 9f9fe7f..0927710 100644 --- a/python3/ashd/serve.py +++ b/python3/ashd/serve.py @@ -1,4 +1,5 @@ -import threading, time, logging +import sys, os, threading, time, logging, select, queue, collections +from . import perf log = logging.getLogger("ashd.serve") seq = 1 @@ -11,44 +12,36 @@ def reqseq(): seq += 1 return s -class reqthread(threading.Thread): - def __init__(self, name=None): - if name is None: - name = "Request handler %i" % reqseq() - super().__init__(name=name) - - def handle(self): - raise Exception() - - def run(self): - try: - self.handle() - except: - log.error("exception occurred when handling request", exc_info=True) - class closed(IOError): def __init__(self): super().__init__("The client has closed the connection.") -class wsgithread(reqthread): - def __init__(self, **kwargs): - super().__init__(**kwargs) +class reqthread(threading.Thread): + def __init__(self, *, name=None, **kw): + if name is None: + name = "Request handler %i" % reqseq() + super().__init__(name=name, **kw) + +class wsgirequest(object): + def __init__(self, *, handler): self.status = None self.headers = [] self.respsent = False + self.handler = handler + self.buffer = bytearray() def handlewsgi(self): raise Exception() + def fileno(self): + raise Exception() def writehead(self, status, headers): raise Exception() - def writedata(self, data): + def flush(self): raise Exception() - - def write(self, data): - if not data: - return - self.flushreq() - self.writedata(data) + def close(self): + pass + def writedata(self, data): + self.buffer.extend(data) def flushreq(self): if not self.respsent: @@ -57,59 +50,444 @@ class wsgithread(reqthread): self.respsent = True self.writehead(self.status, self.headers) + def write(self, data): + if not data: + return + self.flushreq() + self.writedata(data) + self.handler.ckflush(self) + def startreq(self, status, headers, exc_info=None): if self.status: - if exc_info: # Nice calling convetion ^^ + if exc_info: try: if self.respsent: raise exc_info[1] finally: - exc_info = None # CPython GC bug? + exc_info = None else: raise Exception("Can only start responding once.") self.status = status self.headers = headers return self.write - - def handle(self): + +class handler(object): + def handle(self, request): + raise Exception() + def ckflush(self, req): + p = select.poll() + p.register(req, select.POLLOUT) + while len(req.buffer) > 0: + p.poll() + req.flush() + def close(self): + pass + + @classmethod + def parseargs(cls, **args): + if len(args) > 0: + raise ValueError("unknown handler argument: " + next(iter(args))) + return {} + +class single(handler): + cname = "single" + + def handle(self, req): try: - respiter = self.handlewsgi() + env = req.mkenv() + with perf.request(env) as reqevent: + respiter = req.handlewsgi(env, req.startreq) + for data in respiter: + req.write(data) + if req.status: + reqevent.response([req.status, req.headers]) + req.flushreq() + self.ckflush(req) + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) + finally: + req.close() + +def dbg(*a): + f = True + for o in a: + if not f: + sys.stderr.write(" ") + sys.stderr.write(str(a)) + f = False + sys.stderr.write("\n") + sys.stderr.flush() + +class freethread(handler): + cname = "free" + + def __init__(self, *, max=None, timeout=None, **kw): + super().__init__(**kw) + self.current = set() + self.lk = threading.Lock() + self.tcond = threading.Condition(self.lk) + self.max = max + self.timeout = timeout + + @classmethod + def parseargs(cls, *, max=None, abort=None, **args): + ret = super().parseargs(**args) + if max: + ret["max"] = int(max) + if abort: + ret["timeout"] = int(abort) + return ret + + def handle(self, req): + with self.lk: + if self.max is not None: + if self.timeout is not None: + now = start = time.time() + while len(self.current) >= self.max: + self.tcond.wait(start + self.timeout - now) + now = time.time() + if now - start > self.timeout: + os.abort() + else: + while len(self.current) >= self.max: + self.tcond.wait() + th = reqthread(target=self.run, args=[req]) + th.registered = False + th.start() + while not th.registered: + self.tcond.wait() + + def run(self, req): + try: + th = threading.current_thread() + with self.lk: + self.current.add(th) + th.registered = True + self.tcond.notify_all() try: + env = req.mkenv() + with perf.request(env) as reqevent: + respiter = req.handlewsgi(env, req.startreq) + for data in respiter: + req.write(data) + if req.status: + reqevent.response([req.status, req.headers]) + req.flushreq() + self.ckflush(req) + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) + finally: + with self.lk: + self.current.remove(th) + self.tcond.notify_all() + finally: + req.close() + + def close(self): + while True: + with self.lk: + if len(self.current) > 0: + th = next(iter(self.current)) + else: + return + th.join() + +class threadpool(handler): + cname = "pool" + + def __init__(self, *, max=25, qsz=100, timeout=None, **kw): + super().__init__(**kw) + self.current = set() + self.clk = threading.Lock() + self.ccond = threading.Condition(self.clk) + self.queue = collections.deque() + self.waiting = set() + self.waitlimit = 5 + self.wlstart = 0.0 + self.qlk = threading.Lock() + self.qfcond = threading.Condition(self.qlk) + self.qecond = threading.Condition(self.qlk) + self.max = max + self.qsz = qsz + self.timeout = timeout + + @classmethod + def parseargs(cls, *, max=None, queue=None, abort=None, **args): + ret = super().parseargs(**args) + if max: + ret["max"] = int(max) + if queue: + ret["qsz"] = int(queue) + if abort: + ret["timeout"] = int(abort) + return ret + + def handle(self, req): + spawn = False + with self.qlk: + if self.timeout is not None: + now = start = time.time() + while len(self.queue) >= self.qsz: + self.qecond.wait(start + self.timeout - now) + now = time.time() + if now - start > self.timeout: + os.abort() + else: + while len(self.queue) >= self.qsz: + self.qecond.wait() + self.queue.append(req) + self.qfcond.notify() + if len(self.waiting) < 1: + spawn = True + if spawn: + with self.clk: + if len(self.current) < self.max: + th = reqthread(target=self.run) + th.registered = False + th.start() + while not th.registered: + self.ccond.wait() + + def handle1(self, req): + try: + env = req.mkenv() + with perf.request(env) as reqevent: + respiter = req.handlewsgi(env, req.startreq) for data in respiter: - self.write(data) - if self.status: - self.flushreq() + req.write(data) + if req.status: + reqevent.response([req.status, req.headers]) + req.flushreq() + self.ckflush(req) + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) + + def run(self): + timeout = 10.0 + th = threading.current_thread() + with self.clk: + self.current.add(th) + th.registered = True + self.ccond.notify_all() + try: + while True: + start = now = time.time() + with self.qlk: + while len(self.queue) < 1: + if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout: + return + self.waiting.add(th) + try: + if len(self.waiting) == self.waitlimit: + self.wlstart = now + self.qfcond.wait(start + timeout - now) + finally: + self.waiting.remove(th) + now = time.time() + if now - start > timeout: + return + req = self.queue.popleft() + self.qecond.notify() + try: + self.handle1(req) + finally: + req.close() + finally: + with self.clk: + self.current.remove(th) + + def close(self): + while True: + with self.clk: + if len(self.current) > 0: + th = next(iter(self.current)) + else: + return + th.join() + +class resplex(handler): + cname = "rplex" + + def __init__(self, *, max=None, **kw): + super().__init__(**kw) + self.current = set() + self.lk = threading.Lock() + self.tcond = threading.Condition(self.lk) + self.max = max + self.cqueue = queue.Queue(5) + self.cnpipe = os.pipe() + self.rthread = reqthread(name="Response thread", target=self.handle2) + self.rthread.start() + + @classmethod + def parseargs(cls, *, max=None, **args): + ret = super().parseargs(**args) + if max: + ret["max"] = int(max) + return ret + + def ckflush(self, req): + raise Exception("resplex handler does not support the write() function") + + def handle(self, req): + with self.lk: + if self.max is not None: + while len(self.current) >= self.max: + self.tcond.wait() + th = reqthread(target=self.handle1, args=[req]) + th.registered = False + th.start() + while not th.registered: + self.tcond.wait() + + def handle1(self, req): + try: + th = threading.current_thread() + with self.lk: + self.current.add(th) + th.registered = True + self.tcond.notify_all() + try: + env = req.mkenv() + respobj = req.handlewsgi(env, req.startreq) + respiter = iter(respobj) + if not req.status: + log.error("request handler returned without calling start_request") + if hasattr(respiter, "close"): + respiter.close() + return + else: + self.cqueue.put((req, respiter)) + os.write(self.cnpipe[1], b" ") + req = None finally: - if hasattr(respiter, "close"): - respiter.close() + with self.lk: + self.current.remove(th) + self.tcond.notify_all() except closed: pass + except: + log.error("exception occurred when handling request", exc_info=True) + finally: + if req is not None: + req.close() + + def handle2(self): + try: + rp = self.cnpipe[0] + current = {} + + def closereq(req): + respiter = current[req] + try: + if respiter is not None and hasattr(respiter, "close"): + respiter.close() + except: + log.error("exception occurred when closing iterator", exc_info=True) + try: + req.close() + except: + log.error("exception occurred when closing request", exc_info=True) + del current[req] + def ckiter(req): + respiter = current[req] + if respiter is not None: + rem = False + try: + data = next(respiter) + except StopIteration: + rem = True + try: + req.flushreq() + except: + log.error("exception occurred when handling response data", exc_info=True) + except: + rem = True + log.error("exception occurred when iterating response", exc_info=True) + if not rem: + if data: + try: + req.flushreq() + req.writedata(data) + except: + log.error("exception occurred when handling response data", exc_info=True) + rem = True + if rem: + current[req] = None + try: + if hasattr(respiter, "close"): + respiter.close() + except: + log.error("exception occurred when closing iterator", exc_info=True) + respiter = None + if respiter is None and not req.buffer: + closereq(req) + + while True: + bufl = list(req for req in current.keys() if req.buffer) + rls, wls, els = select.select([rp], bufl, [rp] + bufl) + if rp in rls: + ret = os.read(rp, 1024) + if not ret: + os.close(rp) + return + try: + while True: + req, respiter = self.cqueue.get(False) + current[req] = respiter + ckiter(req) + except queue.Empty: + pass + for req in wls: + try: + req.flush() + except closed: + closereq(req) + except: + log.error("exception occurred when writing response", exc_info=True) + closereq(req) + else: + if len(req.buffer) < 65536: + ckiter(req) + except: + log.critical("unexpected exception occurred in response handler thread", exc_info=True) + os.abort() + + def close(self): + while True: + with self.lk: + if len(self.current) > 0: + th = next(iter(self.current)) + else: + break + th.join() + os.close(self.cnpipe[1]) + self.rthread.join() + +names = {cls.cname: cls for cls in globals().values() if + isinstance(cls, type) and + issubclass(cls, handler) and + hasattr(cls, "cname")} -class calllimiter(object): - def __init__(self, limit): - self.limit = limit - self.lock = threading.Condition() - self.inflight = 0 - - def waited(self, time): - if time > 10: - raise RuntimeError("Waited too long") - - def __enter__(self): - with self.lock: - start = time.time() - while self.inflight >= self.limit: - self.lock.wait(10) - self.waited(time.time() - start) - self.inflight += 1 - return self - - def __exit__(self, *excinfo): - with self.lock: - self.inflight -= 1 - self.lock.notify() - return False - - def call(self, target): - with self: - return target() +def parsehspec(spec): + if ":" not in spec: + return spec, {} + nm, spec = spec.split(":", 1) + args = {} + while spec: + if "," in spec: + part, spec = spec.split(",", 1) + else: + part, spec = spec, None + if "=" in part: + key, val = part.split("=", 1) + else: + key, val = part, "" + args[key] = val + return nm, args