From bb1ec927b657639ec5907f73623ea4b4d5c6a5fb Mon Sep 17 00:00:00 2001 From: Fredrik Tolf Date: Sat, 27 Aug 2022 15:50:28 +0200 Subject: [PATCH] python3: Rename async module to asyncio for Python 3.7 compatibility. ashd.async is still around for now as a compatibility measure, simply re-importing everything from asyncio. --- python3/ashd/async.py | 301 +----------------------------------------------- python3/ashd/asyncio.py | 300 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 301 insertions(+), 300 deletions(-) create mode 100644 python3/ashd/asyncio.py diff --git a/python3/ashd/async.py b/python3/ashd/async.py index 238f6b7..b78920f 100644 --- a/python3/ashd/async.py +++ b/python3/ashd/async.py @@ -1,300 +1 @@ -import sys, os, errno, threading, select, traceback - -class epoller(object): - exc_handler = None - - def __init__(self, check=None): - self.registered = {} - self.fdcache = {} - self.lock = threading.RLock() - self.ep = None - self.th = None - self.stopped = False - self.loopcheck = set() - if check is not None: - self.loopcheck.add(check) - self._daemon = True - - @staticmethod - def _evsfor(ch): - return ((select.EPOLLIN if ch.readable else 0) | - (select.EPOLLOUT if ch.writable else 0)) - - def _ckrun(self): - if self.registered and self.th is None: - th = threading.Thread(target=self._run, name="Async epoll thread") - th.daemon = self._daemon - th.start() - self.th = th - - def exception(self, ch, *exc): - self.remove(ch) - if self.exc_handler is None: - traceback.print_exception(*exc) - else: - self.exc_handler(ch, *exc) - - def _cb(self, ch, nm): - try: - m = getattr(ch, nm, None) - if m is None: - raise AttributeError("%r has no %s method" % (ch, nm)) - m() - except Exception as exc: - self.exception(ch, *sys.exc_info()) - - def _closeall(self): - with self.lock: - while self.registered: - fd, (ch, evs) = next(iter(self.registered.items())) - del self.registered[fd] - self.ep.unregister(fd) - self._cb(ch, "close") - - def _run(self): - ep = select.epoll() - try: - with self.lock: - try: - for fd, (ob, evs) in self.registered.items(): - ep.register(fd, evs) - except: - self.registered.clear() - raise - self.ep = ep - - while self.registered: - for ck in self.loopcheck: - ck(self) - if self.stopped: - self._closeall() - break - try: - evlist = ep.poll(10) - except IOError as exc: - if exc.errno == errno.EINTR: - continue - raise - for fd, evs in evlist: - with self.lock: - if fd not in self.registered: - continue - ch, cevs = self.registered[fd] - if fd in self.registered and evs & (select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR): - self._cb(ch, "read") - if fd in self.registered and evs & select.EPOLLOUT: - self._cb(ch, "write") - if fd in self.registered: - nevs = self._evsfor(ch) - if nevs == 0: - del self.fdcache[ch] - del self.registered[fd] - ep.unregister(fd) - self._cb(ch, "close") - elif nevs != cevs: - self.registered[fd] = ch, nevs - ep.modify(fd, nevs) - - finally: - with self.lock: - self.th = None - self.ep = None - self._ckrun() - ep.close() - - @property - def daemon(self): return self._daemon - @daemon.setter - def daemon(self, value): - self._daemon = bool(value) - with self.lock: - if self.th is not None: - self.th = daemon = self._daemon - - def add(self, ch): - with self.lock: - fd = ch.fileno() - if fd in self.registered: - raise KeyError("fd %i is already registered" % fd) - evs = self._evsfor(ch) - if evs == 0: - ch.close() - return - ch.watcher = self - self.fdcache[ch] = fd - self.registered[fd] = (ch, evs) - if self.ep: - self.ep.register(fd, evs) - self._ckrun() - - def remove(self, ch, ignore=False): - with self.lock: - try: - fd = self.fdcache[ch] - except KeyError: - if ignore: - return - raise KeyError("fd %i is not registered" % fd) - pch, cevs = self.registered[fd] - if pch is not ch: - raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch)) - del self.fdcache[ch] - del self.registered[fd] - if self.ep: - self.ep.unregister(fd) - ch.close() - - def update(self, ch, ignore=False): - with self.lock: - try: - fd = self.fdcache[ch] - except KeyError: - if ignore: - return - raise KeyError("fd %i is not registered" % fd) - pch, cevs = self.registered[fd] - if pch is not ch: - raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch)) - evs = self._evsfor(ch) - if evs == 0: - del self.fdcache[ch] - del self.registered[fd] - if self.ep: - self.ep.unregister(fd) - ch.close() - elif evs != cevs: - self.registered[fd] = ch, evs - if self.ep: - self.ep.modify(fd, evs) - - def stop(self): - if threading.current_thread() == self.th: - self.stopped = True - else: - def tgt(): - self.stopped = True - cb = callbuffer() - cb.call(tgt) - cb.stop() - self.add(cb) - -def watcher(): - return epoller() - -class channel(object): - readable = False - writable = False - - def __init__(self): - self.watcher = None - - def fileno(self): - raise NotImplementedError("fileno()") - - def close(self): - pass - -class sockbuffer(channel): - def __init__(self, socket, **kwargs): - super().__init__(**kwargs) - self.sk = socket - self.eof = False - self.obuf = bytearray() - - def fileno(self): - return self.sk.fileno() - - def close(self): - self.sk.close() - - def gotdata(self, data): - if data == b"": - self.eof = True - - def send(self, data, eof=False): - self.obuf.extend(data) - if eof: - self.eof = True - if self.watcher is not None: - self.watcher.update(self, True) - - @property - def readable(self): - return not self.eof - def read(self): - try: - data = self.sk.recv(1024) - self.gotdata(data) - except IOError: - self.obuf[:] = b"" - self.eof = True - - @property - def writable(self): - return bool(self.obuf); - def write(self): - try: - ret = self.sk.send(self.obuf) - self.obuf[:ret] = b"" - except IOError: - self.obuf[:] = b"" - self.eof = True - -class callbuffer(channel): - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.queue = [] - self.rp, self.wp = os.pipe() - self.lock = threading.Lock() - self.eof = False - - def fileno(self): - return self.rp - - def close(self): - with self.lock: - try: - if self.wp >= 0: - os.close(self.wp) - self.wp = -1 - finally: - if self.rp >= 0: - os.close(self.rp) - self.rp = -1 - - @property - def readable(self): - return not self.eof - def read(self): - with self.lock: - try: - data = os.read(self.rp, 1024) - if data == b"": - self.eof = True - except IOError: - self.eof = True - cbs = list(self.queue) - self.queue[:] = [] - for cb in cbs: - cb() - - writable = False - - def call(self, cb): - with self.lock: - if self.wp < 0: - raise Exception("stopped") - self.queue.append(cb) - os.write(self.wp, b"a") - - def stop(self): - with self.lock: - if self.wp >= 0: - os.close(self.wp) - self.wp = -1 - -def currentwatcher(io, current): - def check(io): - if not current: - io.stop() - io.loopcheck.add(check) +from .asyncio import * diff --git a/python3/ashd/asyncio.py b/python3/ashd/asyncio.py new file mode 100644 index 0000000..238f6b7 --- /dev/null +++ b/python3/ashd/asyncio.py @@ -0,0 +1,300 @@ +import sys, os, errno, threading, select, traceback + +class epoller(object): + exc_handler = None + + def __init__(self, check=None): + self.registered = {} + self.fdcache = {} + self.lock = threading.RLock() + self.ep = None + self.th = None + self.stopped = False + self.loopcheck = set() + if check is not None: + self.loopcheck.add(check) + self._daemon = True + + @staticmethod + def _evsfor(ch): + return ((select.EPOLLIN if ch.readable else 0) | + (select.EPOLLOUT if ch.writable else 0)) + + def _ckrun(self): + if self.registered and self.th is None: + th = threading.Thread(target=self._run, name="Async epoll thread") + th.daemon = self._daemon + th.start() + self.th = th + + def exception(self, ch, *exc): + self.remove(ch) + if self.exc_handler is None: + traceback.print_exception(*exc) + else: + self.exc_handler(ch, *exc) + + def _cb(self, ch, nm): + try: + m = getattr(ch, nm, None) + if m is None: + raise AttributeError("%r has no %s method" % (ch, nm)) + m() + except Exception as exc: + self.exception(ch, *sys.exc_info()) + + def _closeall(self): + with self.lock: + while self.registered: + fd, (ch, evs) = next(iter(self.registered.items())) + del self.registered[fd] + self.ep.unregister(fd) + self._cb(ch, "close") + + def _run(self): + ep = select.epoll() + try: + with self.lock: + try: + for fd, (ob, evs) in self.registered.items(): + ep.register(fd, evs) + except: + self.registered.clear() + raise + self.ep = ep + + while self.registered: + for ck in self.loopcheck: + ck(self) + if self.stopped: + self._closeall() + break + try: + evlist = ep.poll(10) + except IOError as exc: + if exc.errno == errno.EINTR: + continue + raise + for fd, evs in evlist: + with self.lock: + if fd not in self.registered: + continue + ch, cevs = self.registered[fd] + if fd in self.registered and evs & (select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR): + self._cb(ch, "read") + if fd in self.registered and evs & select.EPOLLOUT: + self._cb(ch, "write") + if fd in self.registered: + nevs = self._evsfor(ch) + if nevs == 0: + del self.fdcache[ch] + del self.registered[fd] + ep.unregister(fd) + self._cb(ch, "close") + elif nevs != cevs: + self.registered[fd] = ch, nevs + ep.modify(fd, nevs) + + finally: + with self.lock: + self.th = None + self.ep = None + self._ckrun() + ep.close() + + @property + def daemon(self): return self._daemon + @daemon.setter + def daemon(self, value): + self._daemon = bool(value) + with self.lock: + if self.th is not None: + self.th = daemon = self._daemon + + def add(self, ch): + with self.lock: + fd = ch.fileno() + if fd in self.registered: + raise KeyError("fd %i is already registered" % fd) + evs = self._evsfor(ch) + if evs == 0: + ch.close() + return + ch.watcher = self + self.fdcache[ch] = fd + self.registered[fd] = (ch, evs) + if self.ep: + self.ep.register(fd, evs) + self._ckrun() + + def remove(self, ch, ignore=False): + with self.lock: + try: + fd = self.fdcache[ch] + except KeyError: + if ignore: + return + raise KeyError("fd %i is not registered" % fd) + pch, cevs = self.registered[fd] + if pch is not ch: + raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch)) + del self.fdcache[ch] + del self.registered[fd] + if self.ep: + self.ep.unregister(fd) + ch.close() + + def update(self, ch, ignore=False): + with self.lock: + try: + fd = self.fdcache[ch] + except KeyError: + if ignore: + return + raise KeyError("fd %i is not registered" % fd) + pch, cevs = self.registered[fd] + if pch is not ch: + raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch)) + evs = self._evsfor(ch) + if evs == 0: + del self.fdcache[ch] + del self.registered[fd] + if self.ep: + self.ep.unregister(fd) + ch.close() + elif evs != cevs: + self.registered[fd] = ch, evs + if self.ep: + self.ep.modify(fd, evs) + + def stop(self): + if threading.current_thread() == self.th: + self.stopped = True + else: + def tgt(): + self.stopped = True + cb = callbuffer() + cb.call(tgt) + cb.stop() + self.add(cb) + +def watcher(): + return epoller() + +class channel(object): + readable = False + writable = False + + def __init__(self): + self.watcher = None + + def fileno(self): + raise NotImplementedError("fileno()") + + def close(self): + pass + +class sockbuffer(channel): + def __init__(self, socket, **kwargs): + super().__init__(**kwargs) + self.sk = socket + self.eof = False + self.obuf = bytearray() + + def fileno(self): + return self.sk.fileno() + + def close(self): + self.sk.close() + + def gotdata(self, data): + if data == b"": + self.eof = True + + def send(self, data, eof=False): + self.obuf.extend(data) + if eof: + self.eof = True + if self.watcher is not None: + self.watcher.update(self, True) + + @property + def readable(self): + return not self.eof + def read(self): + try: + data = self.sk.recv(1024) + self.gotdata(data) + except IOError: + self.obuf[:] = b"" + self.eof = True + + @property + def writable(self): + return bool(self.obuf); + def write(self): + try: + ret = self.sk.send(self.obuf) + self.obuf[:ret] = b"" + except IOError: + self.obuf[:] = b"" + self.eof = True + +class callbuffer(channel): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.queue = [] + self.rp, self.wp = os.pipe() + self.lock = threading.Lock() + self.eof = False + + def fileno(self): + return self.rp + + def close(self): + with self.lock: + try: + if self.wp >= 0: + os.close(self.wp) + self.wp = -1 + finally: + if self.rp >= 0: + os.close(self.rp) + self.rp = -1 + + @property + def readable(self): + return not self.eof + def read(self): + with self.lock: + try: + data = os.read(self.rp, 1024) + if data == b"": + self.eof = True + except IOError: + self.eof = True + cbs = list(self.queue) + self.queue[:] = [] + for cb in cbs: + cb() + + writable = False + + def call(self, cb): + with self.lock: + if self.wp < 0: + raise Exception("stopped") + self.queue.append(cb) + os.write(self.wp, b"a") + + def stop(self): + with self.lock: + if self.wp >= 0: + os.close(self.wp) + self.wp = -1 + +def currentwatcher(io, current): + def check(io): + if not current: + io.stop() + io.loopcheck.add(check) -- 2.11.0