python: Added a Python3 version of ashd.serve.
[ashd.git] / python3 / ashd / serve.py
... / ...
CommitLineData
1import threading, time, logging
2
3log = logging.getLogger("ashd.serve")
4seq = 1
5seqlk = threading.Lock()
6
7def reqseq():
8 global seq
9 with seqlk:
10 s = seq
11 seq += 1
12 return s
13
14class reqthread(threading.Thread):
15 def __init__(self, name=None):
16 if name is None:
17 name = "Request handler %i" % reqseq()
18 super().__init__(name=name)
19
20 def handle(self):
21 raise Exception()
22
23 def run(self):
24 try:
25 self.handle()
26 except:
27 log.error("exception occurred when handling request", exc_info=True)
28
29class closed(IOError):
30 def __init__(self):
31 super().__init__("The client has closed the connection.")
32
33class wsgithread(reqthread):
34 def __init__(self, **kwargs):
35 super().__init__(**kwargs)
36 self.status = None
37 self.headers = []
38 self.respsent = False
39
40 def handlewsgi(self):
41 raise Exception()
42 def writehead(self, status, headers):
43 raise Exception()
44 def writedata(self, data):
45 raise Exception()
46
47 def write(self, data):
48 if not data:
49 return
50 self.flushreq()
51 self.writedata(data)
52
53 def flushreq(self):
54 if not self.respsent:
55 if not self.status:
56 raise Exception("Cannot send response body before starting response.")
57 self.respsent = True
58 self.writehead(self.status, self.headers)
59
60 def startreq(self, status, headers, exc_info=None):
61 if self.status:
62 if exc_info: # Nice calling convetion ^^
63 try:
64 if self.respsent:
65 raise exc_info[1]
66 finally:
67 exc_info = None # CPython GC bug?
68 else:
69 raise Exception("Can only start responding once.")
70 self.status = status
71 self.headers = headers
72 return self.write
73
74 def handle(self):
75 try:
76 respiter = self.handlewsgi()
77 try:
78 for data in respiter:
79 self.write(data)
80 if self.status:
81 self.flushreq()
82 finally:
83 if hasattr(respiter, "close"):
84 respiter.close()
85 except closed:
86 pass
87
88class calllimiter(object):
89 def __init__(self, limit):
90 self.limit = limit
91 self.lock = threading.Condition()
92 self.inflight = 0
93
94 def waited(self, time):
95 if time > 10:
96 raise RuntimeError("Waited too long")
97
98 def __enter__(self):
99 with self.lock:
100 start = time.time()
101 while self.inflight >= self.limit:
102 self.lock.wait(10)
103 self.waited(time.time() - start)
104 self.inflight += 1
105 return self
106
107 def __exit__(self, *excinfo):
108 with self.lock:
109 self.inflight -= 1
110 self.lock.notify()
111 return False
112
113 def call(self, target):
114 with self:
115 return target()