Added the `ratequeue' program.
[ashd.git] / python3 / ashd / serve.py
1 import sys, os, threading, time, logging, select, queue, collections
2 from . import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super().__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, *, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super().__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, *, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         p = select.poll()
79         p.register(req, select.POLLOUT)
80         while len(req.buffer) > 0:
81             p.poll()
82             req.flush()
83     def close(self):
84         pass
85
86     @classmethod
87     def parseargs(cls, **args):
88         if len(args) > 0:
89             raise ValueError("unknown handler argument: " + next(iter(args)))
90         return {}
91
92 class single(handler):
93     cname = "single"
94
95     def handle(self, req):
96         try:
97             env = req.mkenv()
98             with perf.request(env) as reqevent:
99                 respiter = req.handlewsgi(env, req.startreq)
100                 for data in respiter:
101                     req.write(data)
102                 if req.status:
103                     reqevent.response([req.status, req.headers])
104                     req.flushreq()
105                 self.ckflush(req)
106         except closed:
107             pass
108         except:
109             log.error("exception occurred when handling request", exc_info=True)
110         finally:
111             req.close()
112
113 def dbg(*a):
114     f = True
115     for o in a:
116         if not f:
117             sys.stderr.write(" ")
118         sys.stderr.write(str(a))
119         f = False
120     sys.stderr.write("\n")
121     sys.stderr.flush()
122
123 class freethread(handler):
124     cname = "free"
125
126     def __init__(self, *, max=None, timeout=None, **kw):
127         super().__init__(**kw)
128         self.current = set()
129         self.lk = threading.Lock()
130         self.tcond = threading.Condition(self.lk)
131         self.max = max
132         self.timeout = timeout
133
134     @classmethod
135     def parseargs(cls, *, max=None, abort=None, **args):
136         ret = super().parseargs(**args)
137         if max:
138             ret["max"] = int(max)
139         if abort:
140             ret["timeout"] = int(abort)
141         return ret
142
143     def handle(self, req):
144         with self.lk:
145             if self.max is not None:
146                 if self.timeout is not None:
147                     now = start = time.time()
148                     while len(self.current) >= self.max:
149                         self.tcond.wait(start + self.timeout - now)
150                         now = time.time()
151                         if now - start > self.timeout:
152                             os.abort()
153                 else:
154                     while len(self.current) >= self.max:
155                         self.tcond.wait()
156             th = reqthread(target=self.run, args=[req])
157             th.registered = False
158             th.start()
159             while not th.registered:
160                 self.tcond.wait()
161
162     def run(self, req):
163         try:
164             th = threading.current_thread()
165             with self.lk:
166                 self.current.add(th)
167                 th.registered = True
168                 self.tcond.notify_all()
169             try:
170                 env = req.mkenv()
171                 with perf.request(env) as reqevent:
172                     respiter = req.handlewsgi(env, req.startreq)
173                     for data in respiter:
174                         req.write(data)
175                     if req.status:
176                         reqevent.response([req.status, req.headers])
177                         req.flushreq()
178                     self.ckflush(req)
179             except closed:
180                 pass
181             except:
182                 log.error("exception occurred when handling request", exc_info=True)
183             finally:
184                 with self.lk:
185                     self.current.remove(th)
186                     self.tcond.notify_all()
187         finally:
188             req.close()
189
190     def close(self):
191         while True:
192             with self.lk:
193                 if len(self.current) > 0:
194                     th = next(iter(self.current))
195                 else:
196                     return
197             th.join()
198
199 class threadpool(handler):
200     cname = "pool"
201
202     def __init__(self, *, max=25, qsz=100, timeout=None, **kw):
203         super().__init__(**kw)
204         self.current = set()
205         self.clk = threading.Lock()
206         self.ccond = threading.Condition(self.clk)
207         self.queue = collections.deque()
208         self.waiting = set()
209         self.waitlimit = 5
210         self.wlstart = 0.0
211         self.qlk = threading.Lock()
212         self.qfcond = threading.Condition(self.qlk)
213         self.qecond = threading.Condition(self.qlk)
214         self.max = max
215         self.qsz = qsz
216         self.timeout = timeout
217
218     @classmethod
219     def parseargs(cls, *, max=None, queue=None, abort=None, **args):
220         ret = super().parseargs(**args)
221         if max:
222             ret["max"] = int(max)
223         if queue:
224             ret["qsz"] = int(queue)
225         if abort:
226             ret["timeout"] = int(abort)
227         return ret
228
229     def handle(self, req):
230         spawn = False
231         with self.qlk:
232             if self.timeout is not None:
233                 now = start = time.time()
234                 while len(self.queue) >= self.qsz:
235                     self.qecond.wait(start + self.timeout - now)
236                     now = time.time()
237                     if now - start > self.timeout:
238                         os.abort()
239             else:
240                 while len(self.queue) >= self.qsz:
241                     self.qecond.wait()
242             self.queue.append(req)
243             self.qfcond.notify()
244             if len(self.waiting) < 1:
245                 spawn = True
246         if spawn:
247             with self.clk:
248                 if len(self.current) < self.max:
249                     th = reqthread(target=self.run)
250                     th.registered = False
251                     th.start()
252                     while not th.registered:
253                         self.ccond.wait()
254
255     def handle1(self, req):
256         try:
257             env = req.mkenv()
258             with perf.request(env) as reqevent:
259                 respiter = req.handlewsgi(env, req.startreq)
260                 for data in respiter:
261                     req.write(data)
262                 if req.status:
263                     reqevent.response([req.status, req.headers])
264                     req.flushreq()
265                 self.ckflush(req)
266         except closed:
267             pass
268         except:
269             log.error("exception occurred when handling request", exc_info=True)
270
271     def run(self):
272         timeout = 10.0
273         th = threading.current_thread()
274         with self.clk:
275             self.current.add(th)
276             th.registered = True
277             self.ccond.notify_all()
278         try:
279             while True:
280                 start = now = time.time()
281                 with self.qlk:
282                     while len(self.queue) < 1:
283                         if len(self.waiting) >= self.waitlimit and now - self.wlstart >= timeout:
284                             return
285                         self.waiting.add(th)
286                         try:
287                             if len(self.waiting) == self.waitlimit:
288                                 self.wlstart = now
289                             self.qfcond.wait(start + timeout - now)
290                         finally:
291                             self.waiting.remove(th)
292                         now = time.time()
293                         if now - start > timeout:
294                             return
295                     req = self.queue.popleft()
296                     self.qecond.notify()
297                 try:
298                     self.handle1(req)
299                 finally:
300                     req.close()
301         finally:
302             with self.clk:
303                 self.current.remove(th)
304
305     def close(self):
306         while True:
307             with self.clk:
308                 if len(self.current) > 0:
309                     th = next(iter(self.current))
310                 else:
311                     return
312             th.join()
313
314 class resplex(handler):
315     cname = "rplex"
316
317     def __init__(self, *, max=None, **kw):
318         super().__init__(**kw)
319         self.current = set()
320         self.lk = threading.Lock()
321         self.tcond = threading.Condition(self.lk)
322         self.max = max
323         self.cqueue = queue.Queue(5)
324         self.cnpipe = os.pipe()
325         self.rthread = reqthread(name="Response thread", target=self.handle2)
326         self.rthread.start()
327
328     @classmethod
329     def parseargs(cls, *, max=None, **args):
330         ret = super().parseargs(**args)
331         if max:
332             ret["max"] = int(max)
333         return ret
334
335     def ckflush(self, req):
336         raise Exception("resplex handler does not support the write() function")
337
338     def handle(self, req):
339         with self.lk:
340             if self.max is not None:
341                 while len(self.current) >= self.max:
342                     self.tcond.wait()
343             th = reqthread(target=self.handle1, args=[req])
344             th.registered = False
345             th.start()
346             while not th.registered:
347                 self.tcond.wait()
348
349     def handle1(self, req):
350         try:
351             th = threading.current_thread()
352             with self.lk:
353                 self.current.add(th)
354                 th.registered = True
355                 self.tcond.notify_all()
356             try:
357                 env = req.mkenv()
358                 respobj = req.handlewsgi(env, req.startreq)
359                 respiter = iter(respobj)
360                 if not req.status:
361                     log.error("request handler returned without calling start_request")
362                     if hasattr(respiter, "close"):
363                         respiter.close()
364                     return
365                 else:
366                     self.cqueue.put((req, respiter))
367                     os.write(self.cnpipe[1], b" ")
368                     req = None
369             finally:
370                 with self.lk:
371                     self.current.remove(th)
372                     self.tcond.notify_all()
373         except closed:
374             pass
375         except:
376             log.error("exception occurred when handling request", exc_info=True)
377         finally:
378             if req is not None:
379                 req.close()
380
381     def handle2(self):
382         try:
383             rp = self.cnpipe[0]
384             current = {}
385
386             def closereq(req):
387                 respiter = current[req]
388                 try:
389                     if respiter is not None and hasattr(respiter, "close"):
390                         respiter.close()
391                 except:
392                     log.error("exception occurred when closing iterator", exc_info=True)
393                 try:
394                     req.close()
395                 except:
396                     log.error("exception occurred when closing request", exc_info=True)
397                 del current[req]
398             def ckiter(req):
399                 respiter = current[req]
400                 if respiter is not None:
401                     rem = False
402                     try:
403                         data = next(respiter)
404                     except StopIteration:
405                         rem = True
406                         try:
407                             req.flushreq()
408                         except:
409                             log.error("exception occurred when handling response data", exc_info=True)
410                     except:
411                         rem = True
412                         log.error("exception occurred when iterating response", exc_info=True)
413                     if not rem:
414                         if data:
415                             try:
416                                 req.flushreq()
417                                 req.writedata(data)
418                             except:
419                                 log.error("exception occurred when handling response data", exc_info=True)
420                                 rem = True
421                     if rem:
422                         current[req] = None
423                         try:
424                             if hasattr(respiter, "close"):
425                                 respiter.close()
426                         except:
427                             log.error("exception occurred when closing iterator", exc_info=True)
428                         respiter = None
429                 if respiter is None and not req.buffer:
430                     closereq(req)
431
432             while True:
433                 bufl = list(req for req in current.keys() if req.buffer)
434                 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
435                 if rp in rls:
436                     ret = os.read(rp, 1024)
437                     if not ret:
438                         os.close(rp)
439                         return
440                     try:
441                         while True:
442                             req, respiter = self.cqueue.get(False)
443                             current[req] = respiter
444                             ckiter(req)
445                     except queue.Empty:
446                         pass
447                 for req in wls:
448                     try:
449                         req.flush()
450                     except closed:
451                         closereq(req)
452                     except:
453                         log.error("exception occurred when writing response", exc_info=True)
454                         closereq(req)
455                     else:
456                         if len(req.buffer) < 65536:
457                             ckiter(req)
458         except:
459             log.critical("unexpected exception occurred in response handler thread", exc_info=True)
460             os.abort()
461
462     def close(self):
463         while True:
464             with self.lk:
465                 if len(self.current) > 0:
466                     th = next(iter(self.current))
467                 else:
468                     break
469             th.join()
470         os.close(self.cnpipe[1])
471         self.rthread.join()
472
473 names = {cls.cname: cls for cls in globals().values() if
474          isinstance(cls, type) and
475          issubclass(cls, handler) and
476          hasattr(cls, "cname")}
477
478 def parsehspec(spec):
479     if ":" not in spec:
480         return spec, {}
481     nm, spec = spec.split(":", 1)
482     args = {}
483     while spec:
484         if "," in spec:
485             part, spec = spec.split(",", 1)
486         else:
487             part, spec = spec, None
488         if "=" in part:
489             key, val = part.split("=", 1)
490         else:
491             key, val = part, ""
492         args[key] = val
493     return nm, args