X-Git-Url: http://www.dolda2000.com/gitweb/?p=ashd.git;a=blobdiff_plain;f=python3%2Fashd%2Fserve.py;h=30c835ee9ef75251e42ec53c789d52009287c99a;hp=fe839a2ba442904bc362e92fa16208c97c515df4;hb=46adc298ac3f6be267d1c7db40c538dbd8d24318;hpb=689d6a77a4c080389e1f7b1c0609d3b385354b8c diff --git a/python3/ashd/serve.py b/python3/ashd/serve.py index fe839a2..30c835e 100644 --- a/python3/ashd/serve.py +++ b/python3/ashd/serve.py @@ -1,4 +1,5 @@ -import os, threading, time, logging +import sys, os, threading, time, logging, select +from . import perf log = logging.getLogger("ashd.serve") seq = 1 @@ -11,44 +12,36 @@ def reqseq(): seq += 1 return s -class reqthread(threading.Thread): - def __init__(self, name=None): - if name is None: - name = "Request handler %i" % reqseq() - super().__init__(name=name) - - def handle(self): - raise Exception() - - def run(self): - try: - self.handle() - except: - log.error("exception occurred when handling request", exc_info=True) - class closed(IOError): def __init__(self): super().__init__("The client has closed the connection.") -class wsgithread(reqthread): - def __init__(self, **kwargs): - super().__init__(**kwargs) +class reqthread(threading.Thread): + def __init__(self, *, name=None, **kw): + if name is None: + name = "Request handler %i" % reqseq() + super().__init__(name=name, **kw) + +class wsgirequest(object): + def __init__(self, handler): self.status = None self.headers = [] self.respsent = False + self.handler = handler + self.buffer = bytearray() def handlewsgi(self): raise Exception() + def fileno(self): + raise Exception() def writehead(self, status, headers): raise Exception() - def writedata(self, data): + def flush(self): raise Exception() - - def write(self, data): - if not data: - return - self.flushreq() - self.writedata(data) + def close(self): + pass + def writedata(self, data): + self.buffer.extend(data) def flushreq(self): if not self.respsent: @@ -57,64 +50,81 @@ class wsgithread(reqthread): self.respsent = True self.writehead(self.status, self.headers) + def write(self, data): + if not data: + return + self.flushreq() + self.writedata(data) + self.handler.ckflush(self) + def startreq(self, status, headers, exc_info=None): if self.status: - if exc_info: # Nice calling convetion ^^ + if exc_info: try: if self.respsent: raise exc_info[1] finally: - exc_info = None # CPython GC bug? + exc_info = None else: raise Exception("Can only start responding once.") self.status = status self.headers = headers return self.write - - def handle(self): + +class handler(object): + def handle(self, request): + raise Exception() + def ckflush(self, req): + raise Exception() + def close(self): + pass + +class freethread(handler): + def __init__(self, **kw): + super().__init__(**kw) + self.current = set() + self.lk = threading.Lock() + + def handle(self, req): + reqthread(target=self.run, args=[req]).start() + + def ckflush(self, req): + while len(req.buffer) > 0: + rls, wls, els = select.select([], [req], [req]) + req.flush() + + def run(self, req): try: - respiter = self.handlewsgi() + th = threading.current_thread() + with self.lk: + self.current.add(th) try: - for data in respiter: - self.write(data) - if self.status: - self.flushreq() + env = req.mkenv() + with perf.request(env) as reqevent: + respiter = req.handlewsgi(env, req.startreq) + for data in respiter: + req.write(data) + if req.status: + reqevent.response([req.status, req.headers]) + req.flushreq() + self.ckflush(req) + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) finally: - if hasattr(respiter, "close"): - respiter.close() - except closed: - pass - -class calllimiter(object): - def __init__(self, limit): - self.limit = limit - self.lock = threading.Condition() - self.inflight = 0 - - def waited(self, time): - if time > 10: - raise RuntimeError("Waited too long") - - def __enter__(self): - with self.lock: - start = time.time() - while self.inflight >= self.limit: - self.lock.wait(10) - self.waited(time.time() - start) - self.inflight += 1 - return self - - def __exit__(self, *excinfo): - with self.lock: - self.inflight -= 1 - self.lock.notify() - return False - - def call(self, target): - with self: - return target() - -class abortlimiter(calllimiter): - def waited(self, time): - if time > 10: - os.abort() + with self.lk: + self.current.remove(th) + finally: + req.close() + + def close(self): + while True: + with self.lk: + if len(self.current) > 0: + th = next(iter(self.current)) + else: + th = None + th.join() + +names = {"free": freethread}