blob: 3acaede767c18690c12057081608cbbdce6d192a [file] [log] [blame]
import unittest
import tempfile
import shutil
import os
import subprocess
from util.commands import run_cmd, get_output
import util.git as git
from util.file import touch
def getRevisions(dest, branches=None):
retval = []
cmd = ['git', 'log', '--pretty=oneline']
if branches:
cmd.extend(branches)
for line in get_output(cmd, cwd=dest).split('\n'):
line = line.strip()
if not line:
continue
rev, ref = line.split(" ", 1)
rev = rev.strip()
if not rev:
continue
if rev not in retval:
retval.append(rev)
retval.reverse()
return retval
class TestGitFunctions(unittest.TestCase):
def test_get_repo_name(self):
self.assertEquals(git.get_repo_name("https://git.mozilla.org/releases/gecko.git"),
"git.mozilla.org/releases%2Fgecko.git")
self.assertEquals(git.get_repo_name("git://git.mozilla.org/releases/gecko.git"),
"git.mozilla.org/releases%2Fgecko.git")
self.assertEquals(git.get_repo_name("http://git.mozilla.org/releases/gecko.git"),
"git.mozilla.org/releases%2Fgecko.git")
class TestGit(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.repodir = os.path.join(self.tmpdir, 'repo')
run_cmd(['%s/init_gitrepo.sh' % os.path.dirname(__file__),
self.repodir])
self.revisions = getRevisions(
self.repodir, branches=['master', 'branch2'])
self.wc = os.path.join(self.tmpdir, 'wc')
self.pwd = os.getcwd()
# Make sure we're not setting GIT_SHARE_BASE_DIR
if 'GIT_SHARE_BASE_DIR' in os.environ:
del os.environ['GIT_SHARE_BASE_DIR']
def tearDown(self):
shutil.rmtree(self.tmpdir)
os.chdir(self.pwd)
def testHasRevision(self):
self.assertTrue(git.has_revision(self.repodir, self.revisions[0]))
self.assertFalse(git.has_revision(self.repodir, "foooooooooooooooo"))
def testClean(self):
git.git(self.repodir, self.wc)
try:
f = open(os.path.join(self.wc, 'blaaah'), 'w')
f.write('blah')
finally:
if not f.closed:
f.close()
git.clean(self.wc)
self.assertFalse(os.path.exists(os.path.join(self.wc, 'blaaah')))
def testCleanFail(self):
self.assertRaises(subprocess.CalledProcessError, git.clean, '/tmp')
def testClone(self):
rev = git.clone(self.repodir, self.wc, update_dest=False)
self.assertEquals(rev, None)
# This clones the master branch by default
# This has the 1st and 3rd commit
self.assertEquals(
[self.revisions[0], self.revisions[2]], getRevisions(self.wc))
self.assertEquals(sorted(os.listdir(self.wc)), ['.git'])
def testCloneIntoNonEmptyDir(self):
os.mkdir(self.wc)
open(os.path.join(self.wc, 'test.txt'), 'w').write('hello')
git.clone(self.repodir, self.wc, update_dest=False)
self.failUnless(not os.path.exists(os.path.join(self.wc, 'test.txt')))
def testCloneUpdate(self):
rev = git.clone(self.repodir, self.wc, update_dest=True)
self.assertEquals(rev, self.revisions[-1])
def testCloneBranch(self):
git.clone(self.repodir, self.wc, refname='branch2', update_dest=False)
# This should have just the 1st and 2nd commit
self.assertEquals(
[self.revisions[0], self.revisions[1]], getRevisions(self.wc))
# We should also have no working copy
self.failUnless(not os.path.exists(os.path.join(self.wc, 'hello.txt')))
def testUpdateRevision(self):
rev = git.clone(self.repodir, self.wc, update_dest=False)
self.assertEquals(rev, None)
rev = git.update(self.wc, revision=self.revisions[2])
self.assertEquals(rev, self.revisions[2])
def testUpdateBranch(self):
rev = git.clone(self.repodir, self.wc, update_dest=False)
self.assertEquals(rev, None)
rev = git.update(self.wc, refname='branch2')
self.assertEquals(rev, self.revisions[1])
def testUpdateBranchDefault(self):
rev = git.clone(self.repodir, self.wc, update_dest=False)
self.assertEquals(rev, None)
rev = git.update(self.wc)
self.assertEquals(rev, self.revisions[2])
def testFetchBranch(self):
# Clone just the main branch
git.clone(self.repodir, self.wc, refname='master', update_dest=False)
self.assertEquals(
[self.revisions[0], self.revisions[2]], getRevisions(self.wc))
# Now pull in branch2
git.fetch(self.repodir, self.wc, refname='branch2')
self.assertEquals(getRevisions(self.wc, branches=[
'origin/master', 'origin/branch2']), self.revisions)
def testFetchAll(self):
# Clone just the main branch
git.clone(self.repodir, self.wc, update_dest=False)
# Now pull in branch2
git.fetch(self.repodir, self.wc, refname='branch2')
# Change the original repo
newfile = os.path.join(self.repodir, 'newfile')
touch(newfile)
run_cmd(['git', 'add', 'newfile'], cwd=self.repodir)
run_cmd(['git', 'commit', '-q', '-m', 'add newfile'], cwd=self.repodir)
# Now pull in everything from master branch
git.fetch(self.repodir, self.wc, refname='master')
for branch in 'master', 'branch2':
self.assertEquals(getRevisions(self.wc, branches=['origin/%s' % branch]), getRevisions(self.repodir, branches=[branch]))
# Make sure we actually changed something
self.assertNotEqual(getRevisions(
self.repodir, branches=['master', 'branch2']), self.revisions)
def testGit(self):
rev = git.git(self.repodir, self.wc)
self.assertEquals(rev, self.revisions[-1])
self.assertEquals(getRevisions(self.wc, branches=[
'origin/master', 'origin/branch2']), self.revisions)
self.assertTrue(os.path.exists(os.path.join(self.wc, 'hello.txt')))
self.assertTrue("Is this thing on" in open(
os.path.join(self.wc, 'hello.txt')).read())
# Switch branches, make sure we get the right content
rev = git.git(self.repodir, self.wc, refname='branch2')
self.assertEquals(rev, self.revisions[1])
self.assertFalse("Is this thing on" in open(
os.path.join(self.wc, 'hello.txt')).read())
def testGitCleanDest(self):
git.git(self.repodir, self.wc)
try:
f = open(os.path.join(self.wc, 'blaaah'), 'w')
f.write('blah')
finally:
if not f.closed:
f.close()
git.git(self.repodir, self.wc, clean_dest=True)
self.assertFalse(os.path.exists(os.path.join(self.wc, 'blaaah')))
def testGitRev(self):
rev = git.git(self.repodir, self.wc)
self.assertEquals(rev, self.revisions[-1])
# Update to a different rev
rev = git.git(self.repodir, self.wc, revision=self.revisions[1])
self.assertEquals(rev, self.revisions[1])
# Make a new commit in repodir
newfile = os.path.join(self.repodir, 'newfile')
touch(newfile)
run_cmd(['git', 'add', 'newfile'], cwd=self.repodir)
run_cmd(['git', 'commit', '-q', '-m', 'add newfile'], cwd=self.repodir)
new_rev = getRevisions(self.repodir)[-1]
rev = git.git(self.repodir, self.wc, revision=new_rev)
self.assertEquals(new_rev, rev)
def testGitBranch(self):
rev = git.git(self.repodir, self.wc, refname='branch2')
self.assertEquals(rev, self.revisions[1])
self.assertEquals(getRevisions(self.wc, branches=['origin/branch2']
), getRevisions(self.repodir, branches=['branch2']))
self.assertTrue(os.path.exists(os.path.join(self.wc, 'hello.txt')))
self.assertFalse("Is this thing on" in open(
os.path.join(self.wc, 'hello.txt')).read())
# Switch branches, make sure we get all the revisions
rev = git.git(self.repodir, self.wc, refname='master')
self.assertEquals(rev, self.revisions[-1])
self.assertEquals(getRevisions(self.wc, branches=[
'origin/master', 'origin/branch2']), self.revisions)
self.assertTrue("Is this thing on" in open(
os.path.join(self.wc, 'hello.txt')).read())
def testGitShare(self):
shareBase = os.path.join(self.tmpdir, 'git-repos')
rev = git.git(self.repodir, self.wc, shareBase=shareBase)
shareDir = os.path.join(shareBase, git.get_repo_name(self.repodir))
self.assertEquals(rev, self.revisions[-1])
# We should see all the revisions
revs = getRevisions(
self.wc, branches=['origin/master', 'origin/branch2'])
shared_revs = getRevisions(
shareDir, branches=['origin/master', 'origin/branch2'])
self.assertEquals(revs, shared_revs)
self.assertEquals(revs, self.revisions)
# Update to a different rev
rev = git.git(self.repodir, self.wc,
revision=self.revisions[0], shareBase=shareBase)
self.assertEquals(rev, self.revisions[0])
self.assertFalse(os.path.exists(os.path.join(self.wc, 'newfile')))
# Add a commit to the original repo
newfile = os.path.join(self.repodir, 'newfile')
touch(newfile)
run_cmd(['git', 'add', 'newfile'], cwd=self.repodir)
run_cmd(['git', 'commit', '-q', '-m', 'add newfile'], cwd=self.repodir)
new_rev = getRevisions(self.repodir)[-1]
# Update to the new rev
rev = git.git(
self.repodir, self.wc, revision=new_rev, shareBase=shareBase)
self.assertEquals(rev, new_rev)
self.assertTrue(os.path.exists(os.path.join(self.wc, 'newfile')))
def testGitBadDest(self):
# Create self.wc without .git
os.makedirs(self.wc)
self.assertFalse(git.is_git_repo(self.wc))
rev = git.git(self.repodir, self.wc)
self.assertEquals(rev, self.revisions[-1])
self.assertTrue(os.path.exists(os.path.join(self.wc, 'hello.txt')))
self.assertTrue("Is this thing on" in open(
os.path.join(self.wc, 'hello.txt')).read())
def testGitBadShare(self):
shareBase = os.path.join(self.tmpdir, 'git-repos')
open(shareBase, 'w').write("hello!")
rev = git.git(self.repodir, self.wc, shareBase=shareBase)
self.assertEquals(rev, self.revisions[-1])
def testGitMirrors(self):
# Create a bad mirror and a good mirror
mirror1 = os.path.join(self.tmpdir, 'mirror1')
mirror2 = os.path.join(self.tmpdir, 'mirror2')
git.git(self.repodir, mirror2, update_dest=False)
rev = git.git(self.repodir, self.wc, mirrors=[mirror1, mirror2])
self.assertEquals(rev, self.revisions[-1])
# Add a commit to the mirror
newfile = os.path.join(mirror2, 'newfile')
touch(newfile)
run_cmd(['git', 'add', 'newfile'], cwd=mirror2)
run_cmd(['git', 'commit', '-q', '-m', 'add newfile'], cwd=mirror2)
new_rev = getRevisions(mirror2)[-1]
# Now clone using the mirror. We should get the new commit that's in
# the mirror
rev = git.git(self.repodir, self.wc, mirrors=[mirror1, mirror2])
self.assertEquals(rev, new_rev)
def testGitBadRev(self):
rev = git.git(self.repodir, self.wc)
self.assertEquals(rev, self.revisions[-1])
# Now try and update to a bad revision
self.assertRaises(subprocess.CalledProcessError, git.git,
self.repodir, self.wc, revision="badrev")
def testGitTag(self):
rev = git.git(self.repodir, self.wc)
self.assertEquals(rev, self.revisions[-1])
# Now try and update to a tag
rev = git.git(self.repodir, self.wc, revision="TAG1")
self.assertEquals(rev, self.revisions[0])
# TODO: Test blowing away shared repo and then updating working copy