python: Somewhat integrate async watchers with wsgidir currency.
authorFredrik Tolf <fredrik@dolda2000.com>
Sat, 18 Jun 2016 01:07:41 +0000 (03:07 +0200)
committerFredrik Tolf <fredrik@dolda2000.com>
Sat, 18 Jun 2016 01:07:41 +0000 (03:07 +0200)
python3/ashd/async.py

index 02c75a9..3493959 100644 (file)
@@ -8,6 +8,7 @@ class epoller(object):
         self.lock = threading.RLock()
         self.ep = None
         self.th = None
+        self.stopped = False
         self._daemon = True
 
     @staticmethod
@@ -38,6 +39,14 @@ class epoller(object):
         except Exception as exc:
             self.exception(ch, *sys.exc_info())
 
+    def _closeall(self):
+        with self.lock:
+            while self.registered:
+                fd, (ch, evs) = next(iter(self.registered.items()))
+                del self.registered[fd]
+                self.ep.unregister(fd)
+                self._cb(ch, "close")
+
     def _run(self):
         ep = select.epoll()
         try:
@@ -47,6 +56,9 @@ class epoller(object):
                 self.ep = ep
 
             while self.registered:
+                if self.stopped:
+                    self._closeall()
+                    break
                 try:
                     evlist = ep.poll(10)
                 except IOError as exc:
@@ -139,6 +151,17 @@ class epoller(object):
                 if self.ep:
                     self.ep.modify(fd, evs)
 
+    def stop(self):
+        if threading.current_thread() == self.th:
+            self.stopped = True
+        else:
+            def tgt():
+                self.stopped = True
+            cb = callbuffer()
+            cb.call(tgt)
+            cb.stop()
+            self.add(cb)
+
 def watcher():
     return epoller()
 
@@ -239,3 +262,10 @@ class callbuffer(object):
             if self.wp >= 0:
                 os.close(self.wp)
                 self.wp = -1
+
+def currentwatcher(io, current):
+    def run():
+        while current:
+            current.wait()
+        io.stop()
+    threading.Thread(target=run, name="Current watcher").start()