python: Added blocking thread-pool handler.
[ashd.git] / python3 / ashd / serve.py
CommitLineData
46adc298
FT
1import sys, os, threading, time, logging, select
2from . import perf
552a70bf
FT
3
4log = logging.getLogger("ashd.serve")
5seq = 1
6seqlk = threading.Lock()
7
8def reqseq():
9 global seq
10 with seqlk:
11 s = seq
12 seq += 1
13 return s
14
552a70bf
FT
15class closed(IOError):
16 def __init__(self):
17 super().__init__("The client has closed the connection.")
18
46adc298
FT
19class reqthread(threading.Thread):
20 def __init__(self, *, name=None, **kw):
21 if name is None:
22 name = "Request handler %i" % reqseq()
23 super().__init__(name=name, **kw)
24
25class wsgirequest(object):
26 def __init__(self, handler):
552a70bf
FT
27 self.status = None
28 self.headers = []
29 self.respsent = False
46adc298
FT
30 self.handler = handler
31 self.buffer = bytearray()
552a70bf
FT
32
33 def handlewsgi(self):
34 raise Exception()
46adc298
FT
35 def fileno(self):
36 raise Exception()
552a70bf
FT
37 def writehead(self, status, headers):
38 raise Exception()
46adc298 39 def flush(self):
552a70bf 40 raise Exception()
46adc298
FT
41 def close(self):
42 pass
43 def writedata(self, data):
44 self.buffer.extend(data)
552a70bf
FT
45
46 def flushreq(self):
47 if not self.respsent:
48 if not self.status:
49 raise Exception("Cannot send response body before starting response.")
50 self.respsent = True
51 self.writehead(self.status, self.headers)
52
46adc298
FT
53 def write(self, data):
54 if not data:
55 return
56 self.flushreq()
57 self.writedata(data)
58 self.handler.ckflush(self)
59
552a70bf
FT
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
46adc298 62 if exc_info:
552a70bf
FT
63 try:
64 if self.respsent:
65 raise exc_info[1]
66 finally:
46adc298 67 exc_info = None
552a70bf
FT
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
46adc298
FT
73
74class handler(object):
75 def handle(self, request):
76 raise Exception()
77 def ckflush(self, req):
78 raise Exception()
79 def close(self):
80 pass
81
82class freethread(handler):
83 def __init__(self, **kw):
84 super().__init__(**kw)
85 self.current = set()
86 self.lk = threading.Lock()
87
88 def handle(self, req):
89 reqthread(target=self.run, args=[req]).start()
90
91 def ckflush(self, req):
92 while len(req.buffer) > 0:
93 rls, wls, els = select.select([], [req], [req])
94 req.flush()
95
96 def run(self, req):
552a70bf 97 try:
46adc298
FT
98 th = threading.current_thread()
99 with self.lk:
100 self.current.add(th)
552a70bf 101 try:
46adc298
FT
102 env = req.mkenv()
103 with perf.request(env) as reqevent:
104 respiter = req.handlewsgi(env, req.startreq)
105 for data in respiter:
106 req.write(data)
107 if req.status:
108 reqevent.response([req.status, req.headers])
109 req.flushreq()
110 self.ckflush(req)
111 except closed:
112 pass
113 except:
114 log.error("exception occurred when handling request", exc_info=True)
552a70bf 115 finally:
46adc298
FT
116 with self.lk:
117 self.current.remove(th)
118 finally:
119 req.close()
120
121 def close(self):
122 while True:
123 with self.lk:
124 if len(self.current) > 0:
125 th = next(iter(self.current))
126 else:
127 th = None
128 th.join()
129
d570c3a5
FT
130class threadpool(handler):
131 def __init__(self, *, min=0, max=20, live=10, **kw):
132 super().__init__(**kw)
133 self.current = set()
134 self.free = set()
135 self.lk = threading.RLock()
136 self.pcond = threading.Condition(self.lk)
137 self.rcond = threading.Condition(self.lk)
138 self.wreq = None
139 self.min = min
140 self.max = max
141 self.live = live
142 for i in range(self.min):
143 self.newthread()
144
145 def newthread(self):
146 with self.lk:
147 th = reqthread(target=self.loop)
148 th.start()
149 while not th in self.current:
150 self.pcond.wait()
151
152 def ckflush(self, req):
153 while len(req.buffer) > 0:
154 rls, wls, els = select.select([], [req], [req])
155 req.flush()
156
157 def _handle(self, req):
158 try:
159 env = req.mkenv()
160 with perf.request(env) as reqevent:
161 respiter = req.handlewsgi(env, req.startreq)
162 for data in respiter:
163 req.write(data)
164 if req.status:
165 reqevent.response([req.status, req.headers])
166 req.flushreq()
167 self.ckflush(req)
168 except closed:
169 pass
170 except:
171 log.error("exception occurred when handling request", exc_info=True)
172 finally:
173 req.close()
174
175 def loop(self):
176 th = threading.current_thread()
177 with self.lk:
178 self.current.add(th)
179 try:
180 while True:
181 with self.lk:
182 self.free.add(th)
183 try:
184 self.pcond.notify_all()
185 now = start = time.time()
186 while self.wreq is None:
187 self.rcond.wait(start + self.live - now)
188 now = time.time()
189 if now - start > self.live:
190 if len(self.current) > self.min:
191 self.current.remove(th)
192 return
193 else:
194 start = now
195 req, self.wreq = self.wreq, None
196 self.pcond.notify_all()
197 finally:
198 self.free.remove(th)
199 self._handle(req)
200 req = None
201 finally:
202 with self.lk:
203 try:
204 self.current.remove(th)
205 except KeyError:
206 pass
207 self.pcond.notify_all()
208
209 def handle(self, req):
210 while True:
211 with self.lk:
212 if len(self.free) < 1 and len(self.current) < self.max:
213 self.newthread()
214 while self.wreq is not None:
215 self.pcond.wait()
216 if self.wreq is None:
217 self.wreq = req
218 self.rcond.notify(1)
219 return
220
221 def close(self):
222 self.live = 0
223 self.min = 0
224 with self.lk:
225 while len(self.current) > 0:
226 self.rcond.notify_all()
227 self.pcond.wait(1)
228
229names = {"free": freethread,
230 "pool": threadpool}