python: Properly abort when over request limit.
[ashd.git] / python / ashd / serve.py
... / ...
CommitLineData
1import os, threading, time, logging
2
3log = logging.getLogger("ashd.serve")
4seq = 1
5seqlk = threading.Lock()
6
7def reqseq():
8 global seq
9 seqlk.acquire()
10 try:
11 s = seq
12 seq += 1
13 return s
14 finally:
15 seqlk.release()
16
17class reqthread(threading.Thread):
18 def __init__(self, name=None):
19 if name is None:
20 name = "Request handler %i" % reqseq()
21 super(reqthread, self).__init__(name=name)
22
23 def handle(self):
24 raise Exception()
25
26 def run(self):
27 try:
28 self.handle()
29 except:
30 log.error("exception occurred when handling request", exc_info=True)
31
32class closed(IOError):
33 def __init__(self):
34 super(closed, self).__init__("The client has closed the connection.")
35
36class wsgithread(reqthread):
37 def __init__(self, **kwargs):
38 super(wsgithread, self).__init__(**kwargs)
39 self.status = None
40 self.headers = []
41 self.respsent = False
42
43 def handlewsgi(self):
44 raise Exception()
45 def writehead(self, status, headers):
46 raise Exception()
47 def writedata(self, data):
48 raise Exception()
49
50 def write(self, data):
51 if not data:
52 return
53 self.flushreq()
54 self.writedata(data)
55
56 def flushreq(self):
57 if not self.respsent:
58 if not self.status:
59 raise Exception("Cannot send response body before starting response.")
60 self.respsent = True
61 self.writehead(self.status, self.headers)
62
63 def startreq(self, status, headers, exc_info=None):
64 if self.status:
65 if exc_info: # Nice calling convetion ^^
66 try:
67 if self.respsent:
68 raise exc_info[0], exc_info[1], exc_info[2]
69 finally:
70 exc_info = None # CPython GC bug?
71 else:
72 raise Exception("Can only start responding once.")
73 self.status = status
74 self.headers = headers
75 return self.write
76
77 def handle(self):
78 try:
79 respiter = self.handlewsgi()
80 try:
81 for data in respiter:
82 self.write(data)
83 if self.status:
84 self.flushreq()
85 finally:
86 if hasattr(respiter, "close"):
87 respiter.close()
88 except closed:
89 pass
90
91class calllimiter(object):
92 def __init__(self, limit):
93 self.limit = limit
94 self.lock = threading.Condition()
95 self.inflight = 0
96
97 def waited(self, time):
98 if time > 10:
99 raise RuntimeError("Waited too long")
100
101 def __enter__(self):
102 self.lock.acquire()
103 try:
104 start = time.time()
105 while self.inflight >= self.limit:
106 self.lock.wait(10)
107 self.waited(time.time() - start)
108 self.inflight += 1
109 return self
110 finally:
111 self.lock.release()
112
113 def __exit__(self, *excinfo):
114 self.lock.acquire()
115 try:
116 self.inflight -= 1
117 self.lock.notify()
118 finally:
119 self.lock.release()
120 return False
121
122 def call(self, target):
123 self.__enter__()
124 try:
125 return target()
126 finally:
127 self.__exit__()
128
129class abortlimiter(calllimiter):
130 def waited(self, time):
131 if time > 10:
132 os.abort()