blob: 8ecf6a9932b3d537683261053fc2780909ebc07d [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for git_common.py"""
import binascii
import collections
import os
import signal
import sys
import tempfile
import time
import unittest
DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, DEPOT_TOOLS_ROOT)
from testing_support import coverage_utils
from testing_support import git_test_utils
class GitCommonTestBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
super(GitCommonTestBase, cls).setUpClass()
import git_common
cls.gc = git_common
class Support(GitCommonTestBase):
def _testMemoizeOneBody(self, threadsafe):
calls = collections.defaultdict(int)
def double_if_even(val):
calls[val] += 1
return val * 2 if val % 2 == 0 else None
# Use this explicitly as a wrapper fn instead of a decorator. Otherwise
# pylint crashes (!!)
double_if_even = self.gc.memoize_one(threadsafe=threadsafe)(double_if_even)
self.assertEqual(4, double_if_even(2))
self.assertEqual(4, double_if_even(2))
self.assertEqual(None, double_if_even(1))
self.assertEqual(None, double_if_even(1))
self.assertDictEqual({1: 2, 2: 1}, calls)
double_if_even.set(10, 20)
self.assertEqual(20, double_if_even(10))
self.assertDictEqual({1: 2, 2: 1}, calls)
double_if_even.clear()
self.assertEqual(4, double_if_even(2))
self.assertEqual(4, double_if_even(2))
self.assertEqual(None, double_if_even(1))
self.assertEqual(None, double_if_even(1))
self.assertEqual(20, double_if_even(10))
self.assertDictEqual({1: 4, 2: 2, 10: 1}, calls)
def testMemoizeOne(self):
self._testMemoizeOneBody(threadsafe=False)
def testMemoizeOneThreadsafe(self):
self._testMemoizeOneBody(threadsafe=True)
def slow_square(i):
"""Helper for ScopedPoolTest.
Must be global because non top-level functions aren't pickleable.
"""
return i ** 2
class ScopedPoolTest(GitCommonTestBase):
CTRL_C = signal.CTRL_C_EVENT if sys.platform == 'win32' else signal.SIGINT
def testThreads(self):
result = []
with self.gc.ScopedPool(kind='threads') as pool:
result = list(pool.imap(slow_square, xrange(10)))
self.assertEqual([0, 1, 4, 9, 16, 25, 36, 49, 64, 81], result)
def testThreadsCtrlC(self):
result = []
with self.assertRaises(KeyboardInterrupt):
with self.gc.ScopedPool(kind='threads') as pool:
# Make sure this pool is interrupted in mid-swing
for i in pool.imap(slow_square, xrange(20)):
if i > 32:
os.kill(os.getpid(), self.CTRL_C)
result.append(i)
self.assertEqual([0, 1, 4, 9, 16, 25], result)
def testProcs(self):
result = []
with self.gc.ScopedPool() as pool:
result = list(pool.imap(slow_square, xrange(10)))
self.assertEqual([0, 1, 4, 9, 16, 25, 36, 49, 64, 81], result)
def testProcsCtrlC(self):
result = []
with self.assertRaises(KeyboardInterrupt):
with self.gc.ScopedPool() as pool:
# Make sure this pool is interrupted in mid-swing
for i in pool.imap(slow_square, xrange(20)):
if i > 32:
os.kill(os.getpid(), self.CTRL_C)
result.append(i)
self.assertEqual([0, 1, 4, 9, 16, 25], result)
class ProgressPrinterTest(GitCommonTestBase):
class FakeStream(object):
def __init__(self):
self.data = set()
self.count = 0
def write(self, line):
self.data.add(line)
def flush(self):
self.count += 1
@unittest.expectedFailure
def testBasic(self):
"""This test is probably racy, but I don't have a better alternative."""
fmt = '%(count)d/10'
stream = self.FakeStream()
pp = self.gc.ProgressPrinter(fmt, enabled=True, stream=stream, period=0.01)
with pp as inc:
for _ in xrange(10):
time.sleep(0.02)
inc()
filtered = set(x.strip() for x in stream.data)
rslt = set(fmt % {'count': i} for i in xrange(11))
self.assertSetEqual(filtered, rslt)
self.assertGreaterEqual(stream.count, 10)
class GitReadOnlyFunctionsTest(git_test_utils.GitRepoReadOnlyTestBase,
GitCommonTestBase):
REPO = """
A B C D
B E D
"""
COMMIT_A = {
'some/files/file1': {'data': 'file1'},
'some/files/file2': {'data': 'file2'},
'some/files/file3': {'data': 'file3'},
'some/other/file': {'data': 'otherfile'},
}
COMMIT_C = {
'some/files/file2': {
'mode': 0755,
'data': 'file2 - vanilla'},
}
COMMIT_E = {
'some/files/file2': {'data': 'file2 - merged'},
}
COMMIT_D = {
'some/files/file2': {'data': 'file2 - vanilla\nfile2 - merged'},
}
def testHashes(self):
ret = self.repo.run(
self.gc.hash_multi, *[
'master',
'master~3',
self.repo['E']+'~',
self.repo['D']+'^2',
'tag_C^{}',
]
)
self.assertEqual([
self.repo['D'],
self.repo['A'],
self.repo['B'],
self.repo['E'],
self.repo['C'],
], ret)
self.assertEquals(
self.repo.run(self.gc.hash_one, 'branch_D'),
self.repo['D']
)
def testCurrentBranch(self):
self.repo.git('checkout', 'branch_D')
self.assertEqual(self.repo.run(self.gc.current_branch), 'branch_D')
def testBranches(self):
self.assertEqual(self.repo.run(set, self.gc.branches()),
set(('branch_D', 'root_A')))
def testTags(self):
self.assertEqual(set(self.repo.run(self.gc.tags)),
{'tag_'+l for l in 'ABCDE'})
def testParseCommitrefs(self):
ret = self.repo.run(
self.gc.parse_commitrefs, *[
'master',
'master~3',
self.repo['E']+'~',
self.repo['D']+'^2',
'tag_C^{}',
]
)
self.assertEqual(ret, map(binascii.unhexlify, [
self.repo['D'],
self.repo['A'],
self.repo['B'],
self.repo['E'],
self.repo['C'],
]))
with self.assertRaisesRegexp(Exception, r"one of \('master', 'bananas'\)"):
self.repo.run(self.gc.parse_commitrefs, 'master', 'bananas')
def testTree(self):
tree = self.repo.run(self.gc.tree, 'master:some/files')
file1 = self.COMMIT_A['some/files/file1']['data']
file2 = self.COMMIT_D['some/files/file2']['data']
file3 = self.COMMIT_A['some/files/file3']['data']
self.assertEquals(
tree['file1'],
('100644', 'blob', git_test_utils.git_hash_data(file1)))
self.assertEquals(
tree['file2'],
('100755', 'blob', git_test_utils.git_hash_data(file2)))
self.assertEquals(
tree['file3'],
('100644', 'blob', git_test_utils.git_hash_data(file3)))
tree = self.repo.run(self.gc.tree, 'master:some')
self.assertEquals(len(tree), 2)
# Don't check the tree hash because we're lazy :)
self.assertEquals(tree['files'][:2], ('040000', 'tree'))
tree = self.repo.run(self.gc.tree, 'master:wat')
self.assertEqual(tree, None)
def testTreeRecursive(self):
tree = self.repo.run(self.gc.tree, 'master:some', recurse=True)
file1 = self.COMMIT_A['some/files/file1']['data']
file2 = self.COMMIT_D['some/files/file2']['data']
file3 = self.COMMIT_A['some/files/file3']['data']
other = self.COMMIT_A['some/other/file']['data']
self.assertEquals(
tree['files/file1'],
('100644', 'blob', git_test_utils.git_hash_data(file1)))
self.assertEquals(
tree['files/file2'],
('100755', 'blob', git_test_utils.git_hash_data(file2)))
self.assertEquals(
tree['files/file3'],
('100644', 'blob', git_test_utils.git_hash_data(file3)))
self.assertEquals(
tree['other/file'],
('100644', 'blob', git_test_utils.git_hash_data(other)))
class GitMutableFunctionsTest(git_test_utils.GitRepoReadWriteTestBase,
GitCommonTestBase):
REPO = ''
def _intern_data(self, data):
with tempfile.TemporaryFile() as f:
f.write(data)
f.seek(0)
return self.repo.run(self.gc.intern_f, f)
def testInternF(self):
data = 'CoolBobcatsBro'
data_hash = self._intern_data(data)
self.assertEquals(git_test_utils.git_hash_data(data), data_hash)
self.assertEquals(data, self.repo.git('cat-file', 'blob', data_hash).stdout)
def testMkTree(self):
tree = {}
for i in 1, 2, 3:
name = 'file%d' % i
tree[name] = ('100644', 'blob', self._intern_data(name))
tree_hash = self.repo.run(self.gc.mktree, tree)
self.assertEquals('37b61866d6e061c4ba478e7eb525be7b5752737d', tree_hash)
def testConfig(self):
self.repo.git('config', '--add', 'happy.derpies', 'food')
self.assertEquals(self.repo.run(self.gc.config_list, 'happy.derpies'),
['food'])
self.assertEquals(self.repo.run(self.gc.config_list, 'sad.derpies'), [])
self.repo.git('config', '--add', 'happy.derpies', 'cat')
self.assertEquals(self.repo.run(self.gc.config_list, 'happy.derpies'),
['food', 'cat'])
def testUpstream(self):
self.repo.git('commit', '--allow-empty', '-am', 'foooooo')
self.assertEquals(self.repo.run(self.gc.upstream, 'bobly'), None)
self.assertEquals(self.repo.run(self.gc.upstream, 'master'), None)
self.repo.git('checkout', '-tb', 'happybranch', 'master')
self.assertEquals(self.repo.run(self.gc.upstream, 'happybranch'),
'master')
if __name__ == '__main__':
sys.exit(coverage_utils.covered_main(
os.path.join(DEPOT_TOOLS_ROOT, 'git_common.py')
))