Commit | Line | Data |
---|---|---|
c32cd0db FT |
1 | import os, threading, select |
2 | ||
3 | class pool(object): | |
4 | def __init__(self): | |
5 | self.clients = set() | |
6 | self.lock = threading.RLock() | |
7 | self.th = None | |
8 | self.ipipe = -1 | |
9 | ||
10 | def add(self, cl): | |
11 | with self.lock: | |
12 | self.clients.add(cl) | |
13 | self._ckrun() | |
14 | cl.registered = self | |
15 | self._interrupt() | |
16 | ||
17 | def __iter__(self): | |
18 | with self.lock: | |
19 | return iter([cl for cl in self.clients if not cl.closed]) | |
20 | ||
21 | def broadcast(self, data, eof=False): | |
22 | with self.lock: | |
23 | for cl in self: | |
24 | cl.obuf.extend(data) | |
25 | if eof: | |
26 | cl.closed = True | |
27 | self._interrupt() | |
28 | ||
29 | def _ckrun(self): | |
30 | if self.clients and self.th is None: | |
31 | th = threading.Thread(target=self._run, name="Async watcher thread") | |
32 | th.start() | |
33 | self.th = th | |
34 | ||
35 | def _interrupt(self): | |
36 | fd = self.ipipe | |
37 | if fd >= 0 and threading.current_thread() != self.th: | |
38 | os.write(fd, b"a") | |
39 | ||
40 | def _remove(self, cl): | |
41 | self.clients.remove(cl) | |
42 | cl.registered = None | |
43 | cl._doclose() | |
44 | ||
45 | def _run(self): | |
46 | ipr, ipw = None, None | |
47 | try: | |
48 | ipr, ipw = os.pipe() | |
49 | self.ipipe = ipw | |
50 | while True: | |
51 | with self.lock: | |
52 | for cl in list(self.clients): | |
53 | if cl.closed and not cl.writable: | |
54 | self._remove(cl) | |
55 | if not self.clients: | |
56 | break | |
57 | rsk = [cl for cl in self.clients if not cl.closed] + [ipr] | |
58 | wsk = [cl for cl in self.clients if cl.writable] | |
59 | # XXX: Switch to epoll. | |
60 | rsk, wsk, esk = select.select(rsk, wsk, []) | |
61 | for sk in rsk: | |
62 | if sk == ipr: | |
63 | os.read(ipr, 1024) | |
64 | elif sk in self.clients: | |
65 | sk._doread() | |
66 | for sk in wsk: | |
67 | if sk in self.clients: | |
68 | sk._dowrite() | |
69 | finally: | |
70 | with self.lock: | |
71 | self.th = None | |
72 | self.ipipe = -1 | |
73 | self._ckrun() | |
74 | if ipr is not None: | |
75 | try: os.close(ipr) | |
76 | except: pass | |
77 | if ipw is not None: | |
78 | try: os.close(ipw) | |
79 | except: pass | |
80 | ||
81 | class client(object): | |
82 | pool = None | |
83 | ||
84 | def __init__(self, sock): | |
85 | self.sk = sock | |
86 | self.obuf = bytearray() | |
87 | self.closed = False | |
88 | self.registered = None | |
89 | p = self.pool | |
90 | if p is not None: | |
91 | p.add(self) | |
92 | ||
93 | def fileno(self): | |
94 | return self.sk.fileno() | |
95 | ||
96 | def close(self): | |
97 | self.closed = True | |
98 | if self.registered: | |
99 | self.registered._interrupt() | |
100 | ||
101 | def write(self, data): | |
102 | self.obuf.extend(data) | |
103 | if self.registered: | |
104 | self.registered._interrupt() | |
105 | ||
106 | @property | |
107 | def writable(self): | |
108 | return bool(self.obuf) | |
109 | ||
110 | def gotdata(self, data): | |
111 | if data == b"": | |
112 | self.close() | |
113 | ||
114 | def _doread(self): | |
115 | try: | |
116 | ret = self.sk.recv(1024) | |
117 | except IOError: | |
118 | self.close() | |
119 | self.gotdata(ret) | |
120 | ||
121 | def _dowrite(self): | |
122 | try: | |
123 | if self.obuf: | |
124 | ret = self.sk.send(self.obuf) | |
125 | self.obuf[:ret] = b"" | |
126 | except IOError: | |
127 | self.close() | |
128 | ||
129 | def _doclose(self): | |
130 | try: | |
131 | self.sk.close() | |
132 | except IOError: | |
133 | pass |