Merge branch 'master' into timeheap
[ashd.git] / python3 / ashd / serve.py
1 import sys, os, threading, time, logging, select, queue, collections
2 from . import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super().__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, *, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super().__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, *, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         while len(req.buffer) > 0:
79             rls, wls, els = select.select([], [req], [req])
80             req.flush()
81     def close(self):
82         pass
83
84     @classmethod
85     def parseargs(cls, **args):
86         if len(args) > 0:
87             raise ValueError("unknown handler argument: " + next(iter(args)))
88         return {}
89
90 class single(handler):
91     cname = "single"
92
93     def handle(self, req):
94         try:
95             env = req.mkenv()
96             with perf.request(env) as reqevent:
97                 respiter = req.handlewsgi(env, req.startreq)
98                 for data in respiter:
99                     req.write(data)
100                 if req.status:
101                     reqevent.response([req.status, req.headers])
102                     req.flushreq()
103                 self.ckflush(req)
104         except closed:
105             pass
106         except:
107             log.error("exception occurred when handling request", exc_info=True)
108         finally:
109             req.close()
110
111 def dbg(*a):
112     f = True
113     for o in a:
114         if not f:
115             sys.stderr.write(" ")
116         sys.stderr.write(str(a))
117         f = False
118     sys.stderr.write("\n")
119     sys.stderr.flush()
120
121 class freethread(handler):
122     cname = "free"
123
124     def __init__(self, *, max=None, timeout=None, **kw):
125         super().__init__(**kw)
126         self.current = set()
127         self.lk = threading.Lock()
128         self.tcond = threading.Condition(self.lk)
129         self.max = max
130         self.timeout = timeout
131
132     @classmethod
133     def parseargs(cls, *, max=None, abort=None, **args):
134         ret = super().parseargs(**args)
135         if max:
136             ret["max"] = int(max)
137         if abort:
138             ret["timeout"] = int(abort)
139         return ret
140
141     def handle(self, req):
142         with self.lk:
143             if self.max is not None:
144                 if self.timeout is not None:
145                     now = start = time.time()
146                     while len(self.current) >= self.max:
147                         self.tcond.wait(start + self.timeout - now)
148                         now = time.time()
149                         if now - start > self.timeout:
150                             os.abort()
151                 else:
152                     while len(self.current) >= self.max:
153                         self.tcond.wait()
154             th = reqthread(target=self.run, args=[req])
155             th.registered = False
156             th.start()
157             while not th.registered:
158                 self.tcond.wait()
159
160     def run(self, req):
161         try:
162             th = threading.current_thread()
163             with self.lk:
164                 self.current.add(th)
165                 th.registered = True
166                 self.tcond.notify_all()
167             try:
168                 env = req.mkenv()
169                 with perf.request(env) as reqevent:
170                     respiter = req.handlewsgi(env, req.startreq)
171                     for data in respiter:
172                         req.write(data)
173                     if req.status:
174                         reqevent.response([req.status, req.headers])
175                         req.flushreq()
176                     self.ckflush(req)
177             except closed:
178                 pass
179             except:
180                 log.error("exception occurred when handling request", exc_info=True)
181             finally:
182                 with self.lk:
183                     self.current.remove(th)
184                     self.tcond.notify_all()
185         finally:
186             req.close()
187
188     def close(self):
189         while True:
190             with self.lk:
191                 if len(self.current) > 0:
192                     th = next(iter(self.current))
193                 else:
194                     return
195             th.join()
196
197 class threadpool(handler):
198     cname = "pool"
199
200     def __init__(self, *, max=25, qsz=100, timeout=None, **kw):
201         super().__init__(**kw)
202         self.current = set()
203         self.clk = threading.Lock()
204         self.ccond = threading.Condition(self.clk)
205         self.queue = collections.deque()
206         self.waiting = set()
207         self.waitlimit = 5
208         self.wlstart = 0.0
209         self.qlk = threading.Lock()
210         self.qcond = threading.Condition(self.qlk)
211         self.max = max
212         self.qsz = qsz
213         self.timeout = timeout
214
215     @classmethod
216     def parseargs(cls, *, max=None, queue=None, abort=None, **args):
217         ret = super().parseargs(**args)
218         if max:
219             ret["max"] = int(max)
220         if queue:
221             ret["qsz"] = int(queue)
222         if abort:
223             ret["timeout"] = int(abort)
224         return ret
225
226     def handle(self, req):
227         spawn = False
228         with self.qlk:
229             if self.timeout is not None:
230                 now = start = time.time()
231                 while len(self.queue) >= self.qsz:
232                     self.qcond.wait(start + self.timeout - now)
233                     now = time.time()
234                     if now - start > self.timeout:
235                         os.abort()
236             else:
237                 while len(self.current) >= self.qsz:
238                     self.qcond.wait()
239             self.queue.append(req)
240             self.qcond.notify()
241             if len(self.waiting) < 1:
242                 spawn = True
243         if spawn:
244             with self.clk:
245                 if len(self.current) < self.max:
246                     th = reqthread(target=self.run)
247                     th.registered = False
248                     th.start()
249                     while not th.registered:
250                         self.ccond.wait()
251
252     def handle1(self, req):
253         try:
254             env = req.mkenv()
255             with perf.request(env) as reqevent:
256                 respiter = req.handlewsgi(env, req.startreq)
257                 for data in respiter:
258                     req.write(data)
259                 if req.status:
260                     reqevent.response([req.status, req.headers])
261                     req.flushreq()
262                 self.ckflush(req)
263         except closed:
264             pass
265         except:
266             log.error("exception occurred when handling request", exc_info=True)
267
268     def run(self):
269         timeout = 10.0
270         th = threading.current_thread()
271         with self.clk:
272             self.current.add(th)
273             th.registered = True
274             self.ccond.notify_all()
275         try:
276             while True:
277                 start = now = time.time()
278                 with self.qlk:
279                     while len(self.queue) < 1:
280                         if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout:
281                             return
282                         self.waiting.add(th)
283                         try:
284                             if len(self.waiting) == self.waitlimit:
285                                 self.wlstart = now
286                             self.qcond.wait(start + timeout - now)
287                         finally:
288                             self.waiting.remove(th)
289                         now = time.time()
290                         if now - start > timeout:
291                             return
292                     req = self.queue.popleft()
293                 try:
294                     self.handle1(req)
295                 finally:
296                     req.close()
297         finally:
298             with self.clk:
299                 self.current.remove(th)
300
301     def close(self):
302         while True:
303             with self.clk:
304                 if len(self.current) > 0:
305                     th = next(iter(self.current))
306                 else:
307                     return
308             th.join()
309
310 class resplex(handler):
311     cname = "rplex"
312
313     def __init__(self, *, max=None, **kw):
314         super().__init__(**kw)
315         self.current = set()
316         self.lk = threading.Lock()
317         self.tcond = threading.Condition(self.lk)
318         self.max = max
319         self.cqueue = queue.Queue(5)
320         self.cnpipe = os.pipe()
321         self.rthread = reqthread(name="Response thread", target=self.handle2)
322         self.rthread.start()
323
324     @classmethod
325     def parseargs(cls, *, max=None, **args):
326         ret = super().parseargs(**args)
327         if max:
328             ret["max"] = int(max)
329         return ret
330
331     def ckflush(self, req):
332         raise Exception("resplex handler does not support the write() function")
333
334     def handle(self, req):
335         with self.lk:
336             if self.max is not None:
337                 while len(self.current) >= self.max:
338                     self.tcond.wait()
339             th = reqthread(target=self.handle1, args=[req])
340             th.registered = False
341             th.start()
342             while not th.registered:
343                 self.tcond.wait()
344
345     def handle1(self, req):
346         try:
347             th = threading.current_thread()
348             with self.lk:
349                 self.current.add(th)
350                 th.registered = True
351                 self.tcond.notify_all()
352             try:
353                 env = req.mkenv()
354                 respobj = req.handlewsgi(env, req.startreq)
355                 respiter = iter(respobj)
356                 if not req.status:
357                     log.error("request handler returned without calling start_request")
358                     if hasattr(respiter, "close"):
359                         respiter.close()
360                     return
361                 else:
362                     self.cqueue.put((req, respiter))
363                     os.write(self.cnpipe[1], b" ")
364                     req = None
365             finally:
366                 with self.lk:
367                     self.current.remove(th)
368                     self.tcond.notify_all()
369         except closed:
370             pass
371         except:
372             log.error("exception occurred when handling request", exc_info=True)
373         finally:
374             if req is not None:
375                 req.close()
376
377     def handle2(self):
378         try:
379             rp = self.cnpipe[0]
380             current = {}
381
382             def closereq(req):
383                 respiter = current[req]
384                 try:
385                     if respiter is not None and hasattr(respiter, "close"):
386                         respiter.close()
387                 except:
388                     log.error("exception occurred when closing iterator", exc_info=True)
389                 try:
390                     req.close()
391                 except:
392                     log.error("exception occurred when closing request", exc_info=True)
393                 del current[req]
394             def ckiter(req):
395                 respiter = current[req]
396                 if respiter is not None:
397                     rem = False
398                     try:
399                         data = next(respiter)
400                     except StopIteration:
401                         rem = True
402                         try:
403                             req.flushreq()
404                         except:
405                             log.error("exception occurred when handling response data", exc_info=True)
406                     except:
407                         rem = True
408                         log.error("exception occurred when iterating response", exc_info=True)
409                     if not rem:
410                         if data:
411                             try:
412                                 req.flushreq()
413                                 req.writedata(data)
414                             except:
415                                 log.error("exception occurred when handling response data", exc_info=True)
416                                 rem = True
417                     if rem:
418                         current[req] = None
419                         try:
420                             if hasattr(respiter, "close"):
421                                 respiter.close()
422                         except:
423                             log.error("exception occurred when closing iterator", exc_info=True)
424                         respiter = None
425                 if respiter is None and not req.buffer:
426                     closereq(req)
427
428             while True:
429                 bufl = list(req for req in current.keys() if req.buffer)
430                 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
431                 if rp in rls:
432                     ret = os.read(rp, 1024)
433                     if not ret:
434                         os.close(rp)
435                         return
436                     try:
437                         while True:
438                             req, respiter = self.cqueue.get(False)
439                             current[req] = respiter
440                             ckiter(req)
441                     except queue.Empty:
442                         pass
443                 for req in wls:
444                     try:
445                         req.flush()
446                     except closed:
447                         closereq(req)
448                     except:
449                         log.error("exception occurred when writing response", exc_info=True)
450                         closereq(req)
451                     else:
452                         if len(req.buffer) < 65536:
453                             ckiter(req)
454         except:
455             log.critical("unexpected exception occurred in response handler thread", exc_info=True)
456             os.abort()
457
458     def close(self):
459         while True:
460             with self.lk:
461                 if len(self.current) > 0:
462                     th = next(iter(self.current))
463                 else:
464                     break
465             th.join()
466         os.close(self.cnpipe[1])
467         self.rthread.join()
468
469 names = {cls.cname: cls for cls in globals().values() if
470          isinstance(cls, type) and
471          issubclass(cls, handler) and
472          hasattr(cls, "cname")}
473
474 def parsehspec(spec):
475     if ":" not in spec:
476         return spec, {}
477     nm, spec = spec.split(":", 1)
478     args = {}
479     while spec:
480         if "," in spec:
481             part, spec = spec.split(",", 1)
482         else:
483             part, spec = spec, None
484         if "=" in part:
485             key, val = part.split("=", 1)
486         else:
487             key, val = part, ""
488         args[key] = val
489     return nm, args