-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.py
executable file
·170 lines (146 loc) · 6.07 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#!/usr/bin/env python3
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
import validators as valid
from urllib.parse import urljoin, urlsplit, urlunsplit
from bs4 import BeautifulSoup
from requests.structures import CaseInsensitiveDict
from http.cookies import SimpleCookie
from chisel.session import ChiselSession
import re
import json
class ChiselProxy(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def __getattribute__(self, item):
if item.startswith('do_'):
return self.proxy
elif item.startswith('log_'):
return self.disabled
else:
return super().__getattribute__(item)
@staticmethod
def disabled(*args, **kwargs):
pass
def handle_one_request(self):
try:
super().handle_one_request()
except ConnectionAbortedError:
pass
self.rfile = self.connection.makefile('rb', self.rbufsize)
def proxy(self):
pass
# special case: process info request
if self.path == '/headers':
response = json.dumps({
key.lower(): value for key, value in self.headers.items()
}).encode('UTF8')
self.send_response(200)
self.send_header('content-type', 'application/json')
self.send_header('content-length', str(len(response)))
self.end_headers()
self.wfile.write(response)
return
# process request urls
c_mode, c_target = self.process_url(self.path)
p_mode, p_target = self.process_url(self.headers['referer'])
parsed = urlsplit(c_target)
# handle invalid requests
if c_mode is None or c_target is None:
if p_mode is None or p_target is None:
self.send_error(400)
else:
self.send_response(307)
self.send_header('location', '/' + p_mode + '/' + urljoin(p_target, self.path))
self.send_header('vary', 'referer')
self.send_header('content-length', '0')
self.end_headers()
return
# process request body
content = None
if 'content-length' in self.headers:
content = self.rfile.read(int(self.headers['content-length']))
# process request headers
headers = CaseInsensitiveDict(self.headers)
headers.pop('user-agent', None)
headers.pop('accept-encoding', None)
headers.pop('te', None)
headers.pop('connection', None)
headers.pop('host', None)
headers['origin'] = parsed.scheme + '://' + parsed.netloc
headers['referer'] = p_target
# process request cookies
cookies = {key: value.value for key, value in SimpleCookie(headers.pop('cookie', None)).items()}
cookies.pop('__cfduid', None)
cookies.pop('cf_clearance', None)
# send upstream request
resp = ChiselSession().request(
method=self.command,
url=c_target,
data=content,
headers=headers,
cookies=cookies,
allow_redirects=False,
)
if resp is None:
self.send_error(502)
return
# send initial response
self.send_response(resp.status_code)
for keep in ('set-cookie', 'vary'):
if keep in resp.headers:
self.send_header(keep, resp.headers[keep])
if 'location' in resp.headers:
self.send_header('location', '/' + c_mode + '/' + urljoin(c_target, resp.headers['location']))
# end for HEAD requests
if self.command == 'HEAD':
self.end_headers()
return
body = resp.content
# process response body
if resp.headers['content-type'].startswith('text/html'):
if c_mode == 'browser':
soup = self.make_tasty_soup(resp, True)
for tag in soup('script'):
if tag.string:
tag.string = self.expand_urls_in_text(tag.string, parsed.scheme)
with open('intercept.js', 'r') as fp:
tag = soup.new_tag('script')
tag.append(fp.read())
soup.insert(0, tag)
body = soup.encode()
else:
body = self.make_tasty_soup(resp, False).encode()
resp.headers['content-length'] = str(len(body))
elif c_mode == 'browser' and resp.headers['content-type'].startswith('text/') and (
resp.encoding or resp.apparent_encoding
):
body = self.expand_urls_in_text(resp.text, parsed.scheme).encode(resp.encoding or resp.apparent_encoding)
resp.headers['content-length'] = str(len(body))
# send response body and related headers
self.send_header('content-type', resp.headers['content-type'])
self.send_header('content-length', resp.headers['content-length'])
self.end_headers()
self.wfile.write(body)
@staticmethod
def make_tasty_soup(resp, browser):
soup = BeautifulSoup(resp.content, 'lxml')
base = soup.find('base')
base = base['href'] if base else resp.url
for tag in soup(href=True):
tag['href'] = ('/browser/' if browser else '') + urljoin(base, tag['href'])
for tag in soup(src=True):
tag['src'] = ('/browser/' if browser else '') + urljoin(base, tag['src'])
return soup
@staticmethod
def process_url(url):
if not url:
return None, None
split = urlunsplit(('', '') + urlsplit(url)[2:]).split('/', 2)
if len(split) != 3 or not valid.url(split[2], True) or urlsplit(split[2]).scheme not in ('http', 'https'):
return None, None
return split[1], split[2]
@staticmethod
def expand_urls_in_text(text, scheme):
return re.sub(r'([\"\'])(.*?)(?:https?:)?//(.*?[^\\])?\1', '\\1\\2/browser/' + scheme + '://\\3\\1', text)
if __name__ == '__main__':
print('Starting HTTP server on port 1234 ...')
ThreadingHTTPServer(('', 1234), ChiselProxy).serve_forever()