python: Added an abort timeout for the freethread handler.
[ashd.git] / python3 / ashd / serve.py
1 import sys, os, threading, time, logging, select
2 from . import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super().__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, *, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super().__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         raise Exception()
79     def close(self):
80         pass
81
82     @classmethod
83     def parseargs(cls, **args):
84         if len(args) > 0:
85             raise ValueError("unknown handler argument: " + next(iter(args)))
86         return {}
87
88 class freethread(handler):
89     def __init__(self, *, max=None, timeout=None, **kw):
90         super().__init__(**kw)
91         self.current = set()
92         self.lk = threading.Lock()
93         self.tcond = threading.Condition(self.lk)
94         self.max = max
95         self.timeout = timeout
96
97     @classmethod
98     def parseargs(cls, *, max=None, abort=None, **args):
99         ret = super().parseargs(**args)
100         if max:
101             ret["max"] = int(max)
102         if abort:
103             ret["timeout"] = int(abort)
104         return ret
105
106     def handle(self, req):
107         with self.lk:
108             if self.max is not None:
109                 if self.timeout is not None:
110                     now = start = time.time()
111                     while len(self.current) >= self.max:
112                         self.tcond.wait(start + self.timeout - now)
113                         now = time.time()
114                         if now - start > self.timeout:
115                             os.abort()
116                 else:
117                     while len(self.current) >= self.max:
118                         self.tcond.wait()
119             th = reqthread(target=self.run, args=[req])
120             th.start()
121             while th.is_alive() and th not in self.current:
122                 self.tcond.wait()
123
124     def ckflush(self, req):
125         while len(req.buffer) > 0:
126             rls, wls, els = select.select([], [req], [req])
127             req.flush()
128
129     def run(self, req):
130         try:
131             th = threading.current_thread()
132             with self.lk:
133                 self.current.add(th)
134                 self.tcond.notify_all()
135             try:
136                 env = req.mkenv()
137                 with perf.request(env) as reqevent:
138                     respiter = req.handlewsgi(env, req.startreq)
139                     for data in respiter:
140                         req.write(data)
141                     if req.status:
142                         reqevent.response([req.status, req.headers])
143                         req.flushreq()
144                     self.ckflush(req)
145             except closed:
146                 pass
147             except:
148                 log.error("exception occurred when handling request", exc_info=True)
149             finally:
150                 with self.lk:
151                     self.current.remove(th)
152                     self.tcond.notify_all()
153         finally:
154             req.close()
155
156     def close(self):
157         while True:
158             with self.lk:
159                 if len(self.current) > 0:
160                     th = next(iter(self.current))
161                 else:
162                     return
163             th.join()
164
165 class threadpool(handler):
166     def __init__(self, *, min=0, max=20, live=300, **kw):
167         super().__init__(**kw)
168         self.current = set()
169         self.free = set()
170         self.lk = threading.RLock()
171         self.pcond = threading.Condition(self.lk)
172         self.rcond = threading.Condition(self.lk)
173         self.wreq = None
174         self.min = min
175         self.max = max
176         self.live = live
177         for i in range(self.min):
178             self.newthread()
179
180     @classmethod
181     def parseargs(cls, *, min=None, max=None, live=None, **args):
182         ret = super().parseargs(**args)
183         if min:
184             ret["min"] = int(min)
185         if max:
186             ret["max"] = int(max)
187         if live:
188             ret["live"] = int(live)
189         return ret
190
191     def newthread(self):
192         with self.lk:
193             th = reqthread(target=self.loop)
194             th.start()
195             while not th in self.current:
196                 self.pcond.wait()
197
198     def ckflush(self, req):
199         while len(req.buffer) > 0:
200             rls, wls, els = select.select([], [req], [req])
201             req.flush()
202
203     def _handle(self, req):
204         try:
205             env = req.mkenv()
206             with perf.request(env) as reqevent:
207                 respiter = req.handlewsgi(env, req.startreq)
208                 for data in respiter:
209                     req.write(data)
210                 if req.status:
211                     reqevent.response([req.status, req.headers])
212                     req.flushreq()
213                 self.ckflush(req)
214         except closed:
215             pass
216         except:
217             log.error("exception occurred when handling request", exc_info=True)
218         finally:
219             req.close()
220
221     def loop(self):
222         th = threading.current_thread()
223         with self.lk:
224             self.current.add(th)
225         try:
226             while True:
227                 with self.lk:
228                     self.free.add(th)
229                     try:
230                         self.pcond.notify_all()
231                         now = start = time.time()
232                         while self.wreq is None:
233                             self.rcond.wait(start + self.live - now)
234                             now = time.time()
235                             if now - start > self.live:
236                                 if len(self.current) > self.min:
237                                     self.current.remove(th)
238                                     return
239                                 else:
240                                     start = now
241                         req, self.wreq = self.wreq, None
242                         self.pcond.notify_all()
243                     finally:
244                         self.free.remove(th)
245                 self._handle(req)
246                 req = None
247         finally:
248             with self.lk:
249                 try:
250                     self.current.remove(th)
251                 except KeyError:
252                     pass
253                 self.pcond.notify_all()
254
255     def handle(self, req):
256         while True:
257             with self.lk:
258                 if len(self.free) < 1 and len(self.current) < self.max:
259                     self.newthread()
260                 while self.wreq is not None:
261                     self.pcond.wait()
262                 if self.wreq is None:
263                     self.wreq = req
264                     self.rcond.notify(1)
265                     return
266
267     def close(self):
268         self.live = 0
269         self.min = 0
270         with self.lk:
271             while len(self.current) > 0:
272                 self.rcond.notify_all()
273                 self.pcond.wait(1)
274
275 names = {"free": freethread,
276          "pool": threadpool}
277
278 def parsehspec(spec):
279     if ":" not in spec:
280         return spec, {}
281     nm, spec = spec.split(":", 1)
282     args = {}
283     while spec:
284         if "," in spec:
285             part, spec = spec.split(",", 1)
286         else:
287             part, spec = spec, None
288         if "=" in part:
289             key, val = part.split("=", 1)
290         else:
291             key, val = part, ""
292         args[key] = val
293     return nm, args