X-Git-Url: http://www.dolda2000.com/gitweb/?a=blobdiff_plain;f=python3%2Fashd%2Fasync.py;h=b78920f0f372d84dd84db52d5c69a0a8980fd39c;hb=HEAD;hp=3c6282084dc106ca86d3db58bf53ceb4958d15b8;hpb=c32cd0db68074ae06546406848e85ee2b4a86451;p=ashd.git diff --git a/python3/ashd/async.py b/python3/ashd/async.py index 3c62820..b78920f 100644 --- a/python3/ashd/async.py +++ b/python3/ashd/async.py @@ -1,133 +1 @@ -import os, threading, select - -class pool(object): - def __init__(self): - self.clients = set() - self.lock = threading.RLock() - self.th = None - self.ipipe = -1 - - def add(self, cl): - with self.lock: - self.clients.add(cl) - self._ckrun() - cl.registered = self - self._interrupt() - - def __iter__(self): - with self.lock: - return iter([cl for cl in self.clients if not cl.closed]) - - def broadcast(self, data, eof=False): - with self.lock: - for cl in self: - cl.obuf.extend(data) - if eof: - cl.closed = True - self._interrupt() - - def _ckrun(self): - if self.clients and self.th is None: - th = threading.Thread(target=self._run, name="Async watcher thread") - th.start() - self.th = th - - def _interrupt(self): - fd = self.ipipe - if fd >= 0 and threading.current_thread() != self.th: - os.write(fd, b"a") - - def _remove(self, cl): - self.clients.remove(cl) - cl.registered = None - cl._doclose() - - def _run(self): - ipr, ipw = None, None - try: - ipr, ipw = os.pipe() - self.ipipe = ipw - while True: - with self.lock: - for cl in list(self.clients): - if cl.closed and not cl.writable: - self._remove(cl) - if not self.clients: - break - rsk = [cl for cl in self.clients if not cl.closed] + [ipr] - wsk = [cl for cl in self.clients if cl.writable] - # XXX: Switch to epoll. - rsk, wsk, esk = select.select(rsk, wsk, []) - for sk in rsk: - if sk == ipr: - os.read(ipr, 1024) - elif sk in self.clients: - sk._doread() - for sk in wsk: - if sk in self.clients: - sk._dowrite() - finally: - with self.lock: - self.th = None - self.ipipe = -1 - self._ckrun() - if ipr is not None: - try: os.close(ipr) - except: pass - if ipw is not None: - try: os.close(ipw) - except: pass - -class client(object): - pool = None - - def __init__(self, sock): - self.sk = sock - self.obuf = bytearray() - self.closed = False - self.registered = None - p = self.pool - if p is not None: - p.add(self) - - def fileno(self): - return self.sk.fileno() - - def close(self): - self.closed = True - if self.registered: - self.registered._interrupt() - - def write(self, data): - self.obuf.extend(data) - if self.registered: - self.registered._interrupt() - - @property - def writable(self): - return bool(self.obuf) - - def gotdata(self, data): - if data == b"": - self.close() - - def _doread(self): - try: - ret = self.sk.recv(1024) - except IOError: - self.close() - self.gotdata(ret) - - def _dowrite(self): - try: - if self.obuf: - ret = self.sk.send(self.obuf) - self.obuf[:ret] = b"" - except IOError: - self.close() - - def _doclose(self): - try: - self.sk.close() - except IOError: - pass +from .asyncio import *