From: Fredrik Tolf Date: Mon, 6 Jan 2014 02:53:35 +0000 (+0100) Subject: python: Back-ported the new request handling to Python2. X-Git-Url: http://www.dolda2000.com/gitweb/?p=ashd.git;a=commitdiff_plain;h=8f410ce7bbc467c7c40c92df6b5f22e8a9ee1de8 python: Back-ported the new request handling to Python2. --- diff --git a/python/ashd-wsgi b/python/ashd-wsgi index 01c158f..f1f40b2 100755 --- a/python/ashd-wsgi +++ b/python/ashd-wsgi @@ -1,19 +1,19 @@ #!/usr/bin/python -import sys, os, getopt, threading, logging, time -import ashd.proto, ashd.util, ashd.perf, ashd.serve +import sys, os, getopt, socket, logging, time +import ashd.util, ashd.serve try: import pdm.srv except: pdm = None def usage(out): - out.write("usage: ashd-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-l REQLIMIT] HANDLER-MODULE [ARGS...]\n") + out.write("usage: ashd-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-l REQLIMIT] [-t REQUEST-HANDLER[:PAR[=VAL](,PAR[=VAL])...]] HANDLER-MODULE [ARGS...]\n") -reqlimit = 0 +hspec = "free", {} modwsgi_compat = False setlog = True -opts, args = getopt.getopt(sys.argv[1:], "+hAp:l:m:") +opts, args = getopt.getopt(sys.argv[1:], "+hALp:t:l:m:") for o, a in opts: if o == "-h": usage(sys.stdout) @@ -25,7 +25,9 @@ for o, a in opts: elif o == "-A": modwsgi_compat = True elif o == "-l": - reqlimit = int(a) + hspec = "free", {"max": a, "abort": "10"} + elif o == "-t": + hspec = ashd.serve.parsehspec(a) elif o == "-m": if pdm is not None: pdm.srv.listen(a) @@ -145,56 +147,52 @@ def mkenv(req): env["wsgi.run_once"] = False return env -if reqlimit != 0: - guard = ashd.serve.abortlimiter(reqlimit).call -else: - guard = lambda fun: fun() +class request(ashd.serve.wsgirequest): + def __init__(self, bkreq, **kw): + super(request, self).__init__(**kw) + self.bkreq = bkreq.dup() + + def mkenv(self): + return mkenv(self.bkreq) -class reqthread(ashd.serve.wsgithread): - def __init__(self, req): - super(reqthread, self).__init__() - self.req = req.dup() - - def handlewsgi(self): - return handler(self.env, self.startreq) + def handlewsgi(self, env, startreq): + return handler(env, startreq) + + def fileno(self): + return self.bkreq.bsk.fileno() def writehead(self, status, headers): - try: - self.req.sk.write("HTTP/1.1 %s\n" % status) - for nm, val in headers: - self.req.sk.write("%s: %s\n" % (nm, val)) - self.req.sk.write("\n") - except IOError: - raise ashd.serve.closed() + w = self.buffer.extend + w("HTTP/1.1 %s\n" % status) + for nm, val in headers: + w("%s: %s\n" % (nm, val)) + w("\n") - def writedata(self, data): + def flush(self): try: - self.req.sk.write(data) - self.req.sk.flush() + ret = self.bkreq.bsk.send(self.buffer, socket.MSG_DONTWAIT) + self.buffer[:ret] = "" except IOError: raise ashd.serve.closed() - def handle(self): - self.env = mkenv(self.req) - reqevent = ashd.perf.request(self.env) - exc = (None, None, None) - try: - super(reqthread, self).handle() - if self.status: - reqevent.response([self.status, self.headers]) - except: - exc = sys.exc_info() - raise - finally: - reqevent.__exit__(*exc) - - def run(self): - try: - guard(super(reqthread, self).run) - finally: - self.req.close() - + def close(self): + self.bkreq.close() + def handle(req): - reqthread(req).start() + reqhandler.handle(request(bkreq=req, handler=reqhandler)) -ashd.util.serveloop(handle) +if hspec[0] not in ashd.serve.names: + sys.stderr.write("ashd-wsgi: no such request handler: %s\n" % hspec[0]) + sys.exit(1) +hclass = ashd.serve.names[hspec[0]] +try: + hargs = hclass.parseargs(**hspec[1]) +except ValueError as exc: + sys.stderr.write("ashd-wsgi: %s\n" % exc) + sys.exit(1) + +reqhandler = hclass(**hargs) +try: + ashd.util.serveloop(handle) +finally: + reqhandler.close() diff --git a/python/ashd/serve.py b/python/ashd/serve.py index 14170d7..0197d47 100644 --- a/python/ashd/serve.py +++ b/python/ashd/serve.py @@ -1,4 +1,5 @@ -import os, threading, time, logging +import sys, os, threading, time, logging, select, Queue +import perf log = logging.getLogger("ashd.serve") seq = 1 @@ -6,52 +7,41 @@ seqlk = threading.Lock() def reqseq(): global seq - seqlk.acquire() - try: + with seqlk: s = seq seq += 1 return s - finally: - seqlk.release() - -class reqthread(threading.Thread): - def __init__(self, name=None): - if name is None: - name = "Request handler %i" % reqseq() - super(reqthread, self).__init__(name=name) - - def handle(self): - raise Exception() - - def run(self): - try: - self.handle() - except: - log.error("exception occurred when handling request", exc_info=True) class closed(IOError): def __init__(self): super(closed, self).__init__("The client has closed the connection.") -class wsgithread(reqthread): - def __init__(self, **kwargs): - super(wsgithread, self).__init__(**kwargs) +class reqthread(threading.Thread): + def __init__(self, name=None, **kw): + if name is None: + name = "Request handler %i" % reqseq() + super(reqthread, self).__init__(name=name, **kw) + +class wsgirequest(object): + def __init__(self, handler): self.status = None self.headers = [] self.respsent = False + self.handler = handler + self.buffer = bytearray() def handlewsgi(self): raise Exception() + def fileno(self): + raise Exception() def writehead(self, status, headers): raise Exception() - def writedata(self, data): + def flush(self): raise Exception() - - def write(self, data): - if not data: - return - self.flushreq() - self.writedata(data) + def close(self): + pass + def writedata(self, data): + self.buffer.extend(data) def flushreq(self): if not self.respsent: @@ -60,73 +50,313 @@ class wsgithread(reqthread): self.respsent = True self.writehead(self.status, self.headers) + def write(self, data): + if not data: + return + self.flushreq() + self.writedata(data) + self.handler.ckflush(self) + def startreq(self, status, headers, exc_info=None): if self.status: - if exc_info: # Nice calling convetion ^^ + if exc_info: try: if self.respsent: - raise exc_info[0], exc_info[1], exc_info[2] + raise exc_info[1] finally: - exc_info = None # CPython GC bug? + exc_info = None else: raise Exception("Can only start responding once.") self.status = status self.headers = headers return self.write - - def handle(self): + +class handler(object): + def handle(self, request): + raise Exception() + def ckflush(self, req): + while len(req.buffer) > 0: + rls, wls, els = select.select([], [req], [req]) + req.flush() + def close(self): + pass + + @classmethod + def parseargs(cls, **args): + if len(args) > 0: + raise ValueError("unknown handler argument: " + iter(args).next()) + return {} + +class single(handler): + cname = "single" + + def handle(self, req): try: - respiter = self.handlewsgi() - try: + env = req.mkenv() + with perf.request(env) as reqevent: + respiter = req.handlewsgi(env, req.startreq) for data in respiter: - self.write(data) - if self.status: - self.flushreq() - finally: - if hasattr(respiter, "close"): - respiter.close() + req.write(data) + if req.status: + reqevent.response([req.status, req.headers]) + req.flushreq() + self.ckflush(req) except closed: pass + except: + log.error("exception occurred when handling request", exc_info=True) + finally: + req.close() + +class freethread(handler): + cname = "free" -class calllimiter(object): - def __init__(self, limit): - self.limit = limit - self.lock = threading.Condition() - self.inflight = 0 + def __init__(self, max=None, timeout=None, **kw): + super(freethread, self).__init__(**kw) + self.current = set() + self.lk = threading.Lock() + self.tcond = threading.Condition(self.lk) + self.max = max + self.timeout = timeout - def waited(self, time): - if time > 10: - raise RuntimeError("Waited too long") + @classmethod + def parseargs(cls, max=None, abort=None, **args): + ret = super(freethread, cls).parseargs(**args) + if max: + ret["max"] = int(max) + if abort: + ret["timeout"] = int(abort) + return ret - def __enter__(self): - self.lock.acquire() + def handle(self, req): + with self.lk: + if self.max is not None: + if self.timeout is not None: + now = start = time.time() + while len(self.current) >= self.max: + self.tcond.wait(start + self.timeout - now) + now = time.time() + if now - start > self.timeout: + os.abort() + else: + while len(self.current) >= self.max: + self.tcond.wait() + th = reqthread(target=self.run, args=[req]) + th.start() + while th.is_alive() and th not in self.current: + self.tcond.wait() + + def run(self, req): try: - start = time.time() - while self.inflight >= self.limit: - self.lock.wait(10) - self.waited(time.time() - start) - self.inflight += 1 - return self + th = threading.current_thread() + with self.lk: + self.current.add(th) + self.tcond.notify_all() + try: + env = req.mkenv() + with perf.request(env) as reqevent: + respiter = req.handlewsgi(env, req.startreq) + for data in respiter: + req.write(data) + if req.status: + reqevent.response([req.status, req.headers]) + req.flushreq() + self.ckflush(req) + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) + finally: + with self.lk: + self.current.remove(th) + self.tcond.notify_all() finally: - self.lock.release() + req.close() + + def close(self): + while True: + with self.lk: + if len(self.current) > 0: + th = iter(self.current).next() + else: + return + th.join() + +class resplex(handler): + cname = "rplex" + + def __init__(self, max=None, **kw): + super(resplex, self).__init__(**kw) + self.current = set() + self.lk = threading.Lock() + self.tcond = threading.Condition(self.lk) + self.max = max + self.cqueue = Queue.Queue(5) + self.cnpipe = os.pipe() + self.rthread = reqthread(name="Response thread", target=self.handle2) + self.rthread.start() + + @classmethod + def parseargs(cls, max=None, **args): + ret = super(resplex, cls).parseargs(**args) + if max: + ret["max"] = int(max) + return ret + + def ckflush(self, req): + raise Exception("resplex handler does not support the write() function") - def __exit__(self, *excinfo): - self.lock.acquire() + def handle(self, req): + with self.lk: + if self.max is not None: + while len(self.current) >= self.max: + self.tcond.wait() + th = reqthread(target=self.handle1, args=[req]) + th.start() + while th.is_alive() and th not in self.current: + self.tcond.wait() + + def handle1(self, req): try: - self.inflight -= 1 - self.lock.notify() + th = threading.current_thread() + with self.lk: + self.current.add(th) + self.tcond.notify_all() + try: + env = req.mkenv() + respobj = req.handlewsgi(env, req.startreq) + respiter = iter(respobj) + if not req.status: + log.error("request handler returned without calling start_request") + if hasattr(respiter, "close"): + respiter.close() + return + else: + self.cqueue.put((req, respiter)) + os.write(self.cnpipe[1], " ") + req = None + finally: + with self.lk: + self.current.remove(th) + self.tcond.notify_all() + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) finally: - self.lock.release() - return False + if req is not None: + req.close() - def call(self, target): - self.__enter__() + def handle2(self): try: - return target() - finally: - self.__exit__() + rp = self.cnpipe[0] + current = {} -class abortlimiter(calllimiter): - def waited(self, time): - if time > 10: + def closereq(req): + respiter = current[req] + try: + if respiter is not None and hasattr(respiter, "close"): + respiter.close() + except: + log.error("exception occurred when closing iterator", exc_info=True) + try: + req.close() + except: + log.error("exception occurred when closing request", exc_info=True) + del current[req] + def ckiter(req): + respiter = current[req] + if respiter is not None: + rem = False + try: + data = respiter.next() + except StopIteration: + rem = True + try: + req.flushreq() + except: + log.error("exception occurred when handling response data", exc_info=True) + except: + rem = True + log.error("exception occurred when iterating response", exc_info=True) + if not rem: + if data: + try: + req.flushreq() + req.writedata(data) + except: + log.error("exception occurred when handling response data", exc_info=True) + rem = True + if rem: + current[req] = None + try: + if hasattr(respiter, "close"): + respiter.close() + except: + log.error("exception occurred when closing iterator", exc_info=True) + respiter = None + if respiter is None and not req.buffer: + closereq(req) + + while True: + bufl = list(req for req in current.iterkeys() if req.buffer) + rls, wls, els = select.select([rp], bufl, [rp] + bufl) + if rp in rls: + ret = os.read(rp, 1024) + if not ret: + os.close(rp) + return + try: + while True: + req, respiter = self.cqueue.get(False) + current[req] = respiter + ckiter(req) + except Queue.Empty: + pass + for req in wls: + try: + req.flush() + except closed: + closereq(req) + except: + log.error("exception occurred when writing response", exc_info=True) + closereq(req) + else: + if len(req.buffer) < 65536: + ckiter(req) + except: + log.critical("unexpected exception occurred in response handler thread", exc_info=True) os.abort() + + def close(self): + while True: + with self.lk: + if len(self.current) > 0: + th = iter(self.current).next() + else: + break + th.join() + os.close(self.cnpipe[1]) + self.rthread.join() + +names = dict((cls.cname, cls) for cls in globals().itervalues() if + isinstance(cls, type) and + issubclass(cls, handler) and + hasattr(cls, "cname")) + +def parsehspec(spec): + if ":" not in spec: + return spec, {} + nm, spec = spec.split(":", 1) + args = {} + while spec: + if "," in spec: + part, spec = spec.split(",", 1) + else: + part, spec = spec, None + if "=" in part: + key, val = part.split("=", 1) + else: + key, val = part, "" + args[key] = val + return nm, args diff --git a/python/scgi-wsgi b/python/scgi-wsgi index 2cf715b..6e04f20 100755 --- a/python/scgi-wsgi +++ b/python/scgi-wsgi @@ -1,20 +1,21 @@ #!/usr/bin/python -import sys, os, getopt, logging +import sys, os, getopt, logging, platform import socket -import ashd.scgi, ashd.perf, ashd.serve +import ashd.scgi, ashd.serve try: import pdm.srv except: pdm = None def usage(out): - out.write("usage: scgi-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n") + out.write("usage: scgi-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-t REQUEST-HANDLER[:PAR[=VAL](,PAR[=VAL])...]] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n") sk = None +hspec = "free", {} modwsgi_compat = False setlog = True -opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:m:") +opts, args = getopt.getopt(sys.argv[1:], "+hALp:t:T:m:") for o, a in opts: if o == "-h": usage(sys.stdout) @@ -40,6 +41,8 @@ for o, a in opts: elif o == "-m": if pdm is not None: pdm.srv.listen(a) + elif o == "-t": + hspec = ashd.serve.parsehspec(a) if len(args) < 1: usage(sys.stderr) sys.exit(1) @@ -84,56 +87,68 @@ def mkenv(head, sk): env["wsgi.run_once"] = False return env -class reqthread(ashd.serve.wsgithread): - def __init__(self, sk): - super(reqthread, self).__init__() +class request(ashd.serve.wsgirequest): + def __init__(self, sk, **kw): + super(request, self).__init__(**kw) self.bsk = sk.dup() self.sk = self.bsk.makefile("r+") - def handlewsgi(self): - return handler(self.env, self.startreq) + def mkenv(self): + return mkenv(ashd.scgi.readhead(self.sk), self.sk) + + def handlewsgi(self, env, startreq): + return handler(env, startreq) + + _onjython = None + @staticmethod + def onjython(): + if request._onjython is None: + request._onjython = ("java" in platform.system().lower()) + return request._onjython + + def fileno(self): + if request.onjython(): + self.bsk.setblocking(False) + return self.bsk.fileno() def writehead(self, status, headers): - try: - self.sk.write("Status: %s\n" % status) - for nm, val in headers: - self.sk.write("%s: %s\n" % (nm, val)) - self.sk.write("\n") - except IOError: - raise ashd.serve.closed() + w = self.buffer.extend + w("Status: %s\n" % status) + for nm, val in headers: + w("%s: %s\n" % (nm, val)) + w("\n") - def writedata(self, data): + def flush(self): try: - self.sk.write(data) - self.sk.flush() + if not request.onjython(): + ret = self.bsk.send(self.buffer, socket.MSG_DONTWAIT) + else: + ret = self.bsk.send(self.buffer) + self.buffer[:ret] = "" except IOError: raise ashd.serve.closed() - def handle(self): - head = ashd.scgi.readhead(self.sk) - self.env = mkenv(head, self.sk) - reqevent = ashd.perf.request(self.env) - exc = (None, None, None) - try: - super(reqthread, self).handle() - if self.status: - reqevent.response([self.status, self.headers]) - except: - exc = sys.exc_info() - raise - finally: - reqevent.__exit__(*exc) + def close(self): + self.sk.close() + self.bsk.close() + +if hspec[0] not in ashd.serve.names: + sys.stderr.write("scgi-wsgi: no such request handler: %s\n" % hspec[0]) + sys.exit(1) +hclass = ashd.serve.names[hspec[0]] +try: + hargs = hclass.parseargs(**hspec[1]) +except ValueError as exc: + sys.stderr.write("scgi-wsgi: %s\n" % exc) + sys.exit(1) - def run(self): +reqhandler = hclass(**hargs) +try: + while True: + nsk, addr = sk.accept() try: - super(reqthread, self).run() + reqhandler.handle(request(sk=nsk, handler=reqhandler)) finally: - self.sk.close() - self.bsk.close() - -while True: - nsk, addr = sk.accept() - try: - reqthread(nsk).start() - finally: - nsk.close() + nsk.close() +finally: + reqhandler.close()