From: Fredrik Tolf Date: Sat, 11 Jun 2016 00:59:40 +0000 (+0200) Subject: python3: Added utility module for dealing with asynchronous clients. X-Git-Url: http://www.dolda2000.com/gitweb/?p=ashd.git;a=commitdiff_plain;h=c32cd0db68074ae06546406848e85ee2b4a86451 python3: Added utility module for dealing with asynchronous clients. --- diff --git a/python3/ashd/async.py b/python3/ashd/async.py new file mode 100644 index 0000000..3c62820 --- /dev/null +++ b/python3/ashd/async.py @@ -0,0 +1,133 @@ +import os, threading, select + +class pool(object): + def __init__(self): + self.clients = set() + self.lock = threading.RLock() + self.th = None + self.ipipe = -1 + + def add(self, cl): + with self.lock: + self.clients.add(cl) + self._ckrun() + cl.registered = self + self._interrupt() + + def __iter__(self): + with self.lock: + return iter([cl for cl in self.clients if not cl.closed]) + + def broadcast(self, data, eof=False): + with self.lock: + for cl in self: + cl.obuf.extend(data) + if eof: + cl.closed = True + self._interrupt() + + def _ckrun(self): + if self.clients and self.th is None: + th = threading.Thread(target=self._run, name="Async watcher thread") + th.start() + self.th = th + + def _interrupt(self): + fd = self.ipipe + if fd >= 0 and threading.current_thread() != self.th: + os.write(fd, b"a") + + def _remove(self, cl): + self.clients.remove(cl) + cl.registered = None + cl._doclose() + + def _run(self): + ipr, ipw = None, None + try: + ipr, ipw = os.pipe() + self.ipipe = ipw + while True: + with self.lock: + for cl in list(self.clients): + if cl.closed and not cl.writable: + self._remove(cl) + if not self.clients: + break + rsk = [cl for cl in self.clients if not cl.closed] + [ipr] + wsk = [cl for cl in self.clients if cl.writable] + # XXX: Switch to epoll. + rsk, wsk, esk = select.select(rsk, wsk, []) + for sk in rsk: + if sk == ipr: + os.read(ipr, 1024) + elif sk in self.clients: + sk._doread() + for sk in wsk: + if sk in self.clients: + sk._dowrite() + finally: + with self.lock: + self.th = None + self.ipipe = -1 + self._ckrun() + if ipr is not None: + try: os.close(ipr) + except: pass + if ipw is not None: + try: os.close(ipw) + except: pass + +class client(object): + pool = None + + def __init__(self, sock): + self.sk = sock + self.obuf = bytearray() + self.closed = False + self.registered = None + p = self.pool + if p is not None: + p.add(self) + + def fileno(self): + return self.sk.fileno() + + def close(self): + self.closed = True + if self.registered: + self.registered._interrupt() + + def write(self, data): + self.obuf.extend(data) + if self.registered: + self.registered._interrupt() + + @property + def writable(self): + return bool(self.obuf) + + def gotdata(self, data): + if data == b"": + self.close() + + def _doread(self): + try: + ret = self.sk.recv(1024) + except IOError: + self.close() + self.gotdata(ret) + + def _dowrite(self): + try: + if self.obuf: + ret = self.sk.send(self.obuf) + self.obuf[:ret] = b"" + except IOError: + self.close() + + def _doclose(self): + try: + self.sk.close() + except IOError: + pass