From 46adc298ac3f6be267d1c7db40c538dbd8d24318 Mon Sep 17 00:00:00 2001 From: Fredrik Tolf Date: Sun, 5 Jan 2014 08:20:25 +0100 Subject: [PATCH] python: Start rewriting WSGI handlers with modular handling models. --- python3/ashd-wsgi3 | 60 +++++++++---------- python3/ashd/serve.py | 156 +++++++++++++++++++++++++++----------------------- 2 files changed, 110 insertions(+), 106 deletions(-) diff --git a/python3/ashd-wsgi3 b/python3/ashd-wsgi3 index ba7038d..79dda81 100755 --- a/python3/ashd-wsgi3 +++ b/python3/ashd-wsgi3 @@ -1,6 +1,6 @@ #!/usr/bin/python3 -import sys, os, getopt, threading, logging, time, locale, collections +import sys, os, getopt, threading, socket, logging, time, locale, collections import ashd.proto, ashd.util, ashd.perf, ashd.serve try: import pdm.srv @@ -156,47 +156,41 @@ def recode(thing): else: return str(thing).encode("latin-1") -class reqthread(ashd.serve.wsgithread): - def __init__(self, req): - super().__init__() - self.req = req.dup() +reqhandler = ashd.serve.freethread() - def handlewsgi(self): - return handler(self.env, self.startreq) +class request(ashd.serve.wsgirequest): + def __init__(self, *, bkreq, **kw): + super().__init__(**kw) + self.bkreq = bkreq.dup() + + def mkenv(self): + return mkenv(self.bkreq) + + def handlewsgi(self, env, startreq): + return handler(env, startreq) + + def fileno(self): + return self.bkreq.bsk.fileno() def writehead(self, status, headers): - buf = bytearray() - buf += b"HTTP/1.1 " + recode(status) + b"\n" + w = self.buffer.extend + w(b"HTTP/1.1 " + recode(status) + b"\n") for nm, val in headers: - buf += recode(nm) + b": " + recode(val) + b"\n" - buf += b"\n" - try: - self.req.sk.write(buf) - except IOError: - raise ashd.serve.closed() + w(recode(nm) + b": " + recode(val) + b"\n") + w(b"\n") - def writedata(self, data): + def flush(self): try: - self.req.sk.write(data) - self.req.sk.flush() + ret = self.bkreq.bsk.send(self.buffer, socket.MSG_DONTWAIT) + self.buffer[:ret] = b"" except IOError: raise ashd.serve.closed() - def handle(self): - self.env = mkenv(self.req) - with ashd.perf.request(self.env) as reqevent: - super().handle() - if self.status: - reqevent.response([self.status, self.headers]) - - def run(self): - try: - guard(super().run) - finally: - self.req.close() - sys.stderr.flush() - + def close(self): + self.bkreq.close() + def handle(req): - reqthread(req).start() + reqhandler.handle(request(bkreq=req, handler=reqhandler)) ashd.util.serveloop(handle) +reqhandler.close() diff --git a/python3/ashd/serve.py b/python3/ashd/serve.py index fe839a2..30c835e 100644 --- a/python3/ashd/serve.py +++ b/python3/ashd/serve.py @@ -1,4 +1,5 @@ -import os, threading, time, logging +import sys, os, threading, time, logging, select +from . import perf log = logging.getLogger("ashd.serve") seq = 1 @@ -11,44 +12,36 @@ def reqseq(): seq += 1 return s -class reqthread(threading.Thread): - def __init__(self, name=None): - if name is None: - name = "Request handler %i" % reqseq() - super().__init__(name=name) - - def handle(self): - raise Exception() - - def run(self): - try: - self.handle() - except: - log.error("exception occurred when handling request", exc_info=True) - class closed(IOError): def __init__(self): super().__init__("The client has closed the connection.") -class wsgithread(reqthread): - def __init__(self, **kwargs): - super().__init__(**kwargs) +class reqthread(threading.Thread): + def __init__(self, *, name=None, **kw): + if name is None: + name = "Request handler %i" % reqseq() + super().__init__(name=name, **kw) + +class wsgirequest(object): + def __init__(self, handler): self.status = None self.headers = [] self.respsent = False + self.handler = handler + self.buffer = bytearray() def handlewsgi(self): raise Exception() + def fileno(self): + raise Exception() def writehead(self, status, headers): raise Exception() - def writedata(self, data): + def flush(self): raise Exception() - - def write(self, data): - if not data: - return - self.flushreq() - self.writedata(data) + def close(self): + pass + def writedata(self, data): + self.buffer.extend(data) def flushreq(self): if not self.respsent: @@ -57,64 +50,81 @@ class wsgithread(reqthread): self.respsent = True self.writehead(self.status, self.headers) + def write(self, data): + if not data: + return + self.flushreq() + self.writedata(data) + self.handler.ckflush(self) + def startreq(self, status, headers, exc_info=None): if self.status: - if exc_info: # Nice calling convetion ^^ + if exc_info: try: if self.respsent: raise exc_info[1] finally: - exc_info = None # CPython GC bug? + exc_info = None else: raise Exception("Can only start responding once.") self.status = status self.headers = headers return self.write - - def handle(self): + +class handler(object): + def handle(self, request): + raise Exception() + def ckflush(self, req): + raise Exception() + def close(self): + pass + +class freethread(handler): + def __init__(self, **kw): + super().__init__(**kw) + self.current = set() + self.lk = threading.Lock() + + def handle(self, req): + reqthread(target=self.run, args=[req]).start() + + def ckflush(self, req): + while len(req.buffer) > 0: + rls, wls, els = select.select([], [req], [req]) + req.flush() + + def run(self, req): try: - respiter = self.handlewsgi() + th = threading.current_thread() + with self.lk: + self.current.add(th) try: - for data in respiter: - self.write(data) - if self.status: - self.flushreq() + env = req.mkenv() + with perf.request(env) as reqevent: + respiter = req.handlewsgi(env, req.startreq) + for data in respiter: + req.write(data) + if req.status: + reqevent.response([req.status, req.headers]) + req.flushreq() + self.ckflush(req) + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) finally: - if hasattr(respiter, "close"): - respiter.close() - except closed: - pass - -class calllimiter(object): - def __init__(self, limit): - self.limit = limit - self.lock = threading.Condition() - self.inflight = 0 - - def waited(self, time): - if time > 10: - raise RuntimeError("Waited too long") - - def __enter__(self): - with self.lock: - start = time.time() - while self.inflight >= self.limit: - self.lock.wait(10) - self.waited(time.time() - start) - self.inflight += 1 - return self - - def __exit__(self, *excinfo): - with self.lock: - self.inflight -= 1 - self.lock.notify() - return False - - def call(self, target): - with self: - return target() - -class abortlimiter(calllimiter): - def waited(self, time): - if time > 10: - os.abort() + with self.lk: + self.current.remove(th) + finally: + req.close() + + def close(self): + while True: + with self.lk: + if len(self.current) > 0: + th = next(iter(self.current)) + else: + th = None + th.join() + +names = {"free": freethread} -- 2.11.0