|  | # -*- coding: utf-8 -*- | 
|  | """ | 
|  | proxy.py | 
|  | ~~~~~~~~ | 
|  |  | 
|  | HTTP Proxy Server in Python. | 
|  |  | 
|  | :copyright: (c) 2013-2018 by Abhinav Singh. | 
|  | :license: BSD, see LICENSE for more details. | 
|  | """ | 
|  | import sys | 
|  | import base64 | 
|  | import socket | 
|  | import logging | 
|  | import unittest | 
|  | from threading import Thread | 
|  | from contextlib import closing | 
|  | from proxy import Proxy, ChunkParser, HttpParser, Client | 
|  | from proxy import ProxyAuthenticationFailed, ProxyConnectionFailed | 
|  | from proxy import CRLF, version, PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT | 
|  |  | 
|  | # logging.basicConfig(level=logging.DEBUG, | 
|  | #                     format='%(asctime)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s') | 
|  |  | 
|  | # True if we are running on Python 3. | 
|  | if sys.version_info[0] == 3: | 
|  | from http.server import HTTPServer, BaseHTTPRequestHandler | 
|  | else: | 
|  | from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler | 
|  |  | 
|  |  | 
|  | class TestChunkParser(unittest.TestCase): | 
|  |  | 
|  | def setUp(self): | 
|  | self.parser = ChunkParser() | 
|  |  | 
|  | def test_chunk_parse_basic(self): | 
|  | self.parser.parse(b''.join([ | 
|  | b'4\r\n', | 
|  | b'Wiki\r\n', | 
|  | b'5\r\n', | 
|  | b'pedia\r\n', | 
|  | b'E\r\n', | 
|  | b' in\r\n\r\nchunks.\r\n', | 
|  | b'0\r\n', | 
|  | b'\r\n' | 
|  | ])) | 
|  | self.assertEqual(self.parser.chunk, b'') | 
|  | self.assertEqual(self.parser.size, None) | 
|  | self.assertEqual(self.parser.body, b'Wikipedia in\r\n\r\nchunks.') | 
|  | self.assertEqual(self.parser.state, ChunkParser.states.COMPLETE) | 
|  |  | 
|  | def test_chunk_parse_issue_27(self): | 
|  | """Case when data ends with the chunk size but without CRLF.""" | 
|  | self.parser.parse(b'3') | 
|  | self.assertEqual(self.parser.chunk, b'3') | 
|  | self.assertEqual(self.parser.size, None) | 
|  | self.assertEqual(self.parser.body, b'') | 
|  | self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_SIZE) | 
|  | self.parser.parse(b'\r\n') | 
|  | self.assertEqual(self.parser.chunk, b'') | 
|  | self.assertEqual(self.parser.size, 3) | 
|  | self.assertEqual(self.parser.body, b'') | 
|  | self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_DATA) | 
|  | self.parser.parse(b'abc') | 
|  | self.assertEqual(self.parser.chunk, b'') | 
|  | self.assertEqual(self.parser.size, None) | 
|  | self.assertEqual(self.parser.body, b'abc') | 
|  | self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_SIZE) | 
|  | self.parser.parse(b'\r\n') | 
|  | self.assertEqual(self.parser.chunk, b'') | 
|  | self.assertEqual(self.parser.size, None) | 
|  | self.assertEqual(self.parser.body, b'abc') | 
|  | self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_SIZE) | 
|  | self.parser.parse(b'4\r\n') | 
|  | self.assertEqual(self.parser.chunk, b'') | 
|  | self.assertEqual(self.parser.size, 4) | 
|  | self.assertEqual(self.parser.body, b'abc') | 
|  | self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_DATA) | 
|  | self.parser.parse(b'defg\r\n0') | 
|  | self.assertEqual(self.parser.chunk, b'0') | 
|  | self.assertEqual(self.parser.size, None) | 
|  | self.assertEqual(self.parser.body, b'abcdefg') | 
|  | self.assertEqual(self.parser.state, ChunkParser.states.WAITING_FOR_SIZE) | 
|  | self.parser.parse(b'\r\n\r\n') | 
|  | self.assertEqual(self.parser.chunk, b'') | 
|  | self.assertEqual(self.parser.size, None) | 
|  | self.assertEqual(self.parser.body, b'abcdefg') | 
|  | self.assertEqual(self.parser.state, ChunkParser.states.COMPLETE) | 
|  |  | 
|  |  | 
|  | class TestHttpParser(unittest.TestCase): | 
|  |  | 
|  | def setUp(self): | 
|  | self.parser = HttpParser(HttpParser.types.REQUEST_PARSER) | 
|  |  | 
|  | def test_build_header(self): | 
|  | self.assertEqual(HttpParser.build_header(b'key', b'value'), b'key: value') | 
|  |  | 
|  | def test_split(self): | 
|  | self.assertEqual(HttpParser.split(b'CONNECT python.org:443 HTTP/1.0\r\n\r\n'), | 
|  | (b'CONNECT python.org:443 HTTP/1.0', b'\r\n')) | 
|  |  | 
|  | def test_split_false_line(self): | 
|  | self.assertEqual(HttpParser.split(b'CONNECT python.org:443 HTTP/1.0'), | 
|  | (False, b'CONNECT python.org:443 HTTP/1.0')) | 
|  |  | 
|  | def test_get_full_parse(self): | 
|  | raw = CRLF.join([ | 
|  | b'GET %s HTTP/1.1', | 
|  | b'Host: %s', | 
|  | CRLF | 
|  | ]) | 
|  | self.parser.parse(raw % (b'https://example.com/path/dir/?a=b&c=d#p=q', b'example.com')) | 
|  | self.assertEqual(self.parser.build_url(), b'/path/dir/?a=b&c=d#p=q') | 
|  | self.assertEqual(self.parser.method, b'GET') | 
|  | self.assertEqual(self.parser.url.hostname, b'example.com') | 
|  | self.assertEqual(self.parser.url.port, None) | 
|  | self.assertEqual(self.parser.version, b'HTTP/1.1') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.COMPLETE) | 
|  | self.assertDictContainsSubset({b'host': (b'Host', b'example.com')}, self.parser.headers) | 
|  | self.assertEqual(raw % (b'/path/dir/?a=b&c=d#p=q', b'example.com'), | 
|  | self.parser.build(del_headers=[b'host'], add_headers=[(b'Host', b'example.com')])) | 
|  |  | 
|  | def test_build_url_none(self): | 
|  | self.assertEqual(self.parser.build_url(), b'/None') | 
|  |  | 
|  | def test_line_rcvd_to_rcving_headers_state_change(self): | 
|  | self.parser.parse(b'GET http://localhost HTTP/1.1') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.INITIALIZED) | 
|  | self.parser.parse(CRLF) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.LINE_RCVD) | 
|  | self.parser.parse(CRLF) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS) | 
|  |  | 
|  | def test_get_partial_parse1(self): | 
|  | self.parser.parse(CRLF.join([ | 
|  | b'GET http://localhost:8080 HTTP/1.1' | 
|  | ])) | 
|  | self.assertEqual(self.parser.method, None) | 
|  | self.assertEqual(self.parser.url, None) | 
|  | self.assertEqual(self.parser.version, None) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.INITIALIZED) | 
|  |  | 
|  | self.parser.parse(CRLF) | 
|  | self.assertEqual(self.parser.method, b'GET') | 
|  | self.assertEqual(self.parser.url.hostname, b'localhost') | 
|  | self.assertEqual(self.parser.url.port, 8080) | 
|  | self.assertEqual(self.parser.version, b'HTTP/1.1') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.LINE_RCVD) | 
|  |  | 
|  | self.parser.parse(b'Host: localhost:8080') | 
|  | self.assertDictEqual(self.parser.headers, dict()) | 
|  | self.assertEqual(self.parser.buffer, b'Host: localhost:8080') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.LINE_RCVD) | 
|  |  | 
|  | self.parser.parse(CRLF * 2) | 
|  | self.assertDictContainsSubset({b'host': (b'Host', b'localhost:8080')}, self.parser.headers) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.COMPLETE) | 
|  |  | 
|  | def test_get_partial_parse2(self): | 
|  | self.parser.parse(CRLF.join([ | 
|  | b'GET http://localhost:8080 HTTP/1.1', | 
|  | b'Host: ' | 
|  | ])) | 
|  | self.assertEqual(self.parser.method, b'GET') | 
|  | self.assertEqual(self.parser.url.hostname, b'localhost') | 
|  | self.assertEqual(self.parser.url.port, 8080) | 
|  | self.assertEqual(self.parser.version, b'HTTP/1.1') | 
|  | self.assertEqual(self.parser.buffer, b'Host: ') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.LINE_RCVD) | 
|  |  | 
|  | self.parser.parse(b'localhost:8080' + CRLF) | 
|  | self.assertDictContainsSubset({b'host': (b'Host', b'localhost:8080')}, self.parser.headers) | 
|  | self.assertEqual(self.parser.buffer, b'') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS) | 
|  |  | 
|  | self.parser.parse(b'Content-Type: text/plain' + CRLF) | 
|  | self.assertEqual(self.parser.buffer, b'') | 
|  | self.assertDictContainsSubset({b'content-type': (b'Content-Type', b'text/plain')}, self.parser.headers) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS) | 
|  |  | 
|  | self.parser.parse(CRLF) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.COMPLETE) | 
|  |  | 
|  | def test_post_full_parse(self): | 
|  | raw = CRLF.join([ | 
|  | b'POST %s HTTP/1.1', | 
|  | b'Host: localhost', | 
|  | b'Content-Length: 7', | 
|  | b'Content-Type: application/x-www-form-urlencoded' + CRLF, | 
|  | b'a=b&c=d' | 
|  | ]) | 
|  | self.parser.parse(raw % b'http://localhost') | 
|  | self.assertEqual(self.parser.method, b'POST') | 
|  | self.assertEqual(self.parser.url.hostname, b'localhost') | 
|  | self.assertEqual(self.parser.url.port, None) | 
|  | self.assertEqual(self.parser.version, b'HTTP/1.1') | 
|  | self.assertDictContainsSubset({b'content-type': (b'Content-Type', b'application/x-www-form-urlencoded')}, | 
|  | self.parser.headers) | 
|  | self.assertDictContainsSubset({b'content-length': (b'Content-Length', b'7')}, self.parser.headers) | 
|  | self.assertEqual(self.parser.body, b'a=b&c=d') | 
|  | self.assertEqual(self.parser.buffer, b'') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.COMPLETE) | 
|  | self.assertEqual(len(self.parser.build()), len(raw % b'/')) | 
|  |  | 
|  | def test_post_partial_parse(self): | 
|  | self.parser.parse(CRLF.join([ | 
|  | b'POST http://localhost HTTP/1.1', | 
|  | b'Host: localhost', | 
|  | b'Content-Length: 7', | 
|  | b'Content-Type: application/x-www-form-urlencoded' | 
|  | ])) | 
|  | self.assertEqual(self.parser.method, b'POST') | 
|  | self.assertEqual(self.parser.url.hostname, b'localhost') | 
|  | self.assertEqual(self.parser.url.port, None) | 
|  | self.assertEqual(self.parser.version, b'HTTP/1.1') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS) | 
|  |  | 
|  | self.parser.parse(CRLF) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS) | 
|  |  | 
|  | self.parser.parse(CRLF) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.HEADERS_COMPLETE) | 
|  |  | 
|  | self.parser.parse(b'a=b') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.RCVING_BODY) | 
|  | self.assertEqual(self.parser.body, b'a=b') | 
|  | self.assertEqual(self.parser.buffer, b'') | 
|  |  | 
|  | self.parser.parse(b'&c=d') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.COMPLETE) | 
|  | self.assertEqual(self.parser.body, b'a=b&c=d') | 
|  | self.assertEqual(self.parser.buffer, b'') | 
|  |  | 
|  | def test_connect_request_without_host_header_request_parse(self): | 
|  | """Case where clients can send CONNECT request without a Host header field. | 
|  |  | 
|  | Example: | 
|  | 1. pip3 --proxy http://localhost:8899 install <package name> | 
|  | Uses HTTP/1.0, Host header missing with CONNECT requests | 
|  | 2. Android Emulator | 
|  | Uses HTTP/1.1, Host header missing with CONNECT requests | 
|  |  | 
|  | See https://github.com/abhinavsingh/proxy.py/issues/5 for details. | 
|  | """ | 
|  | self.parser.parse(b'CONNECT pypi.org:443 HTTP/1.0\r\n\r\n') | 
|  | self.assertEqual(self.parser.method, b'CONNECT') | 
|  | self.assertEqual(self.parser.version, b'HTTP/1.0') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.COMPLETE) | 
|  |  | 
|  | def test_request_parse_without_content_length(self): | 
|  | """Case when incoming request doesn't contain a content-length header. | 
|  |  | 
|  | From http://w3-org.9356.n7.nabble.com/POST-with-empty-body-td103965.html | 
|  | 'A POST with no content-length and no body is equivalent to a POST with Content-Length: 0 | 
|  | and nothing following, as could perfectly happen when you upload an empty file for instance.' | 
|  |  | 
|  | See https://github.com/abhinavsingh/proxy.py/issues/20 for details. | 
|  | """ | 
|  | self.parser.parse(CRLF.join([ | 
|  | b'POST http://localhost HTTP/1.1', | 
|  | b'Host: localhost', | 
|  | b'Content-Type: application/x-www-form-urlencoded', | 
|  | CRLF | 
|  | ])) | 
|  | self.assertEqual(self.parser.method, b'POST') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.COMPLETE) | 
|  |  | 
|  | def test_response_parse_without_content_length(self): | 
|  | """Case when server response doesn't contain a content-length header for non-chunk response types. | 
|  |  | 
|  | HttpParser by itself has no way to know if more data should be expected. | 
|  | In example below, parser reaches state HttpParser.states.HEADERS_COMPLETE | 
|  | and it is responsibility of callee to change state to HttpParser.states.COMPLETE | 
|  | when server stream closes. | 
|  |  | 
|  | See https://github.com/abhinavsingh/proxy.py/issues/20 for details. | 
|  | """ | 
|  | self.parser.type = HttpParser.types.RESPONSE_PARSER | 
|  | self.parser.parse(b'HTTP/1.0 200 OK' + CRLF) | 
|  | self.assertEqual(self.parser.code, b'200') | 
|  | self.assertEqual(self.parser.version, b'HTTP/1.0') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.LINE_RCVD) | 
|  | self.parser.parse(CRLF.join([ | 
|  | b'Server: BaseHTTP/0.3 Python/2.7.10', | 
|  | b'Date: Thu, 13 Dec 2018 16:24:09 GMT', | 
|  | CRLF | 
|  | ])) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.HEADERS_COMPLETE) | 
|  |  | 
|  | def test_response_parse(self): | 
|  | self.parser.type = HttpParser.types.RESPONSE_PARSER | 
|  | self.parser.parse(b''.join([ | 
|  | b'HTTP/1.1 301 Moved Permanently\r\n', | 
|  | b'Location: http://www.google.com/\r\n', | 
|  | b'Content-Type: text/html; charset=UTF-8\r\n', | 
|  | b'Date: Wed, 22 May 2013 14:07:29 GMT\r\n', | 
|  | b'Expires: Fri, 21 Jun 2013 14:07:29 GMT\r\n', | 
|  | b'Cache-Control: public, max-age=2592000\r\n', | 
|  | b'Server: gws\r\n', | 
|  | b'Content-Length: 219\r\n', | 
|  | b'X-XSS-Protection: 1; mode=block\r\n', | 
|  | b'X-Frame-Options: SAMEORIGIN\r\n\r\n', | 
|  | b'<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">\n' + | 
|  | b'<TITLE>301 Moved</TITLE></HEAD>', | 
|  | b'<BODY>\n<H1>301 Moved</H1>\nThe document has moved\n' + | 
|  | b'<A HREF="http://www.google.com/">here</A>.\r\n</BODY></HTML>\r\n' | 
|  | ])) | 
|  | self.assertEqual(self.parser.code, b'301') | 
|  | self.assertEqual(self.parser.reason, b'Moved Permanently') | 
|  | self.assertEqual(self.parser.version, b'HTTP/1.1') | 
|  | self.assertEqual(self.parser.body, | 
|  | b'<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">\n' + | 
|  | b'<TITLE>301 Moved</TITLE></HEAD><BODY>\n<H1>301 Moved</H1>\nThe document has moved\n' + | 
|  | b'<A HREF="http://www.google.com/">here</A>.\r\n</BODY></HTML>\r\n') | 
|  | self.assertDictContainsSubset({b'content-length': (b'Content-Length', b'219')}, self.parser.headers) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.COMPLETE) | 
|  |  | 
|  | def test_response_partial_parse(self): | 
|  | self.parser.type = HttpParser.types.RESPONSE_PARSER | 
|  | self.parser.parse(b''.join([ | 
|  | b'HTTP/1.1 301 Moved Permanently\r\n', | 
|  | b'Location: http://www.google.com/\r\n', | 
|  | b'Content-Type: text/html; charset=UTF-8\r\n', | 
|  | b'Date: Wed, 22 May 2013 14:07:29 GMT\r\n', | 
|  | b'Expires: Fri, 21 Jun 2013 14:07:29 GMT\r\n', | 
|  | b'Cache-Control: public, max-age=2592000\r\n', | 
|  | b'Server: gws\r\n', | 
|  | b'Content-Length: 219\r\n', | 
|  | b'X-XSS-Protection: 1; mode=block\r\n', | 
|  | b'X-Frame-Options: SAMEORIGIN\r\n' | 
|  | ])) | 
|  | self.assertDictContainsSubset({b'x-frame-options': (b'X-Frame-Options', b'SAMEORIGIN')}, self.parser.headers) | 
|  | self.assertEqual(self.parser.state, HttpParser.states.RCVING_HEADERS) | 
|  | self.parser.parse(b'\r\n') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.HEADERS_COMPLETE) | 
|  | self.parser.parse( | 
|  | b'<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">\n' + | 
|  | b'<TITLE>301 Moved</TITLE></HEAD>') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.RCVING_BODY) | 
|  | self.parser.parse( | 
|  | b'<BODY>\n<H1>301 Moved</H1>\nThe document has moved\n' + | 
|  | b'<A HREF="http://www.google.com/">here</A>.\r\n</BODY></HTML>\r\n') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.COMPLETE) | 
|  |  | 
|  | def test_chunked_response_parse(self): | 
|  | self.parser.type = HttpParser.types.RESPONSE_PARSER | 
|  | self.parser.parse(b''.join([ | 
|  | b'HTTP/1.1 200 OK\r\n', | 
|  | b'Content-Type: application/json\r\n', | 
|  | b'Date: Wed, 22 May 2013 15:08:15 GMT\r\n', | 
|  | b'Server: gunicorn/0.16.1\r\n', | 
|  | b'transfer-encoding: chunked\r\n', | 
|  | b'Connection: keep-alive\r\n\r\n', | 
|  | b'4\r\n', | 
|  | b'Wiki\r\n', | 
|  | b'5\r\n', | 
|  | b'pedia\r\n', | 
|  | b'E\r\n', | 
|  | b' in\r\n\r\nchunks.\r\n', | 
|  | b'0\r\n', | 
|  | b'\r\n' | 
|  | ])) | 
|  | self.assertEqual(self.parser.body, b'Wikipedia in\r\n\r\nchunks.') | 
|  | self.assertEqual(self.parser.state, HttpParser.states.COMPLETE) | 
|  |  | 
|  |  | 
|  | class MockConnection(object): | 
|  |  | 
|  | def __init__(self, b=b''): | 
|  | self.buffer = b | 
|  |  | 
|  | def recv(self, b=8192): | 
|  | data = self.buffer[:b] | 
|  | self.buffer = self.buffer[b:] | 
|  | return data | 
|  |  | 
|  | def send(self, data): | 
|  | return len(data) | 
|  |  | 
|  | def queue(self, data): | 
|  | self.buffer += data | 
|  |  | 
|  |  | 
|  | class HTTPRequestHandler(BaseHTTPRequestHandler): | 
|  |  | 
|  | def do_GET(self): | 
|  | self.send_response(200) | 
|  | # TODO(abhinavsingh): Proxy should work just fine even without content-length header | 
|  | self.send_header('content-length', 2) | 
|  | self.end_headers() | 
|  | self.wfile.write(b'OK') | 
|  |  | 
|  |  | 
|  | class TestProxy(unittest.TestCase): | 
|  |  | 
|  | http_server = None | 
|  | http_server_port = None | 
|  | http_server_thread = None | 
|  |  | 
|  | @staticmethod | 
|  | def get_available_port(): | 
|  | with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: | 
|  | sock.bind(('', 0)) | 
|  | _, port = sock.getsockname() | 
|  | return port | 
|  |  | 
|  | @classmethod | 
|  | def setUpClass(cls): | 
|  | cls.http_server_port = cls.get_available_port() | 
|  | cls.http_server = HTTPServer(('127.0.0.1', cls.http_server_port), HTTPRequestHandler) | 
|  | cls.http_server_thread = Thread(target=cls.http_server.serve_forever) | 
|  | cls.http_server_thread.setDaemon(True) | 
|  | cls.http_server_thread.start() | 
|  |  | 
|  | @classmethod | 
|  | def tearDownClass(cls): | 
|  | cls.http_server.shutdown() | 
|  | cls.http_server.server_close() | 
|  | cls.http_server_thread.join() | 
|  |  | 
|  | def setUp(self): | 
|  | self._conn = MockConnection() | 
|  | self._addr = ('127.0.0.1', 54382) | 
|  | self.proxy = Proxy(Client(self._conn, self._addr)) | 
|  |  | 
|  | def test_http_get(self): | 
|  | # Send request line | 
|  | self.proxy.client.conn.queue((b'GET http://localhost:%d HTTP/1.1' % self.http_server_port) + CRLF) | 
|  | self.proxy._process_request(self.proxy.client.recv()) | 
|  | self.assertNotEqual(self.proxy.request.state, HttpParser.states.COMPLETE) | 
|  | # Send headers and blank line, thus completing HTTP request | 
|  | self.proxy.client.conn.queue(CRLF.join([ | 
|  | b'User-Agent: proxy.py/%s' % version, | 
|  | b'Host: localhost:%d' % self.http_server_port, | 
|  | b'Accept: */*', | 
|  | b'Proxy-Connection: Keep-Alive', | 
|  | CRLF | 
|  | ])) | 
|  | self.proxy._process_request(self.proxy.client.recv()) | 
|  | self.assertEqual(self.proxy.request.state, HttpParser.states.COMPLETE) | 
|  | self.assertEqual(self.proxy.server.addr, (b'localhost', self.http_server_port)) | 
|  | # Flush data queued for server | 
|  | self.proxy.server.flush() | 
|  | self.assertEqual(self.proxy.server.buffer_size(), 0) | 
|  | # Receive full response from server | 
|  | data = self.proxy.server.recv() | 
|  | while data: | 
|  | self.proxy._process_response(data) | 
|  | logging.info(self.proxy.response.state) | 
|  | if self.proxy.response.state == HttpParser.states.COMPLETE: | 
|  | break | 
|  | data = self.proxy.server.recv() | 
|  | # Verify 200 success response code | 
|  | self.assertEqual(self.proxy.response.state, HttpParser.states.COMPLETE) | 
|  | self.assertEqual(int(self.proxy.response.code), 200) | 
|  |  | 
|  | def test_http_tunnel(self): | 
|  | self.proxy.client.conn.queue(CRLF.join([ | 
|  | b'CONNECT localhost:%d HTTP/1.1' % self.http_server_port, | 
|  | b'Host: localhost:%d' % self.http_server_port, | 
|  | b'User-Agent: proxy.py/%s' % version, | 
|  | b'Proxy-Connection: Keep-Alive', | 
|  | CRLF | 
|  | ])) | 
|  | self.proxy._process_request(self.proxy.client.recv()) | 
|  | self.assertFalse(self.proxy.server is None) | 
|  | self.assertEqual(self.proxy.client.buffer, PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT) | 
|  |  | 
|  | parser = HttpParser(HttpParser.types.RESPONSE_PARSER) | 
|  | parser.parse(self.proxy.client.buffer) | 
|  | self.assertEqual(parser.state, HttpParser.states.HEADERS_COMPLETE) | 
|  | self.assertEqual(int(parser.code), 200) | 
|  |  | 
|  | self.proxy.client.flush() | 
|  | self.assertEqual(self.proxy.client.buffer_size(), 0) | 
|  |  | 
|  | self.proxy.client.conn.queue(CRLF.join([ | 
|  | b'GET / HTTP/1.1', | 
|  | b'Host: localhost:%d' % self.http_server_port, | 
|  | b'User-Agent: proxy.py/%s' % version, | 
|  | CRLF | 
|  | ])) | 
|  | self.proxy._process_request(self.proxy.client.recv()) | 
|  | self.proxy.server.flush() | 
|  | self.assertEqual(self.proxy.server.buffer_size(), 0) | 
|  |  | 
|  | parser = HttpParser(HttpParser.types.RESPONSE_PARSER) | 
|  | data = self.proxy.server.recv() | 
|  | while data: | 
|  | parser.parse(data) | 
|  | if parser.state == HttpParser.states.COMPLETE: | 
|  | break | 
|  | data = self.proxy.server.recv() | 
|  |  | 
|  | self.assertEqual(parser.state, HttpParser.states.COMPLETE) | 
|  | self.assertEqual(int(parser.code), 200) | 
|  |  | 
|  | def test_proxy_connection_failed(self): | 
|  | with self.assertRaises(ProxyConnectionFailed): | 
|  | self.proxy._process_request(CRLF.join([ | 
|  | b'GET http://unknown.domain HTTP/1.1', | 
|  | b'Host: unknown.domain', | 
|  | CRLF | 
|  | ])) | 
|  |  | 
|  | def test_proxy_authentication_failed(self): | 
|  | self.proxy = Proxy(Client(self._conn, self._addr), b'Basic %s' % base64.b64encode(b'user:pass')) | 
|  |  | 
|  | with self.assertRaises(ProxyAuthenticationFailed): | 
|  | self.proxy._process_request(CRLF.join([ | 
|  | b'GET http://abhinavsingh.com HTTP/1.1', | 
|  | b'Host: abhinavsingh.com', | 
|  | CRLF | 
|  | ])) | 
|  |  | 
|  | def test_authenticated_proxy_http_get(self): | 
|  | self.proxy = Proxy(Client(self._conn, self._addr), b'Basic %s' % base64.b64encode(b'user:pass')) | 
|  |  | 
|  | self.proxy.client.conn.queue((b'GET http://localhost:%d HTTP/1.1' % self.http_server_port) + CRLF) | 
|  | self.proxy._process_request(self.proxy.client.recv()) | 
|  | self.assertNotEqual(self.proxy.request.state, HttpParser.states.COMPLETE) | 
|  |  | 
|  | self.proxy.client.conn.queue(CRLF.join([ | 
|  | b'User-Agent: proxy.py/%s' % version, | 
|  | b'Host: localhost:%d' % self.http_server_port, | 
|  | b'Accept: */*', | 
|  | b'Proxy-Connection: Keep-Alive', | 
|  | b'Proxy-Authorization: Basic dXNlcjpwYXNz', | 
|  | CRLF | 
|  | ])) | 
|  |  | 
|  | self.proxy._process_request(self.proxy.client.recv()) | 
|  | self.assertEqual(self.proxy.request.state, HttpParser.states.COMPLETE) | 
|  | self.assertEqual(self.proxy.server.addr, (b'localhost', self.http_server_port)) | 
|  |  | 
|  | self.proxy.server.flush() | 
|  | self.assertEqual(self.proxy.server.buffer_size(), 0) | 
|  |  | 
|  | data = self.proxy.server.recv() | 
|  | while data: | 
|  | self.proxy._process_response(data) | 
|  | if self.proxy.response.state == HttpParser.states.COMPLETE: | 
|  | break | 
|  | data = self.proxy.server.recv() | 
|  |  | 
|  | self.assertEqual(self.proxy.response.state, HttpParser.states.COMPLETE) | 
|  | self.assertEqual(int(self.proxy.response.code), 200) | 
|  |  | 
|  | def test_authenticated_proxy_http_tunnel(self): | 
|  | self.proxy = Proxy(Client(self._conn, self._addr), b'Basic %s' % base64.b64encode(b'user:pass')) | 
|  |  | 
|  | self.proxy.client.conn.queue(CRLF.join([ | 
|  | b'CONNECT localhost:%d HTTP/1.1' % self.http_server_port, | 
|  | b'Host: localhost:%d' % self.http_server_port, | 
|  | b'User-Agent: proxy.py/%s' % version, | 
|  | b'Proxy-Connection: Keep-Alive', | 
|  | b'Proxy-Authorization: Basic dXNlcjpwYXNz', | 
|  | CRLF | 
|  | ])) | 
|  | self.proxy._process_request(self.proxy.client.recv()) | 
|  | self.assertFalse(self.proxy.server is None) | 
|  | self.assertEqual(self.proxy.client.buffer, PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT) | 
|  |  | 
|  | parser = HttpParser(HttpParser.types.RESPONSE_PARSER) | 
|  | parser.parse(self.proxy.client.buffer) | 
|  | self.assertEqual(parser.state, HttpParser.states.HEADERS_COMPLETE) | 
|  | self.assertEqual(int(parser.code), 200) | 
|  |  | 
|  | self.proxy.client.flush() | 
|  | self.assertEqual(self.proxy.client.buffer_size(), 0) | 
|  |  | 
|  | self.proxy.client.conn.queue(CRLF.join([ | 
|  | b'GET / HTTP/1.1', | 
|  | b'Host: localhost:%d' % self.http_server_port, | 
|  | b'User-Agent: proxy.py/%s' % version, | 
|  | CRLF | 
|  | ])) | 
|  | self.proxy._process_request(self.proxy.client.recv()) | 
|  | self.proxy.server.flush() | 
|  | self.assertEqual(self.proxy.server.buffer_size(), 0) | 
|  |  | 
|  | parser = HttpParser(HttpParser.types.RESPONSE_PARSER) | 
|  | data = self.proxy.server.recv() | 
|  | while data: | 
|  | parser.parse(data) | 
|  | if parser.state == HttpParser.states.COMPLETE: | 
|  | break | 
|  | data = self.proxy.server.recv() | 
|  |  | 
|  | self.assertEqual(parser.state, HttpParser.states.COMPLETE) | 
|  | self.assertEqual(int(parser.code), 200) | 
|  |  | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | unittest.main() |