1 import sys, os, errno, threading, select, traceback
6 def __init__(self, check=None):
9 self.lock = threading.RLock()
13 self.loopcheck = set()
15 self.loopcheck.add(check)
20 return ((select.EPOLLIN if ch.readable else 0) |
21 (select.EPOLLOUT if ch.writable else 0))
24 if self.registered and self.th is None:
25 th = threading.Thread(target=self._run, name="Async epoll thread")
26 th.daemon = self._daemon
30 def exception(self, ch, *exc):
32 if self.exc_handler is None:
33 traceback.print_exception(*exc)
35 self.exc_handler(ch, *exc)
37 def _cb(self, ch, nm):
39 m = getattr(ch, nm, None)
41 raise AttributeError("%r has no %s method" % (ch, nm))
43 except Exception as exc:
44 self.exception(ch, *sys.exc_info())
48 while self.registered:
49 fd, (ch, evs) = next(iter(self.registered.items()))
50 del self.registered[fd]
51 self.ep.unregister(fd)
59 for fd, (ob, evs) in self.registered.items():
62 self.registered.clear()
66 while self.registered:
67 for ck in self.loopcheck:
74 except IOError as exc:
75 if exc.errno == errno.EINTR:
78 for fd, evs in evlist:
80 if fd not in self.registered:
82 ch, cevs = self.registered[fd]
83 if fd in self.registered and evs & (select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR):
85 if fd in self.registered and evs & select.EPOLLOUT:
87 if fd in self.registered:
88 nevs = self._evsfor(ch)
91 del self.registered[fd]
95 self.registered[fd] = ch, nevs
106 def daemon(self): return self._daemon
108 def daemon(self, value):
109 self._daemon = bool(value)
111 if self.th is not None:
112 self.th = daemon = self._daemon
117 if fd in self.registered:
118 raise KeyError("fd %i is already registered" % fd)
119 evs = self._evsfor(ch)
124 self.fdcache[ch] = fd
125 self.registered[fd] = (ch, evs)
127 self.ep.register(fd, evs)
130 def remove(self, ch, ignore=False):
133 fd = self.fdcache[ch]
137 raise KeyError("fd %i is not registered" % fd)
138 pch, cevs = self.registered[fd]
140 raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch))
142 del self.registered[fd]
144 self.ep.unregister(fd)
147 def update(self, ch, ignore=False):
150 fd = self.fdcache[ch]
154 raise KeyError("fd %i is not registered" % fd)
155 pch, cevs = self.registered[fd]
157 raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch))
158 evs = self._evsfor(ch)
161 del self.registered[fd]
163 self.ep.unregister(fd)
166 self.registered[fd] = ch, evs
168 self.ep.modify(fd, evs)
171 if threading.current_thread() == self.th:
184 class channel(object):
192 raise NotImplementedError("fileno()")
197 class sockbuffer(channel):
198 def __init__(self, socket, **kwargs):
199 super().__init__(**kwargs)
202 self.obuf = bytearray()
205 return self.sk.fileno()
210 def gotdata(self, data):
214 def send(self, data, eof=False):
215 self.obuf.extend(data)
218 if self.watcher is not None:
219 self.watcher.update(self, True)
226 data = self.sk.recv(1024)
234 return bool(self.obuf);
237 ret = self.sk.send(self.obuf)
238 self.obuf[:ret] = b""
243 class callbuffer(channel):
244 def __init__(self, **kwargs):
245 super().__init__(**kwargs)
247 self.rp, self.wp = os.pipe()
248 self.lock = threading.Lock()
271 data = os.read(self.rp, 1024)
276 cbs = list(self.queue)
286 raise Exception("stopped")
287 self.queue.append(cb)
288 os.write(self.wp, b"a")
296 def currentwatcher(io, current):
300 io.loopcheck.add(check)