-import os, threading, time, logging
+import sys, os, threading, time, logging, select, queue, collections
+from . import perf
log = logging.getLogger("ashd.serve")
seq = 1
seq += 1
return s
-class reqthread(threading.Thread):
- def __init__(self, name=None):
- if name is None:
- name = "Request handler %i" % reqseq()
- super().__init__(name=name)
-
- def handle(self):
- raise Exception()
-
- def run(self):
- try:
- self.handle()
- except:
- log.error("exception occurred when handling request", exc_info=True)
-
class closed(IOError):
def __init__(self):
super().__init__("The client has closed the connection.")
-class wsgithread(reqthread):
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
+class reqthread(threading.Thread):
+ def __init__(self, *, name=None, **kw):
+ if name is None:
+ name = "Request handler %i" % reqseq()
+ super().__init__(name=name, **kw)
+
+class wsgirequest(object):
+ def __init__(self, *, handler):
self.status = None
self.headers = []
self.respsent = False
+ self.handler = handler
+ self.buffer = bytearray()
def handlewsgi(self):
raise Exception()
+ def fileno(self):
+ raise Exception()
def writehead(self, status, headers):
raise Exception()
- def writedata(self, data):
+ def flush(self):
raise Exception()
-
- def write(self, data):
- if not data:
- return
- self.flushreq()
- self.writedata(data)
+ def close(self):
+ pass
+ def writedata(self, data):
+ self.buffer.extend(data)
def flushreq(self):
if not self.respsent:
self.respsent = True
self.writehead(self.status, self.headers)
+ def write(self, data):
+ if not data:
+ return
+ self.flushreq()
+ self.writedata(data)
+ self.handler.ckflush(self)
+
def startreq(self, status, headers, exc_info=None):
if self.status:
- if exc_info: # Nice calling convetion ^^
+ if exc_info:
try:
if self.respsent:
raise exc_info[1]
finally:
- exc_info = None # CPython GC bug?
+ exc_info = None
else:
raise Exception("Can only start responding once.")
self.status = status
self.headers = headers
return self.write
-
- def handle(self):
+
+class handler(object):
+ def handle(self, request):
+ raise Exception()
+ def ckflush(self, req):
+ p = select.poll()
+ p.register(req, select.POLLOUT)
+ while len(req.buffer) > 0:
+ p.poll()
+ req.flush()
+ def close(self):
+ pass
+
+ @classmethod
+ def parseargs(cls, **args):
+ if len(args) > 0:
+ raise ValueError("unknown handler argument: " + next(iter(args)))
+ return {}
+
+class single(handler):
+ cname = "single"
+
+ def handle(self, req):
try:
- respiter = self.handlewsgi()
+ env = req.mkenv()
+ with perf.request(env) as reqevent:
+ respiter = req.handlewsgi(env, req.startreq)
+ for data in respiter:
+ req.write(data)
+ if req.status:
+ reqevent.response([req.status, req.headers])
+ req.flushreq()
+ self.ckflush(req)
+ except closed:
+ pass
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
+ finally:
+ req.close()
+
+def dbg(*a):
+ f = True
+ for o in a:
+ if not f:
+ sys.stderr.write(" ")
+ sys.stderr.write(str(a))
+ f = False
+ sys.stderr.write("\n")
+ sys.stderr.flush()
+
+class freethread(handler):
+ cname = "free"
+
+ def __init__(self, *, max=None, timeout=None, **kw):
+ super().__init__(**kw)
+ self.current = set()
+ self.lk = threading.Lock()
+ self.tcond = threading.Condition(self.lk)
+ self.max = max
+ self.timeout = timeout
+
+ @classmethod
+ def parseargs(cls, *, max=None, abort=None, **args):
+ ret = super().parseargs(**args)
+ if max:
+ ret["max"] = int(max)
+ if abort:
+ ret["timeout"] = int(abort)
+ return ret
+
+ def handle(self, req):
+ with self.lk:
+ if self.max is not None:
+ if self.timeout is not None:
+ now = start = time.time()
+ while len(self.current) >= self.max:
+ self.tcond.wait(start + self.timeout - now)
+ now = time.time()
+ if now - start > self.timeout:
+ os.abort()
+ else:
+ while len(self.current) >= self.max:
+ self.tcond.wait()
+ th = reqthread(target=self.run, args=[req])
+ th.registered = False
+ th.start()
+ while not th.registered:
+ self.tcond.wait()
+
+ def run(self, req):
+ try:
+ th = threading.current_thread()
+ with self.lk:
+ self.current.add(th)
+ th.registered = True
+ self.tcond.notify_all()
try:
+ env = req.mkenv()
+ with perf.request(env) as reqevent:
+ respiter = req.handlewsgi(env, req.startreq)
+ for data in respiter:
+ req.write(data)
+ if req.status:
+ reqevent.response([req.status, req.headers])
+ req.flushreq()
+ self.ckflush(req)
+ except closed:
+ pass
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
+ finally:
+ with self.lk:
+ self.current.remove(th)
+ self.tcond.notify_all()
+ finally:
+ req.close()
+
+ def close(self):
+ while True:
+ with self.lk:
+ if len(self.current) > 0:
+ th = next(iter(self.current))
+ else:
+ return
+ th.join()
+
+class threadpool(handler):
+ cname = "pool"
+
+ def __init__(self, *, max=25, qsz=100, timeout=None, **kw):
+ super().__init__(**kw)
+ self.current = set()
+ self.clk = threading.Lock()
+ self.ccond = threading.Condition(self.clk)
+ self.queue = collections.deque()
+ self.waiting = set()
+ self.waitlimit = 5
+ self.wlstart = 0.0
+ self.qlk = threading.Lock()
+ self.qfcond = threading.Condition(self.qlk)
+ self.qecond = threading.Condition(self.qlk)
+ self.max = max
+ self.qsz = qsz
+ self.timeout = timeout
+
+ @classmethod
+ def parseargs(cls, *, max=None, queue=None, abort=None, **args):
+ ret = super().parseargs(**args)
+ if max:
+ ret["max"] = int(max)
+ if queue:
+ ret["qsz"] = int(queue)
+ if abort:
+ ret["timeout"] = int(abort)
+ return ret
+
+ def handle(self, req):
+ spawn = False
+ with self.qlk:
+ if self.timeout is not None:
+ now = start = time.time()
+ while len(self.queue) >= self.qsz:
+ self.qecond.wait(start + self.timeout - now)
+ now = time.time()
+ if now - start > self.timeout:
+ os.abort()
+ else:
+ while len(self.queue) >= self.qsz:
+ self.qecond.wait()
+ self.queue.append(req)
+ self.qfcond.notify()
+ if len(self.waiting) < 1:
+ spawn = True
+ if spawn:
+ with self.clk:
+ if len(self.current) < self.max:
+ th = reqthread(target=self.run)
+ th.registered = False
+ th.start()
+ while not th.registered:
+ self.ccond.wait()
+
+ def handle1(self, req):
+ try:
+ env = req.mkenv()
+ with perf.request(env) as reqevent:
+ respiter = req.handlewsgi(env, req.startreq)
for data in respiter:
- self.write(data)
- if self.status:
- self.flushreq()
+ req.write(data)
+ if req.status:
+ reqevent.response([req.status, req.headers])
+ req.flushreq()
+ self.ckflush(req)
+ except closed:
+ pass
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
+
+ def run(self):
+ timeout = 10.0
+ th = threading.current_thread()
+ with self.clk:
+ self.current.add(th)
+ th.registered = True
+ self.ccond.notify_all()
+ try:
+ while True:
+ start = now = time.time()
+ with self.qlk:
+ while len(self.queue) < 1:
+ if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout:
+ return
+ self.waiting.add(th)
+ try:
+ if len(self.waiting) == self.waitlimit:
+ self.wlstart = now
+ self.qfcond.wait(start + timeout - now)
+ finally:
+ self.waiting.remove(th)
+ now = time.time()
+ if now - start > timeout:
+ return
+ req = self.queue.popleft()
+ self.qecond.notify()
+ try:
+ self.handle1(req)
+ finally:
+ req.close()
+ finally:
+ with self.clk:
+ self.current.remove(th)
+
+ def close(self):
+ while True:
+ with self.clk:
+ if len(self.current) > 0:
+ th = next(iter(self.current))
+ else:
+ return
+ th.join()
+
+class resplex(handler):
+ cname = "rplex"
+
+ def __init__(self, *, max=None, **kw):
+ super().__init__(**kw)
+ self.current = set()
+ self.lk = threading.Lock()
+ self.tcond = threading.Condition(self.lk)
+ self.max = max
+ self.cqueue = queue.Queue(5)
+ self.cnpipe = os.pipe()
+ self.rthread = reqthread(name="Response thread", target=self.handle2)
+ self.rthread.start()
+
+ @classmethod
+ def parseargs(cls, *, max=None, **args):
+ ret = super().parseargs(**args)
+ if max:
+ ret["max"] = int(max)
+ return ret
+
+ def ckflush(self, req):
+ raise Exception("resplex handler does not support the write() function")
+
+ def handle(self, req):
+ with self.lk:
+ if self.max is not None:
+ while len(self.current) >= self.max:
+ self.tcond.wait()
+ th = reqthread(target=self.handle1, args=[req])
+ th.registered = False
+ th.start()
+ while not th.registered:
+ self.tcond.wait()
+
+ def handle1(self, req):
+ try:
+ th = threading.current_thread()
+ with self.lk:
+ self.current.add(th)
+ th.registered = True
+ self.tcond.notify_all()
+ try:
+ env = req.mkenv()
+ respobj = req.handlewsgi(env, req.startreq)
+ respiter = iter(respobj)
+ if not req.status:
+ log.error("request handler returned without calling start_request")
+ if hasattr(respiter, "close"):
+ respiter.close()
+ return
+ else:
+ self.cqueue.put((req, respiter))
+ os.write(self.cnpipe[1], b" ")
+ req = None
finally:
- if hasattr(respiter, "close"):
- respiter.close()
+ with self.lk:
+ self.current.remove(th)
+ self.tcond.notify_all()
except closed:
pass
+ except:
+ log.error("exception occurred when handling request", exc_info=True)
+ finally:
+ if req is not None:
+ req.close()
+
+ def handle2(self):
+ try:
+ rp = self.cnpipe[0]
+ current = {}
+
+ def closereq(req):
+ respiter = current[req]
+ try:
+ if respiter is not None and hasattr(respiter, "close"):
+ respiter.close()
+ except:
+ log.error("exception occurred when closing iterator", exc_info=True)
+ try:
+ req.close()
+ except:
+ log.error("exception occurred when closing request", exc_info=True)
+ del current[req]
+ def ckiter(req):
+ respiter = current[req]
+ if respiter is not None:
+ rem = False
+ try:
+ data = next(respiter)
+ except StopIteration:
+ rem = True
+ try:
+ req.flushreq()
+ except:
+ log.error("exception occurred when handling response data", exc_info=True)
+ except:
+ rem = True
+ log.error("exception occurred when iterating response", exc_info=True)
+ if not rem:
+ if data:
+ try:
+ req.flushreq()
+ req.writedata(data)
+ except:
+ log.error("exception occurred when handling response data", exc_info=True)
+ rem = True
+ if rem:
+ current[req] = None
+ try:
+ if hasattr(respiter, "close"):
+ respiter.close()
+ except:
+ log.error("exception occurred when closing iterator", exc_info=True)
+ respiter = None
+ if respiter is None and not req.buffer:
+ closereq(req)
-class calllimiter(object):
- def __init__(self, limit):
- self.limit = limit
- self.lock = threading.Condition()
- self.inflight = 0
-
- def waited(self, time):
- if time > 10:
- raise RuntimeError("Waited too long")
-
- def __enter__(self):
- with self.lock:
- start = time.time()
- while self.inflight >= self.limit:
- self.lock.wait(10)
- self.waited(time.time() - start)
- self.inflight += 1
- return self
-
- def __exit__(self, *excinfo):
- with self.lock:
- self.inflight -= 1
- self.lock.notify()
- return False
-
- def call(self, target):
- with self:
- return target()
-
-class abortlimiter(calllimiter):
- def waited(self, time):
- if time > 10:
+ while True:
+ bufl = list(req for req in current.keys() if req.buffer)
+ rls, wls, els = select.select([rp], bufl, [rp] + bufl)
+ if rp in rls:
+ ret = os.read(rp, 1024)
+ if not ret:
+ os.close(rp)
+ return
+ try:
+ while True:
+ req, respiter = self.cqueue.get(False)
+ current[req] = respiter
+ ckiter(req)
+ except queue.Empty:
+ pass
+ for req in wls:
+ try:
+ req.flush()
+ except closed:
+ closereq(req)
+ except:
+ log.error("exception occurred when writing response", exc_info=True)
+ closereq(req)
+ else:
+ if len(req.buffer) < 65536:
+ ckiter(req)
+ except:
+ log.critical("unexpected exception occurred in response handler thread", exc_info=True)
os.abort()
+
+ def close(self):
+ while True:
+ with self.lk:
+ if len(self.current) > 0:
+ th = next(iter(self.current))
+ else:
+ break
+ th.join()
+ os.close(self.cnpipe[1])
+ self.rthread.join()
+
+names = {cls.cname: cls for cls in globals().values() if
+ isinstance(cls, type) and
+ issubclass(cls, handler) and
+ hasattr(cls, "cname")}
+
+def parsehspec(spec):
+ if ":" not in spec:
+ return spec, {}
+ nm, spec = spec.split(":", 1)
+ args = {}
+ while spec:
+ if "," in spec:
+ part, spec = spec.split(",", 1)
+ else:
+ part, spec = spec, None
+ if "=" in part:
+ key, val = part.split("=", 1)
+ else:
+ key, val = part, ""
+ args[key] = val
+ return nm, args