1 import os, threading, select
6 self.lock = threading.RLock()
19 return iter([cl for cl in self.clients if not cl.closed])
21 def broadcast(self, data, eof=False):
30 if self.clients and self.th is None:
31 th = threading.Thread(target=self._run, name="Async watcher thread")
37 if fd >= 0 and threading.current_thread() != self.th:
40 def _remove(self, cl):
41 self.clients.remove(cl)
52 for cl in list(self.clients):
53 if cl.closed and not cl.writable:
57 rsk = [cl for cl in self.clients if not cl.closed] + [ipr]
58 wsk = [cl for cl in self.clients if cl.writable]
59 # XXX: Switch to epoll.
60 rsk, wsk, esk = select.select(rsk, wsk, [])
64 elif sk in self.clients:
67 if sk in self.clients:
84 def __init__(self, sock):
86 self.obuf = bytearray()
88 self.registered = None
94 return self.sk.fileno()
99 self.registered._interrupt()
101 def write(self, data):
102 self.obuf.extend(data)
104 self.registered._interrupt()
108 return bool(self.obuf)
110 def gotdata(self, data):
116 ret = self.sk.recv(1024)
124 ret = self.sk.send(self.obuf)
125 self.obuf[:ret] = b""