callcgi: Fixed possible deadlock problem on aborted requests.
[ashd.git] / python3 / ashd / wsgiutil.py
CommitLineData
22aad619 1import time, sys, io
d8509ee5 2
173e0e9e
FT
3def htmlquote(text):
4 ret = ""
5 for c in text:
6 if c == '&':
7 ret += "&"
8 elif c == '<':
9 ret += "&lt;"
10 elif c == '>':
11 ret += "&gt;"
12 elif c == '"':
13 ret += "&quot;"
14 else:
15 ret += c
16 return ret
17
18def simpleerror(env, startreq, code, title, msg):
19 buf = """<?xml version="1.0" encoding="US-ASCII"?>
20<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
21<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en-US">
22<head>
23<title>%s</title>
24</head>
25<body>
26<h1>%s</h1>
27<p>%s</p>
28</body>
29</html>""" % (title, title, htmlquote(msg))
30 buf = buf.encode("ascii")
31 startreq("%i %s" % (code, title), [("Content-Type", "text/html"), ("Content-Length", str(len(buf)))])
32 return [buf]
d8509ee5
FT
33
34def httpdate(ts):
35 return time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime(ts))
36
37def phttpdate(dstr):
38 tz = dstr[-6:]
39 dstr = dstr[:-6]
40 if tz[0] != " " or (tz[1] != "+" and tz[1] != "-") or not tz[2:].isdigit():
41 return None
42 tz = int(tz[1:])
43 tz = (((tz / 100) * 60) + (tz % 100)) * 60
44 return time.mktime(time.strptime(dstr, "%a, %d %b %Y %H:%M:%S")) - tz - time.altzone
22aad619
FT
45
46def testenviron(uri, qs="", pi="", method=None, filename=None, host="localhost", data=None, ctype=None, head={}):
47 if method is None:
48 method = "GET" if data is None else "POST"
49 if ctype is None and data is not None:
50 ctype = "application/x-www-form-urlencoded"
51 ret = {}
52 ret["wsgi.version"] = 1, 0
53 ret["SERVER_SOFTWARE"] = "ashd-test/1"
54 ret["GATEWAY_INTERFACE"] = "CGI/1.1"
55 ret["SERVER_PROTOCOL"] = "HTTP/1.1"
56 ret["REQUEST_METHOD"] = method
57 ret["wsgi.uri_encoding"] = "utf-8"
58 ret["SCRIPT_NAME"] = uri
59 ret["PATH_INFO"] = pi
60 ret["QUERY_STRING"] = qs
61 full = uri + pi
62 if qs:
63 full = full + "?" + qs
64 ret["REQUEST_URI"] = full
65 if filename is not None:
66 ret["SCRIPT_FILENAME"] = filename
67 ret["HTTP_HOST"] = ret["SERVER_NAME"] = host
68 ret["wsgi.url_scheme"] = "http"
69 ret["SERVER_ADDR"] = "127.0.0.1"
70 ret["SERVER_PORT"] = "80"
71 ret["REMOTE_ADDR"] = "127.0.0.1"
72 ret["REMOTE_PORT"] = "12345"
73 if data is not None:
74 ret["CONTENT_TYPE"] = ctype
75 ret["CONTENT_LENGTH"] = len(data)
76 ret["wsgi.input"] = io.BytesIO(data)
77 else:
78 ret["wsgi.input"] = io.BytesIO(b"")
79 ret["wsgi.errors"] = sys.stderr
80 ret["wsgi.multithread"] = True
81 ret["wsgi.multiprocess"] = False
82 ret["wsgi.run_once"] = False
83 for key, val in head.items():
84 ret["HTTP_" + key.upper().replace("-", "_")] = val
85 return ret
86
87class testrequest(object):
88 def __init__(self):
89 self.wbuf = io.BytesIO()
90 self.headers = None
91 self.status = None
92
93 def __call__(self, status, headers):
94 self.status = status
95 self.headers = headers
96 return self.wbuf.write
97
98 def __repr__(self):
99 return "<ashd.wsgiutil.testrequest %r %s %s>" % (self.status,
100 "None" if self.headers is None else ("[%i]" % len(self.headers)),
101 "(no data)" if len(self.wbuf.getvalue()) == 0 else "(with data)")
102
103 def __str__(self):
104 return repr(self)