X-Git-Url: http://www.dolda2000.com/gitweb/?a=blobdiff_plain;f=python3%2Fashd%2Fasync.py;h=b78920f0f372d84dd84db52d5c69a0a8980fd39c;hb=HEAD;hp=02c75a92a9175a61e3f257e05c866be93e305631;hpb=940f9c467d2f2c923468109c529c4902d95f5b9c;p=ashd.git diff --git a/python3/ashd/async.py b/python3/ashd/async.py index 02c75a9..b78920f 100644 --- a/python3/ashd/async.py +++ b/python3/ashd/async.py @@ -1,241 +1 @@ -import sys, os, errno, threading, select, traceback - -class epoller(object): - exc_handler = None - - def __init__(self): - self.registered = {} - self.lock = threading.RLock() - self.ep = None - self.th = None - self._daemon = True - - @staticmethod - def _evsfor(ch): - return ((select.EPOLLIN if ch.readable else 0) | - (select.EPOLLOUT if ch.writable else 0)) - - def _ckrun(self): - if self.registered and self.th is None: - th = threading.Thread(target=self._run, name="Async epoll thread") - th.daemon = self._daemon - th.start() - self.th = th - - def exception(self, ch, *exc): - self.remove(ch) - if self.exc_handler is None: - traceback.print_exception(exc) - else: - self.exc_handler(ch, *exc) - - def _cb(self, ch, nm): - try: - m = getattr(ch, nm, None) - if m is None: - raise AttributeError("%r has no %s method" % (ch, nm)) - m() - except Exception as exc: - self.exception(ch, *sys.exc_info()) - - def _run(self): - ep = select.epoll() - try: - with self.lock: - for fd, (ob, evs) in self.registered.items(): - ep.register(fd, evs) - self.ep = ep - - while self.registered: - try: - evlist = ep.poll(10) - except IOError as exc: - if exc.errno == errno.EINTR: - continue - raise - for fd, evs in evlist: - with self.lock: - if fd not in self.registered: - continue - ch, cevs = self.registered[fd] - if fd in self.registered and evs & (select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR): - self._cb(ch, "read") - if fd in self.registered and evs & select.EPOLLOUT: - self._cb(ch, "write") - if fd in self.registered: - nevs = self._evsfor(ch) - if nevs == 0: - del self.registered[fd] - ep.unregister(fd) - self._cb(ch, "close") - elif nevs != cevs: - self.registered[fd] = ch, nevs - ep.modify(fd, nevs) - - finally: - with self.lock: - self.th = None - self.ep = None - self._ckrun() - ep.close() - - @property - def daemon(self): return self._daemon - @daemon.setter - def daemon(self, value): - self._daemon = bool(value) - with self.lock: - if self.th is not None: - self.th = daemon = self._daemon - - def add(self, ch): - with self.lock: - fd = ch.fileno() - if fd in self.registered: - raise KeyError("fd %i is already registered" % fd) - evs = self._evsfor(ch) - if evs == 0: - ch.close() - return - ch.watcher = self - self.registered[fd] = (ch, evs) - if self.ep: - self.ep.register(fd, evs) - self._ckrun() - - def remove(self, ch, ignore=False): - with self.lock: - fd = ch.fileno() - if fd not in self.registered: - if ignore: - return - raise KeyError("fd %i is not registered" % fd) - pch, cevs = self.registered[fd] - if pch is not ch: - raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch)) - del self.registered[fd] - if self.ep: - self.ep.unregister(fd) - ch.close() - - def update(self, ch, ignore=False): - with self.lock: - fd = ch.fileno() - if fd not in self.registered: - if ignore: - return - raise KeyError("fd %i is not registered" % fd) - pch, cevs = self.registered[fd] - if pch is not ch: - raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch)) - evs = self._evsfor(ch) - if evs == 0: - del self.registered[fd] - if self.ep: - self.ep.unregister(fd) - ch.close() - elif evs != cevs: - self.registered[fd] = ch, evs - if self.ep: - self.ep.modify(fd, evs) - -def watcher(): - return epoller() - -class sockbuffer(object): - def __init__(self, sk): - self.sk = sk - self.eof = False - self.obuf = bytearray() - self.watcher = None - - def fileno(self): - return self.sk.fileno() - - def close(self): - self.sk.close() - - def gotdata(self, data): - if data == b"": - self.eof = True - - def send(self, data, eof=False): - self.obuf.extend(data) - if eof: - self.eof = True - if self.watcher is not None: - self.watcher.update(self, True) - - @property - def readable(self): - return not self.eof - def read(self): - try: - data = self.sk.recv(1024) - self.gotdata(data) - except IOError: - self.obuf[:] = b"" - self.eof = True - - @property - def writable(self): - return bool(self.obuf); - def write(self): - try: - ret = self.sk.send(self.obuf) - self.obuf[:ret] = b"" - except IOError: - self.obuf[:] = b"" - self.eof = True - -class callbuffer(object): - def __init__(self): - self.queue = [] - self.rp, self.wp = os.pipe() - self.lock = threading.Lock() - self.eof = False - - def fileno(self): - return self.rp - - def close(self): - with self.lock: - try: - if self.wp >= 0: - os.close(self.wp) - self.wp = -1 - finally: - if self.rp >= 0: - os.close(self.rp) - self.rp = -1 - - @property - def readable(self): - return not self.eof - def read(self): - with self.lock: - try: - data = os.read(self.rp, 1024) - if data == b"": - self.eof = True - except IOError: - self.eof = True - cbs = list(self.queue) - self.queue[:] = [] - for cb in cbs: - cb() - - writable = False - - def call(self, cb): - with self.lock: - if self.wp < 0: - raise Exception("stopped") - self.queue.append(cb) - os.write(self.wp, b"a") - - def stop(self): - with self.lock: - if self.wp >= 0: - os.close(self.wp) - self.wp = -1 +from .asyncio import *