From: Fredrik Tolf Date: Sun, 3 Feb 2013 02:29:45 +0000 (+0100) Subject: Merge branches 'block' and 'py-reserve' X-Git-Tag: 0.12~6 X-Git-Url: http://www.dolda2000.com/gitweb/?p=ashd.git;a=commitdiff_plain;h=27ca317cc6d19aa6080eae9a446212e928696bf5;hp=92db680bd8875d87b47a8cafd53bb96ca8d2d71f Merge branches 'block' and 'py-reserve' --- diff --git a/examples/python/dynhosts/dynhosts b/examples/python/dynhosts/dynhosts index 9a69a70..0e10b73 100755 --- a/examples/python/dynhosts/dynhosts +++ b/examples/python/dynhosts/dynhosts @@ -20,6 +20,6 @@ def serve(req): children[dname] = util.pchild(["dirplex", path], autorespawn = True) children[dname].passreq(req) return - util.respond(req, "No such host in configured.\n", status = "404 Not Found", ctype = "text/plain") + util.respond(req, "No such host is configured.\n", status = "404 Not Found", ctype = "text/plain") util.serveloop(serve) diff --git a/examples/python/dynhosts/run b/examples/python/dynhosts/run index d4203ea..07f6673 100755 --- a/examples/python/dynhosts/run +++ b/examples/python/dynhosts/run @@ -4,9 +4,5 @@ set -e cd "$(dirname "$0")" -# Start htparser running this dynhosts script. The setsid command -# ensures that SIGINT is only received by htparser and not by -# dynhosts; it is not of grave importance, but makes shutdown slightly -# more clean, and hides the KeyboardInterrupt otherwise raised by -# Python. -htparser plain:port=8080 -- setsid ./dynhosts . +# Start htparser running this dynhosts script. +htparser plain:port=8080 -- ./dynhosts . diff --git a/examples/python/wsgidir/run b/examples/python/wsgidir/run index 5dbe5d2..8931d4c 100755 --- a/examples/python/wsgidir/run +++ b/examples/python/wsgidir/run @@ -7,9 +7,4 @@ cd "$(dirname "$0")" # Invoke dirplex running in this directory, loading the wsgidir.rc # configuration file. The same configuration can be put in # e.g. /etc/ashd/dirplex.d or in any .htrc file. - -# The setsid command ensures that SIGINT is only received by htparser -# and not by dirplex or its children; it is not of any importance, but -# makes shutdown slightly cleaner, and hides the KeyboardInterrupt -# otherwise raised by Python. -htparser plain:port=8080 -- setsid dirplex -c ./wsgidir.rc . +htparser plain:port=8080 -- dirplex -c ./wsgidir.rc . diff --git a/python/ashd-wsgi b/python/ashd-wsgi index 2e4a2fe..01c158f 100755 --- a/python/ashd-wsgi +++ b/python/ashd-wsgi @@ -1,7 +1,7 @@ #!/usr/bin/python import sys, os, getopt, threading, logging, time -import ashd.proto, ashd.util, ashd.perf +import ashd.proto, ashd.util, ashd.perf, ashd.serve try: import pdm.srv except: @@ -52,10 +52,6 @@ else: sys.exit(1) handler = handlermod.application -class closed(IOError): - def __init__(self): - super(closed, self).__init__("The client has closed the connection.") - cwd = os.getcwd() def absolutify(path): if path[0] != '/': @@ -95,7 +91,7 @@ def unquoteurl(url): buf += c return buf -def dowsgi(req): +def mkenv(req): env = {} env["wsgi.version"] = 1, 0 for key, val in req.headers: @@ -147,103 +143,54 @@ def dowsgi(req): env["wsgi.multithread"] = True env["wsgi.multiprocess"] = False env["wsgi.run_once"] = False + return env - resp = [] - respsent = [] +if reqlimit != 0: + guard = ashd.serve.abortlimiter(reqlimit).call +else: + guard = lambda fun: fun() - def flushreq(): - if not respsent: - if not resp: - raise Exception, "Trying to write data before starting response." - status, headers = resp - respsent[:] = [True] - try: - req.sk.write("HTTP/1.1 %s\n" % status) - for nm, val in headers: - req.sk.write("%s: %s\n" % (nm, val)) - req.sk.write("\n") - except IOError: - raise closed() +class reqthread(ashd.serve.wsgithread): + def __init__(self, req): + super(reqthread, self).__init__() + self.req = req.dup() + + def handlewsgi(self): + return handler(self.env, self.startreq) - def write(data): - if not data: - return - flushreq() + def writehead(self, status, headers): try: - req.sk.write(data) - req.sk.flush() + self.req.sk.write("HTTP/1.1 %s\n" % status) + for nm, val in headers: + self.req.sk.write("%s: %s\n" % (nm, val)) + self.req.sk.write("\n") except IOError: - raise closed() + raise ashd.serve.closed() - def startreq(status, headers, exc_info = None): - if resp: - if exc_info: # Interesting, this... - try: - if respsent: - raise exc_info[0], exc_info[1], exc_info[2] - finally: - exc_info = None # CPython GC bug? - else: - raise Exception, "Can only start responding once." - resp[:] = status, headers - return write - - reqevent = ashd.perf.request(env) - exc = (None, None, None) - try: + def writedata(self, data): try: - respiter = handler(env, startreq) - try: - for data in respiter: - write(data) - if resp: - flushreq() - finally: - if hasattr(respiter, "close"): - respiter.close() - except closed: - pass - if resp: - reqevent.response(resp) - except: - exc = sys.exc_info() - raise - finally: - reqevent.__exit__(*exc) + self.req.sk.write(data) + self.req.sk.flush() + except IOError: + raise ashd.serve.closed() -flightlock = threading.Condition() -inflight = 0 + def handle(self): + self.env = mkenv(self.req) + reqevent = ashd.perf.request(self.env) + exc = (None, None, None) + try: + super(reqthread, self).handle() + if self.status: + reqevent.response([self.status, self.headers]) + except: + exc = sys.exc_info() + raise + finally: + reqevent.__exit__(*exc) -class reqthread(threading.Thread): - def __init__(self, req): - super(reqthread, self).__init__(name = "Request handler") - self.req = req.dup() - def run(self): - global inflight try: - flightlock.acquire() - try: - if reqlimit != 0: - start = time.time() - while inflight >= reqlimit: - flightlock.wait(10) - if time.time() - start > 10: - os.abort() - inflight += 1 - finally: - flightlock.release() - try: - dowsgi(self.req) - finally: - flightlock.acquire() - try: - inflight -= 1 - flightlock.notify() - finally: - flightlock.release() - except: - log.error("exception occurred in handler thread", exc_info=True) + guard(super(reqthread, self).run) finally: self.req.close() diff --git a/python/ashd/scgi.py b/python/ashd/scgi.py index f7ba3a8..1f0c5ab 100644 --- a/python/ashd/scgi.py +++ b/python/ashd/scgi.py @@ -1,13 +1,6 @@ -import sys -import threading - class protoerr(Exception): pass -class closed(IOError): - def __init__(self): - super(closed, self).__init__("The client has closed the connection.") - def readns(sk): hln = 0 while True: @@ -33,100 +26,3 @@ def readhead(sk): ret[parts[i]] = parts[i + 1] i += 2 return ret - -class reqthread(threading.Thread): - def __init__(self, sk, handler): - super(reqthread, self).__init__(name = "SCGI request handler") - self.bsk = sk.dup() - self.sk = self.bsk.makefile("r+") - self.handler = handler - - def run(self): - try: - head = readhead(self.sk) - self.handler(head, self.sk) - finally: - self.sk.close() - self.bsk.close() - -def handlescgi(sk, handler): - t = reqthread(sk, handler) - t.start() - -def servescgi(socket, handler): - while True: - nsk, addr = socket.accept() - try: - handlescgi(nsk, handler) - finally: - nsk.close() - -def wrapwsgi(handler): - def handle(head, sk): - env = dict(head) - env["wsgi.version"] = 1, 0 - if "HTTP_X_ASH_PROTOCOL" in env: - env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"] - elif "HTTPS" in env: - env["wsgi.url_scheme"] = "https" - else: - env["wsgi.url_scheme"] = "http" - env["wsgi.input"] = sk - env["wsgi.errors"] = sys.stderr - env["wsgi.multithread"] = True - env["wsgi.multiprocess"] = False - env["wsgi.run_once"] = False - - resp = [] - respsent = [] - - def flushreq(): - if not respsent: - if not resp: - raise Exception, "Trying to write data before starting response." - status, headers = resp - respsent[:] = [True] - try: - sk.write("Status: %s\n" % status) - for nm, val in headers: - sk.write("%s: %s\n" % (nm, val)) - sk.write("\n") - except IOError: - raise closed() - - def write(data): - if not data: - return - flushreq() - try: - sk.write(data) - sk.flush() - except IOError: - raise closed() - - def startreq(status, headers, exc_info = None): - if resp: - if exc_info: # Interesting, this... - try: - if respsent: - raise exc_info[0], exc_info[1], exc_info[2] - finally: - exc_info = None # CPython GC bug? - else: - raise Exception, "Can only start responding once." - resp[:] = status, headers - return write - - respiter = handler(env, startreq) - try: - try: - for data in respiter: - write(data) - if resp: - flushreq() - except closed: - pass - finally: - if hasattr(respiter, "close"): - respiter.close() - return handle diff --git a/python/ashd/serve.py b/python/ashd/serve.py new file mode 100644 index 0000000..14170d7 --- /dev/null +++ b/python/ashd/serve.py @@ -0,0 +1,132 @@ +import os, threading, time, logging + +log = logging.getLogger("ashd.serve") +seq = 1 +seqlk = threading.Lock() + +def reqseq(): + global seq + seqlk.acquire() + try: + s = seq + seq += 1 + return s + finally: + seqlk.release() + +class reqthread(threading.Thread): + def __init__(self, name=None): + if name is None: + name = "Request handler %i" % reqseq() + super(reqthread, self).__init__(name=name) + + def handle(self): + raise Exception() + + def run(self): + try: + self.handle() + except: + log.error("exception occurred when handling request", exc_info=True) + +class closed(IOError): + def __init__(self): + super(closed, self).__init__("The client has closed the connection.") + +class wsgithread(reqthread): + def __init__(self, **kwargs): + super(wsgithread, self).__init__(**kwargs) + self.status = None + self.headers = [] + self.respsent = False + + def handlewsgi(self): + raise Exception() + def writehead(self, status, headers): + raise Exception() + def writedata(self, data): + raise Exception() + + def write(self, data): + if not data: + return + self.flushreq() + self.writedata(data) + + def flushreq(self): + if not self.respsent: + if not self.status: + raise Exception("Cannot send response body before starting response.") + self.respsent = True + self.writehead(self.status, self.headers) + + def startreq(self, status, headers, exc_info=None): + if self.status: + if exc_info: # Nice calling convetion ^^ + try: + if self.respsent: + raise exc_info[0], exc_info[1], exc_info[2] + finally: + exc_info = None # CPython GC bug? + else: + raise Exception("Can only start responding once.") + self.status = status + self.headers = headers + return self.write + + def handle(self): + try: + respiter = self.handlewsgi() + try: + for data in respiter: + self.write(data) + if self.status: + self.flushreq() + finally: + if hasattr(respiter, "close"): + respiter.close() + except closed: + pass + +class calllimiter(object): + def __init__(self, limit): + self.limit = limit + self.lock = threading.Condition() + self.inflight = 0 + + def waited(self, time): + if time > 10: + raise RuntimeError("Waited too long") + + def __enter__(self): + self.lock.acquire() + try: + start = time.time() + while self.inflight >= self.limit: + self.lock.wait(10) + self.waited(time.time() - start) + self.inflight += 1 + return self + finally: + self.lock.release() + + def __exit__(self, *excinfo): + self.lock.acquire() + try: + self.inflight -= 1 + self.lock.notify() + finally: + self.lock.release() + return False + + def call(self, target): + self.__enter__() + try: + return target() + finally: + self.__exit__() + +class abortlimiter(calllimiter): + def waited(self, time): + if time > 10: + os.abort() diff --git a/python/doc/scgi-wsgi.doc b/python/doc/scgi-wsgi.doc index 1aab621..f67191f 100644 --- a/python/doc/scgi-wsgi.doc +++ b/python/doc/scgi-wsgi.doc @@ -7,7 +7,7 @@ scgi-wsgi - WSGI adapter for SCGI SYNOPSIS -------- -*scgi-wsgi* [*-hA*] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...] +*scgi-wsgi* [*-hA*] [*-m* 'PDM-SPEC'] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...] DESCRIPTION ----------- @@ -53,6 +53,12 @@ OPTIONS address listening for connections on 'PORT' instead. If 'HOST' is not given, `localhost` is used by default. +*-m* 'PDM-SPEC':: + + If the PDM library is installed on the system, create a + listening socket for connection PDM clients according to + 'PDM-SPEC'. + AUTHOR ------ Fredrik Tolf diff --git a/python/scgi-wsgi b/python/scgi-wsgi index e2689d4..2cf715b 100755 --- a/python/scgi-wsgi +++ b/python/scgi-wsgi @@ -2,15 +2,19 @@ import sys, os, getopt, logging import socket -import ashd.scgi +import ashd.scgi, ashd.perf, ashd.serve +try: + import pdm.srv +except: + pdm = None def usage(out): - out.write("usage: scgi-wsgi [-hAL] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n") + out.write("usage: scgi-wsgi [-hAL] [-m PDM-SPEC] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n") sk = None modwsgi_compat = False setlog = True -opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:") +opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:m:") for o, a in opts: if o == "-h": usage(sys.stdout) @@ -33,6 +37,9 @@ for o, a in opts: sk.listen(32) elif o == "-A": modwsgi_compat = True + elif o == "-m": + if pdm is not None: + pdm.srv.listen(a) if len(args) < 1: usage(sys.stderr) sys.exit(1) @@ -61,4 +68,72 @@ else: sys.exit(1) handler = handlermod.application -ashd.scgi.servescgi(sk, ashd.scgi.wrapwsgi(handler)) +def mkenv(head, sk): + env = dict(head) + env["wsgi.version"] = 1, 0 + if "HTTP_X_ASH_PROTOCOL" in env: + env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"] + elif "HTTPS" in env: + env["wsgi.url_scheme"] = "https" + else: + env["wsgi.url_scheme"] = "http" + env["wsgi.input"] = sk + env["wsgi.errors"] = sys.stderr + env["wsgi.multithread"] = True + env["wsgi.multiprocess"] = False + env["wsgi.run_once"] = False + return env + +class reqthread(ashd.serve.wsgithread): + def __init__(self, sk): + super(reqthread, self).__init__() + self.bsk = sk.dup() + self.sk = self.bsk.makefile("r+") + + def handlewsgi(self): + return handler(self.env, self.startreq) + + def writehead(self, status, headers): + try: + self.sk.write("Status: %s\n" % status) + for nm, val in headers: + self.sk.write("%s: %s\n" % (nm, val)) + self.sk.write("\n") + except IOError: + raise ashd.serve.closed() + + def writedata(self, data): + try: + self.sk.write(data) + self.sk.flush() + except IOError: + raise ashd.serve.closed() + + def handle(self): + head = ashd.scgi.readhead(self.sk) + self.env = mkenv(head, self.sk) + reqevent = ashd.perf.request(self.env) + exc = (None, None, None) + try: + super(reqthread, self).handle() + if self.status: + reqevent.response([self.status, self.headers]) + except: + exc = sys.exc_info() + raise + finally: + reqevent.__exit__(*exc) + + def run(self): + try: + super(reqthread, self).run() + finally: + self.sk.close() + self.bsk.close() + +while True: + nsk, addr = sk.accept() + try: + reqthread(nsk).start() + finally: + nsk.close() diff --git a/python3/ashd-wsgi3 b/python3/ashd-wsgi3 index 8944b5c..ba7038d 100755 --- a/python3/ashd-wsgi3 +++ b/python3/ashd-wsgi3 @@ -1,7 +1,7 @@ #!/usr/bin/python3 import sys, os, getopt, threading, logging, time, locale, collections -import ashd.proto, ashd.util, ashd.perf +import ashd.proto, ashd.util, ashd.perf, ashd.serve try: import pdm.srv except: @@ -52,10 +52,6 @@ else: sys.exit(1) handler = handlermod.application -class closed(IOError): - def __init__(self): - super().__init__("The client has closed the connection.") - cwd = os.getcwd() def absolutify(path): if path[0] != '/': @@ -95,7 +91,7 @@ def unquoteurl(url): buf.append(c) return buf -def dowsgi(req): +def mkenv(req): env = {} env["wsgi.version"] = 1, 0 for key, val in req.headers: @@ -147,98 +143,55 @@ def dowsgi(req): env["wsgi.multithread"] = True env["wsgi.multiprocess"] = False env["wsgi.run_once"] = False + return env - resp = [] - respsent = [] +if reqlimit != 0: + guard = ashd.serve.abortlimiter(reqlimit).call +else: + guard = lambda fun: fun() - def recode(thing): - if isinstance(thing, collections.ByteString): - return thing - else: - return str(thing).encode("latin-1") +def recode(thing): + if isinstance(thing, collections.ByteString): + return thing + else: + return str(thing).encode("latin-1") + +class reqthread(ashd.serve.wsgithread): + def __init__(self, req): + super().__init__() + self.req = req.dup() - def flushreq(): - if not respsent: - if not resp: - raise Exception("Trying to write data before starting response.") - status, headers = resp - respsent[:] = [True] - buf = bytearray() - buf += b"HTTP/1.1 " + recode(status) + b"\n" - for nm, val in headers: - buf += recode(nm) + b": " + recode(val) + b"\n" - buf += b"\n" - try: - req.sk.write(buf) - except IOError: - raise closed() + def handlewsgi(self): + return handler(self.env, self.startreq) - def write(data): - if not data: - return - flushreq() + def writehead(self, status, headers): + buf = bytearray() + buf += b"HTTP/1.1 " + recode(status) + b"\n" + for nm, val in headers: + buf += recode(nm) + b": " + recode(val) + b"\n" + buf += b"\n" try: - req.sk.write(data) - req.sk.flush() + self.req.sk.write(buf) except IOError: - raise closed() + raise ashd.serve.closed() - def startreq(status, headers, exc_info = None): - if resp: - if exc_info: # Interesting, this... - try: - if respsent: - raise exc_info[1] - finally: - exc_info = None # CPython GC bug? - else: - raise Exception("Can only start responding once.") - resp[:] = status, headers - return write - - with ashd.perf.request(env) as reqevent: + def writedata(self, data): try: - respiter = handler(env, startreq) - try: - for data in respiter: - write(data) - if resp: - flushreq() - finally: - if hasattr(respiter, "close"): - respiter.close() - except closed: - pass - if resp: - reqevent.response(resp) - -flightlock = threading.Condition() -inflight = 0 - -class reqthread(threading.Thread): - def __init__(self, req): - super().__init__(name = "Request handler") - self.req = req.dup() + self.req.sk.write(data) + self.req.sk.flush() + except IOError: + raise ashd.serve.closed() + + def handle(self): + self.env = mkenv(self.req) + with ashd.perf.request(self.env) as reqevent: + super().handle() + if self.status: + reqevent.response([self.status, self.headers]) def run(self): - global inflight try: - with flightlock: - if reqlimit != 0: - start = time.time() - while inflight >= reqlimit: - flightlock.wait(10) - if time.time() - start > 10: - os.abort() - inflight += 1 - try: - dowsgi(self.req) - finally: - with flightlock: - inflight -= 1 - flightlock.notify() - except: - log.error("exception occurred in handler thread", exc_info=True) + guard(super().run) finally: self.req.close() sys.stderr.flush() diff --git a/python3/ashd/scgi.py b/python3/ashd/scgi.py index 8fa5767..c00c5a3 100644 --- a/python3/ashd/scgi.py +++ b/python3/ashd/scgi.py @@ -1,13 +1,6 @@ -import sys, collections -import threading - class protoerr(Exception): pass -class closed(IOError): - def __init__(self): - super(closed, self).__init__("The client has closed the connection.") - def readns(sk): hln = 0 while True: @@ -34,115 +27,5 @@ def readhead(sk): i += 2 return ret -class reqthread(threading.Thread): - def __init__(self, sk, handler): - super(reqthread, self).__init__(name = "SCGI request handler") - self.bsk = sk.dup() - self.sk = self.bsk.makefile("rwb") - self.handler = handler - - def run(self): - try: - head = readhead(self.sk) - self.handler(head, self.sk) - finally: - self.sk.close() - self.bsk.close() - -def handlescgi(sk, handler): - t = reqthread(sk, handler) - t.start() - -def servescgi(socket, handler): - while True: - nsk, addr = socket.accept() - try: - handlescgi(nsk, handler) - finally: - nsk.close() - def decodehead(head, coding): return {k.decode(coding): v.decode(coding) for k, v in head.items()} - -def wrapwsgi(handler): - def handle(head, sk): - try: - env = decodehead(head, "utf-8") - env["wsgi.uri_encoding"] = "utf-8" - except UnicodeError: - env = decodehead(head, "latin-1") - env["wsgi.uri_encoding"] = "latin-1" - env["wsgi.version"] = 1, 0 - if "HTTP_X_ASH_PROTOCOL" in env: - env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"] - elif "HTTPS" in env: - env["wsgi.url_scheme"] = "https" - else: - env["wsgi.url_scheme"] = "http" - env["wsgi.input"] = sk - env["wsgi.errors"] = sys.stderr - env["wsgi.multithread"] = True - env["wsgi.multiprocess"] = False - env["wsgi.run_once"] = False - - resp = [] - respsent = [] - - def recode(thing): - if isinstance(thing, collections.ByteString): - return thing - else: - return str(thing).encode("latin-1") - - def flushreq(): - if not respsent: - if not resp: - raise Exception("Trying to write data before starting response.") - status, headers = resp - respsent[:] = [True] - buf = bytearray() - buf += b"Status: " + recode(status) + b"\n" - for nm, val in headers: - buf += recode(nm) + b": " + recode(val) + b"\n" - buf += b"\n" - try: - sk.write(buf) - except IOError: - raise closed() - - def write(data): - if not data: - return - flushreq() - try: - sk.write(data) - sk.flush() - except IOError: - raise closed() - - def startreq(status, headers, exc_info = None): - if resp: - if exc_info: # Interesting, this... - try: - if respsent: - raise exc_info[1] - finally: - exc_info = None # CPython GC bug? - else: - raise Exception("Can only start responding once.") - resp[:] = status, headers - return write - - respiter = handler(env, startreq) - try: - try: - for data in respiter: - write(data) - if resp: - flushreq() - except closed: - pass - finally: - if hasattr(respiter, "close"): - respiter.close() - return handle diff --git a/python3/ashd/serve.py b/python3/ashd/serve.py new file mode 100644 index 0000000..fe839a2 --- /dev/null +++ b/python3/ashd/serve.py @@ -0,0 +1,120 @@ +import os, threading, time, logging + +log = logging.getLogger("ashd.serve") +seq = 1 +seqlk = threading.Lock() + +def reqseq(): + global seq + with seqlk: + s = seq + seq += 1 + return s + +class reqthread(threading.Thread): + def __init__(self, name=None): + if name is None: + name = "Request handler %i" % reqseq() + super().__init__(name=name) + + def handle(self): + raise Exception() + + def run(self): + try: + self.handle() + except: + log.error("exception occurred when handling request", exc_info=True) + +class closed(IOError): + def __init__(self): + super().__init__("The client has closed the connection.") + +class wsgithread(reqthread): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.status = None + self.headers = [] + self.respsent = False + + def handlewsgi(self): + raise Exception() + def writehead(self, status, headers): + raise Exception() + def writedata(self, data): + raise Exception() + + def write(self, data): + if not data: + return + self.flushreq() + self.writedata(data) + + def flushreq(self): + if not self.respsent: + if not self.status: + raise Exception("Cannot send response body before starting response.") + self.respsent = True + self.writehead(self.status, self.headers) + + def startreq(self, status, headers, exc_info=None): + if self.status: + if exc_info: # Nice calling convetion ^^ + try: + if self.respsent: + raise exc_info[1] + finally: + exc_info = None # CPython GC bug? + else: + raise Exception("Can only start responding once.") + self.status = status + self.headers = headers + return self.write + + def handle(self): + try: + respiter = self.handlewsgi() + try: + for data in respiter: + self.write(data) + if self.status: + self.flushreq() + finally: + if hasattr(respiter, "close"): + respiter.close() + except closed: + pass + +class calllimiter(object): + def __init__(self, limit): + self.limit = limit + self.lock = threading.Condition() + self.inflight = 0 + + def waited(self, time): + if time > 10: + raise RuntimeError("Waited too long") + + def __enter__(self): + with self.lock: + start = time.time() + while self.inflight >= self.limit: + self.lock.wait(10) + self.waited(time.time() - start) + self.inflight += 1 + return self + + def __exit__(self, *excinfo): + with self.lock: + self.inflight -= 1 + self.lock.notify() + return False + + def call(self, target): + with self: + return target() + +class abortlimiter(calllimiter): + def waited(self, time): + if time > 10: + os.abort() diff --git a/python3/doc/scgi-wsgi3.doc b/python3/doc/scgi-wsgi3.doc index df91477..24bac3a 100644 --- a/python3/doc/scgi-wsgi3.doc +++ b/python3/doc/scgi-wsgi3.doc @@ -7,7 +7,7 @@ scgi-wsgi3 - WSGI adapter for SCGI SYNOPSIS -------- -*scgi-wsgi3* [*-hA*] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...] +*scgi-wsgi3* [*-hA*] [*-m* 'PDM-SPEC'] [*-p* 'MODPATH'] [*-T* \[HOST:]'PORT'] 'HANDLER-MODULE' ['ARGS'...] DESCRIPTION ----------- @@ -53,6 +53,12 @@ OPTIONS address listening for connections on 'PORT' instead. If 'HOST' is not given, `localhost` is used by default. +*-m* 'PDM-SPEC':: + + If the PDM library is installed on the system, create a + listening socket for connection PDM clients according to + 'PDM-SPEC'. + AUTHOR ------ Fredrik Tolf diff --git a/python3/scgi-wsgi3 b/python3/scgi-wsgi3 index c66f6e3..bd625c4 100755 --- a/python3/scgi-wsgi3 +++ b/python3/scgi-wsgi3 @@ -1,16 +1,20 @@ #!/usr/bin/python3 -import sys, os, getopt, logging +import sys, os, getopt, logging, collections import socket -import ashd.scgi +import ashd.scgi, ashd.perf, ashd.serve +try: + import pdm.srv +except: + pdm = None def usage(out): - out.write("usage: scgi-wsgi3 [-hAL] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n") + out.write("usage: scgi-wsgi3 [-hAL] [-m PDM-SPEC] [-p MODPATH] [-T [HOST:]PORT] HANDLER-MODULE [ARGS...]\n") sk = None modwsgi_compat = False setlog = True -opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:") +opts, args = getopt.getopt(sys.argv[1:], "+hALp:T:m:") for o, a in opts: if o == "-h": usage(sys.stdout) @@ -33,6 +37,9 @@ for o, a in opts: sk.listen(32) elif o == "-A": modwsgi_compat = True + elif o == "-m": + if pdm is not None: + pdm.srv.listen(a) if len(args) < 1: usage(sys.stderr) sys.exit(1) @@ -61,4 +68,78 @@ else: sys.exit(1) handler = handlermod.application -ashd.scgi.servescgi(sk, ashd.scgi.wrapwsgi(handler)) +def mkenv(head, sk): + try: + env = ashd.scgi.decodehead(head, "utf-8") + env["wsgi.uri_encoding"] = "utf-8" + except UnicodeError: + env = ashd.scgi.decodehead(head, "latin-1") + env["wsgi.uri_encoding"] = "latin-1" + env["wsgi.version"] = 1, 0 + if "HTTP_X_ASH_PROTOCOL" in env: + env["wsgi.url_scheme"] = env["HTTP_X_ASH_PROTOCOL"] + elif "HTTPS" in env: + env["wsgi.url_scheme"] = "https" + else: + env["wsgi.url_scheme"] = "http" + env["wsgi.input"] = sk + env["wsgi.errors"] = sys.stderr + env["wsgi.multithread"] = True + env["wsgi.multiprocess"] = False + env["wsgi.run_once"] = False + return env + +def recode(thing): + if isinstance(thing, collections.ByteString): + return thing + else: + return str(thing).encode("latin-1") + +class reqthread(ashd.serve.wsgithread): + def __init__(self, sk): + super().__init__() + self.bsk = sk.dup() + self.sk = self.bsk.makefile("rwb") + + def handlewsgi(self): + return handler(self.env, self.startreq) + + def writehead(self, status, headers): + buf = bytearray() + buf += b"Status: " + recode(status) + b"\n" + for nm, val in headers: + buf += recode(nm) + b": " + recode(val) + b"\n" + buf += b"\n" + try: + self.sk.write(buf) + except IOError: + raise ashd.serve.closed() + + def writedata(self, data): + try: + self.sk.write(data) + self.sk.flush() + except IOError: + raise ashd.serve.closed() + + def handle(self): + head = ashd.scgi.readhead(self.sk) + self.env = mkenv(head, self.sk) + with ashd.perf.request(self.env) as reqevent: + super().handle() + if self.status: + reqevent.response([self.status, self.headers]) + + def run(self): + try: + super().run() + finally: + self.sk.close() + self.bsk.close() + +while True: + nsk, addr = sk.accept() + try: + reqthread(nsk).start() + finally: + nsk.close() diff --git a/src/htparser.c b/src/htparser.c index 8d0b515..1dbb8ee 100644 --- a/src/htparser.c +++ b/src/htparser.c @@ -405,8 +405,9 @@ static void plexwatch(struct muth *muth, va_list args) { vavar(int, fd); char *buf; - int i, ret; + int i, s, ret; + s = 0; while(1) { if(block(fd, EV_READ, 0) == 0) break; @@ -416,6 +417,7 @@ static void plexwatch(struct muth *muth, va_list args) flog(LOG_WARNING, "received error on rootplex read channel: %s", strerror(errno)); exit(1); } else if(ret == 0) { + s = 1; free(buf); break; } @@ -423,15 +425,16 @@ static void plexwatch(struct muth *muth, va_list args) * some day... */ free(buf); } - close(plex); - plex = -1; + shutdown(plex, SHUT_RDWR); for(i = 0; i < listeners.d; i++) { if(listeners.b[i] == muth) bufdel(listeners, i); } - flog(LOG_INFO, "root handler exited, so shutting down listening..."); - while(listeners.d > 0) - resume(listeners.b[0], 0); + if(s) { + flog(LOG_INFO, "root handler exited, so shutting down listening..."); + while(listeners.d > 0) + resume(listeners.b[0], 0); + } } static void initroot(void *uu)