Merge branches 'block' and 'py-reserve'
[ashd.git] / python / ashd / serve.py
1 import os, threading, time, logging
2
3 log = logging.getLogger("ashd.serve")
4 seq = 1
5 seqlk = threading.Lock()
6
7 def reqseq():
8     global seq
9     seqlk.acquire()
10     try:
11         s = seq
12         seq += 1
13         return s
14     finally:
15         seqlk.release()
16
17 class reqthread(threading.Thread):
18     def __init__(self, name=None):
19         if name is None:
20             name = "Request handler %i" % reqseq()
21         super(reqthread, self).__init__(name=name)
22
23     def handle(self):
24         raise Exception()
25
26     def run(self):
27         try:
28             self.handle()
29         except:
30             log.error("exception occurred when handling request", exc_info=True)
31
32 class closed(IOError):
33     def __init__(self):
34         super(closed, self).__init__("The client has closed the connection.")
35
36 class wsgithread(reqthread):
37     def __init__(self, **kwargs):
38         super(wsgithread, self).__init__(**kwargs)
39         self.status = None
40         self.headers = []
41         self.respsent = False
42
43     def handlewsgi(self):
44         raise Exception()
45     def writehead(self, status, headers):
46         raise Exception()
47     def writedata(self, data):
48         raise Exception()
49
50     def write(self, data):
51         if not data:
52             return
53         self.flushreq()
54         self.writedata(data)
55
56     def flushreq(self):
57         if not self.respsent:
58             if not self.status:
59                 raise Exception("Cannot send response body before starting response.")
60             self.respsent = True
61             self.writehead(self.status, self.headers)
62
63     def startreq(self, status, headers, exc_info=None):
64         if self.status:
65             if exc_info:                # Nice calling convetion ^^
66                 try:
67                     if self.respsent:
68                         raise exc_info[0], exc_info[1], exc_info[2]
69                 finally:
70                     exc_info = None     # CPython GC bug?
71             else:
72                 raise Exception("Can only start responding once.")
73         self.status = status
74         self.headers = headers
75         return self.write
76  
77     def handle(self):
78         try:
79             respiter = self.handlewsgi()
80             try:
81                 for data in respiter:
82                     self.write(data)
83                 if self.status:
84                     self.flushreq()
85             finally:
86                 if hasattr(respiter, "close"):
87                     respiter.close()
88         except closed:
89             pass
90
91 class calllimiter(object):
92     def __init__(self, limit):
93         self.limit = limit
94         self.lock = threading.Condition()
95         self.inflight = 0
96
97     def waited(self, time):
98         if time > 10:
99             raise RuntimeError("Waited too long")
100
101     def __enter__(self):
102         self.lock.acquire()
103         try:
104             start = time.time()
105             while self.inflight >= self.limit:
106                 self.lock.wait(10)
107                 self.waited(time.time() - start)
108             self.inflight += 1
109             return self
110         finally:
111             self.lock.release()
112
113     def __exit__(self, *excinfo):
114         self.lock.acquire()
115         try:
116             self.inflight -= 1
117             self.lock.notify()
118         finally:
119             self.lock.release()
120         return False
121
122     def call(self, target):
123         self.__enter__()
124         try:
125             return target()
126         finally:
127             self.__exit__()
128
129 class abortlimiter(calllimiter):
130     def waited(self, time):
131         if time > 10:
132             os.abort()