Merge branches 'block' and 'py-reserve'
authorFredrik Tolf <fredrik@dolda2000.com>
Sun, 3 Feb 2013 02:29:45 +0000 (03:29 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Sun, 3 Feb 2013 02:29:45 +0000 (03:29 +0100)
14 files changed:
lib/cf.c
lib/proc.c
lib/req.c
python/ashd-wsgi
python/ashd/scgi.py
python/ashd/serve.py [new file with mode: 0644]
python/doc/scgi-wsgi.doc
python/scgi-wsgi
python3/ashd-wsgi3
python3/ashd/scgi.py
python3/ashd/serve.py [new file with mode: 0644]
python3/doc/scgi-wsgi3.doc
python3/scgi-wsgi3
src/userplex.c

index 38d9808..ba63999 100644 (file)
--- a/lib/cf.c
+++ b/lib/cf.c
@@ -23,6 +23,7 @@
 #include <ctype.h>
 #include <glob.h>
 #include <libgen.h>
+#include <sys/socket.h>
 #include <errno.h>
 
 #ifdef HAVE_CONFIG_H
@@ -305,21 +306,25 @@ static struct chandler stdhandler = {
 static int stdhandle(struct child *ch, struct hthead *req, int fd, void (*chinit)(void *), void *idata)
 {
     struct stdchild *i = ch->pdata;
+    int serr;
     
     if(i->type == CH_SOCKET) {
        if(i->fd < 0)
            i->fd = stdmkchild(i->argv, chinit, idata);
-       if(sendreq(i->fd, req, fd)) {
-           if((errno == EPIPE) || (errno == ECONNRESET)) {
+       if(sendreq2(i->fd, req, fd, MSG_NOSIGNAL | MSG_DONTWAIT)) {
+           serr = errno;
+           if((serr == EPIPE) || (serr == ECONNRESET)) {
                /* Assume that the child has crashed and restart it. */
                close(i->fd);
                i->fd = stdmkchild(i->argv, chinit, idata);
-               if(!sendreq(i->fd, req, fd))
+               if(!sendreq2(i->fd, req, fd, MSG_NOSIGNAL | MSG_DONTWAIT))
                    return(0);
            }
-           flog(LOG_ERR, "could not pass on request to child %s: %s", ch->name, strerror(errno));
-           close(i->fd);
-           i->fd = -1;
+           flog(LOG_ERR, "could not pass on request to child %s: %s", ch->name, strerror(serr));
+           if(serr != EAGAIN) {
+               close(i->fd);
+               i->fd = -1;
+           }
            return(-1);
        }
     } else if(i->type == CH_FORK) {
index a1dc336..ac777d4 100644 (file)
@@ -83,7 +83,7 @@ int sendfd2(int sock, int fd, char *data, size_t datalen, int flags)
 
 int sendfd(int sock, int fd, char *data, size_t datalen)
 {
-    return(sendfd2(sock, fd, data, datalen, MSG_NOSIGNAL | MSG_DONTWAIT));
+    return(sendfd2(sock, fd, data, datalen, MSG_NOSIGNAL));
 }
 
 int recvfd(int sock, char **data, size_t *datalen)
index 8ff26e0..ed5aeba 100644 (file)
--- a/lib/req.c
+++ b/lib/req.c
@@ -315,7 +315,7 @@ int sendreq2(int sock, struct hthead *req, int fd, int flags)
 
 int sendreq(int sock, struct hthead *req, int fd)
 {
-    return(sendreq2(sock, req, fd, MSG_NOSIGNAL | MSG_DONTWAIT));
+    return(sendreq2(sock, req, fd, MSG_NOSIGNAL));
 }
 
 int recvreq(int sock, struct hthead **reqp)
index 2e4a2fe..01c158f 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 
 import sys, os, getopt, threading, logging, time
-import ashd.proto, ashd.util, ashd.perf
+import ashd.proto, ashd.util, ashd.perf, ashd.serve
 try:
     import pdm.srv
 except:
@@ -52,10 +52,6 @@ else:
         sys.exit(1)
     handler = handlermod.application
 
-class closed(IOError):
-    def __init__(self):
-        super(closed, self).__init__("The client has closed the connection.")
-
 cwd = os.getcwd()
 def absolutify(path):
     if path[0] != '/':
@@ -95,7 +91,7 @@ def unquoteurl(url):
             buf += c
     return buf
 
-def dowsgi(req):
+def mkenv(req):
     env = {}
     env["wsgi.version"] = 1, 0
     for key, val in req.headers:
@@ -147,103 +143,54 @@ def dowsgi(req):
     env["wsgi.multithread"] = True
     env["wsgi.multiprocess"] = False
     env["wsgi.run_once"] = False
+    return env
 
-    resp = []
-    respsent = []
+if reqlimit != 0:
+    guard = ashd.serve.abortlimiter(reqlimit).call
+else:
+    guard = lambda fun: fun()
 
-    def flushreq():
-        if not respsent:
-            if not resp:
-                raise Exception, "Trying to write data before starting response."
-            status, headers = resp
-            respsent[:] = [True]
-            try:
-                req.sk.write("HTTP/1.1 %s\n" % status)
-                for nm, val in headers:
-                    req.sk.write("%s: %s\n" % (nm, val))
-                req.sk.write("\n")
-            except IOError:
-                raise closed()
+class reqthread(ashd.serve.wsgithread):
+    def __init__(self, req):
+        super(reqthread, self).__init__()
+        self.req = req.dup()
+    
+    def handlewsgi(self):
+        return handler(self.env, self.startreq)
 
-    def write(data):
-        if not data:
-            return
-        flushreq()
+    def writehead(self, status, headers):
         try:
-            req.sk.write(data)
-            req.sk.flush()
+            self.req.sk.write("HTTP/1.1 %s\n" % status)
+            for nm, val in headers:
+                self.req.sk.write("%s: %s\n" % (nm, val))
+            self.req.sk.write("\n")
         except IOError:
-            raise closed()
+            raise ashd.serve.closed()
 
-    def startreq(status, headers, exc_info = None):
-        if resp:
-            if exc_info:                # Interesting, this...
-                try:
-                    if respsent:
-                        raise exc_info[0], exc_info[1], exc_info[2]
-                finally:
-                    exc_info = None     # CPython GC bug?
-            else:
-                raise Exception, "Can only start responding once."
-        resp[:] = status, headers
-        return write
-
-    reqevent = ashd.perf.request(env)
-    exc = (None, None, None)
-    try:
+    def writedata(self, data):
         try:
-            respiter = handler(env, startreq)
-            try:
-                for data in respiter:
-                    write(data)
-                if resp:
-                    flushreq()
-            finally:
-                if hasattr(respiter, "close"):
-                    respiter.close()
-        except closed:
-            pass
-        if resp:
-            reqevent.response(resp)
-    except:
-        exc = sys.exc_info()
-        raise
-    finally:
-        reqevent.__exit__(*exc)
+            self.req.sk.write(data)
+            self.req.sk.flush()
+        except IOError:
+            raise ashd.serve.closed()
 
-flightlock = threading.Condition()
-inflight = 0
+    def handle(self):
+        self.env = mkenv(self.req)
+        reqevent = ashd.perf.request(self.env)
+        exc = (None, None, None)
+        try:
+            super(reqthread, self).handle()
+            if self.status:
+                reqevent.response([self.status, self.headers])
+        except:
+            exc = sys.exc_info()
+            raise
+        finally:
+            reqevent.__exit__(*exc)
 
-class reqthread(threading.Thread):
-    def __init__(self, req):
-        super(reqthread, self).__init__(name = "Request handler")
-        self.req = req.dup()
-    
     def run(self):
-        global inflight
         try:
-            flightlock.acquire()
-            try:
-                if reqlimit != 0:
-                    start = time.time()
-                    while inflight >= reqlimit:
-                        flightlock.wait(10)
-                        if time.time() - start > 10:
-                            os.abort()
-                inflight += 1
-            finally:
-                flightlock.release()
-            try:
-                dowsgi(self.req)
-            finally:
-                flightlock.acquire()
-                try:
-                    inflight -= 1
-                    flightlock.notify()
-                finally:
-                    flightlock.release()
-        except:
-            log.error("exception occurred in handler thread", exc_info=True)
+            guard(super(reqthread, self).run)
         finally:
             self.req.close()
     
index f7ba3a8..1f0c5ab 100644 (file)
@@ -1,13 +1,6 @@
-import sys
-import threading
-
 class protoerr(Exception):
     pass
 
-class closed(IOError):
-    def __init__(self):
-        super(closed, self).__init__("The client has closed the connection.")
-
 def readns(sk):
     hln = 0
     while True:
@@ -33,100 +26,3 @@ def readhead(sk):
         ret[parts[i]] = parts[i + 1]
         i += 2
     return ret
-
-class reqthread(threading.Thread):
-    def __init__(self, sk, handler):
-        super(reqthread, self).__init__(name = "SCGI request handler")
-        self.bsk = sk.dup()
-        self.sk = self.bsk.makefile("r+")
-        self.handler = handler
-
-    def run(self):
-        try:
-            head = readhead(self.sk)
-            self.handler(head, self.sk)
-        finally:
-            self.sk.close()
-            self.bsk.close()
-
-def handlescgi(sk, handler):
-    t = reqthread(sk, handler)
-    t.start()
-
-def servescgi(socket, handler):
-    while True:
-        nsk, addr = socket.accept()
-        try:
-            handlescgi(nsk, handler)
-        finally:
-            nsk.close()
-
-def wrapwsgi(handler):
-    def handle(head, sk):
-        env = dict(head)
-        env["wsgi.version"] = 1, 0
-        if "HTTP_X_ASH_PROTOCOL" in env:
-            env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"]
-        elif "HTTPS" in env:
-            env["wsgi.url_scheme"] = "https"
-        else:
-            env["wsgi.url_scheme"] = "http"
-        env["wsgi.input"] = sk
-        env["wsgi.errors"] = sys.stderr
-        env["wsgi.multithread"] = True
-        env["wsgi.multiprocess"] = False
-        env["wsgi.run_once"] = False
-
-        resp = []
-        respsent = []
-
-        def flushreq():
-            if not respsent:
-                if not resp:
-                    raise Exception, "Trying to write data before starting response."
-                status, headers = resp
-                respsent[:] = [True]
-                try:
-                    sk.write("Status: %s\n" % status)
-                    for nm, val in headers:
-                        sk.write("%s: %s\n" % (nm, val))
-                    sk.write("\n")
-                except IOError:
-                    raise closed()
-
-        def write(data):
-            if not data:
-                return
-            flushreq()
-            try:
-                sk.write(data)
-                sk.flush()
-            except IOError:
-                raise closed()
-
-        def startreq(status, headers, exc_info = None):
-            if resp:
-                if exc_info:                # Interesting, this...
-                    try:
-                        if respsent:
-                            raise exc_info[0], exc_info[1], exc_info[2]
-                    finally:
-                        exc_info = None     # CPython GC bug?
-                else:
-                    raise Exception, "Can only start responding once."
-            resp[:] = status, headers
-            return write
-
-        respiter = handler(env, startreq)
-        try:
-            try:
-                for data in respiter:
-                    write(data)
-                if resp:
-                    flushreq()
-            except closed:
-                pass
-        finally:
-            if hasattr(respiter, "close"):
-                respiter.close()
-    return handle
diff --git a/python/ashd/serve.py b/python/ashd/serve.py
new file mode 100644 (file)
index 0000000..14170d7
--- /dev/null
@@ -0,0 +1,132 @@
+import os, threading, time, logging
+
+log = logging.getLogger("ashd.serve")
+seq = 1
+seqlk = threading.Lock()
+
+def reqseq():
+    global seq
+    seqlk.acquire()
+    try:
+        s = seq
+        seq += 1
+        return s
+    finally:
+        seqlk.release()
+
+class reqthread(threading.Thread):
+    def __init__(self, name=None):
+        if name is None:
+            name = "Request handler %i" % reqseq()
+        super(reqthread, self).__init__(name=name)
+
+    def handle(self):
+        raise Exception()
+
+    def run(self):
+        try:
+            self.handle()
+        except:
+            log.error("exception occurred when handling request", exc_info=True)
+
+class closed(IOError):
+    def __init__(self):
+        super(closed, self).__init__("The client has closed the connection.")
+
+class wsgithread(reqthread):
+    def __init__(self, **kwargs):
+        super(wsgithread, self).__init__(**kwargs)
+        self.status = None
+        self.headers = []
+        self.respsent = False
+
+    def handlewsgi(self):
+        raise Exception()
+    def writehead(self, status, headers):
+        raise Exception()
+    def writedata(self, data):
+        raise Exception()
+
+    def write(self, data):
+        if not data:
+            return
+        self.flushreq()
+        self.writedata(data)
+
+    def flushreq(self):
+        if not self.respsent:
+            if not self.status:
+                raise Exception("Cannot send response body before starting response.")
+            self.respsent = True
+            self.writehead(self.status, self.headers)
+
+    def startreq(self, status, headers, exc_info=None):
+        if self.status:
+            if exc_info:                # Nice calling convetion ^^
+                try:
+                    if self.respsent:
+                        raise exc_info[0], exc_info[1], exc_info[2]
+                finally:
+                    exc_info = None     # CPython GC bug?
+            else:
+                raise Exception("Can only start responding once.")
+        self.status = status
+        self.headers = headers
+        return self.write
+    def handle(self):
+        try:
+            respiter = self.handlewsgi()
+            try:
+                for data in respiter:
+                    self.write(data)
+                if self.status:
+                    self.flushreq()
+            finally:
+                if hasattr(respiter, "close"):
+                    respiter.close()
+        except closed:
+            pass
+
+class calllimiter(object):
+    def __init__(self, limit):
+        self.limit = limit
+        self.lock = threading.Condition()
+        self.inflight = 0
+
+    def waited(self, time):
+        if time > 10:
+            raise RuntimeError("Waited too long")
+
+    def __enter__(self):
+        self.lock.acquire()
+        try:
+            start = time.time()
+            while self.inflight >= self.limit:
+                self.lock.wait(10)
+                self.waited(time.time() - start)
+            self.inflight += 1
+            return self
+        finally:
+            self.lock.release()
+
+    def __exit__(self, *excinfo):
+        self.lock.acquire()
+        try:
+            self.inflight -= 1
+            self.lock.notify()
+        finally:
+            self.lock.release()
+        return False
+
+    def call(self, target):
+        self.__enter__()
+        try:
+            return target()
+        finally:
+            self.__exit__()
+
+class abortlimiter(calllimiter):
+    def waited(self, time):
+        if time > 10:
+            os.abort()
index 1aab621..f67191f 100644 (file)
@@ -7,7 +7,7 @@ scgi-wsgi - WSGI adapter for SCGI
 
 SYNOPSIS
 --------
-*scgi-wsgi* [*-hA*] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...]
+*scgi-wsgi* [*-hA*] [*-m* 'PDM-SPEC'] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...]
 
 DESCRIPTION
 -----------
@@ -53,6 +53,12 @@ OPTIONS
        address listening for connections on 'PORT' instead. If 'HOST'
        is not given, `localhost` is used by default.
 
+*-m* 'PDM-SPEC'::
+
+       If the PDM library is installed on the system, create a
+       listening socket for connection PDM clients according to
+       'PDM-SPEC'.
+
 AUTHOR
 ------
 Fredrik Tolf <fredrik@dolda2000.com>
index e2689d4..2cf715b 100755 (executable)
@@ -2,15 +2,19 @@
 
 import sys, os, getopt, logging
 import socket
-import ashd.scgi
+import ashd.scgi, ashd.perf, ashd.serve
+try:
+    import pdm.srv
+except:
+    pdm = None
 
 def usage(out):
-    out.write("usage: scgi-wsgi [-hAL] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
+    out.write("usage: scgi-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
 
 sk = None
 modwsgi_compat = False
 setlog = True
-opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:")
+opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:m:")
 for o, a in opts:
     if o == "-h":
         usage(sys.stdout)
@@ -33,6 +37,9 @@ for o, a in opts:
         sk.listen(32)
     elif o == "-A":
         modwsgi_compat = True
+    elif o == "-m":
+        if pdm is not None:
+            pdm.srv.listen(a)
 if len(args) < 1:
     usage(sys.stderr)
     sys.exit(1)
@@ -61,4 +68,72 @@ else:
         sys.exit(1)
     handler = handlermod.application
 
-ashd.scgi.servescgi(sk, ashd.scgi.wrapwsgi(handler))
+def mkenv(head, sk):
+    env = dict(head)
+    env["wsgi.version"] = 1, 0
+    if "HTTP_X_ASH_PROTOCOL" in env:
+        env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"]
+    elif "HTTPS" in env:
+        env["wsgi.url_scheme"] = "https"
+    else:
+        env["wsgi.url_scheme"] = "http"
+    env["wsgi.input"] = sk
+    env["wsgi.errors"] = sys.stderr
+    env["wsgi.multithread"] = True
+    env["wsgi.multiprocess"] = False
+    env["wsgi.run_once"] = False
+    return env
+
+class reqthread(ashd.serve.wsgithread):
+    def __init__(self, sk):
+        super(reqthread, self).__init__()
+        self.bsk = sk.dup()
+        self.sk = self.bsk.makefile("r+")
+
+    def handlewsgi(self):
+        return handler(self.env, self.startreq)
+
+    def writehead(self, status, headers):
+        try:
+            self.sk.write("Status: %s\n" % status)
+            for nm, val in headers:
+                self.sk.write("%s: %s\n" % (nm, val))
+            self.sk.write("\n")
+        except IOError:
+            raise ashd.serve.closed()
+
+    def writedata(self, data):
+        try:
+            self.sk.write(data)
+            self.sk.flush()
+        except IOError:
+            raise ashd.serve.closed()
+
+    def handle(self):
+        head = ashd.scgi.readhead(self.sk)
+        self.env = mkenv(head, self.sk)
+        reqevent = ashd.perf.request(self.env)
+        exc = (None, None, None)
+        try:
+            super(reqthread, self).handle()
+            if self.status:
+                reqevent.response([self.status, self.headers])
+        except:
+            exc = sys.exc_info()
+            raise
+        finally:
+            reqevent.__exit__(*exc)
+
+    def run(self):
+        try:
+            super(reqthread, self).run()
+        finally:
+            self.sk.close()
+            self.bsk.close()
+
+while True:
+    nsk, addr = sk.accept()
+    try:
+        reqthread(nsk).start()
+    finally:
+        nsk.close()
index 8944b5c..ba7038d 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python3
 
 import sys, os, getopt, threading, logging, time, locale, collections
-import ashd.proto, ashd.util, ashd.perf
+import ashd.proto, ashd.util, ashd.perf, ashd.serve
 try:
     import pdm.srv
 except:
@@ -52,10 +52,6 @@ else:
         sys.exit(1)
     handler = handlermod.application
 
-class closed(IOError):
-    def __init__(self):
-        super().__init__("The client has closed the connection.")
-
 cwd = os.getcwd()
 def absolutify(path):
     if path[0] != '/':
@@ -95,7 +91,7 @@ def unquoteurl(url):
             buf.append(c)
     return buf
 
-def dowsgi(req):
+def mkenv(req):
     env = {}
     env["wsgi.version"] = 1, 0
     for key, val in req.headers:
@@ -147,98 +143,55 @@ def dowsgi(req):
     env["wsgi.multithread"] = True
     env["wsgi.multiprocess"] = False
     env["wsgi.run_once"] = False
+    return env
 
-    resp = []
-    respsent = []
+if reqlimit != 0:
+    guard = ashd.serve.abortlimiter(reqlimit).call
+else:
+    guard = lambda fun: fun()
 
-    def recode(thing):
-        if isinstance(thing, collections.ByteString):
-            return thing
-        else:
-            return str(thing).encode("latin-1")
+def recode(thing):
+    if isinstance(thing, collections.ByteString):
+        return thing
+    else:
+        return str(thing).encode("latin-1")
+
+class reqthread(ashd.serve.wsgithread):
+    def __init__(self, req):
+        super().__init__()
+        self.req = req.dup()
 
-    def flushreq():
-        if not respsent:
-            if not resp:
-                raise Exception("Trying to write data before starting response.")
-            status, headers = resp
-            respsent[:] = [True]
-            buf = bytearray()
-            buf += b"HTTP/1.1 " + recode(status) + b"\n"
-            for nm, val in headers:
-                buf += recode(nm) + b": " + recode(val) + b"\n"
-            buf += b"\n"
-            try:
-                req.sk.write(buf)
-            except IOError:
-                raise closed()
+    def handlewsgi(self):
+        return handler(self.env, self.startreq)
 
-    def write(data):
-        if not data:
-            return
-        flushreq()
+    def writehead(self, status, headers):
+        buf = bytearray()
+        buf += b"HTTP/1.1 " + recode(status) + b"\n"
+        for nm, val in headers:
+            buf += recode(nm) + b": " + recode(val) + b"\n"
+        buf += b"\n"
         try:
-            req.sk.write(data)
-            req.sk.flush()
+            self.req.sk.write(buf)
         except IOError:
-            raise closed()
+            raise ashd.serve.closed()
 
-    def startreq(status, headers, exc_info = None):
-        if resp:
-            if exc_info:                # Interesting, this...
-                try:
-                    if respsent:
-                        raise exc_info[1]
-                finally:
-                    exc_info = None     # CPython GC bug?
-            else:
-                raise Exception("Can only start responding once.")
-        resp[:] = status, headers
-        return write
-
-    with ashd.perf.request(env) as reqevent:
+    def writedata(self, data):
         try:
-            respiter = handler(env, startreq)
-            try:
-                for data in respiter:
-                    write(data)
-                if resp:
-                    flushreq()
-            finally:
-                if hasattr(respiter, "close"):
-                    respiter.close()
-        except closed:
-            pass
-        if resp:
-            reqevent.response(resp)
-
-flightlock = threading.Condition()
-inflight = 0
-
-class reqthread(threading.Thread):
-    def __init__(self, req):
-        super().__init__(name = "Request handler")
-        self.req = req.dup()
+            self.req.sk.write(data)
+            self.req.sk.flush()
+        except IOError:
+            raise ashd.serve.closed()
+
+    def handle(self):
+        self.env = mkenv(self.req)
+        with ashd.perf.request(self.env) as reqevent:
+            super().handle()
+            if self.status:
+                reqevent.response([self.status, self.headers])
     
     def run(self):
-        global inflight
         try:
-            with flightlock:
-                if reqlimit != 0:
-                    start = time.time()
-                    while inflight >= reqlimit:
-                        flightlock.wait(10)
-                        if time.time() - start > 10:
-                            os.abort()
-                inflight += 1
-            try:
-                dowsgi(self.req)
-            finally:
-                with flightlock:
-                    inflight -= 1
-                    flightlock.notify()
-        except:
-            log.error("exception occurred in handler thread", exc_info=True)
+            guard(super().run)
         finally:
             self.req.close()
             sys.stderr.flush()
index 8fa5767..c00c5a3 100644 (file)
@@ -1,13 +1,6 @@
-import sys, collections
-import threading
-
 class protoerr(Exception):
     pass
 
-class closed(IOError):
-    def __init__(self):
-        super(closed, self).__init__("The client has closed the connection.")
-
 def readns(sk):
     hln = 0
     while True:
@@ -34,115 +27,5 @@ def readhead(sk):
         i += 2
     return ret
 
-class reqthread(threading.Thread):
-    def __init__(self, sk, handler):
-        super(reqthread, self).__init__(name = "SCGI request handler")
-        self.bsk = sk.dup()
-        self.sk = self.bsk.makefile("rwb")
-        self.handler = handler
-
-    def run(self):
-        try:
-            head = readhead(self.sk)
-            self.handler(head, self.sk)
-        finally:
-            self.sk.close()
-            self.bsk.close()
-
-def handlescgi(sk, handler):
-    t = reqthread(sk, handler)
-    t.start()
-
-def servescgi(socket, handler):
-    while True:
-        nsk, addr = socket.accept()
-        try:
-            handlescgi(nsk, handler)
-        finally:
-            nsk.close()
-
 def decodehead(head, coding):
     return {k.decode(coding): v.decode(coding) for k, v in head.items()}
-
-def wrapwsgi(handler):
-    def handle(head, sk):
-        try:
-            env = decodehead(head, "utf-8")
-            env["wsgi.uri_encoding"] = "utf-8"
-        except UnicodeError:
-            env = decodehead(head, "latin-1")
-            env["wsgi.uri_encoding"] = "latin-1"
-        env["wsgi.version"] = 1, 0
-        if "HTTP_X_ASH_PROTOCOL" in env:
-            env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"]
-        elif "HTTPS" in env:
-            env["wsgi.url_scheme"] = "https"
-        else:
-            env["wsgi.url_scheme"] = "http"
-        env["wsgi.input"] = sk
-        env["wsgi.errors"] = sys.stderr
-        env["wsgi.multithread"] = True
-        env["wsgi.multiprocess"] = False
-        env["wsgi.run_once"] = False
-
-        resp = []
-        respsent = []
-
-        def recode(thing):
-            if isinstance(thing, collections.ByteString):
-                return thing
-            else:
-                return str(thing).encode("latin-1")
-
-        def flushreq():
-            if not respsent:
-                if not resp:
-                    raise Exception("Trying to write data before starting response.")
-                status, headers = resp
-                respsent[:] = [True]
-                buf = bytearray()
-                buf += b"Status: " + recode(status) + b"\n"
-                for nm, val in headers:
-                    buf += recode(nm) + b": " + recode(val) + b"\n"
-                buf += b"\n"
-                try:
-                    sk.write(buf)
-                except IOError:
-                    raise closed()
-
-        def write(data):
-            if not data:
-                return
-            flushreq()
-            try:
-                sk.write(data)
-                sk.flush()
-            except IOError:
-                raise closed()
-
-        def startreq(status, headers, exc_info = None):
-            if resp:
-                if exc_info:                # Interesting, this...
-                    try:
-                        if respsent:
-                            raise exc_info[1]
-                    finally:
-                        exc_info = None     # CPython GC bug?
-                else:
-                    raise Exception("Can only start responding once.")
-            resp[:] = status, headers
-            return write
-
-        respiter = handler(env, startreq)
-        try:
-            try:
-                for data in respiter:
-                    write(data)
-                if resp:
-                    flushreq()
-            except closed:
-                pass
-        finally:
-            if hasattr(respiter, "close"):
-                respiter.close()
-    return handle
diff --git a/python3/ashd/serve.py b/python3/ashd/serve.py
new file mode 100644 (file)
index 0000000..fe839a2
--- /dev/null
@@ -0,0 +1,120 @@
+import os, threading, time, logging
+
+log = logging.getLogger("ashd.serve")
+seq = 1
+seqlk = threading.Lock()
+
+def reqseq():
+    global seq
+    with seqlk:
+        s = seq
+        seq += 1
+        return s
+
+class reqthread(threading.Thread):
+    def __init__(self, name=None):
+        if name is None:
+            name = "Request handler %i" % reqseq()
+        super().__init__(name=name)
+
+    def handle(self):
+        raise Exception()
+
+    def run(self):
+        try:
+            self.handle()
+        except:
+            log.error("exception occurred when handling request", exc_info=True)
+
+class closed(IOError):
+    def __init__(self):
+        super().__init__("The client has closed the connection.")
+
+class wsgithread(reqthread):
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+        self.status = None
+        self.headers = []
+        self.respsent = False
+
+    def handlewsgi(self):
+        raise Exception()
+    def writehead(self, status, headers):
+        raise Exception()
+    def writedata(self, data):
+        raise Exception()
+
+    def write(self, data):
+        if not data:
+            return
+        self.flushreq()
+        self.writedata(data)
+
+    def flushreq(self):
+        if not self.respsent:
+            if not self.status:
+                raise Exception("Cannot send response body before starting response.")
+            self.respsent = True
+            self.writehead(self.status, self.headers)
+
+    def startreq(self, status, headers, exc_info=None):
+        if self.status:
+            if exc_info:                # Nice calling convetion ^^
+                try:
+                    if self.respsent:
+                        raise exc_info[1]
+                finally:
+                    exc_info = None     # CPython GC bug?
+            else:
+                raise Exception("Can only start responding once.")
+        self.status = status
+        self.headers = headers
+        return self.write
+    def handle(self):
+        try:
+            respiter = self.handlewsgi()
+            try:
+                for data in respiter:
+                    self.write(data)
+                if self.status:
+                    self.flushreq()
+            finally:
+                if hasattr(respiter, "close"):
+                    respiter.close()
+        except closed:
+            pass
+
+class calllimiter(object):
+    def __init__(self, limit):
+        self.limit = limit
+        self.lock = threading.Condition()
+        self.inflight = 0
+
+    def waited(self, time):
+        if time > 10:
+            raise RuntimeError("Waited too long")
+
+    def __enter__(self):
+        with self.lock:
+            start = time.time()
+            while self.inflight >= self.limit:
+                self.lock.wait(10)
+                self.waited(time.time() - start)
+            self.inflight += 1
+            return self
+
+    def __exit__(self, *excinfo):
+        with self.lock:
+            self.inflight -= 1
+            self.lock.notify()
+        return False
+
+    def call(self, target):
+        with self:
+            return target()
+
+class abortlimiter(calllimiter):
+    def waited(self, time):
+        if time > 10:
+            os.abort()
index df91477..24bac3a 100644 (file)
@@ -7,7 +7,7 @@ scgi-wsgi3 - WSGI adapter for SCGI
 
 SYNOPSIS
 --------
-*scgi-wsgi3* [*-hA*] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...]
+*scgi-wsgi3* [*-hA*] [*-m* 'PDM-SPEC'] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...]
 
 DESCRIPTION
 -----------
@@ -53,6 +53,12 @@ OPTIONS
        address listening for connections on 'PORT' instead. If 'HOST'
        is not given, `localhost` is used by default.
 
+*-m* 'PDM-SPEC'::
+
+       If the PDM library is installed on the system, create a
+       listening socket for connection PDM clients according to
+       'PDM-SPEC'.
+
 AUTHOR
 ------
 Fredrik Tolf <fredrik@dolda2000.com>
index c66f6e3..bd625c4 100755 (executable)
@@ -1,16 +1,20 @@
 #!/usr/bin/python3
 
-import sys, os, getopt, logging
+import sys, os, getopt, logging, collections
 import socket
-import ashd.scgi
+import ashd.scgi, ashd.perf, ashd.serve
+try:
+    import pdm.srv
+except:
+    pdm = None
 
 def usage(out):
-    out.write("usage: scgi-wsgi3 [-hAL] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
+    out.write("usage: scgi-wsgi3 [-hAL] [-m PDM-SPEC] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n")
 
 sk = None
 modwsgi_compat = False
 setlog = True
-opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:")
+opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:m:")
 for o, a in opts:
     if o == "-h":
         usage(sys.stdout)
@@ -33,6 +37,9 @@ for o, a in opts:
         sk.listen(32)
     elif o == "-A":
         modwsgi_compat = True
+    elif o == "-m":
+        if pdm is not None:
+            pdm.srv.listen(a)
 if len(args) < 1:
     usage(sys.stderr)
     sys.exit(1)
@@ -61,4 +68,78 @@ else:
         sys.exit(1)
     handler = handlermod.application
 
-ashd.scgi.servescgi(sk, ashd.scgi.wrapwsgi(handler))
+def mkenv(head, sk):
+    try:
+        env = ashd.scgi.decodehead(head, "utf-8")
+        env["wsgi.uri_encoding"] = "utf-8"
+    except UnicodeError:
+        env = ashd.scgi.decodehead(head, "latin-1")
+        env["wsgi.uri_encoding"] = "latin-1"
+    env["wsgi.version"] = 1, 0
+    if "HTTP_X_ASH_PROTOCOL" in env:
+        env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"]
+    elif "HTTPS" in env:
+        env["wsgi.url_scheme"] = "https"
+    else:
+        env["wsgi.url_scheme"] = "http"
+    env["wsgi.input"] = sk
+    env["wsgi.errors"] = sys.stderr
+    env["wsgi.multithread"] = True
+    env["wsgi.multiprocess"] = False
+    env["wsgi.run_once"] = False
+    return env
+
+def recode(thing):
+    if isinstance(thing, collections.ByteString):
+        return thing
+    else:
+        return str(thing).encode("latin-1")
+
+class reqthread(ashd.serve.wsgithread):
+    def __init__(self, sk):
+        super().__init__()
+        self.bsk = sk.dup()
+        self.sk = self.bsk.makefile("rwb")
+
+    def handlewsgi(self):
+        return handler(self.env, self.startreq)
+
+    def writehead(self, status, headers):
+        buf = bytearray()
+        buf += b"Status: " + recode(status) + b"\n"
+        for nm, val in headers:
+            buf += recode(nm) + b": " + recode(val) + b"\n"
+        buf += b"\n"
+        try:
+            self.sk.write(buf)
+        except IOError:
+            raise ashd.serve.closed()
+
+    def writedata(self, data):
+        try:
+            self.sk.write(data)
+            self.sk.flush()
+        except IOError:
+            raise ashd.serve.closed()
+
+    def handle(self):
+        head = ashd.scgi.readhead(self.sk)
+        self.env = mkenv(head, self.sk)
+        with ashd.perf.request(self.env) as reqevent:
+            super().handle()
+            if self.status:
+                reqevent.response([self.status, self.headers])
+
+    def run(self):
+        try:
+            super().run()
+        finally:
+            self.sk.close()
+            self.bsk.close()
+
+while True:
+    nsk, addr = sk.accept()
+    try:
+        reqthread(nsk).start()
+    finally:
+        nsk.close()
index c50a15c..48a8ed3 100644 (file)
@@ -158,19 +158,24 @@ static int forkchild(char *usrnm, struct hthead *forreq, int reqfd)
 
 static void serve2(struct user *usr, struct hthead *req, int fd)
 {
+    int serr;
+    
     if(usr->fd < 0)
        usr->fd = forkchild(usr->name, req, fd);
-    if(sendreq(usr->fd, req, fd)) {
-       if((errno == EPIPE) || (errno == ECONNRESET)) {
+    if(sendreq2(usr->fd, req, fd, MSG_NOSIGNAL | MSG_DONTWAIT)) {
+       serr = errno;
+       if((serr == EPIPE) || (serr == ECONNRESET)) {
            /* Assume that the child has crashed and restart it. */
            close(usr->fd);
            usr->fd = forkchild(usr->name, req, fd);
-           if(!sendreq(usr->fd, req, fd))
+           if(!sendreq2(usr->fd, req, fd, MSG_NOSIGNAL | MSG_DONTWAIT))
                return;
        }
-       flog(LOG_ERR, "could not pass on request to user `%s': %s", usr->name, strerror(errno));
-       close(usr->fd);
-       usr->fd = -1;
+       flog(LOG_ERR, "could not pass on request to user `%s': %s", usr->name, strerror(serr));
+       if(serr != EAGAIN) {
+           close(usr->fd);
+           usr->fd = -1;
+       }
     }
 }