python3: Added a pooled thread handler.
[ashd.git] / python3 / ashd / serve.py
1 import sys, os, threading, time, logging, select, queue, collections
2 from . import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super().__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, *, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super().__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, *, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         while len(req.buffer) > 0:
79             rls, wls, els = select.select([], [req], [req])
80             req.flush()
81     def close(self):
82         pass
83
84     @classmethod
85     def parseargs(cls, **args):
86         if len(args) > 0:
87             raise ValueError("unknown handler argument: " + next(iter(args)))
88         return {}
89
90 class single(handler):
91     cname = "single"
92
93     def handle(self, req):
94         try:
95             env = req.mkenv()
96             with perf.request(env) as reqevent:
97                 respiter = req.handlewsgi(env, req.startreq)
98                 for data in respiter:
99                     req.write(data)
100                 if req.status:
101                     reqevent.response([req.status, req.headers])
102                     req.flushreq()
103                 self.ckflush(req)
104         except closed:
105             pass
106         except:
107             log.error("exception occurred when handling request", exc_info=True)
108         finally:
109             req.close()
110
111 def dbg(*a):
112     f = True
113     for o in a:
114         if not f:
115             sys.stderr.write(" ")
116         sys.stderr.write(str(a))
117         f = False
118     sys.stderr.write("\n")
119     sys.stderr.flush()
120
121 class freethread(handler):
122     cname = "free"
123
124     def __init__(self, *, max=None, timeout=None, **kw):
125         super().__init__(**kw)
126         self.current = set()
127         self.lk = threading.Lock()
128         self.tcond = threading.Condition(self.lk)
129         self.max = max
130         self.timeout = timeout
131
132     @classmethod
133     def parseargs(cls, *, max=None, abort=None, **args):
134         ret = super().parseargs(**args)
135         if max:
136             ret["max"] = int(max)
137         if abort:
138             ret["timeout"] = int(abort)
139         return ret
140
141     def handle(self, req):
142         with self.lk:
143             if self.max is not None:
144                 if self.timeout is not None:
145                     now = start = time.time()
146                     while len(self.current) >= self.max:
147                         self.tcond.wait(start + self.timeout - now)
148                         now = time.time()
149                         if now - start > self.timeout:
150                             os.abort()
151                 else:
152                     while len(self.current) >= self.max:
153                         self.tcond.wait()
154             th = reqthread(target=self.run, args=[req])
155             th.registered = False
156             th.start()
157             while not th.registered:
158                 self.tcond.wait()
159
160     def run(self, req):
161         try:
162             th = threading.current_thread()
163             with self.lk:
164                 self.current.add(th)
165                 th.registered = True
166                 self.tcond.notify_all()
167             try:
168                 env = req.mkenv()
169                 with perf.request(env) as reqevent:
170                     respiter = req.handlewsgi(env, req.startreq)
171                     for data in respiter:
172                         req.write(data)
173                     if req.status:
174                         reqevent.response([req.status, req.headers])
175                         req.flushreq()
176                     self.ckflush(req)
177             except closed:
178                 pass
179             except:
180                 log.error("exception occurred when handling request", exc_info=True)
181             finally:
182                 with self.lk:
183                     self.current.remove(th)
184                     self.tcond.notify_all()
185         finally:
186             req.close()
187
188     def close(self):
189         while True:
190             with self.lk:
191                 if len(self.current) > 0:
192                     th = next(iter(self.current))
193                 else:
194                     return
195             th.join()
196
197 class threadpool(handler):
198     cname = "pool"
199
200     def __init__(self, *, max=25, qsz=100, timeout=None, **kw):
201         super().__init__(**kw)
202         self.current = set()
203         self.clk = threading.Lock()
204         self.ccond = threading.Condition(self.clk)
205         self.queue = collections.deque()
206         self.waiting = set()
207         self.qlk = threading.Lock()
208         self.qcond = threading.Condition(self.qlk)
209         self.max = max
210         self.qsz = qsz
211         self.timeout = timeout
212
213     @classmethod
214     def parseargs(cls, *, max=None, queue=None, abort=None, **args):
215         ret = super().parseargs(**args)
216         if max:
217             ret["max"] = int(max)
218         if queue:
219             ret["qsz"] = int(queue)
220         if abort:
221             ret["timeout"] = int(abort)
222         return ret
223
224     def handle(self, req):
225         start = False
226         with self.qlk:
227             if self.timeout is not None:
228                 now = start = time.time()
229                 while len(self.queue) >= self.qsz:
230                     self.qcond.wait(start + self.timeout - now)
231                     now = time.time()
232                     if now - start > self.timeout:
233                         os.abort()
234             else:
235                 while len(self.current) >= self.qsz:
236                     self.qcond.wait()
237             self.queue.append(req)
238             self.qcond.notify()
239             if len(self.waiting) < 1:
240                 start = True
241         if start:
242             with self.clk:
243                 if len(self.current) < self.max:
244                     th = reqthread(target=self.run)
245                     th.registered = False
246                     th.start()
247                     while not th.registered:
248                         self.ccond.wait()
249
250     def handle1(self, req):
251         try:
252             env = req.mkenv()
253             with perf.request(env) as reqevent:
254                 respiter = req.handlewsgi(env, req.startreq)
255                 for data in respiter:
256                     req.write(data)
257                 if req.status:
258                     reqevent.response([req.status, req.headers])
259                     req.flushreq()
260                 self.ckflush(req)
261         except closed:
262             pass
263         except:
264             log.error("exception occurred when handling request", exc_info=True)
265
266     def run(self):
267         timeout = 10.0
268         th = threading.current_thread()
269         with self.clk:
270             self.current.add(th)
271             th.registered = True
272             self.ccond.notify_all()
273         try:
274             while True:
275                 start = now = time.time()
276                 with self.qlk:
277                     while len(self.queue) < 1:
278                         self.waiting.add(th)
279                         self.qcond.wait(start + timeout - now)
280                         self.waiting.remove(th)
281                         now = time.time()
282                         if now - start > timeout:
283                             return
284                     req = self.queue.popleft()
285                 try:
286                     self.handle1(req)
287                 finally:
288                     req.close()
289         finally:
290             with self.clk:
291                 self.current.remove(th)
292
293     def close(self):
294         while True:
295             with self.lk:
296                 if len(self.current) > 0:
297                     th = next(iter(self.current))
298                 else:
299                     return
300             th.join()
301
302 class resplex(handler):
303     cname = "rplex"
304
305     def __init__(self, *, max=None, **kw):
306         super().__init__(**kw)
307         self.current = set()
308         self.lk = threading.Lock()
309         self.tcond = threading.Condition(self.lk)
310         self.max = max
311         self.cqueue = queue.Queue(5)
312         self.cnpipe = os.pipe()
313         self.rthread = reqthread(name="Response thread", target=self.handle2)
314         self.rthread.start()
315
316     @classmethod
317     def parseargs(cls, *, max=None, **args):
318         ret = super().parseargs(**args)
319         if max:
320             ret["max"] = int(max)
321         return ret
322
323     def ckflush(self, req):
324         raise Exception("resplex handler does not support the write() function")
325
326     def handle(self, req):
327         with self.lk:
328             if self.max is not None:
329                 while len(self.current) >= self.max:
330                     self.tcond.wait()
331             th = reqthread(target=self.handle1, args=[req])
332             th.registered = False
333             th.start()
334             while not th.registered:
335                 self.tcond.wait()
336
337     def handle1(self, req):
338         try:
339             th = threading.current_thread()
340             with self.lk:
341                 self.current.add(th)
342                 th.registered = True
343                 self.tcond.notify_all()
344             try:
345                 env = req.mkenv()
346                 respobj = req.handlewsgi(env, req.startreq)
347                 respiter = iter(respobj)
348                 if not req.status:
349                     log.error("request handler returned without calling start_request")
350                     if hasattr(respiter, "close"):
351                         respiter.close()
352                     return
353                 else:
354                     self.cqueue.put((req, respiter))
355                     os.write(self.cnpipe[1], b" ")
356                     req = None
357             finally:
358                 with self.lk:
359                     self.current.remove(th)
360                     self.tcond.notify_all()
361         except closed:
362             pass
363         except:
364             log.error("exception occurred when handling request", exc_info=True)
365         finally:
366             if req is not None:
367                 req.close()
368
369     def handle2(self):
370         try:
371             rp = self.cnpipe[0]
372             current = {}
373
374             def closereq(req):
375                 respiter = current[req]
376                 try:
377                     if respiter is not None and hasattr(respiter, "close"):
378                         respiter.close()
379                 except:
380                     log.error("exception occurred when closing iterator", exc_info=True)
381                 try:
382                     req.close()
383                 except:
384                     log.error("exception occurred when closing request", exc_info=True)
385                 del current[req]
386             def ckiter(req):
387                 respiter = current[req]
388                 if respiter is not None:
389                     rem = False
390                     try:
391                         data = next(respiter)
392                     except StopIteration:
393                         rem = True
394                         try:
395                             req.flushreq()
396                         except:
397                             log.error("exception occurred when handling response data", exc_info=True)
398                     except:
399                         rem = True
400                         log.error("exception occurred when iterating response", exc_info=True)
401                     if not rem:
402                         if data:
403                             try:
404                                 req.flushreq()
405                                 req.writedata(data)
406                             except:
407                                 log.error("exception occurred when handling response data", exc_info=True)
408                                 rem = True
409                     if rem:
410                         current[req] = None
411                         try:
412                             if hasattr(respiter, "close"):
413                                 respiter.close()
414                         except:
415                             log.error("exception occurred when closing iterator", exc_info=True)
416                         respiter = None
417                 if respiter is None and not req.buffer:
418                     closereq(req)
419
420             while True:
421                 bufl = list(req for req in current.keys() if req.buffer)
422                 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
423                 if rp in rls:
424                     ret = os.read(rp, 1024)
425                     if not ret:
426                         os.close(rp)
427                         return
428                     try:
429                         while True:
430                             req, respiter = self.cqueue.get(False)
431                             current[req] = respiter
432                             ckiter(req)
433                     except queue.Empty:
434                         pass
435                 for req in wls:
436                     try:
437                         req.flush()
438                     except closed:
439                         closereq(req)
440                     except:
441                         log.error("exception occurred when writing response", exc_info=True)
442                         closereq(req)
443                     else:
444                         if len(req.buffer) < 65536:
445                             ckiter(req)
446         except:
447             log.critical("unexpected exception occurred in response handler thread", exc_info=True)
448             os.abort()
449
450     def close(self):
451         while True:
452             with self.lk:
453                 if len(self.current) > 0:
454                     th = next(iter(self.current))
455                 else:
456                     break
457             th.join()
458         os.close(self.cnpipe[1])
459         self.rthread.join()
460
461 names = {cls.cname: cls for cls in globals().values() if
462          isinstance(cls, type) and
463          issubclass(cls, handler) and
464          hasattr(cls, "cname")}
465
466 def parsehspec(spec):
467     if ":" not in spec:
468         return spec, {}
469     nm, spec = spec.split(":", 1)
470     args = {}
471     while spec:
472         if "," in spec:
473             part, spec = spec.split(",", 1)
474         else:
475             part, spec = spec, None
476         if "=" in part:
477             key, val = part.split("=", 1)
478         else:
479             key, val = part, ""
480         args[key] = val
481     return nm, args