X-Git-Url: http://www.dolda2000.com/gitweb/?p=ashd.git;a=blobdiff_plain;f=python3%2Fashd%2Fasync.py;fp=python3%2Fashd%2Fasync.py;h=b78920f0f372d84dd84db52d5c69a0a8980fd39c;hp=238f6b72b68d516b2c9d2179a4ac652399aa91cb;hb=bb1ec927b657639ec5907f73623ea4b4d5c6a5fb;hpb=1a53e57ba22759dbb373d7f97625b9a51fa6656a diff --git a/python3/ashd/async.py b/python3/ashd/async.py index 238f6b7..b78920f 100644 --- a/python3/ashd/async.py +++ b/python3/ashd/async.py @@ -1,300 +1 @@ -import sys, os, errno, threading, select, traceback - -class epoller(object): - exc_handler = None - - def __init__(self, check=None): - self.registered = {} - self.fdcache = {} - self.lock = threading.RLock() - self.ep = None - self.th = None - self.stopped = False - self.loopcheck = set() - if check is not None: - self.loopcheck.add(check) - self._daemon = True - - @staticmethod - def _evsfor(ch): - return ((select.EPOLLIN if ch.readable else 0) | - (select.EPOLLOUT if ch.writable else 0)) - - def _ckrun(self): - if self.registered and self.th is None: - th = threading.Thread(target=self._run, name="Async epoll thread") - th.daemon = self._daemon - th.start() - self.th = th - - def exception(self, ch, *exc): - self.remove(ch) - if self.exc_handler is None: - traceback.print_exception(*exc) - else: - self.exc_handler(ch, *exc) - - def _cb(self, ch, nm): - try: - m = getattr(ch, nm, None) - if m is None: - raise AttributeError("%r has no %s method" % (ch, nm)) - m() - except Exception as exc: - self.exception(ch, *sys.exc_info()) - - def _closeall(self): - with self.lock: - while self.registered: - fd, (ch, evs) = next(iter(self.registered.items())) - del self.registered[fd] - self.ep.unregister(fd) - self._cb(ch, "close") - - def _run(self): - ep = select.epoll() - try: - with self.lock: - try: - for fd, (ob, evs) in self.registered.items(): - ep.register(fd, evs) - except: - self.registered.clear() - raise - self.ep = ep - - while self.registered: - for ck in self.loopcheck: - ck(self) - if self.stopped: - self._closeall() - break - try: - evlist = ep.poll(10) - except IOError as exc: - if exc.errno == errno.EINTR: - continue - raise - for fd, evs in evlist: - with self.lock: - if fd not in self.registered: - continue - ch, cevs = self.registered[fd] - if fd in self.registered and evs & (select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR): - self._cb(ch, "read") - if fd in self.registered and evs & select.EPOLLOUT: - self._cb(ch, "write") - if fd in self.registered: - nevs = self._evsfor(ch) - if nevs == 0: - del self.fdcache[ch] - del self.registered[fd] - ep.unregister(fd) - self._cb(ch, "close") - elif nevs != cevs: - self.registered[fd] = ch, nevs - ep.modify(fd, nevs) - - finally: - with self.lock: - self.th = None - self.ep = None - self._ckrun() - ep.close() - - @property - def daemon(self): return self._daemon - @daemon.setter - def daemon(self, value): - self._daemon = bool(value) - with self.lock: - if self.th is not None: - self.th = daemon = self._daemon - - def add(self, ch): - with self.lock: - fd = ch.fileno() - if fd in self.registered: - raise KeyError("fd %i is already registered" % fd) - evs = self._evsfor(ch) - if evs == 0: - ch.close() - return - ch.watcher = self - self.fdcache[ch] = fd - self.registered[fd] = (ch, evs) - if self.ep: - self.ep.register(fd, evs) - self._ckrun() - - def remove(self, ch, ignore=False): - with self.lock: - try: - fd = self.fdcache[ch] - except KeyError: - if ignore: - return - raise KeyError("fd %i is not registered" % fd) - pch, cevs = self.registered[fd] - if pch is not ch: - raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch)) - del self.fdcache[ch] - del self.registered[fd] - if self.ep: - self.ep.unregister(fd) - ch.close() - - def update(self, ch, ignore=False): - with self.lock: - try: - fd = self.fdcache[ch] - except KeyError: - if ignore: - return - raise KeyError("fd %i is not registered" % fd) - pch, cevs = self.registered[fd] - if pch is not ch: - raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch)) - evs = self._evsfor(ch) - if evs == 0: - del self.fdcache[ch] - del self.registered[fd] - if self.ep: - self.ep.unregister(fd) - ch.close() - elif evs != cevs: - self.registered[fd] = ch, evs - if self.ep: - self.ep.modify(fd, evs) - - def stop(self): - if threading.current_thread() == self.th: - self.stopped = True - else: - def tgt(): - self.stopped = True - cb = callbuffer() - cb.call(tgt) - cb.stop() - self.add(cb) - -def watcher(): - return epoller() - -class channel(object): - readable = False - writable = False - - def __init__(self): - self.watcher = None - - def fileno(self): - raise NotImplementedError("fileno()") - - def close(self): - pass - -class sockbuffer(channel): - def __init__(self, socket, **kwargs): - super().__init__(**kwargs) - self.sk = socket - self.eof = False - self.obuf = bytearray() - - def fileno(self): - return self.sk.fileno() - - def close(self): - self.sk.close() - - def gotdata(self, data): - if data == b"": - self.eof = True - - def send(self, data, eof=False): - self.obuf.extend(data) - if eof: - self.eof = True - if self.watcher is not None: - self.watcher.update(self, True) - - @property - def readable(self): - return not self.eof - def read(self): - try: - data = self.sk.recv(1024) - self.gotdata(data) - except IOError: - self.obuf[:] = b"" - self.eof = True - - @property - def writable(self): - return bool(self.obuf); - def write(self): - try: - ret = self.sk.send(self.obuf) - self.obuf[:ret] = b"" - except IOError: - self.obuf[:] = b"" - self.eof = True - -class callbuffer(channel): - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.queue = [] - self.rp, self.wp = os.pipe() - self.lock = threading.Lock() - self.eof = False - - def fileno(self): - return self.rp - - def close(self): - with self.lock: - try: - if self.wp >= 0: - os.close(self.wp) - self.wp = -1 - finally: - if self.rp >= 0: - os.close(self.rp) - self.rp = -1 - - @property - def readable(self): - return not self.eof - def read(self): - with self.lock: - try: - data = os.read(self.rp, 1024) - if data == b"": - self.eof = True - except IOError: - self.eof = True - cbs = list(self.queue) - self.queue[:] = [] - for cb in cbs: - cb() - - writable = False - - def call(self, cb): - with self.lock: - if self.wp < 0: - raise Exception("stopped") - self.queue.append(cb) - os.write(self.wp, b"a") - - def stop(self): - with self.lock: - if self.wp >= 0: - os.close(self.wp) - self.wp = -1 - -def currentwatcher(io, current): - def check(io): - if not current: - io.stop() - io.loopcheck.add(check) +from .asyncio import *