| 1 | import os, threading, select |
| 2 | |
| 3 | class pool(object): |
| 4 | def __init__(self): |
| 5 | self.clients = set() |
| 6 | self.lock = threading.RLock() |
| 7 | self.th = None |
| 8 | self.ipipe = -1 |
| 9 | |
| 10 | def add(self, cl): |
| 11 | with self.lock: |
| 12 | self.clients.add(cl) |
| 13 | self._ckrun() |
| 14 | cl.registered = self |
| 15 | self._interrupt() |
| 16 | |
| 17 | def __iter__(self): |
| 18 | with self.lock: |
| 19 | return iter([cl for cl in self.clients if not cl.closed]) |
| 20 | |
| 21 | def broadcast(self, data, eof=False): |
| 22 | with self.lock: |
| 23 | for cl in self: |
| 24 | cl.obuf.extend(data) |
| 25 | if eof: |
| 26 | cl.closed = True |
| 27 | self._interrupt() |
| 28 | |
| 29 | def _ckrun(self): |
| 30 | if self.clients and self.th is None: |
| 31 | th = threading.Thread(target=self._run, name="Async watcher thread") |
| 32 | th.start() |
| 33 | self.th = th |
| 34 | |
| 35 | def _interrupt(self): |
| 36 | fd = self.ipipe |
| 37 | if fd >= 0 and threading.current_thread() != self.th: |
| 38 | os.write(fd, b"a") |
| 39 | |
| 40 | def _remove(self, cl): |
| 41 | self.clients.remove(cl) |
| 42 | cl.registered = None |
| 43 | cl._doclose() |
| 44 | |
| 45 | def _run(self): |
| 46 | ipr, ipw = None, None |
| 47 | try: |
| 48 | ipr, ipw = os.pipe() |
| 49 | self.ipipe = ipw |
| 50 | while True: |
| 51 | with self.lock: |
| 52 | for cl in list(self.clients): |
| 53 | if cl.closed and not cl.writable: |
| 54 | self._remove(cl) |
| 55 | if not self.clients: |
| 56 | break |
| 57 | rsk = [cl for cl in self.clients if not cl.closed] + [ipr] |
| 58 | wsk = [cl for cl in self.clients if cl.writable] |
| 59 | # XXX: Switch to epoll. |
| 60 | rsk, wsk, esk = select.select(rsk, wsk, []) |
| 61 | for sk in rsk: |
| 62 | if sk == ipr: |
| 63 | os.read(ipr, 1024) |
| 64 | elif sk in self.clients: |
| 65 | sk._doread() |
| 66 | for sk in wsk: |
| 67 | if sk in self.clients: |
| 68 | sk._dowrite() |
| 69 | finally: |
| 70 | with self.lock: |
| 71 | self.th = None |
| 72 | self.ipipe = -1 |
| 73 | self._ckrun() |
| 74 | if ipr is not None: |
| 75 | try: os.close(ipr) |
| 76 | except: pass |
| 77 | if ipw is not None: |
| 78 | try: os.close(ipw) |
| 79 | except: pass |
| 80 | |
| 81 | class client(object): |
| 82 | pool = None |
| 83 | |
| 84 | def __init__(self, sock): |
| 85 | self.sk = sock |
| 86 | self.obuf = bytearray() |
| 87 | self.closed = False |
| 88 | self.registered = None |
| 89 | p = self.pool |
| 90 | if p is not None: |
| 91 | p.add(self) |
| 92 | |
| 93 | def fileno(self): |
| 94 | return self.sk.fileno() |
| 95 | |
| 96 | def close(self): |
| 97 | self.closed = True |
| 98 | if self.registered: |
| 99 | self.registered._interrupt() |
| 100 | |
| 101 | def write(self, data): |
| 102 | self.obuf.extend(data) |
| 103 | if self.registered: |
| 104 | self.registered._interrupt() |
| 105 | |
| 106 | @property |
| 107 | def writable(self): |
| 108 | return bool(self.obuf) |
| 109 | |
| 110 | def gotdata(self, data): |
| 111 | if data == b"": |
| 112 | self.close() |
| 113 | |
| 114 | def _doread(self): |
| 115 | try: |
| 116 | ret = self.sk.recv(1024) |
| 117 | except IOError: |
| 118 | self.close() |
| 119 | self.gotdata(ret) |
| 120 | |
| 121 | def _dowrite(self): |
| 122 | try: |
| 123 | if self.obuf: |
| 124 | ret = self.sk.send(self.obuf) |
| 125 | self.obuf[:ret] = b"" |
| 126 | except IOError: |
| 127 | self.close() |
| 128 | |
| 129 | def _doclose(self): |
| 130 | try: |
| 131 | self.sk.close() |
| 132 | except IOError: |
| 133 | pass |