python3: Rename async module to asyncio for Python 3.7 compatibility.
[ashd.git] / python3 / ashd / async.py
index 3c62820..b78920f 100644 (file)
@@ -1,133 +1 @@
-import os, threading, select
-
-class pool(object):
-    def __init__(self):
-        self.clients = set()
-        self.lock = threading.RLock()
-        self.th = None
-        self.ipipe = -1
-
-    def add(self, cl):
-        with self.lock:
-            self.clients.add(cl)
-            self._ckrun()
-            cl.registered = self
-        self._interrupt()
-
-    def __iter__(self):
-        with self.lock:
-            return iter([cl for cl in self.clients if not cl.closed])
-
-    def broadcast(self, data, eof=False):
-        with self.lock:
-            for cl in self:
-                cl.obuf.extend(data)
-                if eof:
-                    cl.closed = True
-        self._interrupt()
-
-    def _ckrun(self):
-        if self.clients and self.th is None:
-            th = threading.Thread(target=self._run, name="Async watcher thread")
-            th.start()
-            self.th = th
-
-    def _interrupt(self):
-        fd = self.ipipe
-        if fd >= 0 and  threading.current_thread() != self.th:
-            os.write(fd, b"a")
-
-    def _remove(self, cl):
-        self.clients.remove(cl)
-        cl.registered = None
-        cl._doclose()
-
-    def _run(self):
-        ipr, ipw = None, None
-        try:
-            ipr, ipw = os.pipe()
-            self.ipipe = ipw
-            while True:
-                with self.lock:
-                    for cl in list(self.clients):
-                        if cl.closed and not cl.writable:
-                            self._remove(cl)
-                    if not self.clients:
-                        break
-                    rsk = [cl for cl in self.clients if not cl.closed] + [ipr]
-                    wsk = [cl for cl in self.clients if cl.writable]
-                # XXX: Switch to epoll.
-                rsk, wsk, esk = select.select(rsk, wsk, [])
-                for sk in rsk:
-                    if sk == ipr:
-                        os.read(ipr, 1024)
-                    elif sk in self.clients:
-                        sk._doread()
-                for sk in wsk:
-                    if sk in self.clients:
-                        sk._dowrite()
-        finally:
-            with self.lock:
-                self.th = None
-                self.ipipe = -1
-                self._ckrun()
-            if ipr is not None:
-                try: os.close(ipr)
-                except: pass
-            if ipw is not None:
-                try: os.close(ipw)
-                except: pass
-
-class client(object):
-    pool = None
-
-    def __init__(self, sock):
-        self.sk = sock
-        self.obuf = bytearray()
-        self.closed = False
-        self.registered = None
-        p = self.pool
-        if p is not None:
-            p.add(self)
-
-    def fileno(self):
-        return self.sk.fileno()
-
-    def close(self):
-        self.closed = True
-        if self.registered:
-            self.registered._interrupt()
-
-    def write(self, data):
-        self.obuf.extend(data)
-        if self.registered:
-            self.registered._interrupt()
-
-    @property
-    def writable(self):
-        return bool(self.obuf)
-
-    def gotdata(self, data):
-        if data == b"":
-            self.close()
-
-    def _doread(self):
-        try:
-            ret = self.sk.recv(1024)
-        except IOError:
-            self.close()
-        self.gotdata(ret)
-
-    def _dowrite(self):
-        try:
-            if self.obuf:
-                ret = self.sk.send(self.obuf)
-                self.obuf[:ret] = b""
-        except IOError:
-            self.close()
-
-    def _doclose(self):
-        try:
-            self.sk.close()
-        except IOError:
-            pass
+from .asyncio import *