python: Emulate the previous `-l' option to ashd-wsgi3.
[ashd.git] / python3 / ashd / serve.py
CommitLineData
46adc298
FT
1import sys, os, threading, time, logging, select
2from . import perf
552a70bf
FT
3
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
7
8def reqseq():
9 global seq
10 with seqlk:
11 s = seq
12 seq += 1
13 return s
14
552a70bf
FT
15class closed(IOError):
16 def __init__(self):
17 super().__init__("The client has closed the connection.")
18
46adc298
FT
19class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
24
25class wsgirequest(object):
26 def __init__(self, handler):
552a70bf
FT
27 self.status = None
28 self.headers = []
29 self.respsent = False
46adc298
FT
30 self.handler = handler
31 self.buffer = bytearray()
552a70bf
FT
32
33 def handlewsgi(self):
34 raise Exception()
46adc298
FT
35 def fileno(self):
36 raise Exception()
552a70bf
FT
37 def writehead(self, status, headers):
38 raise Exception()
46adc298 39 def flush(self):
552a70bf 40 raise Exception()
46adc298
FT
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
552a70bf
FT
45
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
52
46adc298
FT
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
59
552a70bf
FT
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
46adc298 62 if exc_info:
552a70bf
FT
63 try:
64 if self.respsent:
65 raise exc_info[1]
66 finally:
46adc298 67 exc_info = None
552a70bf
FT
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
46adc298
FT
73
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
78 raise Exception()
79 def close(self):
80 pass
81
8db41888
FT
82 @classmethod
83 def parseargs(cls, **args):
84 if len(args) > 0:
85 raise ValueError("unknown handler argument: " + next(iter(args)))
86 return {}
87
46adc298 88class freethread(handler):
3a3b78e3 89 def __init__(self, *, max=None, timeout=None, **kw):
46adc298
FT
90 super().__init__(**kw)
91 self.current = set()
92 self.lk = threading.Lock()
002ee932
FT
93 self.tcond = threading.Condition(self.lk)
94 self.max = max
3a3b78e3 95 self.timeout = timeout
002ee932
FT
96
97 @classmethod
3a3b78e3 98 def parseargs(cls, *, max=None, abort=None, **args):
002ee932
FT
99 ret = super().parseargs(**args)
100 if max:
101 ret["max"] = int(max)
3a3b78e3
FT
102 if abort:
103 ret["timeout"] = int(abort)
002ee932 104 return ret
46adc298
FT
105
106 def handle(self, req):
002ee932 107 with self.lk:
3a3b78e3
FT
108 if self.max is not None:
109 if self.timeout is not None:
110 now = start = time.time()
111 while len(self.current) >= self.max:
112 self.tcond.wait(start + self.timeout - now)
113 now = time.time()
114 if now - start > self.timeout:
115 os.abort()
116 else:
117 while len(self.current) >= self.max:
118 self.tcond.wait()
002ee932
FT
119 th = reqthread(target=self.run, args=[req])
120 th.start()
121 while th.is_alive() and th not in self.current:
122 self.tcond.wait()
46adc298
FT
123
124 def ckflush(self, req):
125 while len(req.buffer) > 0:
126 rls, wls, els = select.select([], [req], [req])
127 req.flush()
128
129 def run(self, req):
552a70bf 130 try:
46adc298
FT
131 th = threading.current_thread()
132 with self.lk:
133 self.current.add(th)
002ee932 134 self.tcond.notify_all()
552a70bf 135 try:
46adc298
FT
136 env = req.mkenv()
137 with perf.request(env) as reqevent:
138 respiter = req.handlewsgi(env, req.startreq)
139 for data in respiter:
140 req.write(data)
141 if req.status:
142 reqevent.response([req.status, req.headers])
143 req.flushreq()
144 self.ckflush(req)
145 except closed:
146 pass
147 except:
148 log.error("exception occurred when handling request", exc_info=True)
552a70bf 149 finally:
46adc298
FT
150 with self.lk:
151 self.current.remove(th)
002ee932 152 self.tcond.notify_all()
46adc298
FT
153 finally:
154 req.close()
155
156 def close(self):
157 while True:
158 with self.lk:
159 if len(self.current) > 0:
160 th = next(iter(self.current))
161 else:
8db41888 162 return
46adc298
FT
163 th.join()
164
d570c3a5 165class threadpool(handler):
8db41888 166 def __init__(self, *, min=0, max=20, live=300, **kw):
d570c3a5
FT
167 super().__init__(**kw)
168 self.current = set()
169 self.free = set()
170 self.lk = threading.RLock()
171 self.pcond = threading.Condition(self.lk)
172 self.rcond = threading.Condition(self.lk)
173 self.wreq = None
174 self.min = min
175 self.max = max
176 self.live = live
177 for i in range(self.min):
178 self.newthread()
179
8db41888
FT
180 @classmethod
181 def parseargs(cls, *, min=None, max=None, live=None, **args):
182 ret = super().parseargs(**args)
183 if min:
184 ret["min"] = int(min)
185 if max:
186 ret["max"] = int(max)
187 if live:
188 ret["live"] = int(live)
189 return ret
190
d570c3a5
FT
191 def newthread(self):
192 with self.lk:
193 th = reqthread(target=self.loop)
194 th.start()
195 while not th in self.current:
196 self.pcond.wait()
197
198 def ckflush(self, req):
199 while len(req.buffer) > 0:
200 rls, wls, els = select.select([], [req], [req])
201 req.flush()
202
203 def _handle(self, req):
204 try:
205 env = req.mkenv()
206 with perf.request(env) as reqevent:
207 respiter = req.handlewsgi(env, req.startreq)
208 for data in respiter:
209 req.write(data)
210 if req.status:
211 reqevent.response([req.status, req.headers])
212 req.flushreq()
213 self.ckflush(req)
214 except closed:
215 pass
216 except:
217 log.error("exception occurred when handling request", exc_info=True)
218 finally:
219 req.close()
220
221 def loop(self):
222 th = threading.current_thread()
223 with self.lk:
224 self.current.add(th)
225 try:
226 while True:
227 with self.lk:
228 self.free.add(th)
229 try:
230 self.pcond.notify_all()
231 now = start = time.time()
232 while self.wreq is None:
233 self.rcond.wait(start + self.live - now)
234 now = time.time()
235 if now - start > self.live:
236 if len(self.current) > self.min:
237 self.current.remove(th)
238 return
239 else:
240 start = now
241 req, self.wreq = self.wreq, None
242 self.pcond.notify_all()
243 finally:
244 self.free.remove(th)
245 self._handle(req)
246 req = None
247 finally:
248 with self.lk:
249 try:
250 self.current.remove(th)
251 except KeyError:
252 pass
253 self.pcond.notify_all()
254
255 def handle(self, req):
256 while True:
257 with self.lk:
258 if len(self.free) < 1 and len(self.current) < self.max:
259 self.newthread()
260 while self.wreq is not None:
261 self.pcond.wait()
262 if self.wreq is None:
263 self.wreq = req
264 self.rcond.notify(1)
265 return
266
267 def close(self):
268 self.live = 0
269 self.min = 0
270 with self.lk:
271 while len(self.current) > 0:
272 self.rcond.notify_all()
273 self.pcond.wait(1)
274
275names = {"free": freethread,
276 "pool": threadpool}
8db41888
FT
277
278def parsehspec(spec):
279 if ":" not in spec:
280 return spec, {}
281 nm, spec = spec.split(":", 1)
282 args = {}
283 while spec:
284 if "," in spec:
285 part, spec = spec.split(",", 1)
286 else:
287 part, spec = spec, None
288 if "=" in part:
289 key, val = part.split("=", 1)
290 else:
291 key, val = part, ""
292 args[key] = val
293 return nm, args