python: Added blocking thread-pool handler.
authorFredrik Tolf <fredrik@dolda2000.com>
Sun, 5 Jan 2014 09:05:06 +0000 (10:05 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Sun, 5 Jan 2014 09:05:06 +0000 (10:05 +0100)
python3/ashd/serve.py

index 30c835e..d78cec9 100644 (file)
@@ -127,4 +127,104 @@ class freethread(handler):
                     th = None
             th.join()
 
-names = {"free": freethread}
+class threadpool(handler):
+    def __init__(self, *, min=0, max=20, live=10, **kw):
+        super().__init__(**kw)
+        self.current = set()
+        self.free = set()
+        self.lk = threading.RLock()
+        self.pcond = threading.Condition(self.lk)
+        self.rcond = threading.Condition(self.lk)
+        self.wreq = None
+        self.min = min
+        self.max = max
+        self.live = live
+        for i in range(self.min):
+            self.newthread()
+
+    def newthread(self):
+        with self.lk:
+            th = reqthread(target=self.loop)
+            th.start()
+            while not th in self.current:
+                self.pcond.wait()
+
+    def ckflush(self, req):
+        while len(req.buffer) > 0:
+            rls, wls, els = select.select([], [req], [req])
+            req.flush()
+
+    def _handle(self, req):
+        try:
+            env = req.mkenv()
+            with perf.request(env) as reqevent:
+                respiter = req.handlewsgi(env, req.startreq)
+                for data in respiter:
+                    req.write(data)
+                if req.status:
+                    reqevent.response([req.status, req.headers])
+                    req.flushreq()
+                self.ckflush(req)
+        except closed:
+            pass
+        except:
+            log.error("exception occurred when handling request", exc_info=True)
+        finally:
+            req.close()
+
+    def loop(self):
+        th = threading.current_thread()
+        with self.lk:
+            self.current.add(th)
+        try:
+            while True:
+                with self.lk:
+                    self.free.add(th)
+                    try:
+                        self.pcond.notify_all()
+                        now = start = time.time()
+                        while self.wreq is None:
+                            self.rcond.wait(start + self.live - now)
+                            now = time.time()
+                            if now - start > self.live:
+                                if len(self.current) > self.min:
+                                    self.current.remove(th)
+                                    return
+                                else:
+                                    start = now
+                        req, self.wreq = self.wreq, None
+                        self.pcond.notify_all()
+                    finally:
+                        self.free.remove(th)
+                self._handle(req)
+                req = None
+        finally:
+            with self.lk:
+                try:
+                    self.current.remove(th)
+                except KeyError:
+                    pass
+                self.pcond.notify_all()
+
+    def handle(self, req):
+        while True:
+            with self.lk:
+                if len(self.free) < 1 and len(self.current) < self.max:
+                    self.newthread()
+                while self.wreq is not None:
+                    self.pcond.wait()
+                if self.wreq is None:
+                    self.wreq = req
+                    self.rcond.notify(1)
+                    return
+
+    def close(self):
+        self.live = 0
+        self.min = 0
+        with self.lk:
+            while len(self.current) > 0:
+                self.rcond.notify_all()
+                self.pcond.wait(1)
+
+names = {"free": freethread,
+         "pool": threadpool}