X-Git-Url: http://www.dolda2000.com/gitweb/?a=blobdiff_plain;f=python3%2Fashd%2Fasync.py;h=aa52af9d13a9e2976bfe6c4c38d49cce7e8d99cc;hb=c26129cd537a8086d5a9d538ffb3ba2775b619d5;hp=3493959aca15079814eac00406e9378c6ed6fef2;hpb=9000767562d69ddfab2674516146dacf63a6f644;p=ashd.git diff --git a/python3/ashd/async.py b/python3/ashd/async.py index 3493959..aa52af9 100644 --- a/python3/ashd/async.py +++ b/python3/ashd/async.py @@ -3,12 +3,15 @@ import sys, os, errno, threading, select, traceback class epoller(object): exc_handler = None - def __init__(self): + def __init__(self, check=None): self.registered = {} self.lock = threading.RLock() self.ep = None self.th = None self.stopped = False + self.loopcheck = set() + if check is not None: + self.loopcheck.add(check) self._daemon = True @staticmethod @@ -26,7 +29,7 @@ class epoller(object): def exception(self, ch, *exc): self.remove(ch) if self.exc_handler is None: - traceback.print_exception(exc) + traceback.print_exception(*exc) else: self.exc_handler(ch, *exc) @@ -51,11 +54,17 @@ class epoller(object): ep = select.epoll() try: with self.lock: - for fd, (ob, evs) in self.registered.items(): - ep.register(fd, evs) + try: + for fd, (ob, evs) in self.registered.items(): + ep.register(fd, evs) + except: + self.registered.clear() + raise self.ep = ep while self.registered: + for ck in self.loopcheck: + ck(self) if self.stopped: self._closeall() break @@ -165,12 +174,25 @@ class epoller(object): def watcher(): return epoller() -class sockbuffer(object): - def __init__(self, sk): - self.sk = sk +class channel(object): + readable = False + writable = False + + def __init__(self): + self.watcher = None + + def fileno(self): + raise NotImplementedError("fileno()") + + def close(self): + pass + +class sockbuffer(channel): + def __init__(self, socket, **kwargs): + super().__init__(**kwargs) + self.sk = socket self.eof = False self.obuf = bytearray() - self.watcher = None def fileno(self): return self.sk.fileno() @@ -211,8 +233,9 @@ class sockbuffer(object): self.obuf[:] = b"" self.eof = True -class callbuffer(object): - def __init__(self): +class callbuffer(channel): + def __init__(self, **kwargs): + super().__init__(**kwargs) self.queue = [] self.rp, self.wp = os.pipe() self.lock = threading.Lock() @@ -264,8 +287,7 @@ class callbuffer(object): self.wp = -1 def currentwatcher(io, current): - def run(): - while current: - current.wait() - io.stop() - threading.Thread(target=run, name="Current watcher").start() + def check(io): + if not current: + io.stop() + io.loopcheck.add(check)