python: Added request limiter to rplex handler.
[ashd.git] / python3 / ashd / serve.py
1 import sys, os, threading, time, logging, select, queue
2 from . import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super().__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, *, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super().__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         while len(req.buffer) > 0:
79             rls, wls, els = select.select([], [req], [req])
80             req.flush()
81     def close(self):
82         pass
83
84     @classmethod
85     def parseargs(cls, **args):
86         if len(args) > 0:
87             raise ValueError("unknown handler argument: " + next(iter(args)))
88         return {}
89
90 class single(handler):
91     def handle(self, req):
92         try:
93             env = req.mkenv()
94             with perf.request(env) as reqevent:
95                 respiter = req.handlewsgi(env, req.startreq)
96                 for data in respiter:
97                     req.write(data)
98                 if req.status:
99                     reqevent.response([req.status, req.headers])
100                     req.flushreq()
101                 self.ckflush(req)
102         except closed:
103             pass
104         except:
105             log.error("exception occurred when handling request", exc_info=True)
106         finally:
107             req.close()
108
109 class freethread(handler):
110     def __init__(self, *, max=None, timeout=None, **kw):
111         super().__init__(**kw)
112         self.current = set()
113         self.lk = threading.Lock()
114         self.tcond = threading.Condition(self.lk)
115         self.max = max
116         self.timeout = timeout
117
118     @classmethod
119     def parseargs(cls, *, max=None, abort=None, **args):
120         ret = super().parseargs(**args)
121         if max:
122             ret["max"] = int(max)
123         if abort:
124             ret["timeout"] = int(abort)
125         return ret
126
127     def handle(self, req):
128         with self.lk:
129             if self.max is not None:
130                 if self.timeout is not None:
131                     now = start = time.time()
132                     while len(self.current) >= self.max:
133                         self.tcond.wait(start + self.timeout - now)
134                         now = time.time()
135                         if now - start > self.timeout:
136                             os.abort()
137                 else:
138                     while len(self.current) >= self.max:
139                         self.tcond.wait()
140             th = reqthread(target=self.run, args=[req])
141             th.start()
142             while th.is_alive() and th not in self.current:
143                 self.tcond.wait()
144
145     def run(self, req):
146         try:
147             th = threading.current_thread()
148             with self.lk:
149                 self.current.add(th)
150                 self.tcond.notify_all()
151             try:
152                 env = req.mkenv()
153                 with perf.request(env) as reqevent:
154                     respiter = req.handlewsgi(env, req.startreq)
155                     for data in respiter:
156                         req.write(data)
157                     if req.status:
158                         reqevent.response([req.status, req.headers])
159                         req.flushreq()
160                     self.ckflush(req)
161             except closed:
162                 pass
163             except:
164                 log.error("exception occurred when handling request", exc_info=True)
165             finally:
166                 with self.lk:
167                     self.current.remove(th)
168                     self.tcond.notify_all()
169         finally:
170             req.close()
171
172     def close(self):
173         while True:
174             with self.lk:
175                 if len(self.current) > 0:
176                     th = next(iter(self.current))
177                 else:
178                     return
179             th.join()
180
181 class threadpool(handler):
182     def __init__(self, *, min=0, max=20, live=300, **kw):
183         super().__init__(**kw)
184         self.current = set()
185         self.free = set()
186         self.lk = threading.RLock()
187         self.pcond = threading.Condition(self.lk)
188         self.rcond = threading.Condition(self.lk)
189         self.wreq = None
190         self.min = min
191         self.max = max
192         self.live = live
193         for i in range(self.min):
194             self.newthread()
195
196     @classmethod
197     def parseargs(cls, *, min=None, max=None, live=None, **args):
198         ret = super().parseargs(**args)
199         if min:
200             ret["min"] = int(min)
201         if max:
202             ret["max"] = int(max)
203         if live:
204             ret["live"] = int(live)
205         return ret
206
207     def newthread(self):
208         with self.lk:
209             th = reqthread(target=self.loop)
210             th.start()
211             while not th in self.current:
212                 self.pcond.wait()
213
214     def _handle(self, req):
215         try:
216             env = req.mkenv()
217             with perf.request(env) as reqevent:
218                 respiter = req.handlewsgi(env, req.startreq)
219                 for data in respiter:
220                     req.write(data)
221                 if req.status:
222                     reqevent.response([req.status, req.headers])
223                     req.flushreq()
224                 self.ckflush(req)
225         except closed:
226             pass
227         except:
228             log.error("exception occurred when handling request", exc_info=True)
229         finally:
230             req.close()
231
232     def loop(self):
233         th = threading.current_thread()
234         with self.lk:
235             self.current.add(th)
236         try:
237             while True:
238                 with self.lk:
239                     self.free.add(th)
240                     try:
241                         self.pcond.notify_all()
242                         now = start = time.time()
243                         while self.wreq is None:
244                             self.rcond.wait(start + self.live - now)
245                             now = time.time()
246                             if now - start > self.live:
247                                 if len(self.current) > self.min:
248                                     self.current.remove(th)
249                                     return
250                                 else:
251                                     start = now
252                         req, self.wreq = self.wreq, None
253                         self.pcond.notify_all()
254                     finally:
255                         self.free.remove(th)
256                 self._handle(req)
257                 req = None
258         finally:
259             with self.lk:
260                 try:
261                     self.current.remove(th)
262                 except KeyError:
263                     pass
264                 self.pcond.notify_all()
265
266     def handle(self, req):
267         while True:
268             with self.lk:
269                 if len(self.free) < 1 and len(self.current) < self.max:
270                     self.newthread()
271                 while self.wreq is not None:
272                     self.pcond.wait()
273                 if self.wreq is None:
274                     self.wreq = req
275                     self.rcond.notify(1)
276                     return
277
278     def close(self):
279         self.live = 0
280         self.min = 0
281         with self.lk:
282             while len(self.current) > 0:
283                 self.rcond.notify_all()
284                 self.pcond.wait(1)
285
286 class resplex(handler):
287     def __init__(self, *, max=None, **kw):
288         super().__init__(**kw)
289         self.current = set()
290         self.lk = threading.Lock()
291         self.tcond = threading.Condition(self.lk)
292         self.max = max
293         self.cqueue = queue.Queue(5)
294         self.cnpipe = os.pipe()
295         self.rthread = reqthread(name="Response thread", target=self.handle2)
296         self.rthread.start()
297
298     @classmethod
299     def parseargs(cls, *, max=None, **args):
300         ret = super().parseargs(**args)
301         if max:
302             ret["max"] = int(max)
303         return ret
304
305     def ckflush(self, req):
306         raise Exception("resplex handler does not support the write() function")
307
308     def handle(self, req):
309         with self.lk:
310             if self.max is not None:
311                 while len(self.current) >= self.max:
312                     self.tcond.wait()
313             th = reqthread(target=self.handle1, args=[req])
314             th.start()
315             while th.is_alive() and th not in self.current:
316                 self.tcond.wait()
317
318     def handle1(self, req):
319         try:
320             th = threading.current_thread()
321             with self.lk:
322                 self.current.add(th)
323                 self.tcond.notify_all()
324             try:
325                 env = req.mkenv()
326                 respobj = req.handlewsgi(env, req.startreq)
327                 respiter = iter(respobj)
328                 if not req.status:
329                     log.error("request handler returned without calling start_request")
330                     if hasattr(respiter, "close"):
331                         respiter.close()
332                     return
333                 else:
334                     self.cqueue.put((req, respiter))
335                     os.write(self.cnpipe[1], b" ")
336                     req = None
337             finally:
338                 with self.lk:
339                     self.current.remove(th)
340                     self.tcond.notify_all()
341         except closed:
342             pass
343         except:
344             log.error("exception occurred when handling request", exc_info=True)
345         finally:
346             if req is not None:
347                 req.close()
348
349     def handle2(self):
350         try:
351             rp = self.cnpipe[0]
352             current = {}
353
354             def closereq(req):
355                 respiter = current[req]
356                 try:
357                     if respiter is not None and hasattr(respiter, "close"):
358                         respiter.close()
359                 except:
360                     log.error("exception occurred when closing iterator", exc_info=True)
361                 try:
362                     req.close()
363                 except:
364                     log.error("exception occurred when closing request", exc_info=True)
365                 del current[req]
366             def ckiter(req):
367                 respiter = current[req]
368                 if respiter is not None:
369                     rem = False
370                     try:
371                         data = next(respiter)
372                     except StopIteration:
373                         rem = True
374                         req.flushreq()
375                     except:
376                         rem = True
377                         log.error("exception occurred when iterating response", exc_info=True)
378                     if not rem:
379                         if data:
380                             req.flushreq()
381                             req.writedata(data)
382                     else:
383                         current[req] = None
384                         try:
385                             if hasattr(respiter, "close"):
386                                 respiter.close()
387                         except:
388                             log.error("exception occurred when closing iterator", exc_info=True)
389                         respiter = None
390                 if respiter is None and not req.buffer:
391                     closereq(req)
392
393             while True:
394                 bufl = list(req for req in current.keys() if req.buffer)
395                 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
396                 if rp in rls:
397                     ret = os.read(rp, 1024)
398                     if not ret:
399                         os.close(rp)
400                         return
401                     try:
402                         while True:
403                             req, respiter = self.cqueue.get(False)
404                             current[req] = respiter
405                             ckiter(req)
406                     except queue.Empty:
407                         pass
408                 for req in wls:
409                     try:
410                         req.flush()
411                     except closed:
412                         closereq(req)
413                     except:
414                         log.error("exception occurred when writing response", exc_info=True)
415                         closereq(req)
416                     else:
417                         if len(req.buffer) < 65536:
418                             ckiter(req)
419         except:
420             log.critical("unexpected exception occurred in response handler thread", exc_info=True)
421             os.abort()
422
423     def close(self):
424         while True:
425             with self.lk:
426                 if len(self.current) > 0:
427                     th = next(iter(self.current))
428                 else:
429                     break
430             th.join()
431         os.close(self.cnpipe[1])
432         self.rthread.join()
433
434 names = {"single": single,
435          "free": freethread,
436          "pool": threadpool,
437          "rplex": resplex}
438
439 def parsehspec(spec):
440     if ":" not in spec:
441         return spec, {}
442     nm, spec = spec.split(":", 1)
443     args = {}
444     while spec:
445         if "," in spec:
446             part, spec = spec.split(",", 1)
447         else:
448             part, spec = spec, None
449         if "=" in part:
450             key, val = part.split("=", 1)
451         else:
452             key, val = part, ""
453         args[key] = val
454     return nm, args