blob: 3629f2968b3bd47573be84494eac718e480e3586 [file] [log] [blame]
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
"""
A filter is a callable that accepts an iterable of test objects and a
dictionary of values, and returns a new iterable of test objects. It is
possible to define custom filters if the built-in ones are not enough.
"""
from collections import defaultdict, MutableSequence
import itertools
import os
from .expression import (
parse,
ParseError,
)
# built-in filters
def skip_if(tests, values):
"""
Sets disabled on all tests containing the `skip-if` tag and whose condition
is True. This filter is added by default.
"""
tag = 'skip-if'
for test in tests:
if tag in test and parse(test[tag], **values):
test.setdefault('disabled', '{}: {}'.format(tag, test[tag]))
yield test
def run_if(tests, values):
"""
Sets disabled on all tests containing the `run-if` tag and whose condition
is False. This filter is added by default.
"""
tag = 'run-if'
for test in tests:
if tag in test and not parse(test[tag], **values):
test.setdefault('disabled', '{}: {}'.format(tag, test[tag]))
yield test
def fail_if(tests, values):
"""
Sets expected to 'fail' on all tests containing the `fail-if` tag and whose
condition is True. This filter is added by default.
"""
tag = 'fail-if'
for test in tests:
if tag in test and parse(test[tag], **values):
test['expected'] = 'fail'
yield test
def enabled(tests, values):
"""
Removes all tests containing the `disabled` key. This filter can be
added by passing `disabled=False` into `active_tests`.
"""
for test in tests:
if 'disabled' not in test:
yield test
def exists(tests, values):
"""
Removes all tests that do not exist on the file system. This filter is
added by default, but can be removed by passing `exists=False` into
`active_tests`.
"""
for test in tests:
if os.path.exists(test['path']):
yield test
# built-in instance filters
class InstanceFilter(object):
"""
Generally only one instance of a class filter should be applied at a time.
Two instances of `InstanceFilter` are considered equal if they have the
same class name. This ensures only a single instance is ever added to
`filterlist`. This class also formats filters' __str__ method for easier
debugging.
"""
unique = True
def __init__(self, *args, **kwargs):
self.fmt_args = ', '.join(itertools.chain(
[str(a) for a in args],
['{}={}'.format(k, v) for k, v in kwargs.iteritems()]))
def __eq__(self, other):
if self.unique:
return self.__class__ == other.__class__
return self.__hash__() == other.__hash__()
def __str__(self):
return "{}({})".format(self.__class__.__name__, self.fmt_args)
class subsuite(InstanceFilter):
"""
If `name` is None, removes all tests that have a `subsuite` key.
Otherwise removes all tests that do not have a subsuite matching `name`.
It is possible to specify conditional subsuite keys using:
subsuite = foo,condition
where 'foo' is the subsuite name, and 'condition' is the same type of
condition used for skip-if. If the condition doesn't evaluate to true,
the subsuite designation will be removed from the test.
:param name: The name of the subsuite to run (default None)
"""
def __init__(self, name=None):
InstanceFilter.__init__(self, name=name)
self.name = name
def __call__(self, tests, values):
# Look for conditional subsuites, and replace them with the subsuite
# itself (if the condition is true), or nothing.
for test in tests:
subsuite = test.get('subsuite', '')
if ',' in subsuite:
try:
subsuite, cond = subsuite.split(',')
except ValueError:
raise ParseError("subsuite condition can't contain commas")
matched = parse(cond, **values)
if matched:
test['subsuite'] = subsuite
else:
test['subsuite'] = ''
# Filter on current subsuite
if self.name is None:
if not test.get('subsuite'):
yield test
else:
if test.get('subsuite') == self.name:
yield test
class chunk_by_slice(InstanceFilter):
"""
Basic chunking algorithm that splits tests evenly across total chunks.
:param this_chunk: the current chunk, 1 <= this_chunk <= total_chunks
:param total_chunks: the total number of chunks
:param disabled: Whether to include disabled tests in the chunking
algorithm. If False, each chunk contains an equal number
of non-disabled tests. If True, each chunk contains an
equal number of tests (default False)
"""
def __init__(self, this_chunk, total_chunks, disabled=False):
assert 1 <= this_chunk <= total_chunks
InstanceFilter.__init__(self, this_chunk, total_chunks,
disabled=disabled)
self.this_chunk = this_chunk
self.total_chunks = total_chunks
self.disabled = disabled
def __call__(self, tests, values):
tests = list(tests)
if self.disabled:
chunk_tests = tests[:]
else:
chunk_tests = [t for t in tests if 'disabled' not in t]
tests_per_chunk = float(len(chunk_tests)) / self.total_chunks
start = int(round((self.this_chunk - 1) * tests_per_chunk))
end = int(round(self.this_chunk * tests_per_chunk))
if not self.disabled:
# map start and end back onto original list of tests. Disabled
# tests will still be included in the returned list, but each
# chunk will contain an equal number of enabled tests.
if self.this_chunk == 1:
start = 0
elif start < len(chunk_tests):
start = tests.index(chunk_tests[start])
if self.this_chunk == self.total_chunks:
end = len(tests)
elif end < len(chunk_tests):
end = tests.index(chunk_tests[end])
return (t for t in tests[start:end])
class chunk_by_dir(InstanceFilter):
"""
Basic chunking algorithm that splits directories of tests evenly at a
given depth.
For example, a depth of 2 means all test directories two path nodes away
from the base are gathered, then split evenly across the total number of
chunks. The number of tests in each of the directories is not taken into
account (so chunks will not contain an even number of tests). All test
paths must be relative to the same root (typically the root of the source
repository).
:param this_chunk: the current chunk, 1 <= this_chunk <= total_chunks
:param total_chunks: the total number of chunks
:param depth: the minimum depth of a subdirectory before it will be
considered unique
"""
def __init__(self, this_chunk, total_chunks, depth):
InstanceFilter.__init__(self, this_chunk, total_chunks, depth)
self.this_chunk = this_chunk
self.total_chunks = total_chunks
self.depth = depth
def __call__(self, tests, values):
tests_by_dir = defaultdict(list)
ordered_dirs = []
for test in tests:
path = test['relpath']
if path.startswith(os.sep):
path = path[1:]
dirs = path.split(os.sep)
dirs = dirs[:min(self.depth, len(dirs)-1)]
path = os.sep.join(dirs)
# don't count directories that only have disabled tests in them,
# but still yield disabled tests that are alongside enabled tests
if path not in ordered_dirs and 'disabled' not in test:
ordered_dirs.append(path)
tests_by_dir[path].append(test)
tests_per_chunk = float(len(ordered_dirs)) / self.total_chunks
start = int(round((self.this_chunk - 1) * tests_per_chunk))
end = int(round(self.this_chunk * tests_per_chunk))
for i in range(start, end):
for test in tests_by_dir.pop(ordered_dirs[i]):
yield test
# find directories that only contain disabled tests. They still need to
# be yielded for reporting purposes. Put them all in chunk 1 for
# simplicity.
if self.this_chunk == 1:
disabled_dirs = [v for k, v in tests_by_dir.iteritems()
if k not in ordered_dirs]
for disabled_test in itertools.chain(*disabled_dirs):
yield disabled_test
class chunk_by_runtime(InstanceFilter):
"""
Chunking algorithm that attempts to group tests into chunks based on their
average runtimes. It keeps manifests of tests together and pairs slow
running manifests with fast ones.
:param this_chunk: the current chunk, 1 <= this_chunk <= total_chunks
:param total_chunks: the total number of chunks
:param runtimes: dictionary of test runtime data, of the form
{<test path>: <average runtime>}
:param default_runtime: value in seconds to assign tests that don't exist
in the runtimes file
"""
def __init__(self, this_chunk, total_chunks, runtimes, default_runtime=0):
InstanceFilter.__init__(self, this_chunk, total_chunks, runtimes,
default_runtime=default_runtime)
self.this_chunk = this_chunk
self.total_chunks = total_chunks
# defaultdict(lambda:<int>) assigns all non-existent keys the value of
# <int>. This means all tests we encounter that don't exist in the
# runtimes file will be assigned `default_runtime`.
self.runtimes = defaultdict(lambda: default_runtime)
self.runtimes.update(runtimes)
def __call__(self, tests, values):
tests = list(tests)
manifests = set(t['manifest'] for t in tests)
def total_runtime(tests):
return sum(self.runtimes[t['relpath']] for t in tests
if 'disabled' not in t)
tests_by_manifest = []
for manifest in manifests:
mtests = [t for t in tests if t['manifest'] == manifest]
tests_by_manifest.append((total_runtime(mtests), mtests))
tests_by_manifest.sort(reverse=True)
tests_by_chunk = [[0, []] for i in range(self.total_chunks)]
for runtime, batch in tests_by_manifest:
# sort first by runtime, then by number of tests in case of a tie.
# This guarantees the chunk with the fastest runtime will always
# get the next batch of tests.
tests_by_chunk.sort(key=lambda x: (x[0], len(x[1])))
tests_by_chunk[0][0] += runtime
tests_by_chunk[0][1].extend(batch)
return (t for t in tests_by_chunk[self.this_chunk-1][1])
class tags(InstanceFilter):
"""
Removes tests that don't contain any of the given tags. This overrides
InstanceFilter's __eq__ method, so multiple instances can be added.
Multiple tag filters is equivalent to joining tags with the AND operator.
To define a tag in a manifest, add a `tags` attribute to a test or DEFAULT
section. Tests can have multiple tags, in which case they should be
whitespace delimited. For example:
[test_foobar.html]
tags = foo bar
:param tags: A tag or list of tags to filter tests on
"""
unique = False
def __init__(self, tags):
InstanceFilter.__init__(self, tags)
if isinstance(tags, basestring):
tags = [tags]
self.tags = tags
def __call__(self, tests, values):
for test in tests:
if 'tags' not in test:
continue
test_tags = [t.strip() for t in test['tags'].split()]
if any(t in self.tags for t in test_tags):
yield test
class pathprefix(InstanceFilter):
"""
Removes tests that don't start with any of the given test paths.
:param paths: A list of test paths to filter on
"""
def __init__(self, paths):
InstanceFilter.__init__(self, paths)
if isinstance(paths, basestring):
paths = [paths]
self.paths = paths
def __call__(self, tests, values):
for test in tests:
for tp in self.paths:
tp = os.path.normpath(tp)
if not os.path.normpath(test['relpath']).startswith(tp):
continue
# any test path that points to a single file will be run no
# matter what, even if it's disabled
if 'disabled' in test and os.path.normpath(test['relpath']) == tp:
del test['disabled']
yield test
break
# filter container
DEFAULT_FILTERS = (
skip_if,
run_if,
fail_if,
)
"""
By default :func:`~.active_tests` will run the :func:`~.skip_if`,
:func:`~.run_if` and :func:`~.fail_if` filters.
"""
class filterlist(MutableSequence):
"""
A MutableSequence that raises TypeError when adding a non-callable and
ValueError if the item is already added.
"""
def __init__(self, items=None):
self.items = []
if items:
self.items = list(items)
def _validate(self, item):
if not callable(item):
raise TypeError("Filters must be callable!")
if item in self:
raise ValueError("Filter {} is already applied!".format(item))
def __getitem__(self, key):
return self.items[key]
def __setitem__(self, key, value):
self._validate(value)
self.items[key] = value
def __delitem__(self, key):
del self.items[key]
def __len__(self):
return len(self.items)
def insert(self, index, value):
self._validate(value)
self.items.insert(index, value)