python3: Added utility module for dealing with asynchronous clients.
authorFredrik Tolf <fredrik@dolda2000.com>
Sat, 11 Jun 2016 00:59:40 +0000 (02:59 +0200)
committerFredrik Tolf <fredrik@dolda2000.com>
Sat, 11 Jun 2016 00:59:40 +0000 (02:59 +0200)
python3/ashd/async.py [new file with mode: 0644]

diff --git a/python3/ashd/async.py b/python3/ashd/async.py
new file mode 100644 (file)
index 0000000..3c62820
--- /dev/null
@@ -0,0 +1,133 @@
+import os, threading, select
+
+class pool(object):
+    def __init__(self):
+        self.clients = set()
+        self.lock = threading.RLock()
+        self.th = None
+        self.ipipe = -1
+
+    def add(self, cl):
+        with self.lock:
+            self.clients.add(cl)
+            self._ckrun()
+            cl.registered = self
+        self._interrupt()
+
+    def __iter__(self):
+        with self.lock:
+            return iter([cl for cl in self.clients if not cl.closed])
+
+    def broadcast(self, data, eof=False):
+        with self.lock:
+            for cl in self:
+                cl.obuf.extend(data)
+                if eof:
+                    cl.closed = True
+        self._interrupt()
+
+    def _ckrun(self):
+        if self.clients and self.th is None:
+            th = threading.Thread(target=self._run, name="Async watcher thread")
+            th.start()
+            self.th = th
+
+    def _interrupt(self):
+        fd = self.ipipe
+        if fd >= 0 and  threading.current_thread() != self.th:
+            os.write(fd, b"a")
+
+    def _remove(self, cl):
+        self.clients.remove(cl)
+        cl.registered = None
+        cl._doclose()
+
+    def _run(self):
+        ipr, ipw = None, None
+        try:
+            ipr, ipw = os.pipe()
+            self.ipipe = ipw
+            while True:
+                with self.lock:
+                    for cl in list(self.clients):
+                        if cl.closed and not cl.writable:
+                            self._remove(cl)
+                    if not self.clients:
+                        break
+                    rsk = [cl for cl in self.clients if not cl.closed] + [ipr]
+                    wsk = [cl for cl in self.clients if cl.writable]
+                # XXX: Switch to epoll.
+                rsk, wsk, esk = select.select(rsk, wsk, [])
+                for sk in rsk:
+                    if sk == ipr:
+                        os.read(ipr, 1024)
+                    elif sk in self.clients:
+                        sk._doread()
+                for sk in wsk:
+                    if sk in self.clients:
+                        sk._dowrite()
+        finally:
+            with self.lock:
+                self.th = None
+                self.ipipe = -1
+                self._ckrun()
+            if ipr is not None:
+                try: os.close(ipr)
+                except: pass
+            if ipw is not None:
+                try: os.close(ipw)
+                except: pass
+
+class client(object):
+    pool = None
+
+    def __init__(self, sock):
+        self.sk = sock
+        self.obuf = bytearray()
+        self.closed = False
+        self.registered = None
+        p = self.pool
+        if p is not None:
+            p.add(self)
+
+    def fileno(self):
+        return self.sk.fileno()
+
+    def close(self):
+        self.closed = True
+        if self.registered:
+            self.registered._interrupt()
+
+    def write(self, data):
+        self.obuf.extend(data)
+        if self.registered:
+            self.registered._interrupt()
+
+    @property
+    def writable(self):
+        return bool(self.obuf)
+
+    def gotdata(self, data):
+        if data == b"":
+            self.close()
+
+    def _doread(self):
+        try:
+            ret = self.sk.recv(1024)
+        except IOError:
+            self.close()
+        self.gotdata(ret)
+
+    def _dowrite(self):
+        try:
+            if self.obuf:
+                ret = self.sk.send(self.obuf)
+                self.obuf[:ret] = b""
+        except IOError:
+            self.close()
+
+    def _doclose(self):
+        try:
+            self.sk.close()
+        except IOError:
+            pass