callcgi: Fixed possible deadlock problem on aborted requests.
[ashd.git] / python3 / ashd / serve.py
1 import sys, os, threading, time, logging, select, queue, collections
2 from . import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super().__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, *, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super().__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, *, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         while len(req.buffer) > 0:
79             rls, wls, els = select.select([], [req], [req])
80             req.flush()
81     def close(self):
82         pass
83
84     @classmethod
85     def parseargs(cls, **args):
86         if len(args) > 0:
87             raise ValueError("unknown handler argument: " + next(iter(args)))
88         return {}
89
90 class single(handler):
91     cname = "single"
92
93     def handle(self, req):
94         try:
95             env = req.mkenv()
96             with perf.request(env) as reqevent:
97                 respiter = req.handlewsgi(env, req.startreq)
98                 for data in respiter:
99                     req.write(data)
100                 if req.status:
101                     reqevent.response([req.status, req.headers])
102                     req.flushreq()
103                 self.ckflush(req)
104         except closed:
105             pass
106         except:
107             log.error("exception occurred when handling request", exc_info=True)
108         finally:
109             req.close()
110
111 def dbg(*a):
112     f = True
113     for o in a:
114         if not f:
115             sys.stderr.write(" ")
116         sys.stderr.write(str(a))
117         f = False
118     sys.stderr.write("\n")
119     sys.stderr.flush()
120
121 class freethread(handler):
122     cname = "free"
123
124     def __init__(self, *, max=None, timeout=None, **kw):
125         super().__init__(**kw)
126         self.current = set()
127         self.lk = threading.Lock()
128         self.tcond = threading.Condition(self.lk)
129         self.max = max
130         self.timeout = timeout
131
132     @classmethod
133     def parseargs(cls, *, max=None, abort=None, **args):
134         ret = super().parseargs(**args)
135         if max:
136             ret["max"] = int(max)
137         if abort:
138             ret["timeout"] = int(abort)
139         return ret
140
141     def handle(self, req):
142         with self.lk:
143             if self.max is not None:
144                 if self.timeout is not None:
145                     now = start = time.time()
146                     while len(self.current) >= self.max:
147                         self.tcond.wait(start + self.timeout - now)
148                         now = time.time()
149                         if now - start > self.timeout:
150                             os.abort()
151                 else:
152                     while len(self.current) >= self.max:
153                         self.tcond.wait()
154             th = reqthread(target=self.run, args=[req])
155             th.registered = False
156             th.start()
157             while not th.registered:
158                 self.tcond.wait()
159
160     def run(self, req):
161         try:
162             th = threading.current_thread()
163             with self.lk:
164                 self.current.add(th)
165                 th.registered = True
166                 self.tcond.notify_all()
167             try:
168                 env = req.mkenv()
169                 with perf.request(env) as reqevent:
170                     respiter = req.handlewsgi(env, req.startreq)
171                     for data in respiter:
172                         req.write(data)
173                     if req.status:
174                         reqevent.response([req.status, req.headers])
175                         req.flushreq()
176                     self.ckflush(req)
177             except closed:
178                 pass
179             except:
180                 log.error("exception occurred when handling request", exc_info=True)
181             finally:
182                 with self.lk:
183                     self.current.remove(th)
184                     self.tcond.notify_all()
185         finally:
186             req.close()
187
188     def close(self):
189         while True:
190             with self.lk:
191                 if len(self.current) > 0:
192                     th = next(iter(self.current))
193                 else:
194                     return
195             th.join()
196
197 class threadpool(handler):
198     cname = "pool"
199
200     def __init__(self, *, max=25, qsz=100, timeout=None, **kw):
201         super().__init__(**kw)
202         self.current = set()
203         self.clk = threading.Lock()
204         self.ccond = threading.Condition(self.clk)
205         self.queue = collections.deque()
206         self.waiting = set()
207         self.waitlimit = 5
208         self.wlstart = 0.0
209         self.qlk = threading.Lock()
210         self.qfcond = threading.Condition(self.qlk)
211         self.qecond = threading.Condition(self.qlk)
212         self.max = max
213         self.qsz = qsz
214         self.timeout = timeout
215
216     @classmethod
217     def parseargs(cls, *, max=None, queue=None, abort=None, **args):
218         ret = super().parseargs(**args)
219         if max:
220             ret["max"] = int(max)
221         if queue:
222             ret["qsz"] = int(queue)
223         if abort:
224             ret["timeout"] = int(abort)
225         return ret
226
227     def handle(self, req):
228         spawn = False
229         with self.qlk:
230             if self.timeout is not None:
231                 now = start = time.time()
232                 while len(self.queue) >= self.qsz:
233                     self.qecond.wait(start + self.timeout - now)
234                     now = time.time()
235                     if now - start > self.timeout:
236                         os.abort()
237             else:
238                 while len(self.queue) >= self.qsz:
239                     self.qecond.wait()
240             self.queue.append(req)
241             self.qfcond.notify()
242             if len(self.waiting) < 1:
243                 spawn = True
244         if spawn:
245             with self.clk:
246                 if len(self.current) < self.max:
247                     th = reqthread(target=self.run)
248                     th.registered = False
249                     th.start()
250                     while not th.registered:
251                         self.ccond.wait()
252
253     def handle1(self, req):
254         try:
255             env = req.mkenv()
256             with perf.request(env) as reqevent:
257                 respiter = req.handlewsgi(env, req.startreq)
258                 for data in respiter:
259                     req.write(data)
260                 if req.status:
261                     reqevent.response([req.status, req.headers])
262                     req.flushreq()
263                 self.ckflush(req)
264         except closed:
265             pass
266         except:
267             log.error("exception occurred when handling request", exc_info=True)
268
269     def run(self):
270         timeout = 10.0
271         th = threading.current_thread()
272         with self.clk:
273             self.current.add(th)
274             th.registered = True
275             self.ccond.notify_all()
276         try:
277             while True:
278                 start = now = time.time()
279                 with self.qlk:
280                     while len(self.queue) < 1:
281                         if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout:
282                             return
283                         self.waiting.add(th)
284                         try:
285                             if len(self.waiting) == self.waitlimit:
286                                 self.wlstart = now
287                             self.qfcond.wait(start + timeout - now)
288                         finally:
289                             self.waiting.remove(th)
290                         now = time.time()
291                         if now - start > timeout:
292                             return
293                     req = self.queue.popleft()
294                     self.qecond.notify()
295                 try:
296                     self.handle1(req)
297                 finally:
298                     req.close()
299         finally:
300             with self.clk:
301                 self.current.remove(th)
302
303     def close(self):
304         while True:
305             with self.clk:
306                 if len(self.current) > 0:
307                     th = next(iter(self.current))
308                 else:
309                     return
310             th.join()
311
312 class resplex(handler):
313     cname = "rplex"
314
315     def __init__(self, *, max=None, **kw):
316         super().__init__(**kw)
317         self.current = set()
318         self.lk = threading.Lock()
319         self.tcond = threading.Condition(self.lk)
320         self.max = max
321         self.cqueue = queue.Queue(5)
322         self.cnpipe = os.pipe()
323         self.rthread = reqthread(name="Response thread", target=self.handle2)
324         self.rthread.start()
325
326     @classmethod
327     def parseargs(cls, *, max=None, **args):
328         ret = super().parseargs(**args)
329         if max:
330             ret["max"] = int(max)
331         return ret
332
333     def ckflush(self, req):
334         raise Exception("resplex handler does not support the write() function")
335
336     def handle(self, req):
337         with self.lk:
338             if self.max is not None:
339                 while len(self.current) >= self.max:
340                     self.tcond.wait()
341             th = reqthread(target=self.handle1, args=[req])
342             th.registered = False
343             th.start()
344             while not th.registered:
345                 self.tcond.wait()
346
347     def handle1(self, req):
348         try:
349             th = threading.current_thread()
350             with self.lk:
351                 self.current.add(th)
352                 th.registered = True
353                 self.tcond.notify_all()
354             try:
355                 env = req.mkenv()
356                 respobj = req.handlewsgi(env, req.startreq)
357                 respiter = iter(respobj)
358                 if not req.status:
359                     log.error("request handler returned without calling start_request")
360                     if hasattr(respiter, "close"):
361                         respiter.close()
362                     return
363                 else:
364                     self.cqueue.put((req, respiter))
365                     os.write(self.cnpipe[1], b" ")
366                     req = None
367             finally:
368                 with self.lk:
369                     self.current.remove(th)
370                     self.tcond.notify_all()
371         except closed:
372             pass
373         except:
374             log.error("exception occurred when handling request", exc_info=True)
375         finally:
376             if req is not None:
377                 req.close()
378
379     def handle2(self):
380         try:
381             rp = self.cnpipe[0]
382             current = {}
383
384             def closereq(req):
385                 respiter = current[req]
386                 try:
387                     if respiter is not None and hasattr(respiter, "close"):
388                         respiter.close()
389                 except:
390                     log.error("exception occurred when closing iterator", exc_info=True)
391                 try:
392                     req.close()
393                 except:
394                     log.error("exception occurred when closing request", exc_info=True)
395                 del current[req]
396             def ckiter(req):
397                 respiter = current[req]
398                 if respiter is not None:
399                     rem = False
400                     try:
401                         data = next(respiter)
402                     except StopIteration:
403                         rem = True
404                         try:
405                             req.flushreq()
406                         except:
407                             log.error("exception occurred when handling response data", exc_info=True)
408                     except:
409                         rem = True
410                         log.error("exception occurred when iterating response", exc_info=True)
411                     if not rem:
412                         if data:
413                             try:
414                                 req.flushreq()
415                                 req.writedata(data)
416                             except:
417                                 log.error("exception occurred when handling response data", exc_info=True)
418                                 rem = True
419                     if rem:
420                         current[req] = None
421                         try:
422                             if hasattr(respiter, "close"):
423                                 respiter.close()
424                         except:
425                             log.error("exception occurred when closing iterator", exc_info=True)
426                         respiter = None
427                 if respiter is None and not req.buffer:
428                     closereq(req)
429
430             while True:
431                 bufl = list(req for req in current.keys() if req.buffer)
432                 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
433                 if rp in rls:
434                     ret = os.read(rp, 1024)
435                     if not ret:
436                         os.close(rp)
437                         return
438                     try:
439                         while True:
440                             req, respiter = self.cqueue.get(False)
441                             current[req] = respiter
442                             ckiter(req)
443                     except queue.Empty:
444                         pass
445                 for req in wls:
446                     try:
447                         req.flush()
448                     except closed:
449                         closereq(req)
450                     except:
451                         log.error("exception occurred when writing response", exc_info=True)
452                         closereq(req)
453                     else:
454                         if len(req.buffer) < 65536:
455                             ckiter(req)
456         except:
457             log.critical("unexpected exception occurred in response handler thread", exc_info=True)
458             os.abort()
459
460     def close(self):
461         while True:
462             with self.lk:
463                 if len(self.current) > 0:
464                     th = next(iter(self.current))
465                 else:
466                     break
467             th.join()
468         os.close(self.cnpipe[1])
469         self.rthread.join()
470
471 names = {cls.cname: cls for cls in globals().values() if
472          isinstance(cls, type) and
473          issubclass(cls, handler) and
474          hasattr(cls, "cname")}
475
476 def parsehspec(spec):
477     if ":" not in spec:
478         return spec, {}
479     nm, spec = spec.split(":", 1)
480     args = {}
481     while spec:
482         if "," in spec:
483             part, spec = spec.split(",", 1)
484         else:
485             part, spec = spec, None
486         if "=" in part:
487             key, val = part.split("=", 1)
488         else:
489             key, val = part, ""
490         args[key] = val
491     return nm, args