X-Git-Url: http://www.dolda2000.com/gitweb/?a=blobdiff_plain;f=python3%2Fashd%2Fasync.py;h=238f6b72b68d516b2c9d2179a4ac652399aa91cb;hb=295f91786e8606bed03ca004157adbeb84565094;hp=e30f858a3a80b8866b312c627f2a38dd425a027f;hpb=8963785b76b15436bbbf06a26fc1c560ee57496e;p=ashd.git diff --git a/python3/ashd/async.py b/python3/ashd/async.py index e30f858..238f6b7 100644 --- a/python3/ashd/async.py +++ b/python3/ashd/async.py @@ -3,12 +3,16 @@ import sys, os, errno, threading, select, traceback class epoller(object): exc_handler = None - def __init__(self): + def __init__(self, check=None): self.registered = {} + self.fdcache = {} self.lock = threading.RLock() self.ep = None self.th = None self.stopped = False + self.loopcheck = set() + if check is not None: + self.loopcheck.add(check) self._daemon = True @staticmethod @@ -51,11 +55,17 @@ class epoller(object): ep = select.epoll() try: with self.lock: - for fd, (ob, evs) in self.registered.items(): - ep.register(fd, evs) + try: + for fd, (ob, evs) in self.registered.items(): + ep.register(fd, evs) + except: + self.registered.clear() + raise self.ep = ep while self.registered: + for ck in self.loopcheck: + ck(self) if self.stopped: self._closeall() break @@ -77,6 +87,7 @@ class epoller(object): if fd in self.registered: nevs = self._evsfor(ch) if nevs == 0: + del self.fdcache[ch] del self.registered[fd] ep.unregister(fd) self._cb(ch, "close") @@ -110,6 +121,7 @@ class epoller(object): ch.close() return ch.watcher = self + self.fdcache[ch] = fd self.registered[fd] = (ch, evs) if self.ep: self.ep.register(fd, evs) @@ -117,14 +129,16 @@ class epoller(object): def remove(self, ch, ignore=False): with self.lock: - fd = ch.fileno() - if fd not in self.registered: + try: + fd = self.fdcache[ch] + except KeyError: if ignore: return raise KeyError("fd %i is not registered" % fd) pch, cevs = self.registered[fd] if pch is not ch: raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch)) + del self.fdcache[ch] del self.registered[fd] if self.ep: self.ep.unregister(fd) @@ -132,8 +146,9 @@ class epoller(object): def update(self, ch, ignore=False): with self.lock: - fd = ch.fileno() - if fd not in self.registered: + try: + fd = self.fdcache[ch] + except KeyError: if ignore: return raise KeyError("fd %i is not registered" % fd) @@ -142,6 +157,7 @@ class epoller(object): raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch)) evs = self._evsfor(ch) if evs == 0: + del self.fdcache[ch] del self.registered[fd] if self.ep: self.ep.unregister(fd) @@ -278,8 +294,7 @@ class callbuffer(channel): self.wp = -1 def currentwatcher(io, current): - def run(): - while current: - current.wait() - io.stop() - threading.Thread(target=run, name="Current watcher").start() + def check(io): + if not current: + io.stop() + io.loopcheck.add(check)