| import gc |
| import os |
| import weakref |
| |
| from cStringIO import StringIO |
| |
| try: |
| import signal |
| except ImportError: |
| signal = None |
| |
| import unittest2 |
| |
| |
| class TestBreak(unittest2.TestCase): |
| |
| def setUp(self): |
| self._default_handler = signal.getsignal(signal.SIGINT) |
| |
| def tearDown(self): |
| signal.signal(signal.SIGINT, self._default_handler) |
| unittest2.signals._results = weakref.WeakKeyDictionary() |
| unittest2.signals._interrupt_handler = None |
| |
| def testInstallHandler(self): |
| default_handler = signal.getsignal(signal.SIGINT) |
| unittest2.installHandler() |
| self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| try: |
| pid = os.getpid() |
| os.kill(pid, signal.SIGINT) |
| except KeyboardInterrupt: |
| self.fail("KeyboardInterrupt not handled") |
| |
| self.assertTrue(unittest2.signals._interrupt_handler.called) |
| |
| def testRegisterResult(self): |
| result = unittest2.TestResult() |
| unittest2.registerResult(result) |
| |
| for ref in unittest2.signals._results: |
| if ref is result: |
| break |
| elif ref is not result: |
| self.fail("odd object in result set") |
| else: |
| self.fail("result not found") |
| |
| def testInterruptCaught(self): |
| default_handler = signal.getsignal(signal.SIGINT) |
| |
| result = unittest2.TestResult() |
| unittest2.installHandler() |
| unittest2.registerResult(result) |
| |
| self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| def test(result): |
| pid = os.getpid() |
| os.kill(pid, signal.SIGINT) |
| result.breakCaught = True |
| self.assertTrue(result.shouldStop) |
| |
| try: |
| test(result) |
| except KeyboardInterrupt: |
| self.fail("KeyboardInterrupt not handled") |
| self.assertTrue(result.breakCaught) |
| |
| def testSecondInterrupt(self): |
| result = unittest2.TestResult() |
| unittest2.installHandler() |
| unittest2.registerResult(result) |
| |
| def test(result): |
| pid = os.getpid() |
| os.kill(pid, signal.SIGINT) |
| result.breakCaught = True |
| self.assertTrue(result.shouldStop) |
| os.kill(pid, signal.SIGINT) |
| self.fail("Second KeyboardInterrupt not raised") |
| |
| try: |
| test(result) |
| except KeyboardInterrupt: |
| pass |
| else: |
| self.fail("Second KeyboardInterrupt not raised") |
| self.assertTrue(result.breakCaught) |
| |
| def testTwoResults(self): |
| unittest2.installHandler() |
| |
| result = unittest2.TestResult() |
| unittest2.registerResult(result) |
| new_handler = signal.getsignal(signal.SIGINT) |
| |
| result2 = unittest2.TestResult() |
| unittest2.registerResult(result2) |
| self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) |
| |
| result3 = unittest2.TestResult() |
| |
| def test(result): |
| pid = os.getpid() |
| os.kill(pid, signal.SIGINT) |
| |
| try: |
| test(result) |
| except KeyboardInterrupt: |
| self.fail("KeyboardInterrupt not handled") |
| |
| self.assertTrue(result.shouldStop) |
| self.assertTrue(result2.shouldStop) |
| self.assertFalse(result3.shouldStop) |
| |
| def testHandlerReplacedButCalled(self): |
| # If our handler has been replaced (is no longer installed) but is |
| # called by the *new* handler, then it isn't safe to delay the |
| # SIGINT and we should immediately delegate to the default handler |
| unittest2.installHandler() |
| |
| handler = signal.getsignal(signal.SIGINT) |
| |
| def new_handler(frame, signum): |
| handler(frame, signum) |
| signal.signal(signal.SIGINT, new_handler) |
| |
| try: |
| pid = os.getpid() |
| os.kill(pid, signal.SIGINT) |
| except KeyboardInterrupt: |
| pass |
| else: |
| self.fail("replaced but delegated handler doesn't raise interrupt") |
| |
| def testRunner(self): |
| # Creating a TextTestRunner with the appropriate argument should |
| # register the TextTestResult it creates |
| runner = unittest2.TextTestRunner(stream=StringIO()) |
| |
| result = runner.run(unittest2.TestSuite()) |
| self.assertIn(result, unittest2.signals._results) |
| |
| def testWeakReferences(self): |
| # Calling registerResult on a result should not keep it alive |
| result = unittest2.TestResult() |
| unittest2.registerResult(result) |
| |
| ref = weakref.ref(result) |
| del result |
| |
| # For non-reference counting implementations |
| gc.collect() |
| gc.collect() |
| self.assertIsNone(ref()) |
| |
| def testRemoveResult(self): |
| result = unittest2.TestResult() |
| unittest2.registerResult(result) |
| |
| unittest2.installHandler() |
| self.assertTrue(unittest2.removeResult(result)) |
| |
| # Should this raise an error instead? |
| self.assertFalse(unittest2.removeResult(unittest2.TestResult())) |
| |
| try: |
| pid = os.getpid() |
| os.kill(pid, signal.SIGINT) |
| except KeyboardInterrupt: |
| pass |
| |
| self.assertFalse(result.shouldStop) |
| |
| def testMainInstallsHandler(self): |
| failfast = object() |
| test = object() |
| verbosity = object() |
| result = object() |
| default_handler = signal.getsignal(signal.SIGINT) |
| |
| class FakeRunner(object): |
| initArgs = [] |
| runArgs = [] |
| |
| def __init__(self, *args, **kwargs): |
| self.initArgs.append((args, kwargs)) |
| |
| def run(self, test): |
| self.runArgs.append(test) |
| return result |
| |
| class Program(unittest2.TestProgram): |
| |
| def __init__(self, catchbreak): |
| self.exit = False |
| self.verbosity = verbosity |
| self.failfast = failfast |
| self.catchbreak = catchbreak |
| self.testRunner = FakeRunner |
| self.test = test |
| self.result = None |
| |
| p = Program(False) |
| p.runTests() |
| |
| self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity, |
| 'failfast': failfast, |
| 'buffer': None})]) |
| self.assertEqual(FakeRunner.runArgs, [test]) |
| self.assertEqual(p.result, result) |
| |
| self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| FakeRunner.initArgs = [] |
| FakeRunner.runArgs = [] |
| p = Program(True) |
| p.runTests() |
| |
| self.assertEqual(FakeRunner.initArgs, [((), {'verbosity': verbosity, |
| 'failfast': failfast, |
| 'buffer': None})]) |
| self.assertEqual(FakeRunner.runArgs, [test]) |
| self.assertEqual(p.result, result) |
| |
| self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| def testRemoveHandler(self): |
| default_handler = signal.getsignal(signal.SIGINT) |
| unittest2.installHandler() |
| unittest2.removeHandler() |
| self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| # check that calling removeHandler multiple times has no ill-effect |
| unittest2.removeHandler() |
| self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| def testRemoveHandlerAsDecorator(self): |
| default_handler = signal.getsignal(signal.SIGINT) |
| unittest2.installHandler() |
| |
| @unittest2.removeHandler |
| def test(): |
| self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| test() |
| self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) |
| |
| |
| # Should also skip some tests on Jython |
| skipper = unittest2.skipUnless(hasattr(os, 'kill') and signal is not None, |
| "test uses os.kill(...) and the signal module") |
| TestBreak = skipper(TestBreak) |
| |
| if __name__ == '__main__': |
| unittest2.main() |