X-Git-Url: http://www.dolda2000.com/gitweb/?p=ashd.git;a=blobdiff_plain;f=python3%2Fashd%2Fasync.py;fp=python3%2Fashd%2Fasync.py;h=3493959aca15079814eac00406e9378c6ed6fef2;hp=02c75a92a9175a61e3f257e05c866be93e305631;hb=9000767562d69ddfab2674516146dacf63a6f644;hpb=940f9c467d2f2c923468109c529c4902d95f5b9c diff --git a/python3/ashd/async.py b/python3/ashd/async.py index 02c75a9..3493959 100644 --- a/python3/ashd/async.py +++ b/python3/ashd/async.py @@ -8,6 +8,7 @@ class epoller(object): self.lock = threading.RLock() self.ep = None self.th = None + self.stopped = False self._daemon = True @staticmethod @@ -38,6 +39,14 @@ class epoller(object): except Exception as exc: self.exception(ch, *sys.exc_info()) + def _closeall(self): + with self.lock: + while self.registered: + fd, (ch, evs) = next(iter(self.registered.items())) + del self.registered[fd] + self.ep.unregister(fd) + self._cb(ch, "close") + def _run(self): ep = select.epoll() try: @@ -47,6 +56,9 @@ class epoller(object): self.ep = ep while self.registered: + if self.stopped: + self._closeall() + break try: evlist = ep.poll(10) except IOError as exc: @@ -139,6 +151,17 @@ class epoller(object): if self.ep: self.ep.modify(fd, evs) + def stop(self): + if threading.current_thread() == self.th: + self.stopped = True + else: + def tgt(): + self.stopped = True + cb = callbuffer() + cb.call(tgt) + cb.stop() + self.add(cb) + def watcher(): return epoller() @@ -239,3 +262,10 @@ class callbuffer(object): if self.wp >= 0: os.close(self.wp) self.wp = -1 + +def currentwatcher(io, current): + def run(): + while current: + current.wait() + io.stop() + threading.Thread(target=run, name="Current watcher").start()