python*: Use poll instead of select in ckflush.
[ashd.git] / python / ashd / serve.py
CommitLineData
8f410ce7
FT
1import sys, os, threading, time, logging, select, Queue
2import perf
63090e5a
FT
3
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
7
8def reqseq():
9 global seq
8f410ce7 10 with seqlk:
63090e5a
FT
11 s = seq
12 seq += 1
13 return s
63090e5a
FT
14
15class closed(IOError):
16 def __init__(self):
17 super(closed, self).__init__("The client has closed the connection.")
18
8f410ce7
FT
19class reqthread(threading.Thread):
20 def __init__(self, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super(reqthread, self).__init__(name=name, **kw)
24
25class wsgirequest(object):
26 def __init__(self, handler):
63090e5a
FT
27 self.status = None
28 self.headers = []
29 self.respsent = False
8f410ce7
FT
30 self.handler = handler
31 self.buffer = bytearray()
63090e5a
FT
32
33 def handlewsgi(self):
34 raise Exception()
8f410ce7
FT
35 def fileno(self):
36 raise Exception()
63090e5a
FT
37 def writehead(self, status, headers):
38 raise Exception()
8f410ce7 39 def flush(self):
63090e5a 40 raise Exception()
8f410ce7
FT
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
63090e5a
FT
45
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
52
8f410ce7
FT
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
59
63090e5a
FT
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
8f410ce7 62 if exc_info:
63090e5a
FT
63 try:
64 if self.respsent:
8f410ce7 65 raise exc_info[1]
63090e5a 66 finally:
8f410ce7 67 exc_info = None
63090e5a
FT
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
8f410ce7
FT
73
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
6e8b9b9d
FT
78 p = select.poll()
79 p.register(req, select.POLLOUT)
8f410ce7 80 while len(req.buffer) > 0:
6e8b9b9d 81 p.poll()
8f410ce7
FT
82 req.flush()
83 def close(self):
84 pass
85
86 @classmethod
87 def parseargs(cls, **args):
88 if len(args) > 0:
89 raise ValueError("unknown handler argument: " + iter(args).next())
90 return {}
91
92class single(handler):
93 cname = "single"
94
95 def handle(self, req):
63090e5a 96 try:
8f410ce7
FT
97 env = req.mkenv()
98 with perf.request(env) as reqevent:
99 respiter = req.handlewsgi(env, req.startreq)
63090e5a 100 for data in respiter:
8f410ce7
FT
101 req.write(data)
102 if req.status:
103 reqevent.response([req.status, req.headers])
104 req.flushreq()
105 self.ckflush(req)
63090e5a
FT
106 except closed:
107 pass
8f410ce7
FT
108 except:
109 log.error("exception occurred when handling request", exc_info=True)
110 finally:
111 req.close()
112
113class freethread(handler):
114 cname = "free"
63090e5a 115
8f410ce7
FT
116 def __init__(self, max=None, timeout=None, **kw):
117 super(freethread, self).__init__(**kw)
118 self.current = set()
119 self.lk = threading.Lock()
120 self.tcond = threading.Condition(self.lk)
121 self.max = max
122 self.timeout = timeout
63090e5a 123
8f410ce7
FT
124 @classmethod
125 def parseargs(cls, max=None, abort=None, **args):
126 ret = super(freethread, cls).parseargs(**args)
127 if max:
128 ret["max"] = int(max)
129 if abort:
130 ret["timeout"] = int(abort)
131 return ret
63090e5a 132
8f410ce7
FT
133 def handle(self, req):
134 with self.lk:
135 if self.max is not None:
136 if self.timeout is not None:
137 now = start = time.time()
138 while len(self.current) >= self.max:
139 self.tcond.wait(start + self.timeout - now)
140 now = time.time()
141 if now - start > self.timeout:
142 os.abort()
143 else:
144 while len(self.current) >= self.max:
145 self.tcond.wait()
146 th = reqthread(target=self.run, args=[req])
76ff6c4d 147 th.registered = False
8f410ce7 148 th.start()
76ff6c4d
FT
149 while not th.registered:
150 self.tcond.wait()
8f410ce7
FT
151
152 def run(self, req):
63090e5a 153 try:
8f410ce7
FT
154 th = threading.current_thread()
155 with self.lk:
156 self.current.add(th)
76ff6c4d 157 th.registered = True
8f410ce7
FT
158 self.tcond.notify_all()
159 try:
160 env = req.mkenv()
161 with perf.request(env) as reqevent:
162 respiter = req.handlewsgi(env, req.startreq)
163 for data in respiter:
164 req.write(data)
165 if req.status:
166 reqevent.response([req.status, req.headers])
167 req.flushreq()
168 self.ckflush(req)
169 except closed:
170 pass
171 except:
172 log.error("exception occurred when handling request", exc_info=True)
173 finally:
174 with self.lk:
175 self.current.remove(th)
176 self.tcond.notify_all()
63090e5a 177 finally:
8f410ce7
FT
178 req.close()
179
180 def close(self):
181 while True:
182 with self.lk:
183 if len(self.current) > 0:
184 th = iter(self.current).next()
185 else:
186 return
187 th.join()
188
189class resplex(handler):
190 cname = "rplex"
191
192 def __init__(self, max=None, **kw):
193 super(resplex, self).__init__(**kw)
194 self.current = set()
195 self.lk = threading.Lock()
196 self.tcond = threading.Condition(self.lk)
197 self.max = max
198 self.cqueue = Queue.Queue(5)
199 self.cnpipe = os.pipe()
200 self.rthread = reqthread(name="Response thread", target=self.handle2)
201 self.rthread.start()
202
203 @classmethod
204 def parseargs(cls, max=None, **args):
205 ret = super(resplex, cls).parseargs(**args)
206 if max:
207 ret["max"] = int(max)
208 return ret
209
210 def ckflush(self, req):
211 raise Exception("resplex handler does not support the write() function")
63090e5a 212
8f410ce7
FT
213 def handle(self, req):
214 with self.lk:
215 if self.max is not None:
216 while len(self.current) >= self.max:
217 self.tcond.wait()
218 th = reqthread(target=self.handle1, args=[req])
76ff6c4d 219 th.registered = False
8f410ce7 220 th.start()
76ff6c4d
FT
221 while not th.registered:
222 self.tcond.wait()
8f410ce7
FT
223
224 def handle1(self, req):
63090e5a 225 try:
8f410ce7
FT
226 th = threading.current_thread()
227 with self.lk:
228 self.current.add(th)
76ff6c4d 229 th.registered = True
8f410ce7
FT
230 self.tcond.notify_all()
231 try:
232 env = req.mkenv()
233 respobj = req.handlewsgi(env, req.startreq)
234 respiter = iter(respobj)
235 if not req.status:
236 log.error("request handler returned without calling start_request")
237 if hasattr(respiter, "close"):
238 respiter.close()
239 return
240 else:
241 self.cqueue.put((req, respiter))
242 os.write(self.cnpipe[1], " ")
243 req = None
244 finally:
245 with self.lk:
246 self.current.remove(th)
247 self.tcond.notify_all()
248 except closed:
249 pass
250 except:
251 log.error("exception occurred when handling request", exc_info=True)
63090e5a 252 finally:
8f410ce7
FT
253 if req is not None:
254 req.close()
63090e5a 255
8f410ce7 256 def handle2(self):
63090e5a 257 try:
8f410ce7
FT
258 rp = self.cnpipe[0]
259 current = {}
e0c88e73 260
8f410ce7
FT
261 def closereq(req):
262 respiter = current[req]
263 try:
264 if respiter is not None and hasattr(respiter, "close"):
265 respiter.close()
266 except:
267 log.error("exception occurred when closing iterator", exc_info=True)
268 try:
269 req.close()
270 except:
271 log.error("exception occurred when closing request", exc_info=True)
272 del current[req]
273 def ckiter(req):
274 respiter = current[req]
275 if respiter is not None:
276 rem = False
277 try:
278 data = respiter.next()
279 except StopIteration:
280 rem = True
281 try:
282 req.flushreq()
283 except:
284 log.error("exception occurred when handling response data", exc_info=True)
285 except:
286 rem = True
287 log.error("exception occurred when iterating response", exc_info=True)
288 if not rem:
289 if data:
290 try:
291 req.flushreq()
292 req.writedata(data)
293 except:
294 log.error("exception occurred when handling response data", exc_info=True)
295 rem = True
296 if rem:
297 current[req] = None
298 try:
299 if hasattr(respiter, "close"):
300 respiter.close()
301 except:
302 log.error("exception occurred when closing iterator", exc_info=True)
303 respiter = None
304 if respiter is None and not req.buffer:
305 closereq(req)
306
307 while True:
308 bufl = list(req for req in current.iterkeys() if req.buffer)
309 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
310 if rp in rls:
311 ret = os.read(rp, 1024)
312 if not ret:
313 os.close(rp)
314 return
315 try:
316 while True:
317 req, respiter = self.cqueue.get(False)
318 current[req] = respiter
319 ckiter(req)
320 except Queue.Empty:
321 pass
322 for req in wls:
323 try:
324 req.flush()
325 except closed:
326 closereq(req)
327 except:
328 log.error("exception occurred when writing response", exc_info=True)
329 closereq(req)
330 else:
331 if len(req.buffer) < 65536:
332 ckiter(req)
333 except:
334 log.critical("unexpected exception occurred in response handler thread", exc_info=True)
e0c88e73 335 os.abort()
8f410ce7
FT
336
337 def close(self):
338 while True:
339 with self.lk:
340 if len(self.current) > 0:
341 th = iter(self.current).next()
342 else:
343 break
344 th.join()
345 os.close(self.cnpipe[1])
346 self.rthread.join()
347
348names = dict((cls.cname, cls) for cls in globals().itervalues() if
349 isinstance(cls, type) and
350 issubclass(cls, handler) and
351 hasattr(cls, "cname"))
352
353def parsehspec(spec):
354 if ":" not in spec:
355 return spec, {}
356 nm, spec = spec.split(":", 1)
357 args = {}
358 while spec:
359 if "," in spec:
360 part, spec = spec.split(",", 1)
361 else:
362 part, spec = spec, None
363 if "=" in part:
364 key, val = part.split("=", 1)
365 else:
366 key, val = part, ""
367 args[key] = val
368 return nm, args