Skipping shell invocations is good, when possible. This wrapper around subprocess does dirty work of
parsing command lines into argv and making sure that no shell magic is being used.
#TODO: ship pyprocessing?
import multiprocessing
import subprocess, shlex, re, logging, sys, traceback, os, imp, glob
import site
from collections import deque
# XXXkhuey Work around
subprocess._cleanup = lambda: None
import command, util
if sys.platform=='win32':
import win32process
_log = logging.getLogger('pymake.process')
_escapednewlines = re.compile(r'\\\n')
def tokens2re(tokens):
# Create a pattern for non-escaped tokens, in the form:
# (?<!\\)(?:a|b|c...)
# This is meant to match patterns a, b, or c, or ... if they are not
# preceded by a backslash.
# where a, b, c... are in the form
# (?P<name>pattern)
# which matches the pattern and captures it in a named match group.
# The group names and patterns come are given as a dict in the function
# argument.
nonescaped = r'(?<!\\)(?:%s)' % '|'.join('(?P<%s>%s)' % (name, value) for name, value in tokens.iteritems())
# The final pattern matches either the above pattern, or an escaped
# backslash, captured in the "escape" match group.
return re.compile('(?:%s|%s)' % (nonescaped, r'(?P<escape>\\\\)'))
_unquoted_tokens = tokens2re({
'whitespace': r'[\t\r\n ]+',
'quote': r'[\'"]',
'comment': '#',
'special': r'[<>&|`~(){}$;]',
'backslashed': r'\\[^\\]',
'glob': r'[\*\?]',
_doubly_quoted_tokens = tokens2re({
'quote': '"',
'backslashedquote': r'\\"',
'special': '\$',
'backslashed': r'\\[^\\"]',
class MetaCharacterException(Exception):
def __init__(self, char):
self.char = char
class ClineSplitter(list):
Parses a given command line string and creates a list of command
and arguments, with wildcard expansion.
def __init__(self, cline, cwd):
self.cwd = cwd
self.arg = None
self.cline = cline
self.glob = False
def _push(self, str):
Push the given string as part of the current argument
if self.arg is None:
self.arg = ''
self.arg += str
def _next(self):
Finalize current argument, effectively adding it to the list.
Perform globbing if needed.
if self.arg is None:
if self.glob:
if os.path.isabs(self.arg):
path = self.arg
path = os.path.join(self.cwd, self.arg)
globbed = glob.glob(path)
if not globbed:
# If globbing doesn't find anything, the literal string is
# used.
self.extend(f[len(path)-len(self.arg):] for f in globbed)
self.glob = False
self.arg = None
def _parse_unquoted(self):
Parse command line remainder in the context of an unquoted string.
while self.cline:
# Find the next token
m =
# If we find none, the remainder of the string can be pushed to
# the current argument and the argument finalized
if not m:
# The beginning of the string, up to the found token, is part of
# the current argument
if m.start():
self.cline = self.cline[m.end():]
match = dict([(name, value) for name, value in m.groupdict().items() if value])
if 'quote' in match:
# " or ' start a quoted string
if match['quote'] == '"':
elif 'comment' in match:
# Comments are ignored. The current argument can be finalized,
# and parsing stopped.
elif 'special' in match:
# Unquoted, non-escaped special characters need to be sent to a
# shell.
raise MetaCharacterException, match['special']
elif 'whitespace' in match:
# Whitespaces terminate current argument.
elif 'escape' in match:
# Escaped backslashes turn into a single backslash
elif 'backslashed' in match:
# Backslashed characters are unbackslashed
# e.g. echo \a -> a
elif 'glob' in match:
# ? or * will need globbing
self.glob = True
raise Exception, "Shouldn't reach here"
if self.arg:
def _parse_quoted(self):
# Single quoted strings are preserved, except for the final quote
index = self.cline.find("'")
if index == -1:
raise Exception, 'Unterminated quoted string in command'
self.cline = self.cline[index+1:]
def _parse_doubly_quoted(self):
if not self.cline:
raise Exception, 'Unterminated quoted string in command'
while self.cline:
m =
if not m:
raise Exception, 'Unterminated quoted string in command'
self.cline = self.cline[m.end():]
match = dict([(name, value) for name, value in m.groupdict().items() if value])
if 'quote' in match:
# a double quote ends the quoted string, so go back to
# unquoted parsing
elif 'special' in match:
# Unquoted, non-escaped special characters in a doubly quoted
# string still have a special meaning and need to be sent to a
# shell.
raise MetaCharacterException, match['special']
elif 'escape' in match:
# Escaped backslashes turn into a single backslash
elif 'backslashedquote' in match:
# Backslashed double quotes are un-backslashed
elif 'backslashed' in match:
# Backslashed characters are kept backslashed
def clinetoargv(cline, cwd):
If this command line can safely skip the shell, return an argv array.
@returns argv, badchar
str = _escapednewlines.sub('', cline)
args = ClineSplitter(str, cwd)
except MetaCharacterException, e:
return None, e.char
if len(args) and args[0].find('=') != -1:
return None, '='
return args, None
# shellwords contains a set of shell builtin commands that need to be
# executed within a shell. It also contains a set of commands that are known
# to be giving problems when run directly instead of through the msys shell.
shellwords = (':', '.', 'break', 'cd', 'continue', 'exec', 'exit', 'export',
'getopts', 'hash', 'pwd', 'readonly', 'return', 'shift',
'test', 'times', 'trap', 'umask', 'unset', 'alias',
'set', 'bind', 'builtin', 'caller', 'command', 'declare',
'echo', 'enable', 'help', 'let', 'local', 'logout',
'printf', 'read', 'shopt', 'source', 'type', 'typeset',
'ulimit', 'unalias', 'set', 'find')
def prepare_command(cline, cwd, loc):
Returns a list of command and arguments for the given command line string.
If the command needs to be run through a shell for some reason, the
returned list contains the shell invocation.
#TODO: call this once up-front somewhere and save the result?
shell, msys = util.checkmsyscompat()
shellreason = None
executable = None
if msys and cline.startswith('/'):
shellreason = "command starts with /"
argv, badchar = clinetoargv(cline, cwd)
if argv is None:
shellreason = "command contains shell-special character '%s'" % (badchar,)
elif len(argv) and argv[0] in shellwords:
shellreason = "command starts with shell primitive '%s'" % (argv[0],)
elif argv and (os.sep in argv[0] or os.altsep and os.altsep in argv[0]):
executable = util.normaljoin(cwd, argv[0])
# Avoid "%1 is not a valid Win32 application" errors, assuming
# that if the executable path is to be resolved with PATH, it will
# be a Win32 executable.
if sys.platform == 'win32' and os.path.isfile(executable) and open(executable, 'rb').read(2) == "#!":
shellreason = "command executable starts with a hashbang"
if shellreason is not None:
_log.debug("%s: using shell: %s: '%s'", loc, shellreason, cline)
if msys:
if len(cline) > 3 and cline[1] == ':' and cline[2] == '/':
cline = '/' + cline[0] + cline[2:]
argv = [shell, "-c", cline]
executable = None
return executable, argv
def call(cline, env, cwd, loc, cb, context, echo, justprint=False):
executable, argv = prepare_command(cline, cwd, loc)
if not len(argv):
if argv[0] == command.makepypath:
command.main(argv[1:], env, cwd, cb)
if argv[0:2] == [sys.executable.replace('\\', '/'),
command.makepypath.replace('\\', '/')]:
command.main(argv[2:], env, cwd, cb)
return, executable=executable, shell=False, env=env, cwd=cwd, cb=cb,
echo=echo, justprint=justprint)
def call_native(module, method, argv, env, cwd, loc, cb, context, echo, justprint=False,
context.call_native(module, method, argv, env=env, cwd=cwd, cb=cb,
echo=echo, justprint=justprint, pycommandpath=pycommandpath)
def statustoresult(status):
Convert the status returned from waitpid into a prettier numeric result.
sig = status & 0xFF
if sig:
return -sig
return status >>8
class Job(object):
A single job to be executed on the process pool.
done = False # set to true when the job completes
def __init__(self):
self.exitcode = -127
def notify(self, condition, result):
self.done = True
self.exitcode = result
def get_callback(self, condition):
return lambda result: self.notify(condition, result)
class PopenJob(Job):
A job that executes a command using subprocess.Popen.
def __init__(self, argv, executable, shell, env, cwd):
self.argv = argv
self.executable = executable = shell
self.env = env
self.cwd = cwd
self.parentpid = os.getpid()
def run(self):
assert os.getpid() != self.parentpid
# subprocess.Popen doesn't use the PATH set in the env argument for
# finding the executable on some platforms (but strangely it does on
# others!), so set os.environ['PATH'] explicitly. This is parallel-
# safe because pymake uses separate processes for parallelism, and
# each process is serial. See for a
# general overview of "subprocess PATH semantics and portability".
oldpath = os.environ['PATH']
if self.env is not None and self.env.has_key('PATH'):
os.environ['PATH'] = self.env['PATH']
p = subprocess.Popen(self.argv, executable=self.executable,, env=self.env, cwd=self.cwd)
return p.wait()
except OSError, e:
print >>sys.stderr, e
return -127
os.environ['PATH'] = oldpath
class PythonException(Exception):
def __init__(self, message, exitcode):
self.message = message
self.exitcode = exitcode
def __str__(self):
return self.message
class PythonJob(Job):
A job that calls a Python method.
def __init__(self, module, method, argv, env, cwd, pycommandpath=None):
self.module = module
self.method = method
self.argv = argv
self.env = env
self.cwd = cwd
self.pycommandpath = pycommandpath or []
self.parentpid = os.getpid()
def run(self):
assert os.getpid() != self.parentpid
# os.environ is a magic dictionary. Setting it to something else
# doesn't affect the environment of subprocesses, so use clear/update
oldenv = dict(os.environ)
# sys.path is adjusted for the entire lifetime of the command
# execution. This ensures any delayed imports will still work.
oldsyspath = list(sys.path)
sys.path = []
for p in sys.path + self.pycommandpath:
if self.module not in sys.modules:
except Exception as e:
print >>sys.stderr, 'Error importing %s: %s' % (
self.module, e)
return -127
m = sys.modules[self.module]
if self.method not in m.__dict__:
print >>sys.stderr, "No method named '%s' in module %s" % (self.method, self.module)
return -127
rv = m.__dict__[self.method](self.argv)
if rv != 0 and rv is not None:
print >>sys.stderr, (
"Native command '%s %s' returned value '%s'" %
(self.module, self.method, rv))
return (rv if isinstance(rv, int) else 1)
except PythonException, e:
print >>sys.stderr, e
return e.exitcode
e = sys.exc_info()[1]
if isinstance(e, SystemExit) and (e.code == 0 or e.code is None):
pass # sys.exit(0) is not a failure
print >>sys.stderr, e
return -127
sys.path = oldsyspath
# multiprocessing exits via os._exit, make sure that all output
# from command gets written out before that happens.
return 0
def job_runner(job):
Run a job. Called in a Process pool.
class ParallelContext(object):
Manages the parallel execution of processes.
_allcontexts = set()
_condition = multiprocessing.Condition()
def __init__(self, jcount):
self.jcount = jcount
self.exit = False
self.processpool = multiprocessing.Pool(processes=jcount)
self.pending = deque() # deque of (cb, args, kwargs)
self.running = [] # list of (subprocess, cb)
def finish(self):
assert len(self.pending) == 0 and len(self.running) == 0, "pending: %i running: %i" % (len(self.pending), len(self.running))
def run(self):
while len(self.pending) and len(self.running) < self.jcount:
cb, args, kwargs = self.pending.popleft()
cb(*args, **kwargs)
def defer(self, cb, *args, **kwargs):
assert self.jcount > 1 or not len(self.pending), "Serial execution error defering %r %r %r: currently pending %r" % (cb, args, kwargs, self.pending)
self.pending.append((cb, args, kwargs))
def _docall_generic(self, pool, job, cb, echo, justprint):
if echo is not None:
print echo
processcb = job.get_callback(ParallelContext._condition)
if justprint:
pool.apply_async(job_runner, args=(job,), callback=processcb)
self.running.append((job, cb))
def call(self, argv, shell, env, cwd, cb, echo, justprint=False, executable=None):
Asynchronously call the process
job = PopenJob(argv, executable=executable, shell=shell, env=env, cwd=cwd)
self.defer(self._docall_generic, self.processpool, job, cb, echo, justprint)
def call_native(self, module, method, argv, env, cwd, cb,
echo, justprint=False, pycommandpath=None):
Asynchronously call the native function
job = PythonJob(module, method, argv, env, cwd, pycommandpath)
self.defer(self._docall_generic, self.processpool, job, cb, echo, justprint)
def _waitany(condition):
def _checkdone():
jobs = []
for c in ParallelContext._allcontexts:
for i in xrange(0, len(c.running)):
if c.running[i][0].done:
for j in jobs:
if j in c.running:
return jobs
# We must acquire the lock, and then check to see if any jobs have
# finished. If we don't check after acquiring the lock it's possible
# that all outstanding jobs will have completed before we wait and we'll
# wait for notifications that have already occurred.
jobs = _checkdone()
if jobs == []:
jobs = _checkdone()
return jobs
def spin():
Spin the 'event loop', and never return.
while True:
clist = list(ParallelContext._allcontexts)
for c in clist:
dowait = util.any((len(c.running) for c in ParallelContext._allcontexts))
if dowait:
# Wait on local jobs first for perf
for job, cb in ParallelContext._waitany(ParallelContext._condition):
assert any(len(c.pending) for c in ParallelContext._allcontexts)
def makedeferrable(usercb, **userkwargs):
def cb(*args, **kwargs):
return usercb(*args, **kwargs)
return cb
_serialContext = None
_parallelContext = None
def getcontext(jcount):
global _serialContext, _parallelContext
if jcount == 1:
if _serialContext is None:
_serialContext = ParallelContext(1)
return _serialContext
if _parallelContext is None:
_parallelContext = ParallelContext(jcount)
return _parallelContext