python*: Use poll instead of select in ckflush.
[ashd.git] / python / ashd / serve.py
1 import sys, os, threading, time, logging, select, Queue
2 import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super(closed, self).__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super(reqthread, self).__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         p = select.poll()
79         p.register(req, select.POLLOUT)
80         while len(req.buffer) > 0:
81             p.poll()
82             req.flush()
83     def close(self):
84         pass
85
86     @classmethod
87     def parseargs(cls, **args):
88         if len(args) > 0:
89             raise ValueError("unknown handler argument: " + iter(args).next())
90         return {}
91
92 class single(handler):
93     cname = "single"
94
95     def handle(self, req):
96         try:
97             env = req.mkenv()
98             with perf.request(env) as reqevent:
99                 respiter = req.handlewsgi(env, req.startreq)
100                 for data in respiter:
101                     req.write(data)
102                 if req.status:
103                     reqevent.response([req.status, req.headers])
104                     req.flushreq()
105                 self.ckflush(req)
106         except closed:
107             pass
108         except:
109             log.error("exception occurred when handling request", exc_info=True)
110         finally:
111             req.close()
112
113 class freethread(handler):
114     cname = "free"
115
116     def __init__(self, max=None, timeout=None, **kw):
117         super(freethread, self).__init__(**kw)
118         self.current = set()
119         self.lk = threading.Lock()
120         self.tcond = threading.Condition(self.lk)
121         self.max = max
122         self.timeout = timeout
123
124     @classmethod
125     def parseargs(cls, max=None, abort=None, **args):
126         ret = super(freethread, cls).parseargs(**args)
127         if max:
128             ret["max"] = int(max)
129         if abort:
130             ret["timeout"] = int(abort)
131         return ret
132
133     def handle(self, req):
134         with self.lk:
135             if self.max is not None:
136                 if self.timeout is not None:
137                     now = start = time.time()
138                     while len(self.current) >= self.max:
139                         self.tcond.wait(start + self.timeout - now)
140                         now = time.time()
141                         if now - start > self.timeout:
142                             os.abort()
143                 else:
144                     while len(self.current) >= self.max:
145                         self.tcond.wait()
146             th = reqthread(target=self.run, args=[req])
147             th.registered = False
148             th.start()
149             while not th.registered:
150                 self.tcond.wait()
151
152     def run(self, req):
153         try:
154             th = threading.current_thread()
155             with self.lk:
156                 self.current.add(th)
157                 th.registered = True
158                 self.tcond.notify_all()
159             try:
160                 env = req.mkenv()
161                 with perf.request(env) as reqevent:
162                     respiter = req.handlewsgi(env, req.startreq)
163                     for data in respiter:
164                         req.write(data)
165                     if req.status:
166                         reqevent.response([req.status, req.headers])
167                         req.flushreq()
168                     self.ckflush(req)
169             except closed:
170                 pass
171             except:
172                 log.error("exception occurred when handling request", exc_info=True)
173             finally:
174                 with self.lk:
175                     self.current.remove(th)
176                     self.tcond.notify_all()
177         finally:
178             req.close()
179
180     def close(self):
181         while True:
182             with self.lk:
183                 if len(self.current) > 0:
184                     th = iter(self.current).next()
185                 else:
186                     return
187             th.join()
188
189 class resplex(handler):
190     cname = "rplex"
191
192     def __init__(self, max=None, **kw):
193         super(resplex, self).__init__(**kw)
194         self.current = set()
195         self.lk = threading.Lock()
196         self.tcond = threading.Condition(self.lk)
197         self.max = max
198         self.cqueue = Queue.Queue(5)
199         self.cnpipe = os.pipe()
200         self.rthread = reqthread(name="Response thread", target=self.handle2)
201         self.rthread.start()
202
203     @classmethod
204     def parseargs(cls, max=None, **args):
205         ret = super(resplex, cls).parseargs(**args)
206         if max:
207             ret["max"] = int(max)
208         return ret
209
210     def ckflush(self, req):
211         raise Exception("resplex handler does not support the write() function")
212
213     def handle(self, req):
214         with self.lk:
215             if self.max is not None:
216                 while len(self.current) >= self.max:
217                     self.tcond.wait()
218             th = reqthread(target=self.handle1, args=[req])
219             th.registered = False
220             th.start()
221             while not th.registered:
222                 self.tcond.wait()
223
224     def handle1(self, req):
225         try:
226             th = threading.current_thread()
227             with self.lk:
228                 self.current.add(th)
229                 th.registered = True
230                 self.tcond.notify_all()
231             try:
232                 env = req.mkenv()
233                 respobj = req.handlewsgi(env, req.startreq)
234                 respiter = iter(respobj)
235                 if not req.status:
236                     log.error("request handler returned without calling start_request")
237                     if hasattr(respiter, "close"):
238                         respiter.close()
239                     return
240                 else:
241                     self.cqueue.put((req, respiter))
242                     os.write(self.cnpipe[1], " ")
243                     req = None
244             finally:
245                 with self.lk:
246                     self.current.remove(th)
247                     self.tcond.notify_all()
248         except closed:
249             pass
250         except:
251             log.error("exception occurred when handling request", exc_info=True)
252         finally:
253             if req is not None:
254                 req.close()
255
256     def handle2(self):
257         try:
258             rp = self.cnpipe[0]
259             current = {}
260
261             def closereq(req):
262                 respiter = current[req]
263                 try:
264                     if respiter is not None and hasattr(respiter, "close"):
265                         respiter.close()
266                 except:
267                     log.error("exception occurred when closing iterator", exc_info=True)
268                 try:
269                     req.close()
270                 except:
271                     log.error("exception occurred when closing request", exc_info=True)
272                 del current[req]
273             def ckiter(req):
274                 respiter = current[req]
275                 if respiter is not None:
276                     rem = False
277                     try:
278                         data = respiter.next()
279                     except StopIteration:
280                         rem = True
281                         try:
282                             req.flushreq()
283                         except:
284                             log.error("exception occurred when handling response data", exc_info=True)
285                     except:
286                         rem = True
287                         log.error("exception occurred when iterating response", exc_info=True)
288                     if not rem:
289                         if data:
290                             try:
291                                 req.flushreq()
292                                 req.writedata(data)
293                             except:
294                                 log.error("exception occurred when handling response data", exc_info=True)
295                                 rem = True
296                     if rem:
297                         current[req] = None
298                         try:
299                             if hasattr(respiter, "close"):
300                                 respiter.close()
301                         except:
302                             log.error("exception occurred when closing iterator", exc_info=True)
303                         respiter = None
304                 if respiter is None and not req.buffer:
305                     closereq(req)
306
307             while True:
308                 bufl = list(req for req in current.iterkeys() if req.buffer)
309                 rls, wls, els = select.select([rp], bufl, [rp] + bufl)
310                 if rp in rls:
311                     ret = os.read(rp, 1024)
312                     if not ret:
313                         os.close(rp)
314                         return
315                     try:
316                         while True:
317                             req, respiter = self.cqueue.get(False)
318                             current[req] = respiter
319                             ckiter(req)
320                     except Queue.Empty:
321                         pass
322                 for req in wls:
323                     try:
324                         req.flush()
325                     except closed:
326                         closereq(req)
327                     except:
328                         log.error("exception occurred when writing response", exc_info=True)
329                         closereq(req)
330                     else:
331                         if len(req.buffer) < 65536:
332                             ckiter(req)
333         except:
334             log.critical("unexpected exception occurred in response handler thread", exc_info=True)
335             os.abort()
336
337     def close(self):
338         while True:
339             with self.lk:
340                 if len(self.current) > 0:
341                     th = iter(self.current).next()
342                 else:
343                     break
344             th.join()
345         os.close(self.cnpipe[1])
346         self.rthread.join()
347
348 names = dict((cls.cname, cls) for cls in globals().itervalues() if
349              isinstance(cls, type) and
350              issubclass(cls, handler) and
351              hasattr(cls, "cname"))
352
353 def parsehspec(spec):
354     if ":" not in spec:
355         return spec, {}
356     nm, spec = spec.split(":", 1)
357     args = {}
358     while spec:
359         if "," in spec:
360             part, spec = spec.split(",", 1)
361         else:
362             part, spec = spec, None
363         if "=" in part:
364             key, val = part.split("=", 1)
365         else:
366             key, val = part, ""
367         args[key] = val
368     return nm, args