lib: Added mblock support for kqueue.
[ashd.git] / python3 / ashd / wsgiutil.py
1 import time, sys, io
2
3 def htmlquote(text):
4     ret = ""
5     for c in text:
6         if c == '&':
7             ret += "&"
8         elif c == '<':
9             ret += "&lt;"
10         elif c == '>':
11             ret += "&gt;"
12         elif c == '"':
13             ret += "&quot;"
14         else:
15             ret += c
16     return ret
17
18 def simpleerror(env, startreq, code, title, msg):
19     buf = """<?xml version="1.0" encoding="US-ASCII"?>
20 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
21 <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en-US">
22 <head>
23 <title>%s</title>
24 </head>
25 <body>
26 <h1>%s</h1>
27 <p>%s</p>
28 </body>
29 </html>""" % (title, title, htmlquote(msg))
30     buf = buf.encode("ascii")
31     startreq("%i %s" % (code, title), [("Content-Type", "text/html"), ("Content-Length", str(len(buf)))])
32     return [buf]
33
34 def httpdate(ts):
35     return time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime(ts))
36
37 def phttpdate(dstr):
38     tz = dstr[-6:]
39     dstr = dstr[:-6]
40     if tz[0] != " " or (tz[1] != "+" and tz[1] != "-") or not tz[2:].isdigit():
41         return None
42     tz = int(tz[1:])
43     tz = (((tz / 100) * 60) + (tz % 100)) * 60
44     return time.mktime(time.strptime(dstr, "%a, %d %b %Y %H:%M:%S")) - tz - time.altzone
45
46 def testenviron(uri, qs="", pi="", method=None, filename=None, host="localhost", data=None, ctype=None, head={}):
47     if method is None:
48         method = "GET" if data is None else "POST"
49     if ctype is None and data is not None:
50         ctype = "application/x-www-form-urlencoded"
51     ret = {}
52     ret["wsgi.version"] = 1, 0
53     ret["SERVER_SOFTWARE"] = "ashd-test/1"
54     ret["GATEWAY_INTERFACE"] = "CGI/1.1"
55     ret["SERVER_PROTOCOL"] = "HTTP/1.1"
56     ret["REQUEST_METHOD"] = method
57     ret["wsgi.uri_encoding"] = "utf-8"
58     ret["SCRIPT_NAME"] = uri
59     ret["PATH_INFO"] = pi
60     ret["QUERY_STRING"] = qs
61     full = uri + pi
62     if qs:
63         full = full + "?" + qs
64     ret["REQUEST_URI"] = full
65     if filename is not None:
66         ret["SCRIPT_FILENAME"] = filename
67     ret["HTTP_HOST"] = ret["SERVER_NAME"] = host
68     ret["wsgi.url_scheme"] = "http"
69     ret["SERVER_ADDR"] = "127.0.0.1"
70     ret["SERVER_PORT"] = "80"
71     ret["REMOTE_ADDR"] = "127.0.0.1"
72     ret["REMOTE_PORT"] = "12345"
73     if data is not None:
74         ret["CONTENT_TYPE"] = ctype
75         ret["CONTENT_LENGTH"] = len(data)
76         ret["wsgi.input"] = io.BytesIO(data)
77     else:
78         ret["wsgi.input"] = io.BytesIO(b"")
79     ret["wsgi.errors"] = sys.stderr
80     ret["wsgi.multithread"] = True
81     ret["wsgi.multiprocess"] = False
82     ret["wsgi.run_once"] = False
83     for key, val in head.items():
84         ret["HTTP_" + key.upper().replace("-", "_")] = val
85     return ret
86
87 class testrequest(object):
88     def __init__(self):
89         self.wbuf = io.BytesIO()
90         self.headers = None
91         self.status = None
92
93     def __call__(self, status, headers):
94         self.status = status
95         self.headers = headers
96         return self.wbuf.write
97
98     def __repr__(self):
99         return "<ashd.wsgiutil.testrequest %r %s %s>" % (self.status,
100                                                          "None" if self.headers is None else ("[%i]" % len(self.headers)),
101                                                          "(no data)" if len(self.wbuf.getvalue()) == 0 else "(with data)")
102
103     def __str__(self):
104         return repr(self)