| # -*- coding: utf-8 -*- |
| # |
| |
| import sys |
| sys.path[0:0] = [""] |
| |
| import os |
| import os.path |
| import socket |
| |
| import six |
| |
| # websocket-client |
| import websocket as ws |
| from websocket._handshake import _create_sec_websocket_key, \ |
| _validate as _validate_header |
| from websocket._http import read_headers |
| from websocket._url import get_proxy_info, parse_url |
| from websocket._utils import validate_utf8 |
| |
| if six.PY3: |
| from base64 import decodebytes as base64decode |
| else: |
| from base64 import decodestring as base64decode |
| |
| if sys.version_info[0] == 2 and sys.version_info[1] < 7: |
| import unittest2 as unittest |
| else: |
| import unittest |
| |
| try: |
| from ssl import SSLError |
| except ImportError: |
| # dummy class of SSLError for ssl none-support environment. |
| class SSLError(Exception): |
| pass |
| |
| # Skip test to access the internet. |
| TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1' |
| |
| # Skip Secure WebSocket test. |
| TEST_SECURE_WS = True |
| TRACEABLE = True |
| |
| |
| def create_mask_key(_): |
| return "abcd" |
| |
| |
| class SockMock(object): |
| def __init__(self): |
| self.data = [] |
| self.sent = [] |
| |
| def add_packet(self, data): |
| self.data.append(data) |
| |
| def gettimeout(self): |
| return None |
| |
| def recv(self, bufsize): |
| if self.data: |
| e = self.data.pop(0) |
| if isinstance(e, Exception): |
| raise e |
| if len(e) > bufsize: |
| self.data.insert(0, e[bufsize:]) |
| return e[:bufsize] |
| |
| def send(self, data): |
| self.sent.append(data) |
| return len(data) |
| |
| def close(self): |
| pass |
| |
| |
| class HeaderSockMock(SockMock): |
| |
| def __init__(self, fname): |
| SockMock.__init__(self) |
| path = os.path.join(os.path.dirname(__file__), fname) |
| with open(path, "rb") as f: |
| self.add_packet(f.read()) |
| |
| |
| class WebSocketTest(unittest.TestCase): |
| def setUp(self): |
| ws.enableTrace(TRACEABLE) |
| |
| def tearDown(self): |
| pass |
| |
| def testDefaultTimeout(self): |
| self.assertEqual(ws.getdefaulttimeout(), None) |
| ws.setdefaulttimeout(10) |
| self.assertEqual(ws.getdefaulttimeout(), 10) |
| ws.setdefaulttimeout(None) |
| |
| def testParseUrl(self): |
| p = parse_url("ws://www.example.com/r") |
| self.assertEqual(p[0], "www.example.com") |
| self.assertEqual(p[1], 80) |
| self.assertEqual(p[2], "/r") |
| self.assertEqual(p[3], False) |
| |
| p = parse_url("ws://www.example.com/r/") |
| self.assertEqual(p[0], "www.example.com") |
| self.assertEqual(p[1], 80) |
| self.assertEqual(p[2], "/r/") |
| self.assertEqual(p[3], False) |
| |
| p = parse_url("ws://www.example.com/") |
| self.assertEqual(p[0], "www.example.com") |
| self.assertEqual(p[1], 80) |
| self.assertEqual(p[2], "/") |
| self.assertEqual(p[3], False) |
| |
| p = parse_url("ws://www.example.com") |
| self.assertEqual(p[0], "www.example.com") |
| self.assertEqual(p[1], 80) |
| self.assertEqual(p[2], "/") |
| self.assertEqual(p[3], False) |
| |
| p = parse_url("ws://www.example.com:8080/r") |
| self.assertEqual(p[0], "www.example.com") |
| self.assertEqual(p[1], 8080) |
| self.assertEqual(p[2], "/r") |
| self.assertEqual(p[3], False) |
| |
| p = parse_url("ws://www.example.com:8080/") |
| self.assertEqual(p[0], "www.example.com") |
| self.assertEqual(p[1], 8080) |
| self.assertEqual(p[2], "/") |
| self.assertEqual(p[3], False) |
| |
| p = parse_url("ws://www.example.com:8080") |
| self.assertEqual(p[0], "www.example.com") |
| self.assertEqual(p[1], 8080) |
| self.assertEqual(p[2], "/") |
| self.assertEqual(p[3], False) |
| |
| p = parse_url("wss://www.example.com:8080/r") |
| self.assertEqual(p[0], "www.example.com") |
| self.assertEqual(p[1], 8080) |
| self.assertEqual(p[2], "/r") |
| self.assertEqual(p[3], True) |
| |
| p = parse_url("wss://www.example.com:8080/r?key=value") |
| self.assertEqual(p[0], "www.example.com") |
| self.assertEqual(p[1], 8080) |
| self.assertEqual(p[2], "/r?key=value") |
| self.assertEqual(p[3], True) |
| |
| self.assertRaises(ValueError, parse_url, "http://www.example.com/r") |
| |
| if sys.version_info[0] == 2 and sys.version_info[1] < 7: |
| return |
| |
| p = parse_url("ws://[2a03:4000:123:83::3]/r") |
| self.assertEqual(p[0], "2a03:4000:123:83::3") |
| self.assertEqual(p[1], 80) |
| self.assertEqual(p[2], "/r") |
| self.assertEqual(p[3], False) |
| |
| p = parse_url("ws://[2a03:4000:123:83::3]:8080/r") |
| self.assertEqual(p[0], "2a03:4000:123:83::3") |
| self.assertEqual(p[1], 8080) |
| self.assertEqual(p[2], "/r") |
| self.assertEqual(p[3], False) |
| |
| p = parse_url("wss://[2a03:4000:123:83::3]/r") |
| self.assertEqual(p[0], "2a03:4000:123:83::3") |
| self.assertEqual(p[1], 443) |
| self.assertEqual(p[2], "/r") |
| self.assertEqual(p[3], True) |
| |
| p = parse_url("wss://[2a03:4000:123:83::3]:8080/r") |
| self.assertEqual(p[0], "2a03:4000:123:83::3") |
| self.assertEqual(p[1], 8080) |
| self.assertEqual(p[2], "/r") |
| self.assertEqual(p[3], True) |
| |
| def testWSKey(self): |
| key = _create_sec_websocket_key() |
| self.assertTrue(key != 24) |
| self.assertTrue(six.u("¥n") not in key) |
| |
| def testWsUtils(self): |
| key = "c6b8hTg4EeGb2gQMztV1/g==" |
| required_header = { |
| "upgrade": "websocket", |
| "connection": "upgrade", |
| "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0=", |
| } |
| self.assertEqual(_validate_header(required_header, key, None), (True, None)) |
| |
| header = required_header.copy() |
| header["upgrade"] = "http" |
| self.assertEqual(_validate_header(header, key, None), (False, None)) |
| del header["upgrade"] |
| self.assertEqual(_validate_header(header, key, None), (False, None)) |
| |
| header = required_header.copy() |
| header["connection"] = "something" |
| self.assertEqual(_validate_header(header, key, None), (False, None)) |
| del header["connection"] |
| self.assertEqual(_validate_header(header, key, None), (False, None)) |
| |
| header = required_header.copy() |
| header["sec-websocket-accept"] = "something" |
| self.assertEqual(_validate_header(header, key, None), (False, None)) |
| del header["sec-websocket-accept"] |
| self.assertEqual(_validate_header(header, key, None), (False, None)) |
| |
| header = required_header.copy() |
| header["sec-websocket-protocol"] = "sub1" |
| self.assertEqual(_validate_header(header, key, ["sub1", "sub2"]), (True, "sub1")) |
| self.assertEqual(_validate_header(header, key, ["sub2", "sub3"]), (False, None)) |
| |
| header = required_header.copy() |
| header["sec-websocket-protocol"] = "sUb1" |
| self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (True, "sub1")) |
| |
| |
| def testReadHeader(self): |
| status, header, status_message = read_headers(HeaderSockMock("data/header01.txt")) |
| self.assertEqual(status, 101) |
| self.assertEqual(header["connection"], "Upgrade") |
| |
| HeaderSockMock("data/header02.txt") |
| self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt")) |
| |
| def testSend(self): |
| # TODO: add longer frame data |
| sock = ws.WebSocket() |
| sock.set_mask_key(create_mask_key) |
| s = sock.sock = HeaderSockMock("data/header01.txt") |
| sock.send("Hello") |
| self.assertEqual(s.sent[0], six.b("\x81\x85abcd)\x07\x0f\x08\x0e")) |
| |
| sock.send("こんにちは") |
| self.assertEqual(s.sent[1], six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")) |
| |
| sock.send(u"こんにちは") |
| self.assertEqual(s.sent[1], six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")) |
| |
| sock.send("x" * 127) |
| |
| def testRecv(self): |
| # TODO: add longer frame data |
| sock = ws.WebSocket() |
| s = sock.sock = SockMock() |
| something = six.b("\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc") |
| s.add_packet(something) |
| data = sock.recv() |
| self.assertEqual(data, "こんにちは") |
| |
| s.add_packet(six.b("\x81\x85abcd)\x07\x0f\x08\x0e")) |
| data = sock.recv() |
| self.assertEqual(data, "Hello") |
| |
| @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") |
| def testIter(self): |
| count = 2 |
| for _ in ws.create_connection('ws://stream.meetup.com/2/rsvps'): |
| count -= 1 |
| if count == 0: |
| break |
| |
| @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") |
| def testNext(self): |
| sock = ws.create_connection('ws://stream.meetup.com/2/rsvps') |
| self.assertEqual(str, type(next(sock))) |
| |
| def testInternalRecvStrict(self): |
| sock = ws.WebSocket() |
| s = sock.sock = SockMock() |
| s.add_packet(six.b("foo")) |
| s.add_packet(socket.timeout()) |
| s.add_packet(six.b("bar")) |
| # s.add_packet(SSLError("The read operation timed out")) |
| s.add_packet(six.b("baz")) |
| with self.assertRaises(ws.WebSocketTimeoutException): |
| sock.frame_buffer.recv_strict(9) |
| # if six.PY2: |
| # with self.assertRaises(ws.WebSocketTimeoutException): |
| # data = sock._recv_strict(9) |
| # else: |
| # with self.assertRaises(SSLError): |
| # data = sock._recv_strict(9) |
| data = sock.frame_buffer.recv_strict(9) |
| self.assertEqual(data, six.b("foobarbaz")) |
| with self.assertRaises(ws.WebSocketConnectionClosedException): |
| sock.frame_buffer.recv_strict(1) |
| |
| def testRecvTimeout(self): |
| sock = ws.WebSocket() |
| s = sock.sock = SockMock() |
| s.add_packet(six.b("\x81")) |
| s.add_packet(socket.timeout()) |
| s.add_packet(six.b("\x8dabcd\x29\x07\x0f\x08\x0e")) |
| s.add_packet(socket.timeout()) |
| s.add_packet(six.b("\x4e\x43\x33\x0e\x10\x0f\x00\x40")) |
| with self.assertRaises(ws.WebSocketTimeoutException): |
| sock.recv() |
| with self.assertRaises(ws.WebSocketTimeoutException): |
| sock.recv() |
| data = sock.recv() |
| self.assertEqual(data, "Hello, World!") |
| with self.assertRaises(ws.WebSocketConnectionClosedException): |
| sock.recv() |
| |
| def testRecvWithSimpleFragmentation(self): |
| sock = ws.WebSocket() |
| s = sock.sock = SockMock() |
| # OPCODE=TEXT, FIN=0, MSG="Brevity is " |
| s.add_packet(six.b("\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")) |
| # OPCODE=CONT, FIN=1, MSG="the soul of wit" |
| s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")) |
| data = sock.recv() |
| self.assertEqual(data, "Brevity is the soul of wit") |
| with self.assertRaises(ws.WebSocketConnectionClosedException): |
| sock.recv() |
| |
| def testRecvWithFireEventOfFragmentation(self): |
| sock = ws.WebSocket(fire_cont_frame=True) |
| s = sock.sock = SockMock() |
| # OPCODE=TEXT, FIN=0, MSG="Brevity is " |
| s.add_packet(six.b("\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")) |
| # OPCODE=CONT, FIN=0, MSG="Brevity is " |
| s.add_packet(six.b("\x00\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")) |
| # OPCODE=CONT, FIN=1, MSG="the soul of wit" |
| s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")) |
| |
| _, data = sock.recv_data() |
| self.assertEqual(data, six.b("Brevity is ")) |
| _, data = sock.recv_data() |
| self.assertEqual(data, six.b("Brevity is ")) |
| _, data = sock.recv_data() |
| self.assertEqual(data, six.b("the soul of wit")) |
| |
| # OPCODE=CONT, FIN=0, MSG="Brevity is " |
| s.add_packet(six.b("\x80\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C")) |
| |
| with self.assertRaises(ws.WebSocketException): |
| sock.recv_data() |
| |
| with self.assertRaises(ws.WebSocketConnectionClosedException): |
| sock.recv() |
| |
| def testClose(self): |
| sock = ws.WebSocket() |
| sock.sock = SockMock() |
| sock.connected = True |
| sock.close() |
| self.assertEqual(sock.connected, False) |
| |
| sock = ws.WebSocket() |
| s = sock.sock = SockMock() |
| sock.connected = True |
| s.add_packet(six.b('\x88\x80\x17\x98p\x84')) |
| sock.recv() |
| self.assertEqual(sock.connected, False) |
| |
| def testRecvContFragmentation(self): |
| sock = ws.WebSocket() |
| s = sock.sock = SockMock() |
| # OPCODE=CONT, FIN=1, MSG="the soul of wit" |
| s.add_packet(six.b("\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17")) |
| self.assertRaises(ws.WebSocketException, sock.recv) |
| |
| def testRecvWithProlongedFragmentation(self): |
| sock = ws.WebSocket() |
| s = sock.sock = SockMock() |
| # OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, " |
| s.add_packet(six.b("\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15" |
| "\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC")) |
| # OPCODE=CONT, FIN=0, MSG="dear friends, " |
| s.add_packet(six.b("\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07" |
| "\x17MB")) |
| # OPCODE=CONT, FIN=1, MSG="once more" |
| s.add_packet(six.b("\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04")) |
| data = sock.recv() |
| self.assertEqual( |
| data, |
| "Once more unto the breach, dear friends, once more") |
| with self.assertRaises(ws.WebSocketConnectionClosedException): |
| sock.recv() |
| |
| def testRecvWithFragmentationAndControlFrame(self): |
| sock = ws.WebSocket() |
| sock.set_mask_key(create_mask_key) |
| s = sock.sock = SockMock() |
| # OPCODE=TEXT, FIN=0, MSG="Too much " |
| s.add_packet(six.b("\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA")) |
| # OPCODE=PING, FIN=1, MSG="Please PONG this" |
| s.add_packet(six.b("\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17")) |
| # OPCODE=CONT, FIN=1, MSG="of a good thing" |
| s.add_packet(six.b("\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c" |
| "\x08\x0c\x04")) |
| data = sock.recv() |
| self.assertEqual(data, "Too much of a good thing") |
| with self.assertRaises(ws.WebSocketConnectionClosedException): |
| sock.recv() |
| self.assertEqual( |
| s.sent[0], |
| six.b("\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17")) |
| |
| @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") |
| def testWebSocket(self): |
| s = ws.create_connection("ws://echo.websocket.org/") |
| self.assertNotEqual(s, None) |
| s.send("Hello, World") |
| result = s.recv() |
| self.assertEqual(result, "Hello, World") |
| |
| s.send(u"こにゃにゃちは、世界") |
| result = s.recv() |
| self.assertEqual(result, "こにゃにゃちは、世界") |
| s.close() |
| |
| @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") |
| def testPingPong(self): |
| s = ws.create_connection("ws://echo.websocket.org/") |
| self.assertNotEqual(s, None) |
| s.ping("Hello") |
| s.pong("Hi") |
| s.close() |
| |
| @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") |
| @unittest.skipUnless(TEST_SECURE_WS, "wss://echo.websocket.org doesn't work well.") |
| def testSecureWebSocket(self): |
| if 1: |
| import ssl |
| s = ws.create_connection("wss://echo.websocket.org/") |
| self.assertNotEqual(s, None) |
| self.assertTrue(isinstance(s.sock, ssl.SSLSocket)) |
| s.send("Hello, World") |
| result = s.recv() |
| self.assertEqual(result, "Hello, World") |
| s.send(u"こにゃにゃちは、世界") |
| result = s.recv() |
| self.assertEqual(result, "こにゃにゃちは、世界") |
| s.close() |
| #except: |
| # pass |
| |
| @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") |
| def testWebSocketWihtCustomHeader(self): |
| s = ws.create_connection("ws://echo.websocket.org/", |
| headers={"User-Agent": "PythonWebsocketClient"}) |
| self.assertNotEqual(s, None) |
| s.send("Hello, World") |
| result = s.recv() |
| self.assertEqual(result, "Hello, World") |
| s.close() |
| |
| @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") |
| def testAfterClose(self): |
| s = ws.create_connection("ws://echo.websocket.org/") |
| self.assertNotEqual(s, None) |
| s.close() |
| self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello") |
| self.assertRaises(ws.WebSocketConnectionClosedException, s.recv) |
| |
| def testNonce(self): |
| """ WebSocket key should be a random 16-byte nonce. |
| """ |
| key = _create_sec_websocket_key() |
| nonce = base64decode(key.encode("utf-8")) |
| self.assertEqual(16, len(nonce)) |
| |
| |
| class WebSocketAppTest(unittest.TestCase): |
| |
| class NotSetYet(object): |
| """ A marker class for signalling that a value hasn't been set yet. |
| """ |
| |
| def setUp(self): |
| ws.enableTrace(TRACEABLE) |
| |
| WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() |
| WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() |
| WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() |
| |
| def tearDown(self): |
| WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() |
| WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() |
| WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() |
| |
| @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") |
| def testKeepRunning(self): |
| """ A WebSocketApp should keep running as long as its self.keep_running |
| is not False (in the boolean context). |
| """ |
| |
| def on_open(self, *args, **kwargs): |
| """ Set the keep_running flag for later inspection and immediately |
| close the connection. |
| """ |
| WebSocketAppTest.keep_running_open = self.keep_running |
| |
| self.close() |
| |
| def on_close(self, *args, **kwargs): |
| """ Set the keep_running flag for the test to use. |
| """ |
| WebSocketAppTest.keep_running_close = self.keep_running |
| |
| app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close) |
| app.run_forever() |
| |
| # if numpy is installed, this assertion fail |
| # self.assertFalse(isinstance(WebSocketAppTest.keep_running_open, |
| # WebSocketAppTest.NotSetYet)) |
| |
| # self.assertFalse(isinstance(WebSocketAppTest.keep_running_close, |
| # WebSocketAppTest.NotSetYet)) |
| |
| # self.assertEqual(True, WebSocketAppTest.keep_running_open) |
| # self.assertEqual(False, WebSocketAppTest.keep_running_close) |
| |
| @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") |
| def testSockMaskKey(self): |
| """ A WebSocketApp should forward the received mask_key function down |
| to the actual socket. |
| """ |
| |
| def my_mask_key_func(): |
| pass |
| |
| def on_open(self, *args, **kwargs): |
| """ Set the value so the test can use it later on and immediately |
| close the connection. |
| """ |
| WebSocketAppTest.get_mask_key_id = id(self.get_mask_key) |
| self.close() |
| |
| app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func) |
| app.run_forever() |
| |
| # if numpu is installed, this assertion fail |
| # Note: We can't use 'is' for comparing the functions directly, need to use 'id'. |
| # self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func)) |
| |
| |
| class SockOptTest(unittest.TestCase): |
| @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") |
| def testSockOpt(self): |
| sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),) |
| s = ws.create_connection("ws://echo.websocket.org", sockopt=sockopt) |
| self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0) |
| s.close() |
| |
| |
| class UtilsTest(unittest.TestCase): |
| def testUtf8Validator(self): |
| state = validate_utf8(six.b('\xf0\x90\x80\x80')) |
| self.assertEqual(state, True) |
| state = validate_utf8(six.b('\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited')) |
| self.assertEqual(state, False) |
| state = validate_utf8(six.b('')) |
| self.assertEqual(state, True) |
| |
| |
| class ProxyInfoTest(unittest.TestCase): |
| def setUp(self): |
| self.http_proxy = os.environ.get("http_proxy", None) |
| self.https_proxy = os.environ.get("https_proxy", None) |
| if "http_proxy" in os.environ: |
| del os.environ["http_proxy"] |
| if "https_proxy" in os.environ: |
| del os.environ["https_proxy"] |
| |
| def tearDown(self): |
| if self.http_proxy: |
| os.environ["http_proxy"] = self.http_proxy |
| elif "http_proxy" in os.environ: |
| del os.environ["http_proxy"] |
| |
| if self.https_proxy: |
| os.environ["https_proxy"] = self.https_proxy |
| elif "https_proxy" in os.environ: |
| del os.environ["https_proxy"] |
| |
| def testProxyFromArgs(self): |
| self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost"), ("localhost", 0, None)) |
| self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128), ("localhost", 3128, None)) |
| self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost"), ("localhost", 0, None)) |
| self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128), ("localhost", 3128, None)) |
| |
| self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_auth=("a", "b")), |
| ("localhost", 0, ("a", "b"))) |
| self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")), |
| ("localhost", 3128, ("a", "b"))) |
| self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_auth=("a", "b")), |
| ("localhost", 0, ("a", "b"))) |
| self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")), |
| ("localhost", 3128, ("a", "b"))) |
| |
| self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, no_proxy=["example.com"], proxy_auth=("a", "b")), |
| ("localhost", 3128, ("a", "b"))) |
| self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, no_proxy=["echo.websocket.org"], proxy_auth=("a", "b")), |
| (None, 0, None)) |
| |
| def testProxyFromEnv(self): |
| os.environ["http_proxy"] = "http://localhost/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None)) |
| os.environ["http_proxy"] = "http://localhost:3128/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None)) |
| |
| os.environ["http_proxy"] = "http://localhost/" |
| os.environ["https_proxy"] = "http://localhost2/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None)) |
| os.environ["http_proxy"] = "http://localhost:3128/" |
| os.environ["https_proxy"] = "http://localhost2:3128/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None)) |
| |
| os.environ["http_proxy"] = "http://localhost/" |
| os.environ["https_proxy"] = "http://localhost2/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, None)) |
| os.environ["http_proxy"] = "http://localhost:3128/" |
| os.environ["https_proxy"] = "http://localhost2:3128/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, None)) |
| |
| |
| os.environ["http_proxy"] = "http://a:b@localhost/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b"))) |
| os.environ["http_proxy"] = "http://a:b@localhost:3128/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b"))) |
| |
| os.environ["http_proxy"] = "http://a:b@localhost/" |
| os.environ["https_proxy"] = "http://a:b@localhost2/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b"))) |
| os.environ["http_proxy"] = "http://a:b@localhost:3128/" |
| os.environ["https_proxy"] = "http://a:b@localhost2:3128/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b"))) |
| |
| os.environ["http_proxy"] = "http://a:b@localhost/" |
| os.environ["https_proxy"] = "http://a:b@localhost2/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, ("a", "b"))) |
| os.environ["http_proxy"] = "http://a:b@localhost:3128/" |
| os.environ["https_proxy"] = "http://a:b@localhost2:3128/" |
| self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, ("a", "b"))) |
| |
| os.environ["http_proxy"] = "http://a:b@localhost/" |
| os.environ["https_proxy"] = "http://a:b@localhost2/" |
| os.environ["no_proxy"] = "example1.com,example2.com" |
| self.assertEqual(get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b"))) |
| os.environ["http_proxy"] = "http://a:b@localhost:3128/" |
| os.environ["https_proxy"] = "http://a:b@localhost2:3128/" |
| os.environ["no_proxy"] = "example1.com,example2.com, echo.websocket.org" |
| self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None)) |
| |
| os.environ["http_proxy"] = "http://a:b@localhost:3128/" |
| os.environ["https_proxy"] = "http://a:b@localhost2:3128/" |
| os.environ["no_proxy"] = "127.0.0.0/8, 192.168.0.0/16" |
| self.assertEqual(get_proxy_info("127.0.0.1", False), (None, 0, None)) |
| self.assertEqual(get_proxy_info("192.168.1.1", False), (None, 0, None)) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |