python: Fixed the real thread-starting problem.
[ashd.git] / python3 / ashd / serve.py
1 import sys, os, threading, time, logging, select, queue
2 from . import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super().__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, *, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super().__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, *, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         while len(req.buffer) > 0:
79             rls, wls, els = select.select([], [req], [req])
80             req.flush()
81     def close(self):
82         pass
83
84     @classmethod
85     def parseargs(cls, **args):
86         if len(args) > 0:
87             raise ValueError("unknown handler argument: " + next(iter(args)))
88         return {}
89
90 class single(handler):
91     cname = "single"
92
93     def handle(self, req):
94         try:
95             env = req.mkenv()
96             with perf.request(env) as reqevent:
97                 respiter = req.handlewsgi(env, req.startreq)
98                 for data in respiter:
99                     req.write(data)
100                 if req.status:
101                     reqevent.response([req.status, req.headers])
102                     req.flushreq()
103                 self.ckflush(req)
104         except closed:
105             pass
106         except:
107             log.error("exception occurred when handling request", exc_info=True)
108         finally:
109             req.close()
110
111 def dbg(*a):
112     f = True
113     for o in a:
114         if not f:
115             sys.stderr.write(" ")
116         sys.stderr.write(str(a))
117         f = False
118     sys.stderr.write("\n")
119     sys.stderr.flush()
120
121 class freethread(handler):
122     cname = "free"
123
124     def __init__(self, *, max=None, timeout=None, **kw):
125         super().__init__(**kw)
126         self.current = set()
127         self.lk = threading.Lock()
128         self.tcond = threading.Condition(self.lk)
129         self.max = max
130         self.timeout = timeout
131
132     @classmethod
133     def parseargs(cls, *, max=None, abort=None, **args):
134         ret = super().parseargs(**args)
135         if max:
136             ret["max"] = int(max)
137         if abort:
138             ret["timeout"] = int(abort)
139         return ret
140
141     def handle(self, req):
142         with self.lk:
143             if self.max is not None:
144                 if self.timeout is not None:
145                     now = start = time.time()
146                     while len(self.current) >= self.max:
147                         self.tcond.wait(start + self.timeout - now)
148                         now = time.time()
149                         if now - start > self.timeout:
150                             os.abort()
151                 else:
152                     while len(self.current) >= self.max:
153                         self.tcond.wait()
154             th = reqthread(target=self.run, args=[req])
155             th.registered = False
156             th.start()
157             while not th.registered:
158                 self.tcond.wait()
159
160     def run(self, req):
161         try:
162             th = threading.current_thread()
163             with self.lk:
164                 self.current.add(th)
165                 th.registered = True
166                 self.tcond.notify_all()
167             try:
168                 env = req.mkenv()
169                 with perf.request(env) as reqevent:
170                     respiter = req.handlewsgi(env, req.startreq)
171                     for data in respiter:
172                         req.write(data)
173                     if req.status:
174                         reqevent.response([req.status, req.headers])
175                         req.flushreq()
176                     self.ckflush(req)
177             except closed:
178                 pass
179             except:
180                 log.error("exception occurred when handling request", exc_info=True)
181             finally:
182                 with self.lk:
183                     self.current.remove(th)
184                     self.tcond.notify_all()
185         finally:
186             req.close()
187
188     def close(self):
189         while True:
190             with self.lk:
191                 if len(self.current) > 0:
192                     th = next(iter(self.current))
193                 else:
194                     return
195             th.join()
196
197 class resplex(handler):
198     cname = "rplex"
199
200     def __init__(self, *, max=None, **kw):
201         super().__init__(**kw)
202         self.current = set()
203         self.lk = threading.Lock()
204         self.tcond = threading.Condition(self.lk)
205         self.max = max
206         self.cqueue = queue.Queue(5)
207         self.cnpipe = os.pipe()
208         self.rthread = reqthread(name="Response thread", target=self.handle2)
209         self.rthread.start()
210
211     @classmethod
212     def parseargs(cls, *, max=None, **args):
213         ret = super().parseargs(**args)
214         if max:
215             ret["max"] = int(max)
216         return ret
217
218     def ckflush(self, req):
219         raise Exception("resplex handler does not support the write() function")
220
221     def handle(self, req):
222         with self.lk:
223             if self.max is not None:
224                 while len(self.current) >= self.max:
225                     self.tcond.wait()
226             th = reqthread(target=self.handle1, args=[req])
227             th.registered = False
228             th.start()
229             while not th.registered:
230                 self.tcond.wait()
231
232     def handle1(self, req):
233         try:
234             th = threading.current_thread()
235             with self.lk:
236                 self.current.add(th)
237                 th.registered = True
238                 self.tcond.notify_all()
239             try:
240                 env = req.mkenv()
241                 respobj = req.handlewsgi(env, req.startreq)
242                 respiter = iter(respobj)
243                 if not req.status:
244                     log.error("request handler returned without calling start_request")
245                     if hasattr(respiter, "close"):
246                         respiter.close()
247                     return
248                 else:
249                     self.cqueue.put((req, respiter))
250                     os.write(self.cnpipe[1], b" ")
251                     req = None
252             finally:
253                 with self.lk:
254                     self.current.remove(th)
255                     self.tcond.notify_all()
256         except closed:
257             pass
258         except:
259             log.error("exception occurred when handling request", exc_info=True)
260         finally:
261             if req is not None:
262                 req.close()
263
264     def handle2(self):
265         try:
266             rp = self.cnpipe[0]
267             current = {}
268
269             def closereq(req):
270                 respiter = current[req]
271                 try:
272                     if respiter is not None and hasattr(respiter, "close"):
273                         respiter.close()
274                 except:
275                     log.error("exception occurred when closing iterator", exc_info=True)
276                 try:
277                     req.close()
278                 except:
279                     log.error("exception occurred when closing request", exc_info=True)
280                 del current[req]
281             def ckiter(req):
282                 respiter = current[req]
283                 if respiter is not None:
284                     rem = False
285                     try:
286                         data = next(respiter)
287                     except StopIteration:
288                         rem = True
289                         try:
290                             req.flushreq()
291                         except:
292                             log.error("exception occurred when handling response data", exc_info=True)
293                     except:
294                         rem = True
295                         log.error("exception occurred when iterating response", exc_info=True)
296                     if not rem:
297                         if data:
298                             try:
299                                 req.flushreq()
300                                 req.writedata(data)
301                             except:
302                                 log.error("exception occurred when handling response data", exc_info=True)
303                                 rem = True
304                     if rem:
305                         current[req] = None
306                         try:
307                             if hasattr(respiter, "close"):
308                                 respiter.close()
309                         except:
310                             log.error("exception occurred when closing iterator", exc_info=True)
311                         respiter = None
312                 if respiter is None and not req.buffer:
313                     closereq(req)
314
315             while True:
316                 bufl = list(req for req in current.keys() if req.buffer)
317                 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
318                 if rp in rls:
319                     ret = os.read(rp, 1024)
320                     if not ret:
321                         os.close(rp)
322                         return
323                     try:
324                         while True:
325                             req, respiter = self.cqueue.get(False)
326                             current[req] = respiter
327                             ckiter(req)
328                     except queue.Empty:
329                         pass
330                 for req in wls:
331                     try:
332                         req.flush()
333                     except closed:
334                         closereq(req)
335                     except:
336                         log.error("exception occurred when writing response", exc_info=True)
337                         closereq(req)
338                     else:
339                         if len(req.buffer) < 65536:
340                             ckiter(req)
341         except:
342             log.critical("unexpected exception occurred in response handler thread", exc_info=True)
343             os.abort()
344
345     def close(self):
346         while True:
347             with self.lk:
348                 if len(self.current) > 0:
349                     th = next(iter(self.current))
350                 else:
351                     break
352             th.join()
353         os.close(self.cnpipe[1])
354         self.rthread.join()
355
356 names = {cls.cname: cls for cls in globals().values() if
357          isinstance(cls, type) and
358          issubclass(cls, handler) and
359          hasattr(cls, "cname")}
360
361 def parsehspec(spec):
362     if ":" not in spec:
363         return spec, {}
364     nm, spec = spec.split(":", 1)
365     args = {}
366     while spec:
367         if "," in spec:
368             part, spec = spec.split(",", 1)
369         else:
370             part, spec = spec, None
371         if "=" in part:
372             key, val = part.split("=", 1)
373         else:
374             key, val = part, ""
375         args[key] = val
376     return nm, args