Commit | Line | Data |
---|---|---|
22aad619 | 1 | import time, sys, io |
d8509ee5 | 2 | |
173e0e9e FT |
3 | def htmlquote(text): |
4 | ret = "" | |
5 | for c in text: | |
6 | if c == '&': | |
7 | ret += "&" | |
8 | elif c == '<': | |
9 | ret += "<" | |
10 | elif c == '>': | |
11 | ret += ">" | |
12 | elif c == '"': | |
13 | ret += """ | |
14 | else: | |
15 | ret += c | |
16 | return ret | |
17 | ||
18 | def simpleerror(env, startreq, code, title, msg): | |
19 | buf = """<?xml version="1.0" encoding="US-ASCII"?> | |
20 | <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> | |
21 | <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en-US"> | |
22 | <head> | |
23 | <title>%s</title> | |
24 | </head> | |
25 | <body> | |
26 | <h1>%s</h1> | |
27 | <p>%s</p> | |
28 | </body> | |
29 | </html>""" % (title, title, htmlquote(msg)) | |
30 | buf = buf.encode("ascii") | |
31 | startreq("%i %s" % (code, title), [("Content-Type", "text/html"), ("Content-Length", str(len(buf)))]) | |
32 | return [buf] | |
d8509ee5 FT |
33 | |
34 | def httpdate(ts): | |
35 | return time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime(ts)) | |
36 | ||
37 | def phttpdate(dstr): | |
38 | tz = dstr[-6:] | |
39 | dstr = dstr[:-6] | |
40 | if tz[0] != " " or (tz[1] != "+" and tz[1] != "-") or not tz[2:].isdigit(): | |
41 | return None | |
42 | tz = int(tz[1:]) | |
43 | tz = (((tz / 100) * 60) + (tz % 100)) * 60 | |
44 | return time.mktime(time.strptime(dstr, "%a, %d %b %Y %H:%M:%S")) - tz - time.altzone | |
22aad619 FT |
45 | |
46 | def testenviron(uri, qs="", pi="", method=None, filename=None, host="localhost", data=None, ctype=None, head={}): | |
47 | if method is None: | |
48 | method = "GET" if data is None else "POST" | |
49 | if ctype is None and data is not None: | |
50 | ctype = "application/x-www-form-urlencoded" | |
51 | ret = {} | |
52 | ret["wsgi.version"] = 1, 0 | |
53 | ret["SERVER_SOFTWARE"] = "ashd-test/1" | |
54 | ret["GATEWAY_INTERFACE"] = "CGI/1.1" | |
55 | ret["SERVER_PROTOCOL"] = "HTTP/1.1" | |
56 | ret["REQUEST_METHOD"] = method | |
57 | ret["wsgi.uri_encoding"] = "utf-8" | |
58 | ret["SCRIPT_NAME"] = uri | |
59 | ret["PATH_INFO"] = pi | |
60 | ret["QUERY_STRING"] = qs | |
61 | full = uri + pi | |
62 | if qs: | |
63 | full = full + "?" + qs | |
64 | ret["REQUEST_URI"] = full | |
65 | if filename is not None: | |
66 | ret["SCRIPT_FILENAME"] = filename | |
67 | ret["HTTP_HOST"] = ret["SERVER_NAME"] = host | |
68 | ret["wsgi.url_scheme"] = "http" | |
69 | ret["SERVER_ADDR"] = "127.0.0.1" | |
70 | ret["SERVER_PORT"] = "80" | |
71 | ret["REMOTE_ADDR"] = "127.0.0.1" | |
72 | ret["REMOTE_PORT"] = "12345" | |
73 | if data is not None: | |
74 | ret["CONTENT_TYPE"] = ctype | |
75 | ret["CONTENT_LENGTH"] = len(data) | |
76 | ret["wsgi.input"] = io.BytesIO(data) | |
77 | else: | |
78 | ret["wsgi.input"] = io.BytesIO(b"") | |
79 | ret["wsgi.errors"] = sys.stderr | |
80 | ret["wsgi.multithread"] = True | |
81 | ret["wsgi.multiprocess"] = False | |
82 | ret["wsgi.run_once"] = False | |
83 | for key, val in head.items(): | |
84 | ret["HTTP_" + key.upper().replace("-", "_")] = val | |
85 | return ret | |
86 | ||
87 | class testrequest(object): | |
88 | def __init__(self): | |
89 | self.wbuf = io.BytesIO() | |
90 | self.headers = None | |
91 | self.status = None | |
92 | ||
93 | def __call__(self, status, headers): | |
94 | self.status = status | |
95 | self.headers = headers | |
96 | return self.wbuf.write | |
97 | ||
98 | def __repr__(self): | |
99 | return "<ashd.wsgiutil.testrequest %r %s %s>" % (self.status, | |
100 | "None" if self.headers is None else ("[%i]" % len(self.headers)), | |
101 | "(no data)" if len(self.wbuf.getvalue()) == 0 else "(with data)") | |
102 | ||
103 | def __str__(self): | |
104 | return repr(self) |