Merge branch 'master' into timeheap
[ashd.git] / python / ashd / serve.py
CommitLineData
8f410ce7
FT
1import sys, os, threading, time, logging, select, Queue
2import perf
63090e5a
FT
3
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
7
8def reqseq():
9 global seq
8f410ce7 10 with seqlk:
63090e5a
FT
11 s = seq
12 seq += 1
13 return s
63090e5a
FT
14
15class closed(IOError):
16 def __init__(self):
17 super(closed, self).__init__("The client has closed the connection.")
18
8f410ce7
FT
19class reqthread(threading.Thread):
20 def __init__(self, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super(reqthread, self).__init__(name=name, **kw)
24
25class wsgirequest(object):
26 def __init__(self, handler):
63090e5a
FT
27 self.status = None
28 self.headers = []
29 self.respsent = False
8f410ce7
FT
30 self.handler = handler
31 self.buffer = bytearray()
63090e5a
FT
32
33 def handlewsgi(self):
34 raise Exception()
8f410ce7
FT
35 def fileno(self):
36 raise Exception()
63090e5a
FT
37 def writehead(self, status, headers):
38 raise Exception()
8f410ce7 39 def flush(self):
63090e5a 40 raise Exception()
8f410ce7
FT
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
63090e5a
FT
45
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
52
8f410ce7
FT
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
59
63090e5a
FT
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
8f410ce7 62 if exc_info:
63090e5a
FT
63 try:
64 if self.respsent:
8f410ce7 65 raise exc_info[1]
63090e5a 66 finally:
8f410ce7 67 exc_info = None
63090e5a
FT
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
8f410ce7
FT
73
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
78 while len(req.buffer) > 0:
79 rls, wls, els = select.select([], [req], [req])
80 req.flush()
81 def close(self):
82 pass
83
84 @classmethod
85 def parseargs(cls, **args):
86 if len(args) > 0:
87 raise ValueError("unknown handler argument: " + iter(args).next())
88 return {}
89
90class single(handler):
91 cname = "single"
92
93 def handle(self, req):
63090e5a 94 try:
8f410ce7
FT
95 env = req.mkenv()
96 with perf.request(env) as reqevent:
97 respiter = req.handlewsgi(env, req.startreq)
63090e5a 98 for data in respiter:
8f410ce7
FT
99 req.write(data)
100 if req.status:
101 reqevent.response([req.status, req.headers])
102 req.flushreq()
103 self.ckflush(req)
63090e5a
FT
104 except closed:
105 pass
8f410ce7
FT
106 except:
107 log.error("exception occurred when handling request", exc_info=True)
108 finally:
109 req.close()
110
111class freethread(handler):
112 cname = "free"
63090e5a 113
8f410ce7
FT
114 def __init__(self, max=None, timeout=None, **kw):
115 super(freethread, self).__init__(**kw)
116 self.current = set()
117 self.lk = threading.Lock()
118 self.tcond = threading.Condition(self.lk)
119 self.max = max
120 self.timeout = timeout
63090e5a 121
8f410ce7
FT
122 @classmethod
123 def parseargs(cls, max=None, abort=None, **args):
124 ret = super(freethread, cls).parseargs(**args)
125 if max:
126 ret["max"] = int(max)
127 if abort:
128 ret["timeout"] = int(abort)
129 return ret
63090e5a 130
8f410ce7
FT
131 def handle(self, req):
132 with self.lk:
133 if self.max is not None:
134 if self.timeout is not None:
135 now = start = time.time()
136 while len(self.current) >= self.max:
137 self.tcond.wait(start + self.timeout - now)
138 now = time.time()
139 if now - start > self.timeout:
140 os.abort()
141 else:
142 while len(self.current) >= self.max:
143 self.tcond.wait()
144 th = reqthread(target=self.run, args=[req])
76ff6c4d 145 th.registered = False
8f410ce7 146 th.start()
76ff6c4d
FT
147 while not th.registered:
148 self.tcond.wait()
8f410ce7
FT
149
150 def run(self, req):
63090e5a 151 try:
8f410ce7
FT
152 th = threading.current_thread()
153 with self.lk:
154 self.current.add(th)
76ff6c4d 155 th.registered = True
8f410ce7
FT
156 self.tcond.notify_all()
157 try:
158 env = req.mkenv()
159 with perf.request(env) as reqevent:
160 respiter = req.handlewsgi(env, req.startreq)
161 for data in respiter:
162 req.write(data)
163 if req.status:
164 reqevent.response([req.status, req.headers])
165 req.flushreq()
166 self.ckflush(req)
167 except closed:
168 pass
169 except:
170 log.error("exception occurred when handling request", exc_info=True)
171 finally:
172 with self.lk:
173 self.current.remove(th)
174 self.tcond.notify_all()
63090e5a 175 finally:
8f410ce7
FT
176 req.close()
177
178 def close(self):
179 while True:
180 with self.lk:
181 if len(self.current) > 0:
182 th = iter(self.current).next()
183 else:
184 return
185 th.join()
186
187class resplex(handler):
188 cname = "rplex"
189
190 def __init__(self, max=None, **kw):
191 super(resplex, self).__init__(**kw)
192 self.current = set()
193 self.lk = threading.Lock()
194 self.tcond = threading.Condition(self.lk)
195 self.max = max
196 self.cqueue = Queue.Queue(5)
197 self.cnpipe = os.pipe()
198 self.rthread = reqthread(name="Response thread", target=self.handle2)
199 self.rthread.start()
200
201 @classmethod
202 def parseargs(cls, max=None, **args):
203 ret = super(resplex, cls).parseargs(**args)
204 if max:
205 ret["max"] = int(max)
206 return ret
207
208 def ckflush(self, req):
209 raise Exception("resplex handler does not support the write() function")
63090e5a 210
8f410ce7
FT
211 def handle(self, req):
212 with self.lk:
213 if self.max is not None:
214 while len(self.current) >= self.max:
215 self.tcond.wait()
216 th = reqthread(target=self.handle1, args=[req])
76ff6c4d 217 th.registered = False
8f410ce7 218 th.start()
76ff6c4d
FT
219 while not th.registered:
220 self.tcond.wait()
8f410ce7
FT
221
222 def handle1(self, req):
63090e5a 223 try:
8f410ce7
FT
224 th = threading.current_thread()
225 with self.lk:
226 self.current.add(th)
76ff6c4d 227 th.registered = True
8f410ce7
FT
228 self.tcond.notify_all()
229 try:
230 env = req.mkenv()
231 respobj = req.handlewsgi(env, req.startreq)
232 respiter = iter(respobj)
233 if not req.status:
234 log.error("request handler returned without calling start_request")
235 if hasattr(respiter, "close"):
236 respiter.close()
237 return
238 else:
239 self.cqueue.put((req, respiter))
240 os.write(self.cnpipe[1], " ")
241 req = None
242 finally:
243 with self.lk:
244 self.current.remove(th)
245 self.tcond.notify_all()
246 except closed:
247 pass
248 except:
249 log.error("exception occurred when handling request", exc_info=True)
63090e5a 250 finally:
8f410ce7
FT
251 if req is not None:
252 req.close()
63090e5a 253
8f410ce7 254 def handle2(self):
63090e5a 255 try:
8f410ce7
FT
256 rp = self.cnpipe[0]
257 current = {}
e0c88e73 258
8f410ce7
FT
259 def closereq(req):
260 respiter = current[req]
261 try:
262 if respiter is not None and hasattr(respiter, "close"):
263 respiter.close()
264 except:
265 log.error("exception occurred when closing iterator", exc_info=True)
266 try:
267 req.close()
268 except:
269 log.error("exception occurred when closing request", exc_info=True)
270 del current[req]
271 def ckiter(req):
272 respiter = current[req]
273 if respiter is not None:
274 rem = False
275 try:
276 data = respiter.next()
277 except StopIteration:
278 rem = True
279 try:
280 req.flushreq()
281 except:
282 log.error("exception occurred when handling response data", exc_info=True)
283 except:
284 rem = True
285 log.error("exception occurred when iterating response", exc_info=True)
286 if not rem:
287 if data:
288 try:
289 req.flushreq()
290 req.writedata(data)
291 except:
292 log.error("exception occurred when handling response data", exc_info=True)
293 rem = True
294 if rem:
295 current[req] = None
296 try:
297 if hasattr(respiter, "close"):
298 respiter.close()
299 except:
300 log.error("exception occurred when closing iterator", exc_info=True)
301 respiter = None
302 if respiter is None and not req.buffer:
303 closereq(req)
304
305 while True:
306 bufl = list(req for req in current.iterkeys() if req.buffer)
307 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
308 if rp in rls:
309 ret = os.read(rp, 1024)
310 if not ret:
311 os.close(rp)
312 return
313 try:
314 while True:
315 req, respiter = self.cqueue.get(False)
316 current[req] = respiter
317 ckiter(req)
318 except Queue.Empty:
319 pass
320 for req in wls:
321 try:
322 req.flush()
323 except closed:
324 closereq(req)
325 except:
326 log.error("exception occurred when writing response", exc_info=True)
327 closereq(req)
328 else:
329 if len(req.buffer) < 65536:
330 ckiter(req)
331 except:
332 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
e0c88e73 333 os.abort()
8f410ce7
FT
334
335 def close(self):
336 while True:
337 with self.lk:
338 if len(self.current) > 0:
339 th = iter(self.current).next()
340 else:
341 break
342 th.join()
343 os.close(self.cnpipe[1])
344 self.rthread.join()
345
346names = dict((cls.cname, cls) for cls in globals().itervalues() if
347 isinstance(cls, type) and
348 issubclass(cls, handler) and
349 hasattr(cls, "cname"))
350
351def parsehspec(spec):
352 if ":" not in spec:
353 return spec, {}
354 nm, spec = spec.split(":", 1)
355 args = {}
356 while spec:
357 if "," in spec:
358 part, spec = spec.split(",", 1)
359 else:
360 part, spec = spec, None
361 if "=" in part:
362 key, val = part.split("=", 1)
363 else:
364 key, val = part, ""
365 args[key] = val
366 return nm, args