1 import binascii, hashlib, threading, time
2 from . import resp, proto
4 class unauthorized(resp.httperror):
5 def __init__(self, challenge, message=None, detail=None):
6 super().__init__(401, message, detail)
7 if isinstance(challenge, str):
8 challenge = [challenge]
9 self.challenge = challenge
11 def handle(self, req):
12 for challenge in self.challenge:
13 req.ohead.add("WWW-Authenticate", challenge)
14 return super().handle(req)
16 class forbidden(resp.httperror):
17 def __init__(self, message=None, detail=None):
18 super().__init__(403, message, detail)
21 h = req.ihead.get("Authorization", None)
27 return h[:p].strip().lower(), h[p + 1:].strip()
30 mech, data = parsemech(req)
34 raw = proto.unb64(data)
35 except binascii.Error:
38 raw = raw.decode("utf-8")
40 raw = raw.decode("latin1")
44 return raw[:p], raw[p + 1:]
46 class basiccache(object):
49 def __init__(self, realm, authfn=None):
50 self._lock = threading.Lock()
53 if authfn is not None:
56 def _obscure(self, nm, pw):
57 dig = hashlib.sha256()
58 dig.update(self.realm.encode("utf-8"))
59 dig.update(nm.encode("utf-8"))
60 dig.update(pw.encode("utf-8"))
64 nm, pw = parsebasic(req)
66 raise unauthorized("Basic Realm=\"%s\"" % self.realm)
67 pwh = self._obscure(nm, pw)
70 if (nm, pwh) in self._cache:
71 lock, atime, res, resob = self._cache[nm, pwh]
72 if now - atime < self.cachetime:
78 lock = threading.Lock()
79 self._cache[nm, pwh] = (lock, now, None, None)
82 ret = self.auth(req, nm, pw)
83 except forbidden as exc:
85 self._cache[nm, pwh] = (lock, now, "f", exc)
90 self._cache[nm, pwh] = (lock, now, "s", ret)
93 def auth(self, req, nm, pw):
94 raise Exception("authentication function neither supplied nor overridden")