python3: Added utility module for dealing with asynchronous clients.
[ashd.git] / python3 / ashd / async.py
CommitLineData
c32cd0db
FT
1import os, threading, select
2
3class pool(object):
4 def __init__(self):
5 self.clients = set()
6 self.lock = threading.RLock()
7 self.th = None
8 self.ipipe = -1
9
10 def add(self, cl):
11 with self.lock:
12 self.clients.add(cl)
13 self._ckrun()
14 cl.registered = self
15 self._interrupt()
16
17 def __iter__(self):
18 with self.lock:
19 return iter([cl for cl in self.clients if not cl.closed])
20
21 def broadcast(self, data, eof=False):
22 with self.lock:
23 for cl in self:
24 cl.obuf.extend(data)
25 if eof:
26 cl.closed = True
27 self._interrupt()
28
29 def _ckrun(self):
30 if self.clients and self.th is None:
31 th = threading.Thread(target=self._run, name="Async watcher thread")
32 th.start()
33 self.th = th
34
35 def _interrupt(self):
36 fd = self.ipipe
37 if fd >= 0 and threading.current_thread() != self.th:
38 os.write(fd, b"a")
39
40 def _remove(self, cl):
41 self.clients.remove(cl)
42 cl.registered = None
43 cl._doclose()
44
45 def _run(self):
46 ipr, ipw = None, None
47 try:
48 ipr, ipw = os.pipe()
49 self.ipipe = ipw
50 while True:
51 with self.lock:
52 for cl in list(self.clients):
53 if cl.closed and not cl.writable:
54 self._remove(cl)
55 if not self.clients:
56 break
57 rsk = [cl for cl in self.clients if not cl.closed] + [ipr]
58 wsk = [cl for cl in self.clients if cl.writable]
59 # XXX: Switch to epoll.
60 rsk, wsk, esk = select.select(rsk, wsk, [])
61 for sk in rsk:
62 if sk == ipr:
63 os.read(ipr, 1024)
64 elif sk in self.clients:
65 sk._doread()
66 for sk in wsk:
67 if sk in self.clients:
68 sk._dowrite()
69 finally:
70 with self.lock:
71 self.th = None
72 self.ipipe = -1
73 self._ckrun()
74 if ipr is not None:
75 try: os.close(ipr)
76 except: pass
77 if ipw is not None:
78 try: os.close(ipw)
79 except: pass
80
81class client(object):
82 pool = None
83
84 def __init__(self, sock):
85 self.sk = sock
86 self.obuf = bytearray()
87 self.closed = False
88 self.registered = None
89 p = self.pool
90 if p is not None:
91 p.add(self)
92
93 def fileno(self):
94 return self.sk.fileno()
95
96 def close(self):
97 self.closed = True
98 if self.registered:
99 self.registered._interrupt()
100
101 def write(self, data):
102 self.obuf.extend(data)
103 if self.registered:
104 self.registered._interrupt()
105
106 @property
107 def writable(self):
108 return bool(self.obuf)
109
110 def gotdata(self, data):
111 if data == b"":
112 self.close()
113
114 def _doread(self):
115 try:
116 ret = self.sk.recv(1024)
117 except IOError:
118 self.close()
119 self.gotdata(ret)
120
121 def _dowrite(self):
122 try:
123 if self.obuf:
124 ret = self.sk.send(self.obuf)
125 self.obuf[:ret] = b""
126 except IOError:
127 self.close()
128
129 def _doclose(self):
130 try:
131 self.sk.close()
132 except IOError:
133 pass