python: Removed threadpool handler again.
[ashd.git] / python3 / ashd / serve.py
index 0a45259..15e2903 100644 (file)
@@ -178,132 +178,44 @@ class freethread(handler):
                     return
             th.join()
 
-class threadpool(handler):
-    def __init__(self, *, min=0, max=20, live=300, **kw):
+class resplex(handler):
+    def __init__(self, *, max=None, **kw):
         super().__init__(**kw)
         self.current = set()
-        self.free = set()
-        self.lk = threading.RLock()
-        self.pcond = threading.Condition(self.lk)
-        self.rcond = threading.Condition(self.lk)
-        self.wreq = None
-        self.min = min
+        self.lk = threading.Lock()
+        self.tcond = threading.Condition(self.lk)
         self.max = max
-        self.live = live
-        for i in range(self.min):
-            self.newthread()
+        self.cqueue = queue.Queue(5)
+        self.cnpipe = os.pipe()
+        self.rthread = reqthread(name="Response thread", target=self.handle2)
+        self.rthread.start()
 
     @classmethod
-    def parseargs(cls, *, min=None, max=None, live=None, **args):
+    def parseargs(cls, *, max=None, **args):
         ret = super().parseargs(**args)
-        if min:
-            ret["min"] = int(min)
         if max:
             ret["max"] = int(max)
-        if live:
-            ret["live"] = int(live)
         return ret
 
-    def newthread(self):
-        with self.lk:
-            th = reqthread(target=self.loop)
-            th.start()
-            while not th in self.current:
-                self.pcond.wait()
-
-    def _handle(self, req):
-        try:
-            env = req.mkenv()
-            with perf.request(env) as reqevent:
-                respiter = req.handlewsgi(env, req.startreq)
-                for data in respiter:
-                    req.write(data)
-                if req.status:
-                    reqevent.response([req.status, req.headers])
-                    req.flushreq()
-                self.ckflush(req)
-        except closed:
-            pass
-        except:
-            log.error("exception occurred when handling request", exc_info=True)
-        finally:
-            req.close()
-
-    def loop(self):
-        th = threading.current_thread()
-        with self.lk:
-            self.current.add(th)
-        try:
-            while True:
-                with self.lk:
-                    self.free.add(th)
-                    try:
-                        self.pcond.notify_all()
-                        now = start = time.time()
-                        while self.wreq is None:
-                            self.rcond.wait(start + self.live - now)
-                            now = time.time()
-                            if now - start > self.live:
-                                if len(self.current) > self.min:
-                                    self.current.remove(th)
-                                    return
-                                else:
-                                    start = now
-                        req, self.wreq = self.wreq, None
-                        self.pcond.notify_all()
-                    finally:
-                        self.free.remove(th)
-                self._handle(req)
-                req = None
-        finally:
-            with self.lk:
-                try:
-                    self.current.remove(th)
-                except KeyError:
-                    pass
-                self.pcond.notify_all()
-
-    def handle(self, req):
-        while True:
-            with self.lk:
-                if len(self.free) < 1 and len(self.current) < self.max:
-                    self.newthread()
-                while self.wreq is not None:
-                    self.pcond.wait()
-                if self.wreq is None:
-                    self.wreq = req
-                    self.rcond.notify(1)
-                    return
-
-    def close(self):
-        self.live = 0
-        self.min = 0
-        with self.lk:
-            while len(self.current) > 0:
-                self.rcond.notify_all()
-                self.pcond.wait(1)
-
-class resplex(handler):
-    def __init__(self, **kw):
-        super().__init__(**kw)
-        self.current = set()
-        self.lk = threading.Lock()
-        self.cqueue = queue.Queue(5)
-        self.cnpipe = os.pipe()
-        self.rthread = reqthread(name="Response thread", target=self.handle2)
-        self.rthread.start()
-
     def ckflush(self, req):
         raise Exception("resplex handler does not support the write() function")
 
     def handle(self, req):
-        reqthread(target=self.handle1, args=[req]).start()
+        with self.lk:
+            if self.max is not None:
+                while len(self.current) >= self.max:
+                    self.tcond.wait()
+            th = reqthread(target=self.handle1, args=[req])
+            th.start()
+            while th.is_alive() and th not in self.current:
+                self.tcond.wait()
 
     def handle1(self, req):
         try:
             th = threading.current_thread()
             with self.lk:
                 self.current.add(th)
+                self.tcond.notify_all()
             try:
                 env = req.mkenv()
                 respobj = req.handlewsgi(env, req.startreq)
@@ -318,7 +230,9 @@ class resplex(handler):
                     os.write(self.cnpipe[1], b" ")
                     req = None
             finally:
-                self.current.remove(th)
+                with self.lk:
+                    self.current.remove(th)
+                    self.tcond.notify_all()
         except closed:
             pass
         except:
@@ -414,7 +328,6 @@ class resplex(handler):
 
 names = {"single": single,
          "free": freethread,
-         "pool": threadpool,
          "rplex": resplex}
 
 def parsehspec(spec):