python: Start rewriting WSGI handlers with modular handling models.
[ashd.git] / python3 / ashd / serve.py
1 import sys, os, threading, time, logging, select
2 from . import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super().__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, *, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super().__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         raise Exception()
79     def close(self):
80         pass
81
82 class freethread(handler):
83     def __init__(self, **kw):
84         super().__init__(**kw)
85         self.current = set()
86         self.lk = threading.Lock()
87
88     def handle(self, req):
89         reqthread(target=self.run, args=[req]).start()
90
91     def ckflush(self, req):
92         while len(req.buffer) > 0:
93             rls, wls, els = select.select([], [req], [req])
94             req.flush()
95
96     def run(self, req):
97         try:
98             th = threading.current_thread()
99             with self.lk:
100                 self.current.add(th)
101             try:
102                 env = req.mkenv()
103                 with perf.request(env) as reqevent:
104                     respiter = req.handlewsgi(env, req.startreq)
105                     for data in respiter:
106                         req.write(data)
107                     if req.status:
108                         reqevent.response([req.status, req.headers])
109                         req.flushreq()
110                     self.ckflush(req)
111             except closed:
112                 pass
113             except:
114                 log.error("exception occurred when handling request", exc_info=True)
115             finally:
116                 with self.lk:
117                     self.current.remove(th)
118         finally:
119             req.close()
120
121     def close(self):
122         while True:
123             with self.lk:
124                 if len(self.current) > 0:
125                     th = next(iter(self.current))
126                 else:
127                     th = None
128             th.join()
129
130 names = {"free": freethread}