From d570c3a5c601a7484761f547e8c655eefca70230 Mon Sep 17 00:00:00 2001 From: Fredrik Tolf Date: Sun, 5 Jan 2014 10:05:06 +0100 Subject: [PATCH] python: Added blocking thread-pool handler. --- python3/ashd/serve.py | 102 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/python3/ashd/serve.py b/python3/ashd/serve.py index 30c835e..d78cec9 100644 --- a/python3/ashd/serve.py +++ b/python3/ashd/serve.py @@ -127,4 +127,104 @@ class freethread(handler): th = None th.join() -names = {"free": freethread} +class threadpool(handler): + def __init__(self, *, min=0, max=20, live=10, **kw): + super().__init__(**kw) + self.current = set() + self.free = set() + self.lk = threading.RLock() + self.pcond = threading.Condition(self.lk) + self.rcond = threading.Condition(self.lk) + self.wreq = None + self.min = min + self.max = max + self.live = live + for i in range(self.min): + self.newthread() + + def newthread(self): + with self.lk: + th = reqthread(target=self.loop) + th.start() + while not th in self.current: + self.pcond.wait() + + def ckflush(self, req): + while len(req.buffer) > 0: + rls, wls, els = select.select([], [req], [req]) + req.flush() + + def _handle(self, req): + try: + env = req.mkenv() + with perf.request(env) as reqevent: + respiter = req.handlewsgi(env, req.startreq) + for data in respiter: + req.write(data) + if req.status: + reqevent.response([req.status, req.headers]) + req.flushreq() + self.ckflush(req) + except closed: + pass + except: + log.error("exception occurred when handling request", exc_info=True) + finally: + req.close() + + def loop(self): + th = threading.current_thread() + with self.lk: + self.current.add(th) + try: + while True: + with self.lk: + self.free.add(th) + try: + self.pcond.notify_all() + now = start = time.time() + while self.wreq is None: + self.rcond.wait(start + self.live - now) + now = time.time() + if now - start > self.live: + if len(self.current) > self.min: + self.current.remove(th) + return + else: + start = now + req, self.wreq = self.wreq, None + self.pcond.notify_all() + finally: + self.free.remove(th) + self._handle(req) + req = None + finally: + with self.lk: + try: + self.current.remove(th) + except KeyError: + pass + self.pcond.notify_all() + + def handle(self, req): + while True: + with self.lk: + if len(self.free) < 1 and len(self.current) < self.max: + self.newthread() + while self.wreq is not None: + self.pcond.wait() + if self.wreq is None: + self.wreq = req + self.rcond.notify(1) + return + + def close(self): + self.live = 0 + self.min = 0 + with self.lk: + while len(self.current) > 0: + self.rcond.notify_all() + self.pcond.wait(1) + +names = {"free": freethread, + "pool": threadpool} -- 2.11.0