blob: c3f46e3fca7b7b02fe1be6b3c293250a28ad662d [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# test/test_daemon.py
# Part of python-daemon, an implementation of PEP 3143.
#
# Copyright © 2008–2010 Ben Finney <ben+python@benfinney.id.au>
#
# This is free software: you may copy, modify, and/or distribute this work
# under the terms of the Python Software Foundation License, version 2 or
# later as published by the Python Software Foundation.
# No warranty expressed or implied. See the file LICENSE.PSF-2 for details.
""" Unit test for daemon module.
"""
import os
import sys
import tempfile
import resource
import errno
import signal
import socket
from types import ModuleType
import atexit
from StringIO import StringIO
import scaffold
from test_pidlockfile import (
FakeFileDescriptorStringIO,
setup_pidfile_fixtures,
)
from daemon import pidlockfile
import daemon
class Exception_TestCase(scaffold.Exception_TestCase):
""" Test cases for module exception classes. """
def __init__(self, *args, **kwargs):
""" Set up a new instance. """
super(Exception_TestCase, self).__init__(*args, **kwargs)
self.valid_exceptions = {
daemon.daemon.DaemonError: dict(
min_args = 1,
types = (Exception,),
),
daemon.daemon.DaemonOSEnvironmentError: dict(
min_args = 1,
types = (daemon.daemon.DaemonError, OSError),
),
daemon.daemon.DaemonProcessDetachError: dict(
min_args = 1,
types = (daemon.daemon.DaemonError, OSError),
),
}
def setup_daemon_context_fixtures(testcase):
""" Set up common test fixtures for DaemonContext test case. """
testcase.mock_tracker = scaffold.MockTracker()
setup_streams_fixtures(testcase)
setup_pidfile_fixtures(testcase)
testcase.mock_pidfile_path = tempfile.mktemp()
testcase.mock_pidlockfile = scaffold.Mock(
"pidlockfile.PIDLockFile",
tracker=testcase.mock_tracker)
testcase.mock_pidlockfile.path = testcase.mock_pidfile_path
scaffold.mock(
"daemon.daemon.is_detach_process_context_required",
returns=True,
tracker=testcase.mock_tracker)
scaffold.mock(
"daemon.daemon.make_default_signal_map",
returns=object(),
tracker=testcase.mock_tracker)
scaffold.mock(
"os.getuid",
returns=object(),
tracker=testcase.mock_tracker)
scaffold.mock(
"os.getgid",
returns=object(),
tracker=testcase.mock_tracker)
testcase.daemon_context_args = dict(
stdin = testcase.stream_files_by_name['stdin'],
stdout = testcase.stream_files_by_name['stdout'],
stderr = testcase.stream_files_by_name['stderr'],
)
testcase.test_instance = daemon.DaemonContext(
**testcase.daemon_context_args)
class DaemonContext_TestCase(scaffold.TestCase):
""" Test cases for DaemonContext class. """
def setUp(self):
""" Set up test fixtures. """
setup_daemon_context_fixtures(self)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_instantiate(self):
""" New instance of DaemonContext should be created. """
self.failUnlessIsInstance(
self.test_instance, daemon.daemon.DaemonContext)
def test_minimum_zero_arguments(self):
""" Initialiser should not require any arguments. """
instance = daemon.daemon.DaemonContext()
self.failIfIs(None, instance)
def test_has_specified_chroot_directory(self):
""" Should have specified chroot_directory option. """
args = dict(
chroot_directory = object(),
)
expect_directory = args['chroot_directory']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_directory, instance.chroot_directory)
def test_has_specified_working_directory(self):
""" Should have specified working_directory option. """
args = dict(
working_directory = object(),
)
expect_directory = args['working_directory']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_directory, instance.working_directory)
def test_has_default_working_directory(self):
""" Should have default working_directory option. """
args = dict()
expect_directory = '/'
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_directory, instance.working_directory)
def test_has_specified_creation_mask(self):
""" Should have specified umask option. """
args = dict(
umask = object(),
)
expect_mask = args['umask']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_mask, instance.umask)
def test_has_default_creation_mask(self):
""" Should have default umask option. """
args = dict()
expect_mask = 0
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_mask, instance.umask)
def test_has_specified_uid(self):
""" Should have specified uid option. """
args = dict(
uid = object(),
)
expect_id = args['uid']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_id, instance.uid)
def test_has_derived_uid(self):
""" Should have uid option derived from process. """
args = dict()
expect_id = os.getuid()
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_id, instance.uid)
def test_has_specified_gid(self):
""" Should have specified gid option. """
args = dict(
gid = object(),
)
expect_id = args['gid']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_id, instance.gid)
def test_has_derived_gid(self):
""" Should have gid option derived from process. """
args = dict()
expect_id = os.getgid()
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_id, instance.gid)
def test_has_specified_detach_process(self):
""" Should have specified detach_process option. """
args = dict(
detach_process = object(),
)
expect_value = args['detach_process']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_value, instance.detach_process)
def test_has_derived_detach_process(self):
""" Should have detach_process option derived from environment. """
args = dict()
func = daemon.daemon.is_detach_process_context_required
expect_value = func()
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_value, instance.detach_process)
def test_has_specified_files_preserve(self):
""" Should have specified files_preserve option. """
args = dict(
files_preserve = object(),
)
expect_files_preserve = args['files_preserve']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_files_preserve, instance.files_preserve)
def test_has_specified_pidfile(self):
""" Should have the specified pidfile. """
args = dict(
pidfile = object(),
)
expect_pidfile = args['pidfile']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_pidfile, instance.pidfile)
def test_has_specified_stdin(self):
""" Should have specified stdin option. """
args = dict(
stdin = object(),
)
expect_file = args['stdin']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_file, instance.stdin)
def test_has_specified_stdout(self):
""" Should have specified stdout option. """
args = dict(
stdout = object(),
)
expect_file = args['stdout']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_file, instance.stdout)
def test_has_specified_stderr(self):
""" Should have specified stderr option. """
args = dict(
stderr = object(),
)
expect_file = args['stderr']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_file, instance.stderr)
def test_has_specified_signal_map(self):
""" Should have specified signal_map option. """
args = dict(
signal_map = object(),
)
expect_signal_map = args['signal_map']
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_signal_map, instance.signal_map)
def test_has_derived_signal_map(self):
""" Should have signal_map option derived from system. """
args = dict()
expect_signal_map = daemon.daemon.make_default_signal_map()
instance = daemon.daemon.DaemonContext(**args)
self.failUnlessEqual(expect_signal_map, instance.signal_map)
class DaemonContext_is_open_TestCase(scaffold.TestCase):
""" Test cases for DaemonContext.is_open property. """
def setUp(self):
""" Set up test fixtures. """
setup_daemon_context_fixtures(self)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_begin_false(self):
""" Initial value of is_open should be False. """
instance = self.test_instance
self.failUnlessEqual(False, instance.is_open)
def test_write_fails(self):
""" Writing to is_open should fail. """
instance = self.test_instance
self.failUnlessRaises(
AttributeError,
setattr, instance, 'is_open', object())
class DaemonContext_open_TestCase(scaffold.TestCase):
""" Test cases for DaemonContext.open method. """
def setUp(self):
""" Set up test fixtures. """
setup_daemon_context_fixtures(self)
self.mock_tracker.clear()
self.test_instance._is_open = False
scaffold.mock(
"daemon.daemon.detach_process_context",
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.change_working_directory",
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.change_root_directory",
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.change_file_creation_mask",
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.change_process_owner",
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.prevent_core_dump",
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.close_all_open_files",
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.redirect_stream",
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.set_signal_handlers",
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.register_atexit_function",
tracker=self.mock_tracker)
self.test_files_preserve_fds = object()
scaffold.mock(
"daemon.daemon.DaemonContext._get_exclude_file_descriptors",
returns=self.test_files_preserve_fds,
tracker=self.mock_tracker)
self.test_signal_handler_map = object()
scaffold.mock(
"daemon.daemon.DaemonContext._make_signal_handler_map",
returns=self.test_signal_handler_map,
tracker=self.mock_tracker)
scaffold.mock(
"sys.stdin",
tracker=self.mock_tracker)
scaffold.mock(
"sys.stdout",
tracker=self.mock_tracker)
scaffold.mock(
"sys.stderr",
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_performs_steps_in_expected_sequence(self):
""" Should perform daemonisation steps in expected sequence. """
instance = self.test_instance
instance.chroot_directory = object()
instance.detach_process = True
instance.pidfile = self.mock_pidlockfile
expect_mock_output = """\
Called daemon.daemon.change_root_directory(...)
Called daemon.daemon.prevent_core_dump()
Called daemon.daemon.change_file_creation_mask(...)
Called daemon.daemon.change_working_directory(...)
Called daemon.daemon.change_process_owner(...)
Called daemon.daemon.detach_process_context()
Called daemon.daemon.DaemonContext._make_signal_handler_map()
Called daemon.daemon.set_signal_handlers(...)
Called daemon.daemon.DaemonContext._get_exclude_file_descriptors()
Called daemon.daemon.close_all_open_files(...)
Called daemon.daemon.redirect_stream(...)
Called daemon.daemon.redirect_stream(...)
Called daemon.daemon.redirect_stream(...)
Called pidlockfile.PIDLockFile.__enter__()
Called daemon.daemon.register_atexit_function(...)
""" % vars()
self.mock_tracker.clear()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_returns_immediately_if_is_open(self):
""" Should return immediately if is_open property is true. """
instance = self.test_instance
instance._is_open = True
expect_mock_output = """\
"""
self.mock_tracker.clear()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_changes_root_directory_to_chroot_directory(self):
""" Should change root directory to `chroot_directory` option. """
instance = self.test_instance
chroot_directory = object()
instance.chroot_directory = chroot_directory
expect_mock_output = """\
Called daemon.daemon.change_root_directory(
%(chroot_directory)r)
...
""" % vars()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_omits_chroot_if_no_chroot_directory(self):
""" Should omit changing root directory if no `chroot_directory`. """
instance = self.test_instance
instance.chroot_directory = None
unwanted_output = """\
...Called daemon.daemon.change_root_directory(...)..."""
instance.open()
self.failIfMockCheckerMatch(unwanted_output)
def test_prevents_core_dump(self):
""" Should request prevention of core dumps. """
instance = self.test_instance
expect_mock_output = """\
Called daemon.daemon.prevent_core_dump()
...
""" % vars()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_omits_prevent_core_dump_if_prevent_core_false(self):
""" Should omit preventing core dumps if `prevent_core` is false. """
instance = self.test_instance
instance.prevent_core = False
unwanted_output = """\
...Called daemon.daemon.prevent_core_dump()..."""
instance.open()
self.failIfMockCheckerMatch(unwanted_output)
def test_closes_open_files(self):
""" Should close all open files, excluding `files_preserve`. """
instance = self.test_instance
expect_exclude = self.test_files_preserve_fds
expect_mock_output = """\
...
Called daemon.daemon.close_all_open_files(
exclude=%(expect_exclude)r)
...
""" % vars()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_changes_directory_to_working_directory(self):
""" Should change current directory to `working_directory` option. """
instance = self.test_instance
working_directory = object()
instance.working_directory = working_directory
expect_mock_output = """\
...
Called daemon.daemon.change_working_directory(
%(working_directory)r)
...
""" % vars()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_changes_creation_mask_to_umask(self):
""" Should change file creation mask to `umask` option. """
instance = self.test_instance
umask = object()
instance.umask = umask
expect_mock_output = """\
...
Called daemon.daemon.change_file_creation_mask(%(umask)r)
...
""" % vars()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_changes_owner_to_specified_uid_and_gid(self):
""" Should change process UID and GID to `uid` and `gid` options. """
instance = self.test_instance
uid = object()
gid = object()
instance.uid = uid
instance.gid = gid
expect_mock_output = """\
...
Called daemon.daemon.change_process_owner(%(uid)r, %(gid)r)
...
""" % vars()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_detaches_process_context(self):
""" Should request detach of process context. """
instance = self.test_instance
expect_mock_output = """\
...
Called daemon.daemon.detach_process_context()
...
""" % vars()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_omits_process_detach_if_not_required(self):
""" Should omit detach of process context if not required. """
instance = self.test_instance
instance.detach_process = False
unwanted_output = """\
...Called daemon.daemon.detach_process_context(...)..."""
instance.open()
self.failIfMockCheckerMatch(unwanted_output)
def test_sets_signal_handlers_from_signal_map(self):
""" Should set signal handlers according to `signal_map`. """
instance = self.test_instance
instance.signal_map = object()
expect_signal_handler_map = self.test_signal_handler_map
expect_mock_output = """\
...
Called daemon.daemon.set_signal_handlers(
%(expect_signal_handler_map)r)
...
""" % vars()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_redirects_standard_streams(self):
""" Should request redirection of standard stream files. """
instance = self.test_instance
(system_stdin, system_stdout, system_stderr) = (
sys.stdin, sys.stdout, sys.stderr)
(target_stdin, target_stdout, target_stderr) = (
self.stream_files_by_name[name]
for name in ['stdin', 'stdout', 'stderr'])
expect_mock_output = """\
...
Called daemon.daemon.redirect_stream(
%(system_stdin)r, %(target_stdin)r)
Called daemon.daemon.redirect_stream(
%(system_stdout)r, %(target_stdout)r)
Called daemon.daemon.redirect_stream(
%(system_stderr)r, %(target_stderr)r)
...
""" % vars()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_enters_pidfile_context(self):
""" Should enter the PID file context manager. """
instance = self.test_instance
instance.pidfile = self.mock_pidlockfile
expect_mock_output = """\
...
Called pidlockfile.PIDLockFile.__enter__()
...
"""
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_sets_is_open_true(self):
""" Should set the `is_open` property to True. """
instance = self.test_instance
instance.open()
self.failUnlessEqual(True, instance.is_open)
def test_registers_close_method_for_atexit(self):
""" Should register the `close` method for atexit processing. """
instance = self.test_instance
close_method = instance.close
expect_mock_output = """\
...
Called daemon.daemon.register_atexit_function(%(close_method)r)
""" % vars()
instance.open()
self.failUnlessMockCheckerMatch(expect_mock_output)
class DaemonContext_close_TestCase(scaffold.TestCase):
""" Test cases for DaemonContext.close method. """
def setUp(self):
""" Set up test fixtures. """
setup_daemon_context_fixtures(self)
self.mock_tracker.clear()
self.test_instance._is_open = True
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_returns_immediately_if_not_is_open(self):
""" Should return immediately if is_open property is false. """
instance = self.test_instance
instance._is_open = False
instance.pidfile = object()
expect_mock_output = """\
"""
self.mock_tracker.clear()
instance.close()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_exits_pidfile_context(self):
""" Should exit the PID file context manager. """
instance = self.test_instance
instance.pidfile = self.mock_pidlockfile
expect_mock_output = """\
Called pidlockfile.PIDLockFile.__exit__(None, None, None)
"""
instance.close()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_returns_none(self):
""" Should return None. """
instance = self.test_instance
expect_result = None
result = instance.close()
self.failUnlessIs(expect_result, result)
def test_sets_is_open_false(self):
""" Should set the `is_open` property to False. """
instance = self.test_instance
instance.close()
self.failUnlessEqual(False, instance.is_open)
class DaemonContext_context_manager_enter_TestCase(scaffold.TestCase):
""" Test cases for DaemonContext.__enter__ method. """
def setUp(self):
""" Set up test fixtures. """
setup_daemon_context_fixtures(self)
self.mock_tracker.clear()
scaffold.mock(
"daemon.daemon.DaemonContext.open",
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_opens_daemon_context(self):
""" Should open the DaemonContext. """
instance = self.test_instance
expect_mock_output = """\
Called daemon.daemon.DaemonContext.open()
"""
instance.__enter__()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_returns_self_instance(self):
""" Should return DaemonContext instance. """
instance = self.test_instance
expect_result = instance
result = instance.__enter__()
self.failUnlessIs(expect_result, result)
class DaemonContext_context_manager_exit_TestCase(scaffold.TestCase):
""" Test cases for DaemonContext.__exit__ method. """
def setUp(self):
""" Set up test fixtures. """
setup_daemon_context_fixtures(self)
self.mock_tracker.clear()
self.test_args = dict(
exc_type = object(),
exc_value = object(),
traceback = object(),
)
scaffold.mock(
"daemon.daemon.DaemonContext.close",
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_closes_daemon_context(self):
""" Should close the DaemonContext. """
instance = self.test_instance
args = self.test_args
expect_mock_output = """\
Called daemon.daemon.DaemonContext.close()
"""
instance.__exit__(**args)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_returns_none(self):
""" Should return None, indicating exception was not handled. """
instance = self.test_instance
args = self.test_args
expect_result = None
result = instance.__exit__(**args)
self.failUnlessIs(expect_result, result)
class DaemonContext_terminate_TestCase(scaffold.TestCase):
""" Test cases for DaemonContext.terminate method. """
def setUp(self):
""" Set up test fixtures. """
setup_daemon_context_fixtures(self)
self.test_signal = signal.SIGTERM
self.test_frame = None
self.test_args = (self.test_signal, self.test_frame)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_raises_system_exit(self):
""" Should raise SystemExit. """
instance = self.test_instance
args = self.test_args
expect_exception = SystemExit
self.failUnlessRaises(
expect_exception,
instance.terminate, *args)
def test_exception_message_contains_signal_number(self):
""" Should raise exception with a message containing signal number. """
instance = self.test_instance
args = self.test_args
signal_number = self.test_signal
expect_exception = SystemExit
try:
instance.terminate(*args)
except expect_exception, exc:
pass
self.failUnlessIn(str(exc), str(signal_number))
class DaemonContext_get_exclude_file_descriptors_TestCase(scaffold.TestCase):
""" Test cases for DaemonContext._get_exclude_file_descriptors function. """
def setUp(self):
""" Set up test fixtures. """
setup_daemon_context_fixtures(self)
self.test_files = {
2: FakeFileDescriptorStringIO(),
5: 5,
11: FakeFileDescriptorStringIO(),
17: None,
23: FakeFileDescriptorStringIO(),
37: 37,
42: FakeFileDescriptorStringIO(),
}
for (fileno, item) in self.test_files.items():
if hasattr(item, '_fileno'):
item._fileno = fileno
self.test_file_descriptors = set(
fd for (fd, item) in self.test_files.items()
if item is not None)
self.test_file_descriptors.update(
self.stream_files_by_name[name].fileno()
for name in ['stdin', 'stdout', 'stderr']
)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_returns_expected_file_descriptors(self):
""" Should return expected set of file descriptors. """
instance = self.test_instance
instance.files_preserve = self.test_files.values()
expect_result = self.test_file_descriptors
result = instance._get_exclude_file_descriptors()
self.failUnlessEqual(expect_result, result)
def test_returns_stream_redirects_if_no_files_preserve(self):
""" Should return only stream redirects if no files_preserve. """
instance = self.test_instance
instance.files_preserve = None
expect_result = set(
stream.fileno()
for stream in self.stream_files_by_name.values())
result = instance._get_exclude_file_descriptors()
self.failUnlessEqual(expect_result, result)
def test_returns_empty_set_if_no_files(self):
""" Should return empty set if no file options. """
instance = self.test_instance
for name in ['files_preserve', 'stdin', 'stdout', 'stderr']:
setattr(instance, name, None)
expect_result = set()
result = instance._get_exclude_file_descriptors()
self.failUnlessEqual(expect_result, result)
def test_return_set_omits_streams_without_file_descriptors(self):
""" Should omit any stream without a file descriptor. """
instance = self.test_instance
instance.files_preserve = self.test_files.values()
stream_files = self.stream_files_by_name
stream_names = stream_files.keys()
expect_result = self.test_file_descriptors.copy()
for (pseudo_stream_name, pseudo_stream) in stream_files.items():
setattr(instance, pseudo_stream_name, StringIO())
stream_fd = pseudo_stream.fileno()
expect_result.discard(stream_fd)
result = instance._get_exclude_file_descriptors()
self.failUnlessEqual(expect_result, result)
class DaemonContext_make_signal_handler_TestCase(scaffold.TestCase):
""" Test cases for DaemonContext._make_signal_handler function. """
def setUp(self):
""" Set up test fixtures. """
setup_daemon_context_fixtures(self)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_returns_ignore_for_none(self):
""" Should return SIG_IGN when None handler specified. """
instance = self.test_instance
target = None
expect_result = signal.SIG_IGN
result = instance._make_signal_handler(target)
self.failUnlessEqual(expect_result, result)
def test_returns_method_for_name(self):
""" Should return method of DaemonContext when name specified. """
instance = self.test_instance
target = 'terminate'
expect_result = instance.terminate
result = instance._make_signal_handler(target)
self.failUnlessEqual(expect_result, result)
def test_raises_error_for_unknown_name(self):
""" Should raise AttributeError for unknown method name. """
instance = self.test_instance
target = 'b0gUs'
expect_error = AttributeError
self.failUnlessRaises(
expect_error,
instance._make_signal_handler, target)
def test_returns_object_for_object(self):
""" Should return same object for any other object. """
instance = self.test_instance
target = object()
expect_result = target
result = instance._make_signal_handler(target)
self.failUnlessEqual(expect_result, result)
class DaemonContext_make_signal_handler_map_TestCase(scaffold.TestCase):
""" Test cases for DaemonContext._make_signal_handler_map function. """
def setUp(self):
""" Set up test fixtures. """
setup_daemon_context_fixtures(self)
self.test_instance.signal_map = {
object(): object(),
object(): object(),
object(): object(),
}
self.test_signal_handlers = dict(
(key, object())
for key in self.test_instance.signal_map.values())
self.test_signal_handler_map = dict(
(key, self.test_signal_handlers[target])
for (key, target) in self.test_instance.signal_map.items())
def mock_make_signal_handler(target):
return self.test_signal_handlers[target]
scaffold.mock(
"daemon.daemon.DaemonContext._make_signal_handler",
returns_func=mock_make_signal_handler,
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_returns_constructed_signal_handler_items(self):
""" Should return items as constructed via make_signal_handler. """
instance = self.test_instance
expect_result = self.test_signal_handler_map
result = instance._make_signal_handler_map()
self.failUnlessEqual(expect_result, result)
class change_working_directory_TestCase(scaffold.TestCase):
""" Test cases for change_working_directory function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
scaffold.mock(
"os.chdir",
tracker=self.mock_tracker)
self.test_directory = object()
self.test_args = dict(
directory=self.test_directory,
)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_changes_working_directory_to_specified_directory(self):
""" Should change working directory to specified directory. """
args = self.test_args
directory = self.test_directory
expect_mock_output = """\
Called os.chdir(%(directory)r)
""" % vars()
daemon.daemon.change_working_directory(**args)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_raises_daemon_error_on_os_error(self):
""" Should raise a DaemonError on receiving and OSError. """
args = self.test_args
test_error = OSError(errno.ENOENT, "No such directory")
os.chdir.mock_raises = test_error
expect_error = daemon.daemon.DaemonOSEnvironmentError
self.failUnlessRaises(
expect_error,
daemon.daemon.change_working_directory, **args)
def test_error_message_contains_original_error_message(self):
""" Should raise a DaemonError with original message. """
args = self.test_args
test_error = OSError(errno.ENOENT, "No such directory")
os.chdir.mock_raises = test_error
expect_error = daemon.daemon.DaemonOSEnvironmentError
try:
daemon.daemon.change_working_directory(**args)
except expect_error, exc:
pass
self.failUnlessIn(str(exc), str(test_error))
class change_root_directory_TestCase(scaffold.TestCase):
""" Test cases for change_root_directory function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
scaffold.mock(
"os.chdir",
tracker=self.mock_tracker)
scaffold.mock(
"os.chroot",
tracker=self.mock_tracker)
self.test_directory = object()
self.test_args = dict(
directory=self.test_directory,
)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_changes_working_directory_to_specified_directory(self):
""" Should change working directory to specified directory. """
args = self.test_args
directory = self.test_directory
expect_mock_output = """\
Called os.chdir(%(directory)r)
...
""" % vars()
daemon.daemon.change_root_directory(**args)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_changes_root_directory_to_specified_directory(self):
""" Should change root directory to specified directory. """
args = self.test_args
directory = self.test_directory
expect_mock_output = """\
...
Called os.chroot(%(directory)r)
""" % vars()
daemon.daemon.change_root_directory(**args)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_raises_daemon_error_on_os_error_from_chdir(self):
""" Should raise a DaemonError on receiving an OSError from chdir. """
args = self.test_args
test_error = OSError(errno.ENOENT, "No such directory")
os.chdir.mock_raises = test_error
expect_error = daemon.daemon.DaemonOSEnvironmentError
self.failUnlessRaises(
expect_error,
daemon.daemon.change_root_directory, **args)
def test_raises_daemon_error_on_os_error_from_chroot(self):
""" Should raise a DaemonError on receiving an OSError from chroot. """
args = self.test_args
test_error = OSError(errno.EPERM, "No chroot for you!")
os.chroot.mock_raises = test_error
expect_error = daemon.daemon.DaemonOSEnvironmentError
self.failUnlessRaises(
expect_error,
daemon.daemon.change_root_directory, **args)
def test_error_message_contains_original_error_message(self):
""" Should raise a DaemonError with original message. """
args = self.test_args
test_error = OSError(errno.ENOENT, "No such directory")
os.chdir.mock_raises = test_error
expect_error = daemon.daemon.DaemonOSEnvironmentError
try:
daemon.daemon.change_root_directory(**args)
except expect_error, exc:
pass
self.failUnlessIn(str(exc), str(test_error))
class change_file_creation_mask_TestCase(scaffold.TestCase):
""" Test cases for change_file_creation_mask function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
scaffold.mock(
"os.umask",
tracker=self.mock_tracker)
self.test_mask = object()
self.test_args = dict(
mask=self.test_mask,
)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_changes_umask_to_specified_mask(self):
""" Should change working directory to specified directory. """
args = self.test_args
mask = self.test_mask
expect_mock_output = """\
Called os.umask(%(mask)r)
""" % vars()
daemon.daemon.change_file_creation_mask(**args)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_raises_daemon_error_on_os_error_from_chdir(self):
""" Should raise a DaemonError on receiving an OSError from umask. """
args = self.test_args
test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
os.umask.mock_raises = test_error
expect_error = daemon.daemon.DaemonOSEnvironmentError
self.failUnlessRaises(
expect_error,
daemon.daemon.change_file_creation_mask, **args)
def test_error_message_contains_original_error_message(self):
""" Should raise a DaemonError with original message. """
args = self.test_args
test_error = OSError(errno.ENOENT, "No such directory")
os.umask.mock_raises = test_error
expect_error = daemon.daemon.DaemonOSEnvironmentError
try:
daemon.daemon.change_file_creation_mask(**args)
except expect_error, exc:
pass
self.failUnlessIn(str(exc), str(test_error))
class change_process_owner_TestCase(scaffold.TestCase):
""" Test cases for change_process_owner function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
scaffold.mock(
"os.setuid",
tracker=self.mock_tracker)
scaffold.mock(
"os.setgid",
tracker=self.mock_tracker)
self.test_uid = object()
self.test_gid = object()
self.test_args = dict(
uid=self.test_uid,
gid=self.test_gid,
)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_changes_gid_and_uid_in_order(self):
""" Should change process GID and UID in correct order.
Since the process requires appropriate privilege to use
either of `setuid` or `setgid`, changing the UID must be
done last.
"""
args = self.test_args
expect_mock_output = """\
Called os.setgid(...)
Called os.setuid(...)
""" % vars()
daemon.daemon.change_process_owner(**args)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_changes_group_id_to_gid(self):
""" Should change process GID to specified value. """
args = self.test_args
gid = self.test_gid
expect_mock_output = """\
Called os.setgid(%(gid)r)
...
""" % vars()
daemon.daemon.change_process_owner(**args)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_changes_user_id_to_uid(self):
""" Should change process UID to specified value. """
args = self.test_args
uid = self.test_uid
expect_mock_output = """\
...
Called os.setuid(%(uid)r)
""" % vars()
daemon.daemon.change_process_owner(**args)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_raises_daemon_error_on_os_error_from_setgid(self):
""" Should raise a DaemonError on receiving an OSError from setgid. """
args = self.test_args
test_error = OSError(errno.EPERM, "No switching for you!")
os.setgid.mock_raises = test_error
expect_error = daemon.daemon.DaemonOSEnvironmentError
self.failUnlessRaises(
expect_error,
daemon.daemon.change_process_owner, **args)
def test_raises_daemon_error_on_os_error_from_setuid(self):
""" Should raise a DaemonError on receiving an OSError from setuid. """
args = self.test_args
test_error = OSError(errno.EPERM, "No switching for you!")
os.setuid.mock_raises = test_error
expect_error = daemon.daemon.DaemonOSEnvironmentError
self.failUnlessRaises(
expect_error,
daemon.daemon.change_process_owner, **args)
def test_error_message_contains_original_error_message(self):
""" Should raise a DaemonError with original message. """
args = self.test_args
test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
os.setuid.mock_raises = test_error
expect_error = daemon.daemon.DaemonOSEnvironmentError
try:
daemon.daemon.change_process_owner(**args)
except expect_error, exc:
pass
self.failUnlessIn(str(exc), str(test_error))
class prevent_core_dump_TestCase(scaffold.TestCase):
""" Test cases for prevent_core_dump function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
self.RLIMIT_CORE = object()
scaffold.mock(
"resource.RLIMIT_CORE", mock_obj=self.RLIMIT_CORE,
tracker=self.mock_tracker)
scaffold.mock(
"resource.getrlimit", returns=None,
tracker=self.mock_tracker)
scaffold.mock(
"resource.setrlimit", returns=None,
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_sets_core_limit_to_zero(self):
""" Should set the RLIMIT_CORE resource to zero. """
expect_resource = self.RLIMIT_CORE
expect_limit = (0, 0)
expect_mock_output = """\
Called resource.getrlimit(
%(expect_resource)r)
Called resource.setrlimit(
%(expect_resource)r,
%(expect_limit)r)
""" % vars()
daemon.daemon.prevent_core_dump()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_raises_error_when_no_core_resource(self):
""" Should raise DaemonError if no RLIMIT_CORE resource. """
def mock_getrlimit(res):
if res == resource.RLIMIT_CORE:
raise ValueError("Bogus platform doesn't have RLIMIT_CORE")
else:
return None
resource.getrlimit.mock_returns_func = mock_getrlimit
expect_error = daemon.daemon.DaemonOSEnvironmentError
self.failUnlessRaises(
expect_error,
daemon.daemon.prevent_core_dump)
class close_file_descriptor_if_open_TestCase(scaffold.TestCase):
""" Test cases for close_file_descriptor_if_open function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
self.test_fd = 274
scaffold.mock(
"os.close",
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_requests_file_descriptor_close(self):
""" Should request close of file descriptor. """
fd = self.test_fd
expect_mock_output = """\
Called os.close(%(fd)r)
""" % vars()
daemon.daemon.close_file_descriptor_if_open(fd)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_ignores_badfd_error_on_close(self):
""" Should ignore OSError EBADF when closing. """
fd = self.test_fd
test_error = OSError(errno.EBADF, "Bad file descriptor")
def os_close(fd):
raise test_error
os.close.mock_returns_func = os_close
expect_mock_output = """\
Called os.close(%(fd)r)
""" % vars()
daemon.daemon.close_file_descriptor_if_open(fd)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_raises_error_if_error_on_close(self):
""" Should raise DaemonError if an OSError occurs when closing. """
fd = self.test_fd
test_error = OSError(object(), "Unexpected error")
def os_close(fd):
raise test_error
os.close.mock_returns_func = os_close
expect_error = daemon.daemon.DaemonOSEnvironmentError
self.failUnlessRaises(
expect_error,
daemon.daemon.close_file_descriptor_if_open, fd)
class maxfd_TestCase(scaffold.TestCase):
""" Test cases for module MAXFD constant. """
def test_positive(self):
""" Should be a positive number. """
maxfd = daemon.daemon.MAXFD
self.failUnless(maxfd > 0)
def test_integer(self):
""" Should be an integer. """
maxfd = daemon.daemon.MAXFD
self.failUnlessEqual(int(maxfd), maxfd)
def test_reasonably_high(self):
""" Should be reasonably high for default open files limit.
If the system reports a limit of “infinity” on maximum
file descriptors, we still need a finite number in order
to close “all” of them. Ensure this is reasonably high
to catch most use cases.
"""
expect_minimum = 2048
maxfd = daemon.daemon.MAXFD
self.failUnless(
expect_minimum <= maxfd,
msg="MAXFD should be at least %(expect_minimum)r (got %(maxfd)r)"
% vars())
class get_maximum_file_descriptors_TestCase(scaffold.TestCase):
""" Test cases for get_maximum_file_descriptors function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
self.RLIMIT_NOFILE = object()
self.RLIM_INFINITY = object()
self.test_rlimit_nofile = 2468
def mock_getrlimit(resource):
result = (object(), self.test_rlimit_nofile)
if resource != self.RLIMIT_NOFILE:
result = NotImplemented
return result
self.test_maxfd = object()
scaffold.mock(
"daemon.daemon.MAXFD", mock_obj=self.test_maxfd,
tracker=self.mock_tracker)
scaffold.mock(
"resource.RLIMIT_NOFILE", mock_obj=self.RLIMIT_NOFILE,
tracker=self.mock_tracker)
scaffold.mock(
"resource.RLIM_INFINITY", mock_obj=self.RLIM_INFINITY,
tracker=self.mock_tracker)
scaffold.mock(
"resource.getrlimit", returns_func=mock_getrlimit,
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_returns_system_hard_limit(self):
""" Should return process hard limit on number of files. """
expect_result = self.test_rlimit_nofile
result = daemon.daemon.get_maximum_file_descriptors()
self.failUnlessEqual(expect_result, result)
def test_returns_module_default_if_hard_limit_infinity(self):
""" Should return module MAXFD if hard limit is infinity. """
self.test_rlimit_nofile = self.RLIM_INFINITY
expect_result = self.test_maxfd
result = daemon.daemon.get_maximum_file_descriptors()
self.failUnlessEqual(expect_result, result)
class close_all_open_files_TestCase(scaffold.TestCase):
""" Test cases for close_all_open_files function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
self.RLIMIT_NOFILE = object()
self.RLIM_INFINITY = object()
self.test_rlimit_nofile = self.RLIM_INFINITY
def mock_getrlimit(resource):
result = (self.test_rlimit_nofile, object())
if resource != self.RLIMIT_NOFILE:
result = NotImplemented
return result
self.test_maxfd = 8
scaffold.mock(
"daemon.daemon.get_maximum_file_descriptors",
returns=self.test_maxfd,
tracker=self.mock_tracker)
scaffold.mock(
"resource.RLIMIT_NOFILE", mock_obj=self.RLIMIT_NOFILE,
tracker=self.mock_tracker)
scaffold.mock(
"resource.RLIM_INFINITY", mock_obj=self.RLIM_INFINITY,
tracker=self.mock_tracker)
scaffold.mock(
"resource.getrlimit", returns_func=mock_getrlimit,
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.close_file_descriptor_if_open",
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_requests_all_open_files_to_close(self):
""" Should request close of all open files. """
expect_file_descriptors = reversed(range(self.test_maxfd))
expect_mock_output = "...\n" + "".join(
"Called daemon.daemon.close_file_descriptor_if_open(%(fd)r)\n"
% vars()
for fd in expect_file_descriptors)
daemon.daemon.close_all_open_files()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_requests_all_but_excluded_files_to_close(self):
""" Should request close of all open files but those excluded. """
test_exclude = set([3, 7])
args = dict(
exclude = test_exclude,
)
expect_file_descriptors = (
fd for fd in reversed(range(self.test_maxfd))
if fd not in test_exclude)
expect_mock_output = "...\n" + "".join(
"Called daemon.daemon.close_file_descriptor_if_open(%(fd)r)\n"
% vars()
for fd in expect_file_descriptors)
daemon.daemon.close_all_open_files(**args)
self.failUnlessMockCheckerMatch(expect_mock_output)
class detach_process_context_TestCase(scaffold.TestCase):
""" Test cases for detach_process_context function. """
class FakeOSExit(SystemExit):
""" Fake exception raised for os._exit(). """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
test_pids = [0, 0]
scaffold.mock(
"os.fork", returns_iter=test_pids,
tracker=self.mock_tracker)
scaffold.mock(
"os.setsid",
tracker=self.mock_tracker)
def raise_os_exit(status=None):
raise self.FakeOSExit(status)
scaffold.mock(
"os._exit", returns_func=raise_os_exit,
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_parent_exits(self):
""" Parent process should exit. """
parent_pid = 23
scaffold.mock("os.fork", returns_iter=[parent_pid],
tracker=self.mock_tracker)
expect_mock_output = """\
Called os.fork()
Called os._exit(0)
"""
self.failUnlessRaises(
self.FakeOSExit,
daemon.daemon.detach_process_context)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_first_fork_error_raises_error(self):
""" Error on first fork should raise DaemonProcessDetachError. """
fork_errno = 13
fork_strerror = "Bad stuff happened"
fork_error = OSError(fork_errno, fork_strerror)
test_pids_iter = iter([fork_error])
def mock_fork():
next = test_pids_iter.next()
if isinstance(next, Exception):
raise next
else:
return next
scaffold.mock("os.fork", returns_func=mock_fork,
tracker=self.mock_tracker)
expect_mock_output = """\
Called os.fork()
"""
self.failUnlessRaises(
daemon.daemon.DaemonProcessDetachError,
daemon.daemon.detach_process_context)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_child_starts_new_process_group(self):
""" Child should start new process group. """
expect_mock_output = """\
Called os.fork()
Called os.setsid()
...
"""
daemon.daemon.detach_process_context()
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_child_forks_next_parent_exits(self):
""" Child should fork, then exit if parent. """
test_pids = [0, 42]
scaffold.mock("os.fork", returns_iter=test_pids,
tracker=self.mock_tracker)
expect_mock_output = """\
Called os.fork()
Called os.setsid()
Called os.fork()
Called os._exit(0)
"""
self.failUnlessRaises(
self.FakeOSExit,
daemon.daemon.detach_process_context)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_second_fork_error_reports_to_stderr(self):
""" Error on second fork should cause report to stderr. """
fork_errno = 17
fork_strerror = "Nasty stuff happened"
fork_error = OSError(fork_errno, fork_strerror)
test_pids_iter = iter([0, fork_error])
def mock_fork():
next = test_pids_iter.next()
if isinstance(next, Exception):
raise next
else:
return next
scaffold.mock("os.fork", returns_func=mock_fork,
tracker=self.mock_tracker)
expect_mock_output = """\
Called os.fork()
Called os.setsid()
Called os.fork()
"""
self.failUnlessRaises(
daemon.daemon.DaemonProcessDetachError,
daemon.daemon.detach_process_context)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_child_forks_next_child_continues(self):
""" Child should fork, then continue if child. """
expect_mock_output = """\
Called os.fork()
Called os.setsid()
Called os.fork()
""" % vars()
daemon.daemon.detach_process_context()
self.failUnlessMockCheckerMatch(expect_mock_output)
class is_process_started_by_init_TestCase(scaffold.TestCase):
""" Test cases for is_process_started_by_init function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
self.test_ppid = 765
scaffold.mock(
"os.getppid",
returns=self.test_ppid,
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_returns_false_by_default(self):
""" Should return False under normal circumstances. """
expect_result = False
result = daemon.daemon.is_process_started_by_init()
self.failUnlessIs(expect_result, result)
def test_returns_true_if_parent_process_is_init(self):
""" Should return True if parent process is `init`. """
init_pid = 1
os.getppid.mock_returns = init_pid
expect_result = True
result = daemon.daemon.is_process_started_by_init()
self.failUnlessIs(expect_result, result)
class is_socket_TestCase(scaffold.TestCase):
""" Test cases for is_socket function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
def mock_getsockopt(level, optname, buflen=None):
result = object()
if optname is socket.SO_TYPE:
result = socket.SOCK_RAW
return result
self.mock_socket_getsockopt_func = mock_getsockopt
self.mock_socket_error = socket.error(
errno.ENOTSOCK,
"Socket operation on non-socket")
self.mock_socket = scaffold.Mock(
"socket.socket",
tracker=self.mock_tracker)
self.mock_socket.getsockopt.mock_raises = self.mock_socket_error
def mock_socket_fromfd(fd, family, type, proto=None):
return self.mock_socket
scaffold.mock(
"socket.fromfd",
returns_func=mock_socket_fromfd,
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_returns_false_by_default(self):
""" Should return False under normal circumstances. """
test_fd = 23
expect_result = False
result = daemon.daemon.is_socket(test_fd)
self.failUnlessIs(expect_result, result)
def test_returns_true_if_stdin_is_socket(self):
""" Should return True if `stdin` is a socket. """
test_fd = 23
getsockopt = self.mock_socket.getsockopt
getsockopt.mock_raises = None
getsockopt.mock_returns_func = self.mock_socket_getsockopt_func
expect_result = True
result = daemon.daemon.is_socket(test_fd)
self.failUnlessIs(expect_result, result)
def test_returns_false_if_stdin_socket_raises_error(self):
""" Should return True if `stdin` is a socket and raises error. """
test_fd = 23
getsockopt = self.mock_socket.getsockopt
getsockopt.mock_raises = socket.error(
object(), "Weird socket stuff")
expect_result = True
result = daemon.daemon.is_socket(test_fd)
self.failUnlessIs(expect_result, result)
class is_process_started_by_superserver_TestCase(scaffold.TestCase):
""" Test cases for is_process_started_by_superserver function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
def mock_is_socket(fd):
if sys.__stdin__.fileno() == fd:
result = self.mock_stdin_is_socket_func()
else:
result = False
return result
self.mock_stdin_is_socket_func = (lambda: False)
scaffold.mock(
"daemon.daemon.is_socket",
returns_func=mock_is_socket,
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_returns_false_by_default(self):
""" Should return False under normal circumstances. """
expect_result = False
result = daemon.daemon.is_process_started_by_superserver()
self.failUnlessIs(expect_result, result)
def test_returns_true_if_stdin_is_socket(self):
""" Should return True if `stdin` is a socket. """
self.mock_stdin_is_socket_func = (lambda: True)
expect_result = True
result = daemon.daemon.is_process_started_by_superserver()
self.failUnlessIs(expect_result, result)
class is_detach_process_context_required_TestCase(scaffold.TestCase):
""" Test cases for is_detach_process_context_required function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
scaffold.mock(
"daemon.daemon.is_process_started_by_init",
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.is_process_started_by_superserver",
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_returns_true_by_default(self):
""" Should return False under normal circumstances. """
expect_result = True
result = daemon.daemon.is_detach_process_context_required()
self.failUnlessIs(expect_result, result)
def test_returns_false_if_started_by_init(self):
""" Should return False if current process started by init. """
daemon.daemon.is_process_started_by_init.mock_returns = True
expect_result = False
result = daemon.daemon.is_detach_process_context_required()
self.failUnlessIs(expect_result, result)
def test_returns_true_if_started_by_superserver(self):
""" Should return False if current process started by superserver. """
daemon.daemon.is_process_started_by_superserver.mock_returns = True
expect_result = False
result = daemon.daemon.is_detach_process_context_required()
self.failUnlessIs(expect_result, result)
def setup_streams_fixtures(testcase):
""" Set up common test fixtures for standard streams. """
testcase.mock_tracker = scaffold.MockTracker()
testcase.stream_file_paths = dict(
stdin = tempfile.mktemp(),
stdout = tempfile.mktemp(),
stderr = tempfile.mktemp(),
)
testcase.stream_files_by_name = dict(
(name, FakeFileDescriptorStringIO())
for name in ['stdin', 'stdout', 'stderr']
)
testcase.stream_files_by_path = dict(
(testcase.stream_file_paths[name],
testcase.stream_files_by_name[name])
for name in ['stdin', 'stdout', 'stderr']
)
scaffold.mock(
"os.dup2",
tracker=testcase.mock_tracker)
class redirect_stream_TestCase(scaffold.TestCase):
""" Test cases for redirect_stream function. """
def setUp(self):
""" Set up test fixtures. """
setup_streams_fixtures(self)
self.test_system_stream = FakeFileDescriptorStringIO()
self.test_target_stream = FakeFileDescriptorStringIO()
self.test_null_file = FakeFileDescriptorStringIO()
def mock_open(path, flag, mode=None):
if path == os.devnull:
result = self.test_null_file.fileno()
else:
raise OSError(errno.NOENT, "No such file", path)
return result
scaffold.mock(
"os.open",
returns_func=mock_open,
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_duplicates_target_file_descriptor(self):
""" Should duplicate file descriptor from target to system stream. """
system_stream = self.test_system_stream
system_fileno = system_stream.fileno()
target_stream = self.test_target_stream
target_fileno = target_stream.fileno()
expect_mock_output = """\
Called os.dup2(%(target_fileno)r, %(system_fileno)r)
""" % vars()
daemon.daemon.redirect_stream(system_stream, target_stream)
self.failUnlessMockCheckerMatch(expect_mock_output)
def test_duplicates_null_file_descriptor_by_default(self):
""" Should by default duplicate the null file to the system stream. """
system_stream = self.test_system_stream
system_fileno = system_stream.fileno()
target_stream = None
null_path = os.devnull
null_flag = os.O_RDWR
null_file = self.test_null_file
null_fileno = null_file.fileno()
expect_mock_output = """\
Called os.open(%(null_path)r, %(null_flag)r)
Called os.dup2(%(null_fileno)r, %(system_fileno)r)
""" % vars()
daemon.daemon.redirect_stream(system_stream, target_stream)
self.failUnlessMockCheckerMatch(expect_mock_output)
class make_default_signal_map_TestCase(scaffold.TestCase):
""" Test cases for make_default_signal_map function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
mock_signal_module = ModuleType('signal')
mock_signal_names = [
'SIGHUP',
'SIGCLD',
'SIGSEGV',
'SIGTSTP',
'SIGTTIN',
'SIGTTOU',
'SIGTERM',
]
for name in mock_signal_names:
setattr(mock_signal_module, name, object())
scaffold.mock(
"signal",
mock_obj=mock_signal_module,
tracker=self.mock_tracker)
scaffold.mock(
"daemon.daemon.signal",
mock_obj=mock_signal_module,
tracker=self.mock_tracker)
default_signal_map_by_name = {
'SIGTSTP': None,
'SIGTTIN': None,
'SIGTTOU': None,
'SIGTERM': 'terminate',
}
self.default_signal_map = dict(
(getattr(signal, name), target)
for (name, target) in default_signal_map_by_name.items())
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_returns_constructed_signal_map(self):
""" Should return map per default. """
expect_result = self.default_signal_map
result = daemon.daemon.make_default_signal_map()
self.failUnlessEqual(expect_result, result)
def test_returns_signal_map_with_only_ids_in_signal_module(self):
""" Should return map with only signals in the `signal` module.
The `signal` module is documented to only define those
signals which exist on the running system. Therefore the
default map should not contain any signals which are not
defined in the `signal` module.
"""
del(self.default_signal_map[signal.SIGTTOU])
del(signal.SIGTTOU)
expect_result = self.default_signal_map
result = daemon.daemon.make_default_signal_map()
self.failUnlessEqual(expect_result, result)
class set_signal_handlers_TestCase(scaffold.TestCase):
""" Test cases for set_signal_handlers function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
scaffold.mock(
"signal.signal",
tracker=self.mock_tracker)
self.signal_handler_map = {
signal.SIGQUIT: object(),
signal.SIGSEGV: object(),
signal.SIGINT: object(),
}
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_sets_signal_handler_for_each_item(self):
""" Should set signal handler for each item in map. """
signal_handler_map = self.signal_handler_map
expect_mock_output = "".join(
"Called signal.signal(%(signal_number)r, %(handler)r)\n"
% vars()
for (signal_number, handler) in signal_handler_map.items())
daemon.daemon.set_signal_handlers(signal_handler_map)
self.failUnlessMockCheckerMatch(expect_mock_output)
class register_atexit_function_TestCase(scaffold.TestCase):
""" Test cases for register_atexit_function function. """
def setUp(self):
""" Set up test fixtures. """
self.mock_tracker = scaffold.MockTracker()
scaffold.mock(
"atexit.register",
tracker=self.mock_tracker)
def tearDown(self):
""" Tear down test fixtures. """
scaffold.mock_restore()
def test_registers_function_for_atexit_processing(self):
""" Should register specified function for atexit processing. """
func = object()
expect_mock_output = """\
Called atexit.register(%(func)r)
""" % vars()
daemon.daemon.register_atexit_function(func)
self.failUnlessMockCheckerMatch(expect_mock_output)