python*: Use poll instead of select in ckflush.
[ashd.git] / python3 / ashd / serve.py
index 0eddc09..0927710 100644 (file)
@@ -75,8 +75,10 @@ class handler(object):
     def handle(self, request):
         raise Exception()
     def ckflush(self, req):
+        p = select.poll()
+        p.register(req, select.POLLOUT)
         while len(req.buffer) > 0:
-            rls, wls, els = select.select([], [req], [req])
+            p.poll()
             req.flush()
     def close(self):
         pass
@@ -204,8 +206,11 @@ class threadpool(handler):
         self.ccond = threading.Condition(self.clk)
         self.queue = collections.deque()
         self.waiting = set()
+        self.waitlimit = 5
+        self.wlstart = 0.0
         self.qlk = threading.Lock()
-        self.qcond = threading.Condition(self.qlk)
+        self.qfcond = threading.Condition(self.qlk)
+        self.qecond = threading.Condition(self.qlk)
         self.max = max
         self.qsz = qsz
         self.timeout = timeout
@@ -222,23 +227,23 @@ class threadpool(handler):
         return ret
 
     def handle(self, req):
-        start = False
+        spawn = False
         with self.qlk:
             if self.timeout is not None:
                 now = start = time.time()
                 while len(self.queue) >= self.qsz:
-                    self.qcond.wait(start + self.timeout - now)
+                    self.qecond.wait(start + self.timeout - now)
                     now = time.time()
                     if now - start > self.timeout:
                         os.abort()
             else:
-                while len(self.current) >= self.qsz:
-                    self.qcond.wait()
+                while len(self.queue) >= self.qsz:
+                    self.qecond.wait()
             self.queue.append(req)
-            self.qcond.notify()
+            self.qfcond.notify()
             if len(self.waiting) < 1:
-                start = True
-        if start:
+                spawn = True
+        if spawn:
             with self.clk:
                 if len(self.current) < self.max:
                     th = reqthread(target=self.run)
@@ -275,13 +280,20 @@ class threadpool(handler):
                 start = now = time.time()
                 with self.qlk:
                     while len(self.queue) < 1:
+                        if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout:
+                            return
                         self.waiting.add(th)
-                        self.qcond.wait(start + timeout - now)
-                        self.waiting.remove(th)
+                        try:
+                            if len(self.waiting) == self.waitlimit:
+                                self.wlstart = now
+                            self.qfcond.wait(start + timeout - now)
+                        finally:
+                            self.waiting.remove(th)
                         now = time.time()
                         if now - start > timeout:
                             return
                     req = self.queue.popleft()
+                    self.qecond.notify()
                 try:
                     self.handle1(req)
                 finally:
@@ -292,7 +304,7 @@ class threadpool(handler):
 
     def close(self):
         while True:
-            with self.lk:
+            with self.clk:
                 if len(self.current) > 0:
                     th = next(iter(self.current))
                 else: