python: Added command-line parsing of request handler and its arguments.
[ashd.git] / python3 / ashd / serve.py
CommitLineData
46adc298
FT
1import sys, os, threading, time, logging, select
2from . import perf
552a70bf
FT
3
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
7
8def reqseq():
9 global seq
10 with seqlk:
11 s = seq
12 seq += 1
13 return s
14
552a70bf
FT
15class closed(IOError):
16 def __init__(self):
17 super().__init__("The client has closed the connection.")
18
46adc298
FT
19class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
24
25class wsgirequest(object):
26 def __init__(self, handler):
552a70bf
FT
27 self.status = None
28 self.headers = []
29 self.respsent = False
46adc298
FT
30 self.handler = handler
31 self.buffer = bytearray()
552a70bf
FT
32
33 def handlewsgi(self):
34 raise Exception()
46adc298
FT
35 def fileno(self):
36 raise Exception()
552a70bf
FT
37 def writehead(self, status, headers):
38 raise Exception()
46adc298 39 def flush(self):
552a70bf 40 raise Exception()
46adc298
FT
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
552a70bf
FT
45
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
52
46adc298
FT
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
59
552a70bf
FT
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
46adc298 62 if exc_info:
552a70bf
FT
63 try:
64 if self.respsent:
65 raise exc_info[1]
66 finally:
46adc298 67 exc_info = None
552a70bf
FT
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
46adc298
FT
73
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
78 raise Exception()
79 def close(self):
80 pass
81
8db41888
FT
82 @classmethod
83 def parseargs(cls, **args):
84 if len(args) > 0:
85 raise ValueError("unknown handler argument: " + next(iter(args)))
86 return {}
87
46adc298
FT
88class freethread(handler):
89 def __init__(self, **kw):
90 super().__init__(**kw)
91 self.current = set()
92 self.lk = threading.Lock()
93
94 def handle(self, req):
95 reqthread(target=self.run, args=[req]).start()
96
97 def ckflush(self, req):
98 while len(req.buffer) > 0:
99 rls, wls, els = select.select([], [req], [req])
100 req.flush()
101
102 def run(self, req):
552a70bf 103 try:
46adc298
FT
104 th = threading.current_thread()
105 with self.lk:
106 self.current.add(th)
552a70bf 107 try:
46adc298
FT
108 env = req.mkenv()
109 with perf.request(env) as reqevent:
110 respiter = req.handlewsgi(env, req.startreq)
111 for data in respiter:
112 req.write(data)
113 if req.status:
114 reqevent.response([req.status, req.headers])
115 req.flushreq()
116 self.ckflush(req)
117 except closed:
118 pass
119 except:
120 log.error("exception occurred when handling request", exc_info=True)
552a70bf 121 finally:
46adc298
FT
122 with self.lk:
123 self.current.remove(th)
124 finally:
125 req.close()
126
127 def close(self):
128 while True:
129 with self.lk:
130 if len(self.current) > 0:
131 th = next(iter(self.current))
132 else:
8db41888 133 return
46adc298
FT
134 th.join()
135
d570c3a5 136class threadpool(handler):
8db41888 137 def __init__(self, *, min=0, max=20, live=300, **kw):
d570c3a5
FT
138 super().__init__(**kw)
139 self.current = set()
140 self.free = set()
141 self.lk = threading.RLock()
142 self.pcond = threading.Condition(self.lk)
143 self.rcond = threading.Condition(self.lk)
144 self.wreq = None
145 self.min = min
146 self.max = max
147 self.live = live
148 for i in range(self.min):
149 self.newthread()
150
8db41888
FT
151 @classmethod
152 def parseargs(cls, *, min=None, max=None, live=None, **args):
153 ret = super().parseargs(**args)
154 if min:
155 ret["min"] = int(min)
156 if max:
157 ret["max"] = int(max)
158 if live:
159 ret["live"] = int(live)
160 return ret
161
d570c3a5
FT
162 def newthread(self):
163 with self.lk:
164 th = reqthread(target=self.loop)
165 th.start()
166 while not th in self.current:
167 self.pcond.wait()
168
169 def ckflush(self, req):
170 while len(req.buffer) > 0:
171 rls, wls, els = select.select([], [req], [req])
172 req.flush()
173
174 def _handle(self, req):
175 try:
176 env = req.mkenv()
177 with perf.request(env) as reqevent:
178 respiter = req.handlewsgi(env, req.startreq)
179 for data in respiter:
180 req.write(data)
181 if req.status:
182 reqevent.response([req.status, req.headers])
183 req.flushreq()
184 self.ckflush(req)
185 except closed:
186 pass
187 except:
188 log.error("exception occurred when handling request", exc_info=True)
189 finally:
190 req.close()
191
192 def loop(self):
193 th = threading.current_thread()
194 with self.lk:
195 self.current.add(th)
196 try:
197 while True:
198 with self.lk:
199 self.free.add(th)
200 try:
201 self.pcond.notify_all()
202 now = start = time.time()
203 while self.wreq is None:
204 self.rcond.wait(start + self.live - now)
205 now = time.time()
206 if now - start > self.live:
207 if len(self.current) > self.min:
208 self.current.remove(th)
209 return
210 else:
211 start = now
212 req, self.wreq = self.wreq, None
213 self.pcond.notify_all()
214 finally:
215 self.free.remove(th)
216 self._handle(req)
217 req = None
218 finally:
219 with self.lk:
220 try:
221 self.current.remove(th)
222 except KeyError:
223 pass
224 self.pcond.notify_all()
225
226 def handle(self, req):
227 while True:
228 with self.lk:
229 if len(self.free) < 1 and len(self.current) < self.max:
230 self.newthread()
231 while self.wreq is not None:
232 self.pcond.wait()
233 if self.wreq is None:
234 self.wreq = req
235 self.rcond.notify(1)
236 return
237
238 def close(self):
239 self.live = 0
240 self.min = 0
241 with self.lk:
242 while len(self.current) > 0:
243 self.rcond.notify_all()
244 self.pcond.wait(1)
245
246names = {"free": freethread,
247 "pool": threadpool}
8db41888
FT
248
249def parsehspec(spec):
250 if ":" not in spec:
251 return spec, {}
252 nm, spec = spec.split(":", 1)
253 args = {}
254 while spec:
255 if "," in spec:
256 part, spec = spec.split(",", 1)
257 else:
258 part, spec = spec, None
259 if "=" in part:
260 key, val = part.split("=", 1)
261 else:
262 key, val = part, ""
263 args[key] = val
264 return nm, args