X-Git-Url: http://www.dolda2000.com/gitweb/?p=ashd.git;a=blobdiff_plain;f=python3%2Fashd%2Fserve.py;h=0927710ae7c7ac90c0411ffb45e6c48d8326d236;hp=0eddc0907cf61137e681d48ae9c37ce50aa56d94;hb=6e8b9b9d6d043ecbcaeb8ef807bd9648424aedd7;hpb=ab3275e99fbdc36e2f46eeaa8b38c65d71583ed5 diff --git a/python3/ashd/serve.py b/python3/ashd/serve.py index 0eddc09..0927710 100644 --- a/python3/ashd/serve.py +++ b/python3/ashd/serve.py @@ -75,8 +75,10 @@ class handler(object): def handle(self, request): raise Exception() def ckflush(self, req): + p = select.poll() + p.register(req, select.POLLOUT) while len(req.buffer) > 0: - rls, wls, els = select.select([], [req], [req]) + p.poll() req.flush() def close(self): pass @@ -204,8 +206,11 @@ class threadpool(handler): self.ccond = threading.Condition(self.clk) self.queue = collections.deque() self.waiting = set() + self.waitlimit = 5 + self.wlstart = 0.0 self.qlk = threading.Lock() - self.qcond = threading.Condition(self.qlk) + self.qfcond = threading.Condition(self.qlk) + self.qecond = threading.Condition(self.qlk) self.max = max self.qsz = qsz self.timeout = timeout @@ -222,23 +227,23 @@ class threadpool(handler): return ret def handle(self, req): - start = False + spawn = False with self.qlk: if self.timeout is not None: now = start = time.time() while len(self.queue) >= self.qsz: - self.qcond.wait(start + self.timeout - now) + self.qecond.wait(start + self.timeout - now) now = time.time() if now - start > self.timeout: os.abort() else: - while len(self.current) >= self.qsz: - self.qcond.wait() + while len(self.queue) >= self.qsz: + self.qecond.wait() self.queue.append(req) - self.qcond.notify() + self.qfcond.notify() if len(self.waiting) < 1: - start = True - if start: + spawn = True + if spawn: with self.clk: if len(self.current) < self.max: th = reqthread(target=self.run) @@ -275,13 +280,20 @@ class threadpool(handler): start = now = time.time() with self.qlk: while len(self.queue) < 1: + if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout: + return self.waiting.add(th) - self.qcond.wait(start + timeout - now) - self.waiting.remove(th) + try: + if len(self.waiting) == self.waitlimit: + self.wlstart = now + self.qfcond.wait(start + timeout - now) + finally: + self.waiting.remove(th) now = time.time() if now - start > timeout: return req = self.queue.popleft() + self.qecond.notify() try: self.handle1(req) finally: @@ -292,7 +304,7 @@ class threadpool(handler): def close(self): while True: - with self.lk: + with self.clk: if len(self.current) > 0: th = next(iter(self.current)) else: