| #!/usr/bin/env python |
| # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Tests for Constrained Network Server.""" |
| import os |
| import signal |
| import subprocess |
| import tempfile |
| import time |
| import unittest |
| import urllib2 |
| import cherrypy |
| import cns |
| import traffic_control |
| |
| # The local interface to test on. |
| _INTERFACE = 'lo' |
| |
| |
| class PortAllocatorTest(unittest.TestCase): |
| """Unit tests for the Port Allocator class.""" |
| |
| # Expiration time for ports. In mock time. |
| _EXPIRY_TIME = 6 |
| |
| def setUp(self): |
| # Mock out time.time() to accelerate port expiration testing. |
| self._old_time = time.time |
| self._current_time = 0 |
| time.time = lambda: self._current_time |
| |
| # TODO(dalecurtis): Mock out actual calls to shadi's port setup. |
| self._pa = cns.PortAllocator(cns._DEFAULT_CNS_PORT_RANGE, self._EXPIRY_TIME) |
| self._MockTrafficControl() |
| |
| def tearDown(self): |
| self._pa.Cleanup(all_ports=True) |
| # Ensure ports are cleaned properly. |
| self.assertEquals(self._pa._ports, {}) |
| time.time = self._old_time |
| self._RestoreTrafficControl() |
| |
| def _MockTrafficControl(self): |
| self.old_CreateConstrainedPort = traffic_control.CreateConstrainedPort |
| self.old_DeleteConstrainedPort = traffic_control.DeleteConstrainedPort |
| self.old_TearDown = traffic_control.TearDown |
| |
| traffic_control.CreateConstrainedPort = lambda config: True |
| traffic_control.DeleteConstrainedPort = lambda config: True |
| traffic_control.TearDown = lambda config: True |
| |
| def _RestoreTrafficControl(self): |
| traffic_control.CreateConstrainedPort = self.old_CreateConstrainedPort |
| traffic_control.DeleteConstrainedPort = self.old_DeleteConstrainedPort |
| traffic_control.TearDown = self.old_TearDown |
| |
| def testPortAllocator(self): |
| # Ensure Get() succeeds and returns the correct port. |
| self.assertEquals(self._pa.Get('test'), cns._DEFAULT_CNS_PORT_RANGE[0]) |
| |
| # Call again with the same key and make sure we get the same port. |
| self.assertEquals(self._pa.Get('test'), cns._DEFAULT_CNS_PORT_RANGE[0]) |
| |
| # Call with a different key and make sure we get a different port. |
| self.assertEquals(self._pa.Get('test2'), cns._DEFAULT_CNS_PORT_RANGE[0] + 1) |
| |
| # Update fake time so that ports should expire. |
| self._current_time += self._EXPIRY_TIME + 1 |
| |
| # Test to make sure cache is checked before expiring ports. |
| self.assertEquals(self._pa.Get('test2'), cns._DEFAULT_CNS_PORT_RANGE[0] + 1) |
| |
| # Update fake time so that ports should expire. |
| self._current_time += self._EXPIRY_TIME + 1 |
| |
| # Request a new port, old ports should be expired, so we should get the |
| # first port in the range. Make sure this is the only allocated port. |
| self.assertEquals(self._pa.Get('test3'), cns._DEFAULT_CNS_PORT_RANGE[0]) |
| self.assertEquals(self._pa._ports.keys(), [cns._DEFAULT_CNS_PORT_RANGE[0]]) |
| |
| def testPortAllocatorExpiresOnlyCorrectPorts(self): |
| # Ensure Get() succeeds and returns the correct port. |
| self.assertEquals(self._pa.Get('test'), cns._DEFAULT_CNS_PORT_RANGE[0]) |
| |
| # Stagger port allocation and so we can ensure only ports older than the |
| # expiry time are actually expired. |
| self._current_time += self._EXPIRY_TIME / 2 + 1 |
| |
| # Call with a different key and make sure we get a different port. |
| self.assertEquals(self._pa.Get('test2'), cns._DEFAULT_CNS_PORT_RANGE[0] + 1) |
| |
| # After this sleep the port with key 'test' should expire on the next Get(). |
| self._current_time += self._EXPIRY_TIME / 2 + 1 |
| |
| # Call with a different key and make sure we get the first port. |
| self.assertEquals(self._pa.Get('test3'), cns._DEFAULT_CNS_PORT_RANGE[0]) |
| self.assertEquals(set(self._pa._ports.keys()), set([ |
| cns._DEFAULT_CNS_PORT_RANGE[0], cns._DEFAULT_CNS_PORT_RANGE[0] + 1])) |
| |
| def testPortAllocatorNoExpiration(self): |
| # Setup PortAllocator w/o port expiration. |
| self._pa = cns.PortAllocator(cns._DEFAULT_CNS_PORT_RANGE, 0) |
| |
| # Ensure Get() succeeds and returns the correct port. |
| self.assertEquals(self._pa.Get('test'), cns._DEFAULT_CNS_PORT_RANGE[0]) |
| |
| # Update fake time to see if ports expire. |
| self._current_time += self._EXPIRY_TIME |
| |
| # Send second Get() which would normally cause ports to expire. Ensure that |
| # the ports did not expire. |
| self.assertEquals(self._pa.Get('test2'), cns._DEFAULT_CNS_PORT_RANGE[0] + 1) |
| self.assertEquals(set(self._pa._ports.keys()), set([ |
| cns._DEFAULT_CNS_PORT_RANGE[0], cns._DEFAULT_CNS_PORT_RANGE[0] + 1])) |
| |
| def testPortAllocatorCleanMatchingIP(self): |
| # Setup PortAllocator w/o port expiration. |
| self._pa = cns.PortAllocator(cns._DEFAULT_CNS_PORT_RANGE, 0) |
| |
| # Ensure Get() succeeds and returns the correct port. |
| self.assertEquals(self._pa.Get('ip1', t=1), cns._DEFAULT_CNS_PORT_RANGE[0]) |
| self.assertEquals(self._pa.Get('ip1', t=2), |
| cns._DEFAULT_CNS_PORT_RANGE[0] + 1) |
| self.assertEquals(self._pa.Get('ip1', t=3), |
| cns._DEFAULT_CNS_PORT_RANGE[0] + 2) |
| self.assertEquals(self._pa.Get('ip2', t=1), |
| cns._DEFAULT_CNS_PORT_RANGE[0] + 3) |
| |
| self._pa.Cleanup(all_ports=False, request_ip='ip1') |
| |
| self.assertEquals(self._pa._ports.keys(), |
| [cns._DEFAULT_CNS_PORT_RANGE[0] + 3]) |
| self.assertEquals(self._pa.Get('ip2'), cns._DEFAULT_CNS_PORT_RANGE[0]) |
| self.assertEquals(self._pa.Get('ip1'), cns._DEFAULT_CNS_PORT_RANGE[0] + 1) |
| |
| self._pa.Cleanup(all_ports=False, request_ip='ip2') |
| self.assertEquals(self._pa._ports.keys(), |
| [cns._DEFAULT_CNS_PORT_RANGE[0] + 1]) |
| |
| self._pa.Cleanup(all_ports=False, request_ip='abc') |
| self.assertEquals(self._pa._ports.keys(), |
| [cns._DEFAULT_CNS_PORT_RANGE[0] + 1]) |
| |
| self._pa.Cleanup(all_ports=False, request_ip='ip1') |
| self.assertEquals(self._pa._ports.keys(), []) |
| |
| |
| class ConstrainedNetworkServerTest(unittest.TestCase): |
| """End to end tests for ConstrainedNetworkServer system. |
| |
| These tests require root access and run the cherrypy server along with |
| tc/iptables commands. |
| """ |
| |
| # Amount of time to wait for the CNS to start up. |
| _SERVER_START_SLEEP_SECS = 1 |
| |
| # Sample data used to verify file serving. |
| _TEST_DATA = 'The quick brown fox jumps over the lazy dog' |
| |
| # Server information. |
| _SERVER_URL = ('http://127.0.0.1:%d/ServeConstrained?' % |
| cns._DEFAULT_SERVING_PORT) |
| |
| # Setting for latency testing. |
| _LATENCY_TEST_SECS = 1 |
| |
| def _StartServer(self): |
| """Starts the CNS, returns pid.""" |
| cmd = ['python', 'cns.py', '--interface=%s' % _INTERFACE] |
| process = subprocess.Popen(cmd, stderr=subprocess.PIPE) |
| |
| # Wait for server to startup. |
| line = True |
| while line: |
| line = process.stderr.readline() |
| if 'STARTED' in line: |
| return process.pid |
| |
| self.fail('Failed to start CNS.') |
| |
| def setUp(self): |
| # Start the CNS. |
| self._server_pid = self._StartServer() |
| |
| # Create temp file for serving. Run after server start so if a failure |
| # during setUp() occurs we don't leave junk files around. |
| f, self._file = tempfile.mkstemp(dir=os.getcwd()) |
| os.write(f, self._TEST_DATA) |
| os.close(f) |
| |
| # Strip cwd off so we have a proper relative path. |
| self._relative_fn = self._file[len(os.getcwd()) + 1:] |
| |
| def tearDown(self): |
| os.unlink(self._file) |
| os.kill(self._server_pid, signal.SIGTERM) |
| |
| def testServerServesFiles(self): |
| now = time.time() |
| |
| f = urllib2.urlopen('%sf=%s' % (self._SERVER_URL, self._relative_fn)) |
| |
| # Verify file data is served correctly. |
| self.assertEqual(self._TEST_DATA, f.read()) |
| |
| # For completeness ensure an unconstrained call takes less time than our |
| # artificial constraints checked in the tests below. |
| self.assertTrue(time.time() - now < self._LATENCY_TEST_SECS) |
| |
| def testServerLatencyConstraint(self): |
| """Tests serving a file with a latency network constraint.""" |
| # Abort if does not have root access. |
| self.assertEqual(os.geteuid(), 0, 'You need root access to run this test.') |
| now = time.time() |
| |
| base_url = '%sf=%s' % (self._SERVER_URL, self._relative_fn) |
| url = '%s&latency=%d' % (base_url, self._LATENCY_TEST_SECS * 1000) |
| f = urllib2.urlopen(url) |
| |
| # Verify file data is served correctly. |
| self.assertEqual(self._TEST_DATA, f.read()) |
| |
| # Verify the request took longer than the requested latency. |
| self.assertTrue(time.time() - now > self._LATENCY_TEST_SECS) |
| |
| # Verify the server properly redirected the URL. |
| self.assertTrue(f.geturl().startswith(base_url.replace( |
| str(cns._DEFAULT_SERVING_PORT), str(cns._DEFAULT_CNS_PORT_RANGE[0])))) |
| |
| |
| class ConstrainedNetworkServerUnitTests(unittest.TestCase): |
| """ConstrainedNetworkServer class unit tests.""" |
| |
| def testGetServerURL(self): |
| """Test server URL is correct when using Cherrypy port.""" |
| cns_obj = cns.ConstrainedNetworkServer(self.DummyOptions(), None) |
| |
| self.assertEqual(cns_obj._GetServerURL('ab/xz.webm', port=1234, t=1), |
| 'http://127.0.0.1:1234/ServeConstrained?f=ab/xz.webm&t=1') |
| |
| def testGetServerURLWithLocalServer(self): |
| """Test server URL is correct when using --local-server-port port.""" |
| cns_obj = cns.ConstrainedNetworkServer(self.DummyOptionsWithServer(), None) |
| |
| self.assertEqual(cns_obj._GetServerURL('ab/xz.webm', port=1234, t=1), |
| 'http://127.0.0.1:1234/media/ab/xz.webm?t=1') |
| |
| class DummyOptions(object): |
| www_root = 'media' |
| port = 9000 |
| cherrypy.url = lambda: 'http://127.0.0.1:9000/ServeConstrained' |
| local_server_port = None |
| |
| class DummyOptionsWithServer(object): |
| www_root = 'media' |
| port = 9000 |
| cherrypy.url = lambda: 'http://127.0.0.1:9000/ServeConstrained' |
| local_server_port = 8080 |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |