X-Git-Url: http://www.dolda2000.com/gitweb/?a=blobdiff_plain;f=lib%2Fmtio-kqueue.c;h=7d086e2243867e865f23912e41fe6f0864c3021c;hb=595adb9922885c2a05bc6917ee8f8f02f496e618;hp=12359735bf3cd084db9d5c19273febc3aa769554;hpb=ac612570642ede6790e44483286624f400ab2fba;p=ashd.git diff --git a/lib/mtio-kqueue.c b/lib/mtio-kqueue.c index 1235973..7d086e2 100644 --- a/lib/mtio-kqueue.c +++ b/lib/mtio-kqueue.c @@ -37,7 +37,8 @@ static struct blocker *blockers; struct blocker { struct blocker *n, *p, *n2, *p2; int fd, reg; - int ev; + int ev, rev, id; + int thpos; time_t to; struct muth *th; }; @@ -45,6 +46,7 @@ struct blocker { static int qfd = -1, fdln = 0; static int exitstatus; static struct blocker **fdlist; +static typedbuf(struct blocker *) timeheap; static int regfd(struct blocker *bl) { @@ -137,26 +139,81 @@ static void remfd(struct blocker *bl) bl->reg = 0; } -int block(int fd, int ev, time_t to) +static void thraise(struct blocker *bl, int n) { - struct blocker *bl; - int rv; + int p; - omalloc(bl); - bl->fd = fd; - bl->ev = ev; - if(to > 0) - bl->to = time(NULL) + to; - bl->th = current; - if((qfd >= 0) && regfd(bl)) { - free(bl); - return(-1); + while(n > 0) { + p = (n - 1) >> 1; + if(timeheap.b[p]->to <= bl->to) + break; + timeheap.b[n] = timeheap.b[p]; + timeheap.b[n]->thpos = n; + n = p; + } + timeheap.b[n] = bl; + bl->thpos = n; +} + +static void thlower(struct blocker *bl, int n) +{ + int c; + + while(1) { + c = (n << 1) + 1; + if(c >= timeheap.d) + break; + if((c + 1 < timeheap.d) && (timeheap.b[c + 1]->to < timeheap.b[c]->to)) + c = c + 1; + if(timeheap.b[c]->to > bl->to) + break; + timeheap.b[n] = timeheap.b[c]; + timeheap.b[n]->thpos = n; + n = c; } + timeheap.b[n] = bl; + bl->thpos = n; +} + +static void addtimeout(struct blocker *bl, time_t to) +{ + sizebuf(timeheap, ++timeheap.d); + thraise(bl, timeheap.d - 1); +} + +static void deltimeout(struct blocker *bl) +{ + int n; + + if(bl->thpos == timeheap.d - 1) { + timeheap.d--; + return; + } + n = bl->thpos; + bl = timeheap.b[--timeheap.d]; + if((n > 0) && (timeheap.b[(n - 1) >> 1]->to > bl->to)) + thraise(bl, n); + else + thlower(bl, n); +} + +static int addblock(struct blocker *bl) +{ + if((qfd >= 0) && regfd(bl)) + return(-1); bl->n = blockers; if(blockers) blockers->p = bl; blockers = bl; - rv = yield(); + if(bl->to > 0) + addtimeout(bl, bl->to); + return(0); +} + +static void remblock(struct blocker *bl) +{ + if(bl->to > 0) + deltimeout(bl); if(bl->n) bl->n->p = bl->p; if(bl->p) @@ -164,7 +221,51 @@ int block(int fd, int ev, time_t to) if(bl == blockers) blockers = bl->n; remfd(bl); - free(bl); +} + +struct selected mblock(time_t to, int n, struct selected *spec) +{ + int i, id; + struct blocker bls[n]; + + to = (to > 0)?(time(NULL) + to):0; + for(i = 0; i < n; i++) { + bls[i] = (struct blocker) { + .fd = spec[i].fd, + .ev = spec[i].ev, + .id = i, + .to = to, + .th = current, + }; + if(addblock(&bls[i])) { + for(i--; i >= 0; i--) + remblock(&bls[i]); + return((struct selected){.fd = -1, .ev = -1}); + } + } + id = yield(); + for(i = 0; i < n; i++) + remblock(&bls[i]); + if(id < 0) + return((struct selected){.fd = -1, .ev = -1}); + return((struct selected){.fd = bls[id].fd, .ev = bls[id].rev}); +} + +int block(int fd, int ev, time_t to) +{ + struct blocker bl; + int rv; + + bl = (struct blocker) { + .fd = fd, + .ev = ev, + .id = -1, + .to = (to > 0)?(time(NULL) + to):0, + .th = current, + }; + addblock(&bl); + rv = yield(); + remblock(&bl); return(rv); } @@ -173,7 +274,7 @@ int ioloop(void) struct blocker *bl, *nbl; struct kevent evs[16]; int i, fd, nev, ev; - time_t now, timeout; + time_t now; struct timespec *toval; exitstatus = 0; @@ -185,18 +286,14 @@ int ioloop(void) resume(bl->th, -1); } while(blockers != NULL) { - timeout = 0; - for(bl = blockers; bl; bl = bl->n) { - if((bl->to != 0) && ((timeout == 0) || (timeout > bl->to))) - timeout = bl->to; - } now = time(NULL); - if(timeout == 0) + toval = &(struct timespec){}; + if(timeheap.d == 0) toval = NULL; - else if(timeout > now) - toval = &(struct timespec){.tv_sec = timeout - now}; + else if(timeheap.b[0]->to > now) + *toval = (struct timespec){.tv_sec = timeheap.b[0]->to - now}; else - toval = &(struct timespec){.tv_sec = 1}; + *toval = (struct timespec){.tv_sec = 1}; if(exitstatus) break; nev = kevent(qfd, NULL, 0, evs, sizeof(evs) / sizeof(*evs), toval); @@ -214,15 +311,24 @@ int ioloop(void) ev = (evs[i].filter == EVFILT_READ)?EV_READ:EV_WRITE; for(bl = fdlist[fd]; bl; bl = nbl) { nbl = bl->n2; - if(ev & bl->ev) - resume(bl->th, ev); + if(ev & bl->ev) { + if(bl->id < 0) { + resume(bl->th, ev); + } else { + bl->rev = ev; + resume(bl->th, bl->id); + } + } } } now = time(NULL); - for(bl = blockers; bl; bl = nbl) { - nbl = bl->n; - if((bl->to != 0) && (bl->to <= now)) + while((timeheap.d > 0) && ((bl = timeheap.b[0])->to <= now)) { + if(bl->id < 0) { resume(bl->th, 0); + } else { + bl->rev = 0; + resume(bl->th, bl->id); + } } } for(bl = blockers; bl; bl = bl->n)