python: Added command-line parsing of request handler and its arguments.
[ashd.git] / python3 / ashd / serve.py
1 import sys, os, threading, time, logging, select
2 from . import perf
3
4 log = logging.getLogger("ashd.serve")
5 seq = 1
6 seqlk = threading.Lock()
7
8 def reqseq():
9     global seq
10     with seqlk:
11         s = seq
12         seq += 1
13         return s
14
15 class closed(IOError):
16     def __init__(self):
17         super().__init__("The client has closed the connection.")
18
19 class reqthread(threading.Thread):
20     def __init__(self, *, name=None, **kw):
21         if name is None:
22             name = "Request handler %i" % reqseq()
23         super().__init__(name=name, **kw)
24
25 class wsgirequest(object):
26     def __init__(self, handler):
27         self.status = None
28         self.headers = []
29         self.respsent = False
30         self.handler = handler
31         self.buffer = bytearray()
32
33     def handlewsgi(self):
34         raise Exception()
35     def fileno(self):
36         raise Exception()
37     def writehead(self, status, headers):
38         raise Exception()
39     def flush(self):
40         raise Exception()
41     def close(self):
42         pass
43     def writedata(self, data):
44         self.buffer.extend(data)
45
46     def flushreq(self):
47         if not self.respsent:
48             if not self.status:
49                 raise Exception("Cannot send response body before starting response.")
50             self.respsent = True
51             self.writehead(self.status, self.headers)
52
53     def write(self, data):
54         if not data:
55             return
56         self.flushreq()
57         self.writedata(data)
58         self.handler.ckflush(self)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73
74 class handler(object):
75     def handle(self, request):
76         raise Exception()
77     def ckflush(self, req):
78         raise Exception()
79     def close(self):
80         pass
81
82     @classmethod
83     def parseargs(cls, **args):
84         if len(args) > 0:
85             raise ValueError("unknown handler argument: " + next(iter(args)))
86         return {}
87
88 class freethread(handler):
89     def __init__(self, **kw):
90         super().__init__(**kw)
91         self.current = set()
92         self.lk = threading.Lock()
93
94     def handle(self, req):
95         reqthread(target=self.run, args=[req]).start()
96
97     def ckflush(self, req):
98         while len(req.buffer) > 0:
99             rls, wls, els = select.select([], [req], [req])
100             req.flush()
101
102     def run(self, req):
103         try:
104             th = threading.current_thread()
105             with self.lk:
106                 self.current.add(th)
107             try:
108                 env = req.mkenv()
109                 with perf.request(env) as reqevent:
110                     respiter = req.handlewsgi(env, req.startreq)
111                     for data in respiter:
112                         req.write(data)
113                     if req.status:
114                         reqevent.response([req.status, req.headers])
115                         req.flushreq()
116                     self.ckflush(req)
117             except closed:
118                 pass
119             except:
120                 log.error("exception occurred when handling request", exc_info=True)
121             finally:
122                 with self.lk:
123                     self.current.remove(th)
124         finally:
125             req.close()
126
127     def close(self):
128         while True:
129             with self.lk:
130                 if len(self.current) > 0:
131                     th = next(iter(self.current))
132                 else:
133                     return
134             th.join()
135
136 class threadpool(handler):
137     def __init__(self, *, min=0, max=20, live=300, **kw):
138         super().__init__(**kw)
139         self.current = set()
140         self.free = set()
141         self.lk = threading.RLock()
142         self.pcond = threading.Condition(self.lk)
143         self.rcond = threading.Condition(self.lk)
144         self.wreq = None
145         self.min = min
146         self.max = max
147         self.live = live
148         for i in range(self.min):
149             self.newthread()
150
151     @classmethod
152     def parseargs(cls, *, min=None, max=None, live=None, **args):
153         ret = super().parseargs(**args)
154         if min:
155             ret["min"] = int(min)
156         if max:
157             ret["max"] = int(max)
158         if live:
159             ret["live"] = int(live)
160         return ret
161
162     def newthread(self):
163         with self.lk:
164             th = reqthread(target=self.loop)
165             th.start()
166             while not th in self.current:
167                 self.pcond.wait()
168
169     def ckflush(self, req):
170         while len(req.buffer) > 0:
171             rls, wls, els = select.select([], [req], [req])
172             req.flush()
173
174     def _handle(self, req):
175         try:
176             env = req.mkenv()
177             with perf.request(env) as reqevent:
178                 respiter = req.handlewsgi(env, req.startreq)
179                 for data in respiter:
180                     req.write(data)
181                 if req.status:
182                     reqevent.response([req.status, req.headers])
183                     req.flushreq()
184                 self.ckflush(req)
185         except closed:
186             pass
187         except:
188             log.error("exception occurred when handling request", exc_info=True)
189         finally:
190             req.close()
191
192     def loop(self):
193         th = threading.current_thread()
194         with self.lk:
195             self.current.add(th)
196         try:
197             while True:
198                 with self.lk:
199                     self.free.add(th)
200                     try:
201                         self.pcond.notify_all()
202                         now = start = time.time()
203                         while self.wreq is None:
204                             self.rcond.wait(start + self.live - now)
205                             now = time.time()
206                             if now - start > self.live:
207                                 if len(self.current) > self.min:
208                                     self.current.remove(th)
209                                     return
210                                 else:
211                                     start = now
212                         req, self.wreq = self.wreq, None
213                         self.pcond.notify_all()
214                     finally:
215                         self.free.remove(th)
216                 self._handle(req)
217                 req = None
218         finally:
219             with self.lk:
220                 try:
221                     self.current.remove(th)
222                 except KeyError:
223                     pass
224                 self.pcond.notify_all()
225
226     def handle(self, req):
227         while True:
228             with self.lk:
229                 if len(self.free) < 1 and len(self.current) < self.max:
230                     self.newthread()
231                 while self.wreq is not None:
232                     self.pcond.wait()
233                 if self.wreq is None:
234                     self.wreq = req
235                     self.rcond.notify(1)
236                     return
237
238     def close(self):
239         self.live = 0
240         self.min = 0
241         with self.lk:
242             while len(self.current) > 0:
243                 self.rcond.notify_all()
244                 self.pcond.wait(1)
245
246 names = {"free": freethread,
247          "pool": threadpool}
248
249 def parsehspec(spec):
250     if ":" not in spec:
251         return spec, {}
252     nm, spec = spec.split(":", 1)
253     args = {}
254     while spec:
255         if "," in spec:
256             part, spec = spec.split(",", 1)
257         else:
258             part, spec = spec, None
259         if "=" in part:
260             key, val = part.split("=", 1)
261         else:
262             key, val = part, ""
263         args[key] = val
264     return nm, args