python: Properly abort when over request limit.
[ashd.git] / python3 / ashd / serve.py
1 import os, threading, time, logging
2
3 log = logging.getLogger("ashd.serve")
4 seq = 1
5 seqlk = threading.Lock()
6
7 def reqseq():
8     global seq
9     with seqlk:
10         s = seq
11         seq += 1
12         return s
13
14 class reqthread(threading.Thread):
15     def __init__(self, name=None):
16         if name is None:
17             name = "Request handler %i" % reqseq()
18         super().__init__(name=name)
19
20     def handle(self):
21         raise Exception()
22
23     def run(self):
24         try:
25             self.handle()
26         except:
27             log.error("exception occurred when handling request", exc_info=True)
28
29 class closed(IOError):
30     def __init__(self):
31         super().__init__("The client has closed the connection.")
32
33 class wsgithread(reqthread):
34     def __init__(self, **kwargs):
35         super().__init__(**kwargs)
36         self.status = None
37         self.headers = []
38         self.respsent = False
39
40     def handlewsgi(self):
41         raise Exception()
42     def writehead(self, status, headers):
43         raise Exception()
44     def writedata(self, data):
45         raise Exception()
46
47     def write(self, data):
48         if not data:
49             return
50         self.flushreq()
51         self.writedata(data)
52
53     def flushreq(self):
54         if not self.respsent:
55             if not self.status:
56                 raise Exception("Cannot send response body before starting response.")
57             self.respsent = True
58             self.writehead(self.status, self.headers)
59
60     def startreq(self, status, headers, exc_info=None):
61         if self.status:
62             if exc_info:                # Nice calling convetion ^^
63                 try:
64                     if self.respsent:
65                         raise exc_info[1]
66                 finally:
67                     exc_info = None     # CPython GC bug?
68             else:
69                 raise Exception("Can only start responding once.")
70         self.status = status
71         self.headers = headers
72         return self.write
73  
74     def handle(self):
75         try:
76             respiter = self.handlewsgi()
77             try:
78                 for data in respiter:
79                     self.write(data)
80                 if self.status:
81                     self.flushreq()
82             finally:
83                 if hasattr(respiter, "close"):
84                     respiter.close()
85         except closed:
86             pass
87
88 class calllimiter(object):
89     def __init__(self, limit):
90         self.limit = limit
91         self.lock = threading.Condition()
92         self.inflight = 0
93
94     def waited(self, time):
95         if time > 10:
96             raise RuntimeError("Waited too long")
97
98     def __enter__(self):
99         with self.lock:
100             start = time.time()
101             while self.inflight >= self.limit:
102                 self.lock.wait(10)
103                 self.waited(time.time() - start)
104             self.inflight += 1
105             return self
106
107     def __exit__(self, *excinfo):
108         with self.lock:
109             self.inflight -= 1
110             self.lock.notify()
111         return False
112
113     def call(self, target):
114         with self:
115             return target()
116
117 class abortlimiter(calllimiter):
118     def waited(self, time):
119         if time > 10:
120             os.abort()