X-Git-Url: http://www.dolda2000.com/gitweb/?p=ashd.git;a=blobdiff_plain;f=src%2Fratequeue.c;fp=src%2Fratequeue.c;h=dc7e19879cf617a2713aa3b4e1efb5bd7c06ea04;hp=04ea89fbc92e50f651f85967465c009daf267680;hb=57052193f089a4f126fad95e31e6ec5d099ce1e0;hpb=ebe9b5058475128a28805b1f00a3f342f4f0437b diff --git a/src/ratequeue.c b/src/ratequeue.c index 04ea89f..dc7e198 100644 --- a/src/ratequeue.c +++ b/src/ratequeue.c @@ -40,6 +40,7 @@ #define SBUCKETS 7 struct source { + int type; char data[16]; unsigned int len, hash; }; @@ -51,9 +52,9 @@ struct waiting { struct bucket { struct source id; - double level, last, etime; + double level, last, etime, wtime; typedbuf(struct waiting) brim; - int thpos; + int thpos, blocked; }; struct btime { @@ -62,7 +63,7 @@ struct btime { }; struct config { - double size, rate, retain; + double size, rate, retain, warnrate; int brimsize; }; @@ -73,7 +74,7 @@ static typedbuf(struct btime) timeheap; static int child, reload; static double now; static const struct config defcfg = { - .size = 100, .rate = 10, + .size = 100, .rate = 10, .warnrate = 60, .retain = 10, .brimsize = 10, }; static struct config cf; @@ -103,16 +104,51 @@ static struct source reqsource(struct hthead *req) ret = (struct source){}; if((sa = getheader(req, "X-Ash-Address")) != NULL) { if(inet_pton(AF_INET, sa, &a4) == 1) { + ret.type = AF_INET; memcpy(ret.data, &a4, ret.len = sizeof(a4)); } else if(inet_pton(AF_INET6, sa, &a6) == 1) { + ret.type = AF_INET6; memcpy(ret.data, &a6, ret.len = sizeof(a6)); } } - for(i = 0, ret.hash = 0; i < ret.len; i++) + for(i = 0, ret.hash = ret.type; i < ret.len; i++) ret.hash = (ret.hash * 31) + ret.data[i]; return(ret); } +static int srccmp(const struct source *a, const struct source *b) +{ + int c; + + if((c = a->len - b->len) != 0) + return(c); + if((c = a->type - b->type) != 0) + return(c); + return(memcmp(a->data, b->data, a->len)); +} + +static const char *formatsrc(const struct source *src) +{ + static char buf[128]; + struct in_addr a4; + struct in6_addr a6; + + switch(src->type) { + case AF_INET: + memcpy(&a4, src->data, sizeof(a4)); + if(!inet_ntop(AF_INET, &a4, buf, sizeof(buf))) + return(""); + return(buf); + case AF_INET6: + memcpy(&a6, src->data, sizeof(a6)); + if(!inet_ntop(AF_INET6, &a6, buf, sizeof(buf))) + return(""); + return(buf); + default: + return(""); + } +} + static void rehash(int nlen) { int i, o, n, m, pl, nl; @@ -145,7 +181,7 @@ static void rehash(int nlen) hashlen = nlen; } -static struct bucket *hashget(struct source *src) +static struct bucket *hashget(const struct source *src) { unsigned int i, n, N, m; struct bucket *bk; @@ -153,7 +189,7 @@ static struct bucket *hashget(struct source *src) m = (N = (1 << hashlen)) - 1; for(i = src->hash & m, n = 0; n < N; i = (i + 1) & m, n++) { bk = buckets[i]; - if(bk && (bk->id.len == src->len) && !memcmp(bk->id.data, src->data, src->len)) + if(bk && !srccmp(&bk->id, src)) return(bk); } for(i = src->hash & m; buckets[i]; i = (i + 1) & m); @@ -161,6 +197,7 @@ static struct bucket *hashget(struct source *src) memcpy(&bk->id, src, sizeof(*src)); bk->last = bk->etime = now; bk->thpos = -1; + bk->blocked = -1; if(++nbuckets > (1 << (hashlen - 1))) rehash(hashlen + 1); return(bk); @@ -174,7 +211,7 @@ static void hashdel(struct bucket *bk) m = (N = (1 << hashlen)) - 1; for(i = bk->id.hash & m, n = 0; n < N; i = (i + 1) & m, n++) { assert((sb = buckets[i]) != NULL); - if((sb->id.len == bk->id.len) && !memcmp(sb->id.data, bk->id.data, bk->id.len)) + if(!srccmp(&sb->id, &bk->id)) break; } assert(sb == bk); @@ -255,10 +292,14 @@ static void updbtime(struct bucket *bk) double tm, tm2; tm = (bk->level == 0) ? (bk->etime + cf.retain) : (bk->last + (bk->level / cf.rate) + cf.retain); - if(bk->brim.d > 0) { - tm2 = bk->last + ((bk->level - cf.size) / cf.rate); - tm = (tm2 < tm) ? tm2 : tm; - } + if((bk->blocked > 0) && ((tm2 = bk->wtime + cf.warnrate) > tm)) + tm = tm2; + + if((bk->brim.d > 0) && ((tm2 = bk->last + ((bk->level - cf.size) / cf.rate)) < tm)) + tm = tm2; + if((bk->blocked > 0) && ((tm2 = bk->wtime + cf.warnrate) < tm)) + tm = tm2; + if(bk->thpos < 0) { sizebuf(timeheap, ++timeheap.d); thraise((struct btime){bk, tm}, timeheap.d - 1); @@ -289,12 +330,17 @@ static void tickbucket(struct bucket *bk) bufdel(bk->brim, 0); bk->level += 1; } + if((bk->blocked > 0) && (now - bk->wtime >= cf.warnrate)) { + flog(LOG_NOTICE, "ratequeue: blocked %i requests from %s", bk->blocked, formatsrc(&bk->id)); + bk->blocked = 0; + bk->wtime = now; + } } static void checkbtime(struct bucket *bk) { tickbucket(bk); - if((bk->level == 0) && (now >= bk->etime + cf.retain)) { + if((bk->level == 0) && (now >= bk->etime + cf.retain) && (bk->blocked <= 0)) { freebucket(bk); return; } @@ -321,14 +367,20 @@ static void serve(struct hthead *req, int fd) } else if(bk->brim.d < cf.brimsize) { bufadd(bk->brim, ((struct waiting){.req = req, .fd = fd})); } else { + if(bk->blocked < 0) { + flog(LOG_NOTICE, "ratequeue: blocking requests from %s", formatsrc(&bk->id)); + bk->blocked = 0; + bk->wtime = now; + } simpleerror(fd, 429, "Too many requests", "Your client is being throttled for issuing too frequent requests."); freehthead(req); close(fd); + bk->blocked++; } updbtime(bk); } -static int parseint(char *str, int *dst) +static int parseint(const char *str, int *dst) { long buf; char *p; @@ -340,7 +392,7 @@ static int parseint(char *str, int *dst) return(0); } -static int parsefloat(char *str, double *dst) +static int parsefloat(const char *str, double *dst) { double buf; char *p;