python3: Added utility module for dealing with asynchronous clients.
[ashd.git] / python3 / ashd / async.py
1 import os, threading, select
2
3 class pool(object):
4     def __init__(self):
5         self.clients = set()
6         self.lock = threading.RLock()
7         self.th = None
8         self.ipipe = -1
9
10     def add(self, cl):
11         with self.lock:
12             self.clients.add(cl)
13             self._ckrun()
14             cl.registered = self
15         self._interrupt()
16
17     def __iter__(self):
18         with self.lock:
19             return iter([cl for cl in self.clients if not cl.closed])
20
21     def broadcast(self, data, eof=False):
22         with self.lock:
23             for cl in self:
24                 cl.obuf.extend(data)
25                 if eof:
26                     cl.closed = True
27         self._interrupt()
28
29     def _ckrun(self):
30         if self.clients and self.th is None:
31             th = threading.Thread(target=self._run, name="Async watcher thread")
32             th.start()
33             self.th = th
34
35     def _interrupt(self):
36         fd = self.ipipe
37         if fd >= 0 and  threading.current_thread() != self.th:
38             os.write(fd, b"a")
39
40     def _remove(self, cl):
41         self.clients.remove(cl)
42         cl.registered = None
43         cl._doclose()
44
45     def _run(self):
46         ipr, ipw = None, None
47         try:
48             ipr, ipw = os.pipe()
49             self.ipipe = ipw
50             while True:
51                 with self.lock:
52                     for cl in list(self.clients):
53                         if cl.closed and not cl.writable:
54                             self._remove(cl)
55                     if not self.clients:
56                         break
57                     rsk = [cl for cl in self.clients if not cl.closed] + [ipr]
58                     wsk = [cl for cl in self.clients if cl.writable]
59                 # XXX: Switch to epoll.
60                 rsk, wsk, esk = select.select(rsk, wsk, [])
61                 for sk in rsk:
62                     if sk == ipr:
63                         os.read(ipr, 1024)
64                     elif sk in self.clients:
65                         sk._doread()
66                 for sk in wsk:
67                     if sk in self.clients:
68                         sk._dowrite()
69         finally:
70             with self.lock:
71                 self.th = None
72                 self.ipipe = -1
73                 self._ckrun()
74             if ipr is not None:
75                 try: os.close(ipr)
76                 except: pass
77             if ipw is not None:
78                 try: os.close(ipw)
79                 except: pass
80
81 class client(object):
82     pool = None
83
84     def __init__(self, sock):
85         self.sk = sock
86         self.obuf = bytearray()
87         self.closed = False
88         self.registered = None
89         p = self.pool
90         if p is not None:
91             p.add(self)
92
93     def fileno(self):
94         return self.sk.fileno()
95
96     def close(self):
97         self.closed = True
98         if self.registered:
99             self.registered._interrupt()
100
101     def write(self, data):
102         self.obuf.extend(data)
103         if self.registered:
104             self.registered._interrupt()
105
106     @property
107     def writable(self):
108         return bool(self.obuf)
109
110     def gotdata(self, data):
111         if data == b"":
112             self.close()
113
114     def _doread(self):
115         try:
116             ret = self.sk.recv(1024)
117         except IOError:
118             self.close()
119         self.gotdata(ret)
120
121     def _dowrite(self):
122         try:
123             if self.obuf:
124                 ret = self.sk.send(self.obuf)
125                 self.obuf[:ret] = b""
126         except IOError:
127             self.close()
128
129     def _doclose(self):
130         try:
131             self.sk.close()
132         except IOError:
133             pass