blob: e7ddf90947cf5f10f21f200bb42c7acb7ac34bf9 [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for subprocess2.py."""
import logging
import optparse
import os
import sys
import time
import unittest
try:
import fcntl # pylint: disable=F0401
except ImportError:
fcntl = None
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import subprocess
import subprocess2
from testing_support import auto_stub
# Method could be a function
# pylint: disable=R0201
# Create aliases for subprocess2 specific tests. They shouldn't be used for
# regression tests.
TIMED_OUT = subprocess2.TIMED_OUT
VOID = subprocess2.VOID
PIPE = subprocess2.PIPE
STDOUT = subprocess2.STDOUT
def convert_to_crlf(string):
"""Unconditionally convert LF to CRLF."""
return string.replace('\n', '\r\n')
def convert_to_cr(string):
"""Unconditionally convert LF to CR."""
return string.replace('\n', '\r')
def convert_win(string):
"""Converts string to CRLF on Windows only."""
if sys.platform == 'win32':
return string.replace('\n', '\r\n')
return string
class DefaultsTest(auto_stub.TestCase):
# TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they
# can be trapped in the child process for better coverage.
def _fake_communicate(self):
"""Mocks subprocess2.communicate()."""
results = {}
def fake_communicate(args, **kwargs):
assert not results
results.update(kwargs)
results['args'] = args
return ('stdout', 'stderr'), 0
self.mock(subprocess2, 'communicate', fake_communicate)
return results
def _fake_Popen(self):
"""Mocks the whole subprocess2.Popen class."""
results = {}
class fake_Popen(object):
returncode = -8
def __init__(self, args, **kwargs):
assert not results
results.update(kwargs)
results['args'] = args
@staticmethod
# pylint: disable=W0622
def communicate(input=None, timeout=None, nag_max=None, nag_timer=None):
return None, None
self.mock(subprocess2, 'Popen', fake_Popen)
return results
def _fake_subprocess_Popen(self):
"""Mocks the base class subprocess.Popen only."""
results = {}
def __init__(self, args, **kwargs):
assert not results
results.update(kwargs)
results['args'] = args
def communicate():
return None, None
self.mock(subprocess.Popen, '__init__', __init__)
self.mock(subprocess.Popen, 'communicate', communicate)
return results
def test_check_call_defaults(self):
results = self._fake_communicate()
self.assertEquals(
('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True))
expected = {
'args': ['foo'],
'a':True,
}
self.assertEquals(expected, results)
def test_capture_defaults(self):
results = self._fake_communicate()
self.assertEquals(
'stdout', subprocess2.capture(['foo'], a=True))
expected = {
'args': ['foo'],
'a':True,
'stdin': subprocess2.VOID,
'stdout': subprocess2.PIPE,
}
self.assertEquals(expected, results)
def test_communicate_defaults(self):
results = self._fake_Popen()
self.assertEquals(
((None, None), -8), subprocess2.communicate(['foo'], a=True))
expected = {
'args': ['foo'],
'a': True,
}
self.assertEquals(expected, results)
def test_Popen_defaults(self):
results = self._fake_subprocess_Popen()
proc = subprocess2.Popen(['foo'], a=True)
# Cleanup code in subprocess.py needs this member to be set.
# pylint: disable=W0201
proc._child_created = None
expected = {
'args': ['foo'],
'a': True,
'shell': bool(sys.platform=='win32'),
}
if sys.platform != 'win32':
env = os.environ.copy()
is_english = lambda name: env.get(name, 'en').startswith('en')
if not is_english('LANG'):
env['LANG'] = 'en_US.UTF-8'
expected['env'] = env
if not is_english('LANGUAGE'):
env['LANGUAGE'] = 'en_US.UTF-8'
expected['env'] = env
self.assertEquals(expected, results)
self.assertTrue(time.time() >= proc.start)
def test_check_output_defaults(self):
results = self._fake_communicate()
# It's discarding 'stderr' because it assumes stderr=subprocess2.STDOUT but
# fake_communicate() doesn't 'implement' that.
self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True))
expected = {
'args': ['foo'],
'a':True,
'stdin': subprocess2.VOID,
'stdout': subprocess2.PIPE,
}
self.assertEquals(expected, results)
class BaseTestCase(unittest.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self.exe_path = __file__
self.exe = [sys.executable, self.exe_path, '--child']
self.states = {}
if fcntl:
for v in (sys.stdin, sys.stdout, sys.stderr):
fileno = v.fileno()
self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL)
def tearDown(self):
for fileno, fl in self.states.iteritems():
self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL))
super(BaseTestCase, self).tearDown()
def _check_res(self, res, stdout, stderr, returncode):
(out, err), code = res
self.assertEquals(stdout, out)
self.assertEquals(stderr, err)
self.assertEquals(returncode, code)
class RegressionTest(BaseTestCase):
# Regression tests to ensure that subprocess and subprocess2 have the same
# behavior.
def _run_test(self, function):
"""Runs tests in 12 combinations:
- LF output with universal_newlines=False
- CR output with universal_newlines=False
- CRLF output with universal_newlines=False
- LF output with universal_newlines=True
- CR output with universal_newlines=True
- CRLF output with universal_newlines=True
Once with subprocess, once with subprocess2.
First |function| argument is the conversion for the original expected LF
string to the right EOL.
Second |function| argument is the executable and initial flag to run, to
control what EOL is used by the child process.
Third |function| argument is universal_newlines value.
"""
noop = lambda x: x
for subp in (subprocess, subprocess2):
function(noop, self.exe, False, subp)
function(convert_to_cr, self.exe + ['--cr'], False, subp)
function(convert_to_crlf, self.exe + ['--crlf'], False, subp)
function(noop, self.exe, True, subp)
function(noop, self.exe + ['--cr'], True, subp)
function(noop, self.exe + ['--crlf'], True, subp)
def _check_exception(self, subp, e, stdout, stderr, returncode):
"""On exception, look if the exception members are set correctly."""
self.assertEquals(returncode, e.returncode)
if subp is subprocess:
# subprocess never save the output.
self.assertFalse(hasattr(e, 'stdout'))
self.assertFalse(hasattr(e, 'stderr'))
elif subp is subprocess2:
self.assertEquals(stdout, e.stdout)
self.assertEquals(stderr, e.stderr)
else:
self.fail()
def test_check_output_no_stdout(self):
try:
subprocess2.check_output(self.exe, stdout=subprocess2.PIPE)
self.fail()
except ValueError:
pass
if (sys.version_info[0] * 10 + sys.version_info[1]) >= 27:
# python 2.7+
try:
# pylint: disable=E1101
subprocess.check_output(self.exe, stdout=subprocess.PIPE)
self.fail()
except ValueError:
pass
def test_check_output_throw_stdout(self):
def fn(c, e, un, subp):
if not hasattr(subp, 'check_output'):
return
try:
subp.check_output(
e + ['--fail', '--stdout'], universal_newlines=un)
self.fail()
except subp.CalledProcessError, exception:
self._check_exception(subp, exception, c('A\nBB\nCCC\n'), None, 64)
self._run_test(fn)
def test_check_output_throw_no_stderr(self):
def fn(c, e, un, subp):
if not hasattr(subp, 'check_output'):
return
try:
subp.check_output(
e + ['--fail', '--stderr'], universal_newlines=un)
self.fail()
except subp.CalledProcessError, exception:
self._check_exception(subp, exception, c(''), None, 64)
self._run_test(fn)
def test_check_output_throw_stderr(self):
def fn(c, e, un, subp):
if not hasattr(subp, 'check_output'):
return
try:
subp.check_output(
e + ['--fail', '--stderr'],
stderr=subp.PIPE,
universal_newlines=un)
self.fail()
except subp.CalledProcessError, exception:
self._check_exception(subp, exception, '', c('a\nbb\nccc\n'), 64)
self._run_test(fn)
def test_check_output_throw_stderr_stdout(self):
def fn(c, e, un, subp):
if not hasattr(subp, 'check_output'):
return
try:
subp.check_output(
e + ['--fail', '--stderr'],
stderr=subp.STDOUT,
universal_newlines=un)
self.fail()
except subp.CalledProcessError, exception:
self._check_exception(subp, exception, c('a\nbb\nccc\n'), None, 64)
self._run_test(fn)
def test_check_call_throw(self):
for subp in (subprocess, subprocess2):
try:
subp.check_call(self.exe + ['--fail', '--stderr'])
self.fail()
except subp.CalledProcessError, exception:
self._check_exception(subp, exception, None, None, 64)
def test_redirect_stderr_to_stdout_pipe(self):
def fn(c, e, un, subp):
# stderr output into stdout.
proc = subp.Popen(
e + ['--stderr'],
stdout=subp.PIPE,
stderr=subp.STDOUT,
universal_newlines=un)
res = proc.communicate(), proc.returncode
self._check_res(res, c('a\nbb\nccc\n'), None, 0)
self._run_test(fn)
def test_redirect_stderr_to_stdout(self):
def fn(c, e, un, subp):
# stderr output into stdout but stdout is not piped.
proc = subp.Popen(
e + ['--stderr'], stderr=STDOUT, universal_newlines=un)
res = proc.communicate(), proc.returncode
self._check_res(res, None, None, 0)
self._run_test(fn)
def test_stderr(self):
cmd = ['expr', '1', '/', '0']
if sys.platform == 'win32':
cmd = ['cmd.exe', '/c', 'exit', '1']
p1 = subprocess.Popen(cmd, stderr=subprocess.PIPE, shell=False)
p2 = subprocess2.Popen(cmd, stderr=subprocess.PIPE, shell=False)
r1 = p1.communicate()
r2 = p2.communicate(timeout=100)
self.assertEquals(r1, r2)
class S2Test(BaseTestCase):
# Tests that can only run in subprocess2, e.g. new functionalities.
# In particular, subprocess2.communicate() doesn't exist in subprocess.
def _run_test(self, function):
"""Runs tests in 6 combinations:
- LF output with universal_newlines=False
- CR output with universal_newlines=False
- CRLF output with universal_newlines=False
- LF output with universal_newlines=True
- CR output with universal_newlines=True
- CRLF output with universal_newlines=True
First |function| argument is the convertion for the origianl expected LF
string to the right EOL.
Second |function| argument is the executable and initial flag to run, to
control what EOL is used by the child process.
Third |function| argument is universal_newlines value.
"""
noop = lambda x: x
function(noop, self.exe, False)
function(convert_to_cr, self.exe + ['--cr'], False)
function(convert_to_crlf, self.exe + ['--crlf'], False)
function(noop, self.exe, True)
function(noop, self.exe + ['--cr'], True)
function(noop, self.exe + ['--crlf'], True)
def _check_exception(self, e, stdout, stderr, returncode):
"""On exception, look if the exception members are set correctly."""
self.assertEquals(returncode, e.returncode)
self.assertEquals(stdout, e.stdout)
self.assertEquals(stderr, e.stderr)
def test_timeout(self):
# timeout doesn't exist in subprocess.
def fn(c, e, un):
res = subprocess2.communicate(
self.exe + ['--sleep_first', '--stdout'],
timeout=0.01,
stdout=PIPE,
shell=False)
self._check_res(res, '', None, TIMED_OUT)
self._run_test(fn)
def test_timeout_shell_throws(self):
def fn(c, e, un):
try:
# With shell=True, it needs a string.
subprocess2.communicate(' '.join(self.exe), timeout=0.01, shell=True)
self.fail()
except TypeError:
pass
self._run_test(fn)
def test_stdin(self):
def fn(c, e, un):
stdin = '0123456789'
res = subprocess2.communicate(
e + ['--read'],
stdin=stdin,
universal_newlines=un)
self._check_res(res, None, None, 10)
self._run_test(fn)
def test_stdin_unicode(self):
def fn(c, e, un):
stdin = u'0123456789'
res = subprocess2.communicate(
e + ['--read'],
stdin=stdin,
universal_newlines=un)
self._check_res(res, None, None, 10)
self._run_test(fn)
def test_stdin_empty(self):
def fn(c, e, un):
stdin = ''
res = subprocess2.communicate(
e + ['--read'],
stdin=stdin,
universal_newlines=un)
self._check_res(res, None, None, 0)
self._run_test(fn)
def test_stdin_void(self):
res = subprocess2.communicate(self.exe + ['--read'], stdin=VOID)
self._check_res(res, None, None, 0)
def test_stdin_void_stdout_timeout(self):
# Make sure a mix of VOID, PIPE and timeout works.
def fn(c, e, un):
res = subprocess2.communicate(
e + ['--stdout', '--read'],
stdin=VOID,
stdout=PIPE,
timeout=10,
universal_newlines=un,
shell=False)
self._check_res(res, c('A\nBB\nCCC\n'), None, 0)
self._run_test(fn)
def test_stdout_void(self):
def fn(c, e, un):
res = subprocess2.communicate(
e + ['--stdout', '--stderr'],
stdout=VOID,
stderr=PIPE,
universal_newlines=un)
self._check_res(res, None, c('a\nbb\nccc\n'), 0)
self._run_test(fn)
def test_stderr_void(self):
def fn(c, e, un):
res = subprocess2.communicate(
e + ['--stdout', '--stderr'],
stdout=PIPE,
stderr=VOID,
universal_newlines=un)
self._check_res(res, c('A\nBB\nCCC\n'), None, 0)
self._run_test(fn)
def test_stdout_void_stderr_redirect(self):
def fn(c, e, un):
res = subprocess2.communicate(
e + ['--stdout', '--stderr'],
stdout=VOID,
stderr=STDOUT,
universal_newlines=un)
self._check_res(res, None, None, 0)
self._run_test(fn)
def test_tee_stderr(self):
def fn(c, e, un):
stderr = []
res = subprocess2.communicate(
e + ['--stderr'], stderr=stderr.append, universal_newlines=un)
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._check_res(res, None, None, 0)
self._run_test(fn)
def test_tee_stdout_stderr(self):
def fn(c, e, un):
stdout = []
stderr = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr'],
stdout=stdout.append,
stderr=stderr.append,
universal_newlines=un)
self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._check_res(res, None, None, 0)
self._run_test(fn)
def test_tee_stdin(self):
def fn(c, e, un):
# Mix of stdin input and stdout callback.
stdout = []
stdin = '0123456789'
res = subprocess2.communicate(
e + ['--stdout', '--read'],
stdin=stdin,
stdout=stdout.append,
universal_newlines=un)
self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
self._check_res(res, None, None, 10)
self._run_test(fn)
def test_tee_throw(self):
def fn(c, e, un):
# Make sure failure still returns stderr completely.
stderr = []
try:
subprocess2.check_output(
e + ['--stderr', '--fail'],
stderr=stderr.append,
universal_newlines=un)
self.fail()
except subprocess2.CalledProcessError, exception:
self._check_exception(exception, '', None, 64)
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._run_test(fn)
def test_tee_timeout_stdout_void(self):
def fn(c, e, un):
stderr = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr', '--fail'],
stdout=VOID,
stderr=stderr.append,
shell=False,
timeout=10,
universal_newlines=un)
self._check_res(res, None, None, 64)
self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr))
self._run_test(fn)
def test_tee_timeout_stderr_void(self):
def fn(c, e, un):
stdout = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr', '--fail'],
stdout=stdout.append,
stderr=VOID,
shell=False,
timeout=10,
universal_newlines=un)
self._check_res(res, None, None, 64)
self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
self._run_test(fn)
def test_tee_timeout_stderr_stdout(self):
def fn(c, e, un):
stdout = []
res = subprocess2.communicate(
e + ['--stdout', '--stderr', '--fail'],
stdout=stdout.append,
stderr=STDOUT,
shell=False,
timeout=10,
universal_newlines=un)
self._check_res(res, None, None, 64)
# Ordering is random due to buffering.
self.assertEquals(
set(c('a\nbb\nccc\nA\nBB\nCCC\n').splitlines(True)),
set(''.join(stdout).splitlines(True)))
self._run_test(fn)
def test_tee_large(self):
stdout = []
# Read 128kb. On my workstation it takes >2s. Welcome to 2011.
res = subprocess2.communicate(self.exe + ['--large'], stdout=stdout.append)
self.assertEquals(128*1024, len(''.join(stdout)))
self._check_res(res, None, None, 0)
def test_tee_large_stdin(self):
stdout = []
# Write 128kb.
stdin = '0123456789abcdef' * (8*1024)
res = subprocess2.communicate(
self.exe + ['--large', '--read'], stdin=stdin, stdout=stdout.append)
self.assertEquals(128*1024, len(''.join(stdout)))
# Windows return code is > 8 bits.
returncode = len(stdin) if sys.platform == 'win32' else 0
self._check_res(res, None, None, returncode)
def test_tee_cb_throw(self):
# Having a callback throwing up should not cause side-effects. It's a bit
# hard to measure.
class Blow(Exception):
pass
def blow(_):
raise Blow()
proc = subprocess2.Popen(self.exe + ['--stdout'], stdout=blow)
try:
proc.communicate()
self.fail()
except Blow:
self.assertNotEquals(0, proc.returncode)
def test_nag_timer(self):
w = []
l = logging.getLogger()
class _Filter(logging.Filter):
def filter(self, record):
if record.levelno == logging.WARNING:
w.append(record.getMessage().lstrip())
return 0
f = _Filter()
l.addFilter(f)
proc = subprocess2.Popen(
self.exe + ['--stdout', '--sleep_first'], stdout=PIPE)
res = proc.communicate(nag_timer=3), proc.returncode
l.removeFilter(f)
self._check_res(res, 'A\nBB\nCCC\n', None, 0)
expected = ['No output for 3 seconds from command:', proc.cmd_str,
'No output for 6 seconds from command:', proc.cmd_str,
'No output for 9 seconds from command:', proc.cmd_str]
self.assertEquals(w, expected)
def child_main(args):
if sys.platform == 'win32':
# Annoying, make sure the output is not translated on Windows.
# pylint: disable=E1101,F0401
import msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY)
parser = optparse.OptionParser()
parser.add_option(
'--fail',
dest='return_value',
action='store_const',
default=0,
const=64)
parser.add_option(
'--crlf', action='store_const', const='\r\n', dest='eol', default='\n')
parser.add_option(
'--cr', action='store_const', const='\r', dest='eol')
parser.add_option('--stdout', action='store_true')
parser.add_option('--stderr', action='store_true')
parser.add_option('--sleep_first', action='store_true')
parser.add_option('--sleep_last', action='store_true')
parser.add_option('--large', action='store_true')
parser.add_option('--read', action='store_true')
options, args = parser.parse_args(args)
if args:
parser.error('Internal error')
if options.sleep_first:
time.sleep(10)
def do(string):
if options.stdout:
sys.stdout.write(string.upper())
sys.stdout.write(options.eol)
if options.stderr:
sys.stderr.write(string.lower())
sys.stderr.write(options.eol)
do('A')
do('BB')
do('CCC')
if options.large:
# Print 128kb.
string = '0123456789abcdef' * (8*1024)
sys.stdout.write(string)
if options.read:
assert options.return_value is 0
try:
while sys.stdin.read(1):
options.return_value += 1
except OSError:
pass
if options.sleep_last:
time.sleep(10)
return options.return_value
if __name__ == '__main__':
logging.basicConfig(level=
[logging.WARNING, logging.INFO, logging.DEBUG][
min(2, sys.argv.count('-v'))])
if len(sys.argv) > 1 and sys.argv[1] == '--child':
sys.exit(child_main(sys.argv[2:]))
unittest.main()