ratequeue: Log request blocking.
[ashd.git] / src / ratequeue.c
index 04ea89f..dc7e198 100644 (file)
@@ -40,6 +40,7 @@
 #define SBUCKETS 7
 
 struct source {
 #define SBUCKETS 7
 
 struct source {
+    int type;
     char data[16];
     unsigned int len, hash;
 };
     char data[16];
     unsigned int len, hash;
 };
@@ -51,9 +52,9 @@ struct waiting {
 
 struct bucket {
     struct source id;
 
 struct bucket {
     struct source id;
-    double level, last, etime;
+    double level, last, etime, wtime;
     typedbuf(struct waiting) brim;
     typedbuf(struct waiting) brim;
-    int thpos;
+    int thpos, blocked;
 };
 
 struct btime {
 };
 
 struct btime {
@@ -62,7 +63,7 @@ struct btime {
 };
 
 struct config {
 };
 
 struct config {
-    double size, rate, retain;
+    double size, rate, retain, warnrate;
     int brimsize;
 };
 
     int brimsize;
 };
 
@@ -73,7 +74,7 @@ static typedbuf(struct btime) timeheap;
 static int child, reload;
 static double now;
 static const struct config defcfg = {
 static int child, reload;
 static double now;
 static const struct config defcfg = {
-    .size = 100, .rate = 10,
+    .size = 100, .rate = 10, .warnrate = 60,
     .retain = 10, .brimsize = 10,
 };
 static struct config cf;
     .retain = 10, .brimsize = 10,
 };
 static struct config cf;
@@ -103,16 +104,51 @@ static struct source reqsource(struct hthead *req)
     ret = (struct source){};
     if((sa = getheader(req, "X-Ash-Address")) != NULL) {
        if(inet_pton(AF_INET, sa, &a4) == 1) {
     ret = (struct source){};
     if((sa = getheader(req, "X-Ash-Address")) != NULL) {
        if(inet_pton(AF_INET, sa, &a4) == 1) {
+           ret.type = AF_INET;
            memcpy(ret.data, &a4, ret.len = sizeof(a4));
        } else if(inet_pton(AF_INET6, sa, &a6) == 1) {
            memcpy(ret.data, &a4, ret.len = sizeof(a4));
        } else if(inet_pton(AF_INET6, sa, &a6) == 1) {
+           ret.type = AF_INET6;
            memcpy(ret.data, &a6, ret.len = sizeof(a6));
        }
     }
            memcpy(ret.data, &a6, ret.len = sizeof(a6));
        }
     }
-    for(i = 0, ret.hash = 0; i < ret.len; i++)
+    for(i = 0, ret.hash = ret.type; i < ret.len; i++)
        ret.hash = (ret.hash * 31) + ret.data[i];
     return(ret);
 }
 
        ret.hash = (ret.hash * 31) + ret.data[i];
     return(ret);
 }
 
+static int srccmp(const struct source *a, const struct source *b)
+{
+    int c;
+    
+    if((c = a->len - b->len) != 0)
+       return(c);
+    if((c = a->type - b->type) != 0)
+       return(c);
+    return(memcmp(a->data, b->data, a->len));
+}
+
+static const char *formatsrc(const struct source *src)
+{
+    static char buf[128];
+    struct in_addr a4;
+    struct in6_addr a6;
+    
+    switch(src->type) {
+    case AF_INET:
+       memcpy(&a4, src->data, sizeof(a4));
+       if(!inet_ntop(AF_INET, &a4, buf, sizeof(buf)))
+           return("<invalid ipv4>");
+       return(buf);
+    case AF_INET6:
+       memcpy(&a6, src->data, sizeof(a6));
+       if(!inet_ntop(AF_INET6, &a6, buf, sizeof(buf)))
+           return("<invalid ipv6>");
+       return(buf);
+    default:
+       return("<invalid source record>");
+    }
+}
+
 static void rehash(int nlen)
 {
     int i, o, n, m, pl, nl;
 static void rehash(int nlen)
 {
     int i, o, n, m, pl, nl;
@@ -145,7 +181,7 @@ static void rehash(int nlen)
     hashlen = nlen;
 }
 
     hashlen = nlen;
 }
 
-static struct bucket *hashget(struct source *src)
+static struct bucket *hashget(const struct source *src)
 {
     unsigned int i, n, N, m;
     struct bucket *bk;
 {
     unsigned int i, n, N, m;
     struct bucket *bk;
@@ -153,7 +189,7 @@ static struct bucket *hashget(struct source *src)
     m = (N = (1 << hashlen)) - 1;
     for(i = src->hash & m, n = 0; n < N; i = (i + 1) & m, n++) {
        bk = buckets[i];
     m = (N = (1 << hashlen)) - 1;
     for(i = src->hash & m, n = 0; n < N; i = (i + 1) & m, n++) {
        bk = buckets[i];
-       if(bk && (bk->id.len == src->len) && !memcmp(bk->id.data, src->data, src->len))
+       if(bk && !srccmp(&bk->id, src))
            return(bk);
     }
     for(i = src->hash & m; buckets[i]; i = (i + 1) & m);
            return(bk);
     }
     for(i = src->hash & m; buckets[i]; i = (i + 1) & m);
@@ -161,6 +197,7 @@ static struct bucket *hashget(struct source *src)
     memcpy(&bk->id, src, sizeof(*src));
     bk->last = bk->etime = now;
     bk->thpos = -1;
     memcpy(&bk->id, src, sizeof(*src));
     bk->last = bk->etime = now;
     bk->thpos = -1;
+    bk->blocked = -1;
     if(++nbuckets > (1 << (hashlen - 1)))
        rehash(hashlen + 1);
     return(bk);
     if(++nbuckets > (1 << (hashlen - 1)))
        rehash(hashlen + 1);
     return(bk);
@@ -174,7 +211,7 @@ static void hashdel(struct bucket *bk)
     m = (N = (1 << hashlen)) - 1;
     for(i = bk->id.hash & m, n = 0; n < N; i = (i + 1) & m, n++) {
        assert((sb = buckets[i]) != NULL);
     m = (N = (1 << hashlen)) - 1;
     for(i = bk->id.hash & m, n = 0; n < N; i = (i + 1) & m, n++) {
        assert((sb = buckets[i]) != NULL);
-       if((sb->id.len == bk->id.len) && !memcmp(sb->id.data, bk->id.data, bk->id.len))
+       if(!srccmp(&sb->id, &bk->id))
            break;
     }
     assert(sb == bk);
            break;
     }
     assert(sb == bk);
@@ -255,10 +292,14 @@ static void updbtime(struct bucket *bk)
     double tm, tm2;
     
     tm = (bk->level == 0) ? (bk->etime + cf.retain) : (bk->last + (bk->level / cf.rate) + cf.retain);
     double tm, tm2;
     
     tm = (bk->level == 0) ? (bk->etime + cf.retain) : (bk->last + (bk->level / cf.rate) + cf.retain);
-    if(bk->brim.d > 0) {
-       tm2 = bk->last + ((bk->level - cf.size) / cf.rate);
-       tm = (tm2 < tm) ? tm2 : tm;
-    }
+    if((bk->blocked > 0) && ((tm2 = bk->wtime + cf.warnrate) > tm))
+       tm = tm2;
+    
+    if((bk->brim.d > 0) && ((tm2 = bk->last + ((bk->level - cf.size) / cf.rate)) < tm))
+       tm = tm2;
+    if((bk->blocked > 0) && ((tm2 = bk->wtime + cf.warnrate) < tm))
+       tm = tm2;
+    
     if(bk->thpos < 0) {
        sizebuf(timeheap, ++timeheap.d);
        thraise((struct btime){bk, tm}, timeheap.d - 1);
     if(bk->thpos < 0) {
        sizebuf(timeheap, ++timeheap.d);
        thraise((struct btime){bk, tm}, timeheap.d - 1);
@@ -289,12 +330,17 @@ static void tickbucket(struct bucket *bk)
        bufdel(bk->brim, 0);
        bk->level += 1;
     }
        bufdel(bk->brim, 0);
        bk->level += 1;
     }
+    if((bk->blocked > 0) && (now - bk->wtime >= cf.warnrate)) {
+       flog(LOG_NOTICE, "ratequeue: blocked %i requests from %s", bk->blocked, formatsrc(&bk->id));
+       bk->blocked = 0;
+       bk->wtime = now;
+    }
 }
 
 static void checkbtime(struct bucket *bk)
 {
     tickbucket(bk);
 }
 
 static void checkbtime(struct bucket *bk)
 {
     tickbucket(bk);
-    if((bk->level == 0) && (now >= bk->etime + cf.retain)) {
+    if((bk->level == 0) && (now >= bk->etime + cf.retain) && (bk->blocked <= 0)) {
        freebucket(bk);
        return;
     }
        freebucket(bk);
        return;
     }
@@ -321,14 +367,20 @@ static void serve(struct hthead *req, int fd)
     } else if(bk->brim.d < cf.brimsize) {
        bufadd(bk->brim, ((struct waiting){.req = req, .fd = fd}));
     } else {
     } else if(bk->brim.d < cf.brimsize) {
        bufadd(bk->brim, ((struct waiting){.req = req, .fd = fd}));
     } else {
+       if(bk->blocked < 0) {
+           flog(LOG_NOTICE, "ratequeue: blocking requests from %s", formatsrc(&bk->id));
+           bk->blocked = 0;
+           bk->wtime = now;
+       }
        simpleerror(fd, 429, "Too many requests", "Your client is being throttled for issuing too frequent requests.");
        freehthead(req);
        close(fd);
        simpleerror(fd, 429, "Too many requests", "Your client is being throttled for issuing too frequent requests.");
        freehthead(req);
        close(fd);
+       bk->blocked++;
     }
     updbtime(bk);
 }
 
     }
     updbtime(bk);
 }
 
-static int parseint(char *str, int *dst)
+static int parseint(const char *str, int *dst)
 {
     long buf;
     char *p;
 {
     long buf;
     char *p;
@@ -340,7 +392,7 @@ static int parseint(char *str, int *dst)
     return(0);
 }
 
     return(0);
 }
 
-static int parsefloat(char *str, double *dst)
+static int parsefloat(const char *str, double *dst)
 {
     double buf;
     char *p;
 {
     double buf;
     char *p;