X-Git-Url: http://www.dolda2000.com/gitweb/?p=ashd.git;a=blobdiff_plain;f=python3%2Fashd%2Fserve.py;h=0927710ae7c7ac90c0411ffb45e6c48d8326d236;hp=db68a5f6c4ab2cf509a874cacde0ec7343ef9078;hb=6e8b9b9d6d043ecbcaeb8ef807bd9648424aedd7;hpb=76ff6c4dcf0bed028475ef646f4d637d8e91d1a7 diff --git a/python3/ashd/serve.py b/python3/ashd/serve.py index db68a5f..0927710 100644 --- a/python3/ashd/serve.py +++ b/python3/ashd/serve.py @@ -1,4 +1,4 @@ -import sys, os, threading, time, logging, select, queue +import sys, os, threading, time, logging, select, queue, collections from . import perf log = logging.getLogger("ashd.serve") @@ -75,8 +75,10 @@ class handler(object): def handle(self, request): raise Exception() def ckflush(self, req): + p = select.poll() + p.register(req, select.POLLOUT) while len(req.buffer) > 0: - rls, wls, els = select.select([], [req], [req]) + p.poll() req.flush() def close(self): pass @@ -194,6 +196,121 @@ class freethread(handler): return th.join() +class threadpool(handler): + cname = "pool" + + def __init__(self, *, max=25, qsz=100, timeout=None, **kw): + super().__init__(**kw) + self.current = set() + self.clk = threading.Lock() + self.ccond = threading.Condition(self.clk) + self.queue = collections.deque() + self.waiting = set() + self.waitlimit = 5 + self.wlstart = 0.0 + self.qlk = threading.Lock() + self.qfcond = threading.Condition(self.qlk) + self.qecond = threading.Condition(self.qlk) + self.max = max + self.qsz = qsz + self.timeout = timeout + + @classmethod + def parseargs(cls, *, max=None, queue=None, abort=None, **args): + ret = super().parseargs(**args) + if max: + ret["max"] = int(max) + if queue: + ret["qsz"] = int(queue) + if abort: + ret["timeout"] = int(abort) + return ret + + def handle(self, req): + spawn = False + with self.qlk: + if self.timeout is not None: + now = start = time.time() + while len(self.queue) >= self.qsz: + self.qecond.wait(start + self.timeout - now) + now = time.time() + if now - start > self.timeout: + os.abort() + else: + while len(self.queue) >= self.qsz: + self.qecond.wait() + self.queue.append(req) + self.qfcond.notify() + if len(self.waiting) < 1: + spawn = True + if spawn: + with self.clk: + if len(self.current) < self.max: + th = reqthread(target=self.run) + th.registered = False + th.start() + while not th.registered: + self.ccond.wait() + + def handle1(self, req): + try: + env = req.mkenv() + with perf.request(env) as reqevent: + respiter = req.handlewsgi(env, req.startreq) + for data in respiter: + req.write(data) + if req.status: + reqevent.response([req.status, req.headers]) + req.flushreq() + self.ckflush(req) + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) + + def run(self): + timeout = 10.0 + th = threading.current_thread() + with self.clk: + self.current.add(th) + th.registered = True + self.ccond.notify_all() + try: + while True: + start = now = time.time() + with self.qlk: + while len(self.queue) < 1: + if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout: + return + self.waiting.add(th) + try: + if len(self.waiting) == self.waitlimit: + self.wlstart = now + self.qfcond.wait(start + timeout - now) + finally: + self.waiting.remove(th) + now = time.time() + if now - start > timeout: + return + req = self.queue.popleft() + self.qecond.notify() + try: + self.handle1(req) + finally: + req.close() + finally: + with self.clk: + self.current.remove(th) + + def close(self): + while True: + with self.clk: + if len(self.current) > 0: + th = next(iter(self.current)) + else: + return + th.join() + class resplex(handler): cname = "rplex"