| #!/usr/bin/env python |
| # |
| # Copyright 2011, Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # * Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # * Redistributions in binary form must reproduce the above |
| # copyright notice, this list of conditions and the following disclaimer |
| # in the documentation and/or other materials provided with the |
| # distribution. |
| # * Neither the name of Google Inc. nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| |
| """Tests for handshake.hybi00 module.""" |
| |
| |
| import unittest |
| |
| import set_sys_path # Update sys.path to locate mod_pywebsocket module. |
| |
| from mod_pywebsocket.handshake._base import HandshakeException |
| from mod_pywebsocket.handshake.hybi00 import Handshaker |
| from mod_pywebsocket.handshake.hybi00 import _validate_subprotocol |
| from test import mock |
| |
| |
| _TEST_KEY1 = '4 @1 46546xW%0l 1 5' |
| _TEST_KEY2 = '12998 5 Y3 1 .P00' |
| _TEST_KEY3 = '^n:ds[4U' |
| _TEST_CHALLENGE_RESPONSE = '8jKS\'y:G*Co,Wxa-' |
| |
| |
| _GOOD_REQUEST = ( |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3) |
| |
| _GOOD_REQUEST_CAPITALIZED_HEADER_VALUES = ( |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'UPGRADE', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'Upgrade': 'WEBSOCKET', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3) |
| |
| _GOOD_REQUEST_CASE_MIXED_HEADER_NAMES = ( |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'hOsT': 'example.com', |
| 'cOnNeCtIoN': 'Upgrade', |
| 'sEc-wEbsOcKeT-kEy2': _TEST_KEY2, |
| 'sEc-wEbsOcKeT-pRoToCoL': 'sample', |
| 'uPgRaDe': 'WebSocket', |
| 'sEc-wEbsOcKeT-kEy1': _TEST_KEY1, |
| 'oRiGiN': 'http://example.com', |
| }, |
| _TEST_KEY3) |
| |
| _GOOD_RESPONSE_DEFAULT_PORT = ( |
| 'HTTP/1.1 101 WebSocket Protocol Handshake\r\n' |
| 'Upgrade: WebSocket\r\n' |
| 'Connection: Upgrade\r\n' |
| 'Sec-WebSocket-Location: ws://example.com/demo\r\n' |
| 'Sec-WebSocket-Origin: http://example.com\r\n' |
| 'Sec-WebSocket-Protocol: sample\r\n' |
| '\r\n' + |
| _TEST_CHALLENGE_RESPONSE) |
| |
| _GOOD_RESPONSE_SECURE = ( |
| 'HTTP/1.1 101 WebSocket Protocol Handshake\r\n' |
| 'Upgrade: WebSocket\r\n' |
| 'Connection: Upgrade\r\n' |
| 'Sec-WebSocket-Location: wss://example.com/demo\r\n' |
| 'Sec-WebSocket-Origin: http://example.com\r\n' |
| 'Sec-WebSocket-Protocol: sample\r\n' |
| '\r\n' + |
| _TEST_CHALLENGE_RESPONSE) |
| |
| _GOOD_REQUEST_NONDEFAULT_PORT = ( |
| 8081, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com:8081', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3) |
| |
| _GOOD_RESPONSE_NONDEFAULT_PORT = ( |
| 'HTTP/1.1 101 WebSocket Protocol Handshake\r\n' |
| 'Upgrade: WebSocket\r\n' |
| 'Connection: Upgrade\r\n' |
| 'Sec-WebSocket-Location: ws://example.com:8081/demo\r\n' |
| 'Sec-WebSocket-Origin: http://example.com\r\n' |
| 'Sec-WebSocket-Protocol: sample\r\n' |
| '\r\n' + |
| _TEST_CHALLENGE_RESPONSE) |
| |
| _GOOD_RESPONSE_SECURE_NONDEF = ( |
| 'HTTP/1.1 101 WebSocket Protocol Handshake\r\n' |
| 'Upgrade: WebSocket\r\n' |
| 'Connection: Upgrade\r\n' |
| 'Sec-WebSocket-Location: wss://example.com:8081/demo\r\n' |
| 'Sec-WebSocket-Origin: http://example.com\r\n' |
| 'Sec-WebSocket-Protocol: sample\r\n' |
| '\r\n' + |
| _TEST_CHALLENGE_RESPONSE) |
| |
| _GOOD_REQUEST_NO_PROTOCOL = ( |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3) |
| |
| _GOOD_RESPONSE_NO_PROTOCOL = ( |
| 'HTTP/1.1 101 WebSocket Protocol Handshake\r\n' |
| 'Upgrade: WebSocket\r\n' |
| 'Connection: Upgrade\r\n' |
| 'Sec-WebSocket-Location: ws://example.com/demo\r\n' |
| 'Sec-WebSocket-Origin: http://example.com\r\n' |
| '\r\n' + |
| _TEST_CHALLENGE_RESPONSE) |
| |
| _GOOD_REQUEST_WITH_OPTIONAL_HEADERS = ( |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'EmptyValue': '', |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'AKey': 'AValue', |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3) |
| |
| # TODO(tyoshino): Include \r \n in key3, challenge response. |
| |
| _GOOD_REQUEST_WITH_NONPRINTABLE_KEY = ( |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': 'y R2 48 Q1O4 e|BV3 i5 1 u- 65', |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': '36 7 74 i 92 2\'m 9 0G', |
| 'Origin': 'http://example.com', |
| }, |
| ''.join(map(chr, [0x01, 0xd1, 0xdd, 0x3b, 0xd1, 0x56, 0x63, 0xff]))) |
| |
| _GOOD_RESPONSE_WITH_NONPRINTABLE_KEY = ( |
| 'HTTP/1.1 101 WebSocket Protocol Handshake\r\n' |
| 'Upgrade: WebSocket\r\n' |
| 'Connection: Upgrade\r\n' |
| 'Sec-WebSocket-Location: ws://example.com/demo\r\n' |
| 'Sec-WebSocket-Origin: http://example.com\r\n' |
| 'Sec-WebSocket-Protocol: sample\r\n' |
| '\r\n' + |
| ''.join(map(chr, [0x0b, 0x99, 0xfa, 0x55, 0xbd, 0x01, 0x23, 0x7b, |
| 0x45, 0xa2, 0xf1, 0xd0, 0x87, 0x8a, 0xee, 0xeb]))) |
| |
| _GOOD_REQUEST_WITH_QUERY_PART = ( |
| 80, |
| 'GET', |
| '/demo?e=mc2', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3) |
| |
| _GOOD_RESPONSE_WITH_QUERY_PART = ( |
| 'HTTP/1.1 101 WebSocket Protocol Handshake\r\n' |
| 'Upgrade: WebSocket\r\n' |
| 'Connection: Upgrade\r\n' |
| 'Sec-WebSocket-Location: ws://example.com/demo?e=mc2\r\n' |
| 'Sec-WebSocket-Origin: http://example.com\r\n' |
| 'Sec-WebSocket-Protocol: sample\r\n' |
| '\r\n' + |
| _TEST_CHALLENGE_RESPONSE) |
| |
| _BAD_REQUESTS = ( |
| ( # HTTP request |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'www.google.com', |
| 'User-Agent': 'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.5;' |
| ' en-US; rv:1.9.1.3) Gecko/20090824 Firefox/3.5.3' |
| ' GTB6 GTBA', |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,' |
| '*/*;q=0.8', |
| 'Accept-Language': 'en-us,en;q=0.5', |
| 'Accept-Encoding': 'gzip,deflate', |
| 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', |
| 'Keep-Alive': '300', |
| 'Connection': 'keep-alive', |
| }), |
| ( # Wrong method |
| 80, |
| 'POST', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3), |
| ( # Missing Upgrade |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3), |
| ( # Wrong Upgrade |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'Upgrade': 'NonWebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3), |
| ( # Empty WebSocket-Protocol |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': '', |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3), |
| ( # Wrong port number format |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com:0x50', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3), |
| ( # Header/connection port mismatch |
| 8080, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': 'sample', |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3), |
| ( # Illegal WebSocket-Protocol |
| 80, |
| 'GET', |
| '/demo', |
| { |
| 'Host': 'example.com', |
| 'Connection': 'Upgrade', |
| 'Sec-WebSocket-Key2': _TEST_KEY2, |
| 'Sec-WebSocket-Protocol': 'illegal\x09protocol', |
| 'Upgrade': 'WebSocket', |
| 'Sec-WebSocket-Key1': _TEST_KEY1, |
| 'Origin': 'http://example.com', |
| }, |
| _TEST_KEY3), |
| ) |
| |
| |
| def _create_request(request_def): |
| data = '' |
| if len(request_def) > 4: |
| data = request_def[4] |
| conn = mock.MockConn(data) |
| conn.local_addr = ('0.0.0.0', request_def[0]) |
| return mock.MockRequest( |
| method=request_def[1], |
| uri=request_def[2], |
| headers_in=request_def[3], |
| connection=conn) |
| |
| |
| def _create_get_memorized_lines(lines): |
| """Creates a function that returns the given string.""" |
| |
| def get_memorized_lines(): |
| return lines |
| return get_memorized_lines |
| |
| |
| def _create_requests_with_lines(request_lines_set): |
| requests = [] |
| for lines in request_lines_set: |
| request = _create_request(_GOOD_REQUEST) |
| request.connection.get_memorized_lines = _create_get_memorized_lines( |
| lines) |
| requests.append(request) |
| return requests |
| |
| |
| class HyBi00HandshakerTest(unittest.TestCase): |
| |
| def test_good_request_default_port(self): |
| request = _create_request(_GOOD_REQUEST) |
| handshaker = Handshaker(request, mock.MockDispatcher()) |
| handshaker.do_handshake() |
| self.assertEqual(_GOOD_RESPONSE_DEFAULT_PORT, |
| request.connection.written_data()) |
| self.assertEqual('/demo', request.ws_resource) |
| self.assertEqual('http://example.com', request.ws_origin) |
| self.assertEqual('ws://example.com/demo', request.ws_location) |
| self.assertEqual('sample', request.ws_protocol) |
| |
| def test_good_request_capitalized_header_values(self): |
| request = _create_request(_GOOD_REQUEST_CAPITALIZED_HEADER_VALUES) |
| handshaker = Handshaker(request, mock.MockDispatcher()) |
| handshaker.do_handshake() |
| self.assertEqual(_GOOD_RESPONSE_DEFAULT_PORT, |
| request.connection.written_data()) |
| |
| def test_good_request_case_mixed_header_names(self): |
| request = _create_request(_GOOD_REQUEST_CASE_MIXED_HEADER_NAMES) |
| handshaker = Handshaker(request, mock.MockDispatcher()) |
| handshaker.do_handshake() |
| self.assertEqual(_GOOD_RESPONSE_DEFAULT_PORT, |
| request.connection.written_data()) |
| |
| def test_good_request_secure_default_port(self): |
| request = _create_request(_GOOD_REQUEST) |
| request.connection.local_addr = ('0.0.0.0', 443) |
| request.is_https_ = True |
| handshaker = Handshaker(request, mock.MockDispatcher()) |
| handshaker.do_handshake() |
| self.assertEqual(_GOOD_RESPONSE_SECURE, |
| request.connection.written_data()) |
| self.assertEqual('sample', request.ws_protocol) |
| |
| def test_good_request_nondefault_port(self): |
| request = _create_request(_GOOD_REQUEST_NONDEFAULT_PORT) |
| handshaker = Handshaker(request, |
| mock.MockDispatcher()) |
| handshaker.do_handshake() |
| self.assertEqual(_GOOD_RESPONSE_NONDEFAULT_PORT, |
| request.connection.written_data()) |
| self.assertEqual('sample', request.ws_protocol) |
| |
| def test_good_request_secure_non_default_port(self): |
| request = _create_request(_GOOD_REQUEST_NONDEFAULT_PORT) |
| request.is_https_ = True |
| handshaker = Handshaker(request, mock.MockDispatcher()) |
| handshaker.do_handshake() |
| self.assertEqual(_GOOD_RESPONSE_SECURE_NONDEF, |
| request.connection.written_data()) |
| self.assertEqual('sample', request.ws_protocol) |
| |
| def test_good_request_default_no_protocol(self): |
| request = _create_request(_GOOD_REQUEST_NO_PROTOCOL) |
| handshaker = Handshaker(request, mock.MockDispatcher()) |
| handshaker.do_handshake() |
| self.assertEqual(_GOOD_RESPONSE_NO_PROTOCOL, |
| request.connection.written_data()) |
| self.assertEqual(None, request.ws_protocol) |
| |
| def test_good_request_optional_headers(self): |
| request = _create_request(_GOOD_REQUEST_WITH_OPTIONAL_HEADERS) |
| handshaker = Handshaker(request, mock.MockDispatcher()) |
| handshaker.do_handshake() |
| self.assertEqual('AValue', |
| request.headers_in['AKey']) |
| self.assertEqual('', |
| request.headers_in['EmptyValue']) |
| |
| def test_good_request_with_nonprintable_key(self): |
| request = _create_request(_GOOD_REQUEST_WITH_NONPRINTABLE_KEY) |
| handshaker = Handshaker(request, mock.MockDispatcher()) |
| handshaker.do_handshake() |
| self.assertEqual(_GOOD_RESPONSE_WITH_NONPRINTABLE_KEY, |
| request.connection.written_data()) |
| self.assertEqual('sample', request.ws_protocol) |
| |
| def test_good_request_with_query_part(self): |
| request = _create_request(_GOOD_REQUEST_WITH_QUERY_PART) |
| handshaker = Handshaker(request, mock.MockDispatcher()) |
| handshaker.do_handshake() |
| self.assertEqual(_GOOD_RESPONSE_WITH_QUERY_PART, |
| request.connection.written_data()) |
| self.assertEqual('ws://example.com/demo?e=mc2', request.ws_location) |
| |
| def test_bad_requests(self): |
| for request in map(_create_request, _BAD_REQUESTS): |
| handshaker = Handshaker(request, mock.MockDispatcher()) |
| self.assertRaises(HandshakeException, handshaker.do_handshake) |
| |
| |
| class HyBi00ValidateSubprotocolTest(unittest.TestCase): |
| def test_validate_subprotocol(self): |
| # should succeed. |
| _validate_subprotocol('sample') |
| _validate_subprotocol('Sample') |
| _validate_subprotocol('sample\x7eprotocol') |
| _validate_subprotocol('sample\x20protocol') |
| |
| # should fail. |
| self.assertRaises(HandshakeException, |
| _validate_subprotocol, |
| '') |
| self.assertRaises(HandshakeException, |
| _validate_subprotocol, |
| 'sample\x19protocol') |
| self.assertRaises(HandshakeException, |
| _validate_subprotocol, |
| 'sample\x7fprotocol') |
| self.assertRaises(HandshakeException, |
| _validate_subprotocol, |
| # "Japan" in Japanese |
| u'\u65e5\u672c') |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |
| |
| |
| # vi:sts=4 sw=4 et |