python: Added an early version of a response-multiplexing handler.
authorFredrik Tolf <fredrik@dolda2000.com>
Sun, 5 Jan 2014 20:26:04 +0000 (21:26 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Sun, 5 Jan 2014 21:11:03 +0000 (22:11 +0100)
python3/ashd/serve.py

index ecc8d46..cddf815 100644 (file)
@@ -1,4 +1,4 @@
-import sys, os, threading, time, logging, select
+import sys, os, threading, time, logging, select, queue
 from . import perf
 
 log = logging.getLogger("ashd.serve")
@@ -283,9 +283,139 @@ class threadpool(handler):
                 self.rcond.notify_all()
                 self.pcond.wait(1)
 
+class resplex(handler):
+    def __init__(self, **kw):
+        super().__init__(**kw)
+        self.current = set()
+        self.lk = threading.Lock()
+        self.cqueue = queue.Queue(5)
+        self.cnpipe = os.pipe()
+        self.rthread = reqthread(name="Response thread", target=self.handle2)
+        self.rthread.start()
+
+    def ckflush(self, req):
+        raise Exception("resplex handler does not support the write() function")
+
+    def handle(self, req):
+        reqthread(target=self.handle1, args=[req]).start()
+
+    def handle1(self, req):
+        try:
+            th = threading.current_thread()
+            with self.lk:
+                self.current.add(th)
+            try:
+                env = req.mkenv()
+                respobj = req.handlewsgi(env, req.startreq)
+                respiter = iter(respobj)
+                if not req.status:
+                    log.error("request handler returned without calling start_request")
+                    if hasattr(respiter, "close"):
+                        respiter.close()
+                    return
+                else:
+                    self.cqueue.put((req, respiter))
+                    os.write(self.cnpipe[1], b" ")
+                    req = None
+            finally:
+                self.current.remove(th)
+        except closed:
+            pass
+        except:
+            log.error("exception occurred when handling request", exc_info=True)
+        finally:
+            if req is not None:
+                req.close()
+
+    def handle2(self):
+        try:
+            rp = self.cnpipe[0]
+            current = {}
+
+            def closereq(req):
+                respiter = current[req]
+                try:
+                    if respiter is not None and hasattr(respiter, "close"):
+                        respiter.close()
+                except:
+                    log.error("exception occurred when closing iterator", exc_info=True)
+                try:
+                    req.close()
+                except:
+                    log.error("exception occurred when closing request", exc_info=True)
+                del current[req]
+            def ckiter(req):
+                respiter = current[req]
+                if respiter is not None:
+                    rem = False
+                    try:
+                        data = next(respiter)
+                    except StopIteration:
+                        rem = True
+                        req.flushreq()
+                    except:
+                        rem = True
+                        log.error("exception occurred when iterating response", exc_info=True)
+                    if not rem:
+                        if data:
+                            req.flushreq()
+                            req.writedata(data)
+                    else:
+                        current[req] = None
+                        try:
+                            if hasattr(respiter, "close"):
+                                respiter.close()
+                        except:
+                            log.error("exception occurred when closing iterator", exc_info=True)
+                        respiter = None
+                if respiter is None and not req.buffer:
+                    closereq(req)
+
+            while True:
+                bufl = list(req for req in current.keys() if req.buffer)
+                rls, wls, els = select.select([rp], bufl, [rp] + bufl)
+                if rp in rls:
+                    ret = os.read(rp, 1024)
+                    if not ret:
+                        os.close(rp)
+                        return
+                    try:
+                        while True:
+                            req, respiter = self.cqueue.get(False)
+                            current[req] = respiter
+                            ckiter(req)
+                    except queue.Empty:
+                        pass
+                for req in wls:
+                    try:
+                        req.flush()
+                    except closed:
+                        closereq(req)
+                    except:
+                        log.error("exception occurred when writing response", exc_info=True)
+                        closereq(req)
+                    else:
+                        if len(req.buffer) < 65536:
+                            ckiter(req)
+        except:
+            log.critical("unexpected exception occurred in response handler thread", exc_info=True)
+            sys.exit(1)
+
+    def close(self):
+        while True:
+            with self.lk:
+                if len(self.current) > 0:
+                    th = next(iter(self.current))
+                else:
+                    break
+            th.join()
+        os.close(self.cnpipe[1])
+        self.rthread.join()
+
 names = {"single": single,
          "free": freethread,
-         "pool": threadpool}
+         "pool": threadpool,
+         "rplex": resplex}
 
 def parsehspec(spec):
     if ":" not in spec: