1 import sys, os, errno, threading, select, traceback
8 self.lock = threading.RLock()
15 return ((select.EPOLLIN if ch.readable else 0) |
16 (select.EPOLLOUT if ch.writable else 0))
19 if self.registered and self.th is None:
20 th = threading.Thread(target=self._run, name="Async epoll thread")
21 th.daemon = self._daemon
25 def exception(self, ch, *exc):
27 if self.exc_handler is None:
28 traceback.print_exception(exc)
30 self.exc_handler(ch, *exc)
32 def _cb(self, ch, nm):
34 m = getattr(ch, nm, None)
36 raise AttributeError("%r has no %s method" % (ch, nm))
38 except Exception as exc:
39 self.exception(ch, *sys.exc_info())
45 for fd, (ob, evs) in self.registered.items():
49 while self.registered:
52 except IOError as exc:
53 if exc.errno == errno.EINTR:
56 for fd, evs in evlist:
58 if fd not in self.registered:
60 ch, cevs = self.registered[fd]
61 if fd in self.registered and evs & (select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR):
63 if fd in self.registered and evs & select.EPOLLOUT:
65 if fd in self.registered:
66 nevs = self._evsfor(ch)
68 del self.registered[fd]
72 self.registered[fd] = ch, nevs
83 def daemon(self): return self._daemon
85 def daemon(self, value):
86 self._daemon = bool(value)
88 if self.th is not None:
89 self.th = daemon = self._daemon
94 if fd in self.registered:
95 raise KeyError("fd %i is already registered" % fd)
96 evs = self._evsfor(ch)
101 self.registered[fd] = (ch, evs)
103 self.ep.register(fd, evs)
106 def remove(self, ch, ignore=False):
109 if fd not in self.registered:
112 raise KeyError("fd %i is not registered" % fd)
113 pch, cevs = self.registered[fd]
115 raise ValueError("fd %i registered via object %r, cannot remove with %r" % (pch, ch))
116 del self.registered[fd]
118 self.ep.unregister(fd)
121 def update(self, ch, ignore=False):
124 if fd not in self.registered:
127 raise KeyError("fd %i is not registered" % fd)
128 pch, cevs = self.registered[fd]
130 raise ValueError("fd %i registered via object %r, cannot update with %r" % (pch, ch))
131 evs = self._evsfor(ch)
133 del self.registered[fd]
135 self.ep.unregister(fd)
138 self.registered[fd] = ch, evs
140 self.ep.modify(fd, evs)
145 class sockbuffer(object):
146 def __init__(self, sk):
149 self.obuf = bytearray()
153 return self.sk.fileno()
158 def gotdata(self, data):
162 def send(self, data, eof=False):
163 self.obuf.extend(data)
166 if self.watcher is not None:
167 self.watcher.update(self, True)
174 data = self.sk.recv(1024)
182 return bool(self.obuf);
185 ret = self.sk.send(self.obuf)
186 self.obuf[:ret] = b""
191 class callbuffer(object):
194 self.rp, self.wp = os.pipe()
195 self.lock = threading.Lock()
218 data = os.read(self.rp, 1024)
223 cbs = list(self.queue)
233 raise Exception("stopped")
234 self.queue.append(cb)
235 os.write(self.wp, b"a")