python: Fixed the real thread-starting problem.
[ashd.git] / python3 / ashd / serve.py
... / ...
CommitLineData
1import sys, os, threading, time, logging, select, queue
2from . import perf
3
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
7
8def reqseq():
9 global seq
10 with seqlk:
11 s = seq
12 seq += 1
13 return s
14
15class closed(IOError):
16 def __init__(self):
17 super().__init__("The client has closed the connection.")
18
19class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
24
25class wsgirequest(object):
26 def __init__(self, *, handler):
27 self.status = None
28 self.headers = []
29 self.respsent = False
30 self.handler = handler
31 self.buffer = bytearray()
32
33 def handlewsgi(self):
34 raise Exception()
35 def fileno(self):
36 raise Exception()
37 def writehead(self, status, headers):
38 raise Exception()
39 def flush(self):
40 raise Exception()
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
45
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
52
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
59
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
62 if exc_info:
63 try:
64 if self.respsent:
65 raise exc_info[1]
66 finally:
67 exc_info = None
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
73
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
78 while len(req.buffer) > 0:
79 rls, wls, els = select.select([], [req], [req])
80 req.flush()
81 def close(self):
82 pass
83
84 @classmethod
85 def parseargs(cls, **args):
86 if len(args) > 0:
87 raise ValueError("unknown handler argument: " + next(iter(args)))
88 return {}
89
90class single(handler):
91 cname = "single"
92
93 def handle(self, req):
94 try:
95 env = req.mkenv()
96 with perf.request(env) as reqevent:
97 respiter = req.handlewsgi(env, req.startreq)
98 for data in respiter:
99 req.write(data)
100 if req.status:
101 reqevent.response([req.status, req.headers])
102 req.flushreq()
103 self.ckflush(req)
104 except closed:
105 pass
106 except:
107 log.error("exception occurred when handling request", exc_info=True)
108 finally:
109 req.close()
110
111def dbg(*a):
112 f = True
113 for o in a:
114 if not f:
115 sys.stderr.write(" ")
116 sys.stderr.write(str(a))
117 f = False
118 sys.stderr.write("\n")
119 sys.stderr.flush()
120
121class freethread(handler):
122 cname = "free"
123
124 def __init__(self, *, max=None, timeout=None, **kw):
125 super().__init__(**kw)
126 self.current = set()
127 self.lk = threading.Lock()
128 self.tcond = threading.Condition(self.lk)
129 self.max = max
130 self.timeout = timeout
131
132 @classmethod
133 def parseargs(cls, *, max=None, abort=None, **args):
134 ret = super().parseargs(**args)
135 if max:
136 ret["max"] = int(max)
137 if abort:
138 ret["timeout"] = int(abort)
139 return ret
140
141 def handle(self, req):
142 with self.lk:
143 if self.max is not None:
144 if self.timeout is not None:
145 now = start = time.time()
146 while len(self.current) >= self.max:
147 self.tcond.wait(start + self.timeout - now)
148 now = time.time()
149 if now - start > self.timeout:
150 os.abort()
151 else:
152 while len(self.current) >= self.max:
153 self.tcond.wait()
154 th = reqthread(target=self.run, args=[req])
155 th.registered = False
156 th.start()
157 while not th.registered:
158 self.tcond.wait()
159
160 def run(self, req):
161 try:
162 th = threading.current_thread()
163 with self.lk:
164 self.current.add(th)
165 th.registered = True
166 self.tcond.notify_all()
167 try:
168 env = req.mkenv()
169 with perf.request(env) as reqevent:
170 respiter = req.handlewsgi(env, req.startreq)
171 for data in respiter:
172 req.write(data)
173 if req.status:
174 reqevent.response([req.status, req.headers])
175 req.flushreq()
176 self.ckflush(req)
177 except closed:
178 pass
179 except:
180 log.error("exception occurred when handling request", exc_info=True)
181 finally:
182 with self.lk:
183 self.current.remove(th)
184 self.tcond.notify_all()
185 finally:
186 req.close()
187
188 def close(self):
189 while True:
190 with self.lk:
191 if len(self.current) > 0:
192 th = next(iter(self.current))
193 else:
194 return
195 th.join()
196
197class resplex(handler):
198 cname = "rplex"
199
200 def __init__(self, *, max=None, **kw):
201 super().__init__(**kw)
202 self.current = set()
203 self.lk = threading.Lock()
204 self.tcond = threading.Condition(self.lk)
205 self.max = max
206 self.cqueue = queue.Queue(5)
207 self.cnpipe = os.pipe()
208 self.rthread = reqthread(name="Response thread", target=self.handle2)
209 self.rthread.start()
210
211 @classmethod
212 def parseargs(cls, *, max=None, **args):
213 ret = super().parseargs(**args)
214 if max:
215 ret["max"] = int(max)
216 return ret
217
218 def ckflush(self, req):
219 raise Exception("resplex handler does not support the write() function")
220
221 def handle(self, req):
222 with self.lk:
223 if self.max is not None:
224 while len(self.current) >= self.max:
225 self.tcond.wait()
226 th = reqthread(target=self.handle1, args=[req])
227 th.registered = False
228 th.start()
229 while not th.registered:
230 self.tcond.wait()
231
232 def handle1(self, req):
233 try:
234 th = threading.current_thread()
235 with self.lk:
236 self.current.add(th)
237 th.registered = True
238 self.tcond.notify_all()
239 try:
240 env = req.mkenv()
241 respobj = req.handlewsgi(env, req.startreq)
242 respiter = iter(respobj)
243 if not req.status:
244 log.error("request handler returned without calling start_request")
245 if hasattr(respiter, "close"):
246 respiter.close()
247 return
248 else:
249 self.cqueue.put((req, respiter))
250 os.write(self.cnpipe[1], b" ")
251 req = None
252 finally:
253 with self.lk:
254 self.current.remove(th)
255 self.tcond.notify_all()
256 except closed:
257 pass
258 except:
259 log.error("exception occurred when handling request", exc_info=True)
260 finally:
261 if req is not None:
262 req.close()
263
264 def handle2(self):
265 try:
266 rp = self.cnpipe[0]
267 current = {}
268
269 def closereq(req):
270 respiter = current[req]
271 try:
272 if respiter is not None and hasattr(respiter, "close"):
273 respiter.close()
274 except:
275 log.error("exception occurred when closing iterator", exc_info=True)
276 try:
277 req.close()
278 except:
279 log.error("exception occurred when closing request", exc_info=True)
280 del current[req]
281 def ckiter(req):
282 respiter = current[req]
283 if respiter is not None:
284 rem = False
285 try:
286 data = next(respiter)
287 except StopIteration:
288 rem = True
289 try:
290 req.flushreq()
291 except:
292 log.error("exception occurred when handling response data", exc_info=True)
293 except:
294 rem = True
295 log.error("exception occurred when iterating response", exc_info=True)
296 if not rem:
297 if data:
298 try:
299 req.flushreq()
300 req.writedata(data)
301 except:
302 log.error("exception occurred when handling response data", exc_info=True)
303 rem = True
304 if rem:
305 current[req] = None
306 try:
307 if hasattr(respiter, "close"):
308 respiter.close()
309 except:
310 log.error("exception occurred when closing iterator", exc_info=True)
311 respiter = None
312 if respiter is None and not req.buffer:
313 closereq(req)
314
315 while True:
316 bufl = list(req for req in current.keys() if req.buffer)
317 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
318 if rp in rls:
319 ret = os.read(rp, 1024)
320 if not ret:
321 os.close(rp)
322 return
323 try:
324 while True:
325 req, respiter = self.cqueue.get(False)
326 current[req] = respiter
327 ckiter(req)
328 except queue.Empty:
329 pass
330 for req in wls:
331 try:
332 req.flush()
333 except closed:
334 closereq(req)
335 except:
336 log.error("exception occurred when writing response", exc_info=True)
337 closereq(req)
338 else:
339 if len(req.buffer) < 65536:
340 ckiter(req)
341 except:
342 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
343 os.abort()
344
345 def close(self):
346 while True:
347 with self.lk:
348 if len(self.current) > 0:
349 th = next(iter(self.current))
350 else:
351 break
352 th.join()
353 os.close(self.cnpipe[1])
354 self.rthread.join()
355
356names = {cls.cname: cls for cls in globals().values() if
357 isinstance(cls, type) and
358 issubclass(cls, handler) and
359 hasattr(cls, "cname")}
360
361def parsehspec(spec):
362 if ":" not in spec:
363 return spec, {}
364 nm, spec = spec.split(":", 1)
365 args = {}
366 while spec:
367 if "," in spec:
368 part, spec = spec.split(",", 1)
369 else:
370 part, spec = spec, None
371 if "=" in part:
372 key, val = part.split("=", 1)
373 else:
374 key, val = part, ""
375 args[key] = val
376 return nm, args