blob: 71a916864c8bee54d8d4fdf9dd28697529f2a1de [file] [log] [blame]
import unittest
import tempfile
import shutil
import os
import subprocess
import time
import util.hg as hg
from util.hg import clone, pull, update, hg_ver, mercurial, _make_absolute, \
share, push, apply_and_push, HgUtilError, make_hg_url, get_branch, purge, \
get_branches, path, init, unbundle, adjust_paths, is_hg_cset, commit, tag, \
get_hg_output, has_rev
from util.file import touch
from util.commands import run_cmd, get_output
from mock import patch
def getRevisions(dest):
retval = []
for rev in get_output(['hg', 'log', '-R', dest, '--template', '{node|short}\n']).split('\n'):
rev = rev.strip()
if not rev:
continue
retval.append(rev)
return retval
def getRevInfo(dest, rev):
output = get_output(['hg', 'log', '-R', dest, '-r', rev, '--template',
'{author}\n{desc}\n{tags}']).splitlines()
info = {
'user': output[0],
'msg': output[1],
'tags': []
}
if len(output) > 2:
info['tags'] = output[2].split()
return info
def getTags(dest):
tags = []
for t in get_output(['hg', 'tags', '-R', dest]).splitlines():
tags.append(t.split()[0])
return tags
def yesterday_timestamp():
"""returns a valid yesterday timestamp for touch"""
yesterday = time.time() - 86400
return (yesterday, yesterday)
class TestMakeAbsolute(unittest.TestCase):
def testAbsolutePath(self):
self.assertEquals(_make_absolute("/foo/bar"), "/foo/bar")
def testStripTrailingSlash(self):
self.assertEquals(_make_absolute("/foo/bar/"), "/foo/bar")
def testRelativePath(self):
self.assertEquals(
_make_absolute("foo/bar"), os.path.abspath("foo/bar"))
def testHTTPPaths(self):
self.assertEquals(_make_absolute("http://foo/bar"), "http://foo/bar")
def testAbsoluteFilePath(self):
self.assertEquals(_make_absolute("file:///foo/bar"), "file:///foo/bar")
def testRelativeFilePath(self):
self.assertEquals(_make_absolute(
"file://foo/bar"), "file://%s/foo/bar" % os.getcwd())
class TestIsHgCset(unittest.TestCase):
def testShortCset(self):
self.assertTrue(is_hg_cset('fd06332733e5'))
def testLongCset(self):
self.assertTrue(is_hg_cset('bf37aabfd9367aec573487ebe1f784108bbef73f'))
def testMediumCset(self):
self.assertTrue(is_hg_cset('1e3391794bac9d0e707a7681de3'))
def testInt(self):
self.assertFalse(is_hg_cset(1234567890))
def testTag(self):
self.assertFalse(is_hg_cset('FIREFOX_50_0_RELEASE'))
def testDefault(self):
self.assertFalse(is_hg_cset('default'))
def testTip(self):
self.assertFalse(is_hg_cset('tip'))
def testBranch(self):
self.assertFalse(is_hg_cset('GECKO77_203512230833_RELBRANCH'))
class TestHg(unittest.TestCase):
def setUp(self):
tmpdir = None
if os.path.isdir("/dev/shm"):
tmpdir = "/dev/shm"
self.tmpdir = tempfile.mkdtemp(dir=tmpdir)
self.pwd = os.getcwd()
os.chdir(self.tmpdir)
self.repodir = os.path.join(self.tmpdir, 'repo')
# Have a stable hgrc to test with
os.environ['HGRCPATH'] = os.path.join(os.path.dirname(__file__), "hgrc")
run_cmd(['%s/init_hgrepo.sh' % os.path.dirname(__file__),
self.repodir])
self.revisions = getRevisions(self.repodir)
self.wc = os.path.join(self.tmpdir, 'wc')
self.sleep_patcher = patch('time.sleep')
self.sleep_patcher.start()
hg.RETRY_ATTEMPTS = 2
def tearDown(self):
shutil.rmtree(self.tmpdir)
os.chdir(self.pwd)
self.sleep_patcher.stop()
def testGetBranch(self):
clone(self.repodir, self.wc)
b = get_branch(self.wc)
self.assertEquals(b, 'default')
def testGetBranches(self):
clone(self.repodir, self.wc)
branches = get_branches(self.wc)
self.assertEquals(sorted(branches), sorted(["branch2", "default"]))
def testClone(self):
rev = clone(self.repodir, self.wc, update_dest=False)
self.assertEquals(rev, None)
self.assertEquals(self.revisions, getRevisions(self.wc))
self.assertEquals(sorted(os.listdir(self.wc)), ['.hg'])
def testCloneIntoNonEmptyDir(self):
os.mkdir(self.wc)
open(os.path.join(self.wc, 'test.txt'), 'w').write('hello')
clone(self.repodir, self.wc, update_dest=False)
self.failUnless(not os.path.exists(os.path.join(self.wc, 'test.txt')))
def testCloneUpdate(self):
rev = clone(self.repodir, self.wc, update_dest=True)
self.assertEquals(rev, self.revisions[0])
def testCloneBranch(self):
clone(self.repodir, self.wc, branch='branch2',
update_dest=False, clone_by_rev=True)
# On hg 1.6, we should only have a subset of the revisions
if hg_ver() >= (1, 6, 0):
self.assertEquals(self.revisions[1:],
getRevisions(self.wc))
else:
self.assertEquals(self.revisions,
getRevisions(self.wc))
def testCloneUpdateBranch(self):
rev = clone(self.repodir, os.path.join(self.tmpdir, 'wc'),
branch="branch2", update_dest=True, clone_by_rev=True)
self.assertEquals(rev, self.revisions[1], self.revisions)
def testCloneRevision(self):
clone(self.repodir, self.wc,
revision=self.revisions[0], update_dest=False,
clone_by_rev=True)
# We'll only get a subset of the revisions
self.assertEquals(self.revisions[:1] + self.revisions[2:],
getRevisions(self.wc))
def testUpdateRevision(self):
rev = clone(self.repodir, self.wc, update_dest=False)
self.assertEquals(rev, None)
rev = update(self.wc, revision=self.revisions[1])
self.assertEquals(rev, self.revisions[1])
def testPull(self):
# Clone just the first rev
clone(self.repodir, self.wc, revision=self.revisions[-1],
update_dest=False, clone_by_rev=True)
self.assertEquals(getRevisions(self.wc), self.revisions[-1:])
# Now pull in new changes
rev = pull(self.repodir, self.wc, update_dest=False)
self.assertEquals(rev, None)
self.assertEquals(getRevisions(self.wc), self.revisions)
def testPullRevision(self):
# Clone just the first rev
clone(self.repodir, self.wc, revision=self.revisions[-1],
update_dest=False, clone_by_rev=True)
self.assertEquals(getRevisions(self.wc), self.revisions[-1:])
# Now pull in just the last revision
rev = pull(self.repodir, self.wc, revision=self.revisions[
0], update_dest=False)
self.assertEquals(rev, None)
# We'll be missing the middle revision (on another branch)
self.assertEquals(
getRevisions(self.wc), self.revisions[:1] + self.revisions[2:])
def testPullBranch(self):
# Clone just the first rev
clone(self.repodir, self.wc, revision=self.revisions[-1],
update_dest=False, clone_by_rev=True)
self.assertEquals(getRevisions(self.wc), self.revisions[-1:])
# Now pull in the other branch
rev = pull(self.repodir, self.wc, branch="branch2", update_dest=False)
self.assertEquals(rev, None)
# On hg 1.6, we'll be missing the last revision (on another branch)
if hg_ver() >= (1, 6, 0):
self.assertEquals(getRevisions(self.wc), self.revisions[1:])
else:
self.assertEquals(getRevisions(self.wc), self.revisions)
def testPullTag(self):
clone(self.repodir, self.wc)
run_cmd(['hg', 'tag', '-f', 'TAG1'], cwd=self.repodir)
revisions = getRevisions(self.repodir)
rev = pull(self.repodir, self.wc, revision='TAG1')
self.assertEquals(rev, revisions[1])
self.assertEquals(getRevisions(self.wc), revisions)
def testPullTip(self):
clone(self.repodir, self.wc)
run_cmd(['hg', 'tag', '-f', 'TAG1'], cwd=self.repodir)
revisions = getRevisions(self.repodir)
rev = pull(self.repodir, self.wc, revision='tip')
self.assertEquals(rev, revisions[0])
self.assertEquals(getRevisions(self.wc), revisions)
def testPullDefault(self):
clone(self.repodir, self.wc)
run_cmd(['hg', 'tag', '-f', 'TAG1'], cwd=self.repodir)
revisions = getRevisions(self.repodir)
rev = pull(self.repodir, self.wc, revision='default')
self.assertEquals(rev, revisions[0])
self.assertEquals(getRevisions(self.wc), revisions)
def testPullUnrelated(self):
# Create a new repo
repo2 = os.path.join(self.tmpdir, 'repo2')
run_cmd(['%s/init_hgrepo.sh' % os.path.dirname(__file__), repo2],
env={'extra': 'unrelated'})
self.assertNotEqual(self.revisions, getRevisions(repo2))
# Clone the original repo
clone(self.repodir, self.wc, update_dest=False)
# Try and pull in changes from the new repo
self.assertRaises(subprocess.CalledProcessError, pull,
repo2, self.wc, update_dest=False)
def testShareUnrelated(self):
# Create a new repo
repo2 = os.path.join(self.tmpdir, 'repo2')
run_cmd(['%s/init_hgrepo.sh' % os.path.dirname(__file__), repo2],
env={'extra': 'share_unrelated'})
self.assertNotEqual(self.revisions, getRevisions(repo2))
shareBase = os.path.join(self.tmpdir, 'share')
# Clone the original repo
mercurial(self.repodir, self.wc, shareBase=shareBase)
# Clone the new repo
mercurial(repo2, self.wc, shareBase=shareBase)
self.assertEquals(getRevisions(self.wc), getRevisions(repo2))
def testShareExtraFiles(self):
shareBase = os.path.join(self.tmpdir, 'share')
backup = os.path.join(self.tmpdir, 'backup')
# Clone the original repo
mercurial(self.repodir, self.wc, shareBase=shareBase)
clone(self.repodir, backup)
# Make the working repo have a new file. We need it to have an earlier
# timestamp (yesterday) to trigger the odd behavior in hg
newfile = os.path.join(self.wc, 'newfile')
touch(newfile, timestamp=yesterday_timestamp())
run_cmd(['hg', 'add', 'newfile'], cwd=self.wc)
run_cmd(['hg', 'commit', '-m', '"add newfile"'], cwd=self.wc)
# Reset the share base to remove the 'add newfile' commit. We
# overwrite repodir with the backup that doesn't have the commit,
# then clone the repodir to a throwaway dir to create the new
# shareBase. Now self.wc still points to shareBase, but the
# changeset that self.wc was on is lost.
shutil.rmtree(self.repodir)
shutil.rmtree(shareBase)
clone(backup, self.repodir)
throwaway = os.path.join(self.tmpdir, 'throwaway')
mercurial(self.repodir, throwaway, shareBase=shareBase)
# Try and update our working copy
mercurial(self.repodir, self.wc, shareBase=shareBase)
self.assertFalse(os.path.exists(os.path.join(self.wc, 'newfile')))
def testShareExtraFilesReset(self):
shareBase = os.path.join(self.tmpdir, 'share')
# Clone the original repo
mercurial(self.repodir, self.wc, shareBase=shareBase)
# Reset the repo
run_cmd(
['%s/init_hgrepo.sh' % os.path.dirname(__file__), self.repodir],
env={'extra': 'extra_files_reset'})
# Make the working repo have a new file. We need it to have an earlier
# timestamp (yesterday) to trigger the odd behavior in hg
newfile = os.path.join(self.wc, 'newfile')
touch(newfile, timestamp=yesterday_timestamp())
run_cmd(['hg', 'add', 'newfile'], cwd=self.wc)
run_cmd(['hg', 'commit', '-m', '"add newfile"'], cwd=self.wc)
# Try and update our working copy
mercurial(self.repodir, self.wc, shareBase=shareBase)
self.assertFalse(os.path.exists(os.path.join(self.wc, 'newfile')))
def testShareReset(self):
shareBase = os.path.join(self.tmpdir, 'share')
# Clone the original repo
mercurial(self.repodir, self.wc, shareBase=shareBase)
old_revs = self.revisions[:]
# Reset the repo
run_cmd(
['%s/init_hgrepo.sh' % os.path.dirname(__file__), self.repodir],
env={'extra': 'share_reset'})
self.assertNotEqual(old_revs, getRevisions(self.repodir))
# Try and update our working copy
mercurial(self.repodir, self.wc, shareBase=shareBase)
self.assertEquals(getRevisions(self.repodir), getRevisions(self.wc))
self.assertNotEqual(old_revs, getRevisions(self.wc))
def testPush(self):
clone(self.repodir, self.wc, revision=self.revisions[-2],
clone_by_rev=True)
push(src=self.repodir, remote=self.wc)
self.assertEquals(getRevisions(self.wc), self.revisions)
def testPushWithBranch(self):
clone(self.repodir, self.wc, revision=self.revisions[-1],
clone_by_rev=True)
push(src=self.repodir, remote=self.wc, branch='branch2')
push(src=self.repodir, remote=self.wc, branch='default')
self.assertEquals(getRevisions(self.wc), self.revisions)
def testPushWithRevision(self):
clone(self.repodir, self.wc, revision=self.revisions[-1],
clone_by_rev=True)
push(src=self.repodir, remote=self.wc, revision=self.revisions[-2])
self.assertEquals(getRevisions(self.wc), self.revisions[-2:])
def testPurgeUntrackedFile(self):
clone(self.repodir, self.wc)
fileToPurge = os.path.join(self.wc, 'fileToPurge')
with file(fileToPurge, 'a') as f:
f.write('purgeme')
purge(self.wc)
self.assertFalse(os.path.exists(fileToPurge))
def testPurgeUntrackedDirectory(self):
clone(self.repodir, self.wc)
directoryToPurge = os.path.join(self.wc, 'directoryTopPurge')
os.makedirs(directoryToPurge)
purge(directoryToPurge)
self.assertFalse(os.path.isdir(directoryToPurge))
def testPurgeVeryLongPath(self):
clone(self.repodir, self.wc)
# now create a very long path name
longPath = self.wc
for new_dir in xrange(1, 64):
longPath = os.path.join(longPath, str(new_dir))
os.makedirs(longPath)
self.assertTrue(os.path.isdir(longPath))
purge(self.wc)
self.assertFalse(os.path.isdir(longPath))
def testPurgeAFreshClone(self):
clone(self.repodir, self.wc)
purge(self.wc)
self.assertTrue(os.path.exists(os.path.join(self.wc, 'hello.txt')))
def testPurgeTrackedFile(self):
clone(self.repodir, self.wc)
fileToModify = os.path.join(self.wc, 'hello.txt')
with open(fileToModify, 'w') as f:
f.write('hello!')
purge(self.wc)
with open(fileToModify, 'r') as f:
content = f.read()
self.assertEqual(content, 'hello!')
def testMercurial(self):
rev = mercurial(self.repodir, self.wc)
self.assertEquals(rev, self.revisions[0])
def testPushNewBranchesNotAllowed(self):
clone(self.repodir, self.wc, revision=self.revisions[0],
clone_by_rev=True)
self.assertRaises(Exception, push, self.repodir, self.wc,
push_new_branches=False)
def testPushWithForce(self):
clone(self.repodir, self.wc, revision=self.revisions[0],
clone_by_rev=True)
newfile = os.path.join(self.wc, 'newfile')
touch(newfile)
run_cmd(['hg', 'add', 'newfile'], cwd=self.wc)
run_cmd(['hg', 'commit', '-m', '"re-add newfile"'], cwd=self.wc)
push(self.repodir, self.wc, push_new_branches=False, force=True)
def testPushForceFail(self):
clone(self.repodir, self.wc, revision=self.revisions[0],
clone_by_rev=True)
newfile = os.path.join(self.wc, 'newfile')
touch(newfile)
run_cmd(['hg', 'add', 'newfile'], cwd=self.wc)
run_cmd(['hg', 'commit', '-m', '"add newfile"'], cwd=self.wc)
self.assertRaises(Exception, push, self.repodir, self.wc,
push_new_branches=False, force=False)
def testMercurialWithNewShare(self):
shareBase = os.path.join(self.tmpdir, 'share')
sharerepo = os.path.join(shareBase, self.repodir.lstrip("/"))
os.mkdir(shareBase)
mercurial(self.repodir, self.wc, shareBase=shareBase)
self.assertEquals(getRevisions(self.repodir), getRevisions(self.wc))
self.assertEquals(getRevisions(self.repodir), getRevisions(sharerepo))
def testMercurialWithShareBaseInEnv(self):
shareBase = os.path.join(self.tmpdir, 'share')
sharerepo = os.path.join(shareBase, self.repodir.lstrip("/"))
os.mkdir(shareBase)
try:
os.environ['HG_SHARE_BASE_DIR'] = shareBase
mercurial(self.repodir, self.wc)
self.assertEquals(
getRevisions(self.repodir), getRevisions(self.wc))
self.assertEquals(
getRevisions(self.repodir), getRevisions(sharerepo))
finally:
del os.environ['HG_SHARE_BASE_DIR']
def testMercurialWithExistingShare(self):
shareBase = os.path.join(self.tmpdir, 'share')
sharerepo = os.path.join(shareBase, self.repodir.lstrip("/"))
os.mkdir(shareBase)
mercurial(self.repodir, sharerepo)
open(os.path.join(self.repodir, 'test.txt'), 'w').write('hello!')
run_cmd(['hg', 'add', 'test.txt'], cwd=self.repodir)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=self.repodir)
mercurial(self.repodir, self.wc, shareBase=shareBase)
self.assertEquals(getRevisions(self.repodir), getRevisions(self.wc))
self.assertEquals(getRevisions(self.repodir), getRevisions(sharerepo))
def testMercurialRelativeDir(self):
os.chdir(os.path.dirname(self.repodir))
repo = os.path.basename(self.repodir)
wc = os.path.basename(self.wc)
rev = mercurial(repo, wc, revision=self.revisions[-1])
self.assertEquals(rev, self.revisions[-1])
open(os.path.join(self.wc, 'test.txt'), 'w').write("hello!")
rev = mercurial(repo, wc)
self.assertEquals(rev, self.revisions[0])
# Make sure our local file didn't go away
self.failUnless(os.path.exists(os.path.join(self.wc, 'test.txt')))
def testMercurialUpdateTip(self):
rev = mercurial(self.repodir, self.wc, revision=self.revisions[-1])
self.assertEquals(rev, self.revisions[-1])
open(os.path.join(self.wc, 'test.txt'), 'w').write("hello!")
rev = mercurial(self.repodir, self.wc)
self.assertEquals(rev, self.revisions[0])
# Make sure our local file didn't go away
self.failUnless(os.path.exists(os.path.join(self.wc, 'test.txt')))
def testMercurialUpdateRev(self):
rev = mercurial(self.repodir, self.wc, revision=self.revisions[-1])
self.assertEquals(rev, self.revisions[-1])
open(os.path.join(self.wc, 'test.txt'), 'w').write("hello!")
rev = mercurial(self.repodir, self.wc, revision=self.revisions[0])
self.assertEquals(rev, self.revisions[0])
# Make sure our local file didn't go away
self.failUnless(os.path.exists(os.path.join(self.wc, 'test.txt')))
# TODO: this test doesn't seem to be compatible with mercurial()'s
# share() usage, and fails when HG_SHARE_BASE_DIR is set
def testMercurialChangeRepo(self):
# Create a new repo
old_env = os.environ.copy()
if 'HG_SHARE_BASE_DIR' in os.environ:
del os.environ['HG_SHARE_BASE_DIR']
try:
repo2 = os.path.join(self.tmpdir, 'repo2')
run_cmd(['%s/init_hgrepo.sh' % os.path.dirname(__file__), repo2],
env={'extra': 'change_repo'})
self.assertNotEqual(self.revisions, getRevisions(repo2))
# Clone the original repo
mercurial(self.repodir, self.wc)
self.assertEquals(getRevisions(self.wc), self.revisions)
open(os.path.join(self.wc, 'test.txt'), 'w').write("hello!")
# Clone the new one
mercurial(repo2, self.wc)
self.assertEquals(getRevisions(self.wc), getRevisions(repo2))
# Make sure our local file went away
self.failUnless(
not os.path.exists(os.path.join(self.wc, 'test.txt')))
finally:
os.environ.clear()
os.environ.update(old_env)
def testMakeHGUrl(self):
# construct an hg url specific to revision, branch and filename and try
# to pull it down
file_url = make_hg_url(
"hg.mozilla.org",
'//build/tools/',
revision='FIREFOX_3_6_12_RELEASE',
filename="/lib/python/util/hg.py"
)
expected_url = "https://hg.mozilla.org/build/tools/raw-file/FIREFOX_3_6_12_RELEASE/lib/python/util/hg.py"
self.assertEquals(file_url, expected_url)
def testMakeHGUrlNoFilename(self):
file_url = make_hg_url(
"hg.mozilla.org",
"/build/tools",
revision="default"
)
expected_url = "https://hg.mozilla.org/build/tools/rev/default"
self.assertEquals(file_url, expected_url)
def testMakeHGUrlNoRevisionNoFilename(self):
repo_url = make_hg_url(
"hg.mozilla.org",
"/build/tools"
)
expected_url = "https://hg.mozilla.org/build/tools"
self.assertEquals(repo_url, expected_url)
def testMakeHGUrlDifferentProtocol(self):
repo_url = make_hg_url(
"hg.mozilla.org",
"/build/tools",
protocol='ssh'
)
expected_url = "ssh://hg.mozilla.org/build/tools"
self.assertEquals(repo_url, expected_url)
def testShareRepo(self):
repo3 = os.path.join(self.tmpdir, 'repo3')
share(self.repodir, repo3)
# make sure shared history is identical
self.assertEquals(self.revisions, getRevisions(repo3))
def testMercurialShareOutgoing(self):
# ensure that outgoing changesets in a shared clone affect the shared
# history
repo5 = os.path.join(self.tmpdir, 'repo5')
repo6 = os.path.join(self.tmpdir, 'repo6')
mercurial(self.repodir, repo5)
share(repo5, repo6)
open(os.path.join(repo6, 'test.txt'), 'w').write("hello!")
# modify the history of the new clone
run_cmd(['hg', 'add', 'test.txt'], cwd=repo6)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=repo6)
self.assertNotEquals(self.revisions, getRevisions(repo6))
self.assertNotEquals(self.revisions, getRevisions(repo5))
self.assertEquals(getRevisions(repo5), getRevisions(repo6))
def testApplyAndPush(self):
clone(self.repodir, self.wc)
def c(repo, attempt):
run_cmd(['hg', 'tag', '-f', 'TEST'], cwd=repo)
apply_and_push(self.wc, self.repodir, c)
self.assertEquals(getRevisions(self.wc), getRevisions(self.repodir))
def testApplyAndPushFail(self):
clone(self.repodir, self.wc)
def c(repo, attempt, remote):
run_cmd(['hg', 'tag', '-f', 'TEST'], cwd=repo)
run_cmd(['hg', 'tag', '-f', 'CONFLICTING_TAG'], cwd=remote)
self.assertRaises(HgUtilError, apply_and_push, self.wc, self.repodir,
lambda r, a: c(r, a, self.repodir), max_attempts=2)
def testApplyAndPushWithRebase(self):
clone(self.repodir, self.wc)
def c(repo, attempt, remote):
run_cmd(['hg', 'tag', '-f', 'TEST'], cwd=repo)
if attempt == 1:
run_cmd(['hg', 'rm', 'hello.txt'], cwd=remote)
run_cmd(['hg', 'commit', '-m', 'test'], cwd=remote)
apply_and_push(self.wc, self.repodir,
lambda r, a: c(r, a, self.repodir), max_attempts=2)
self.assertEquals(getRevisions(self.wc), getRevisions(self.repodir))
def testApplyAndPushRebaseFails(self):
clone(self.repodir, self.wc)
def c(repo, attempt, remote):
run_cmd(['hg', 'tag', '-f', 'TEST'], cwd=repo)
if attempt in (1, 2):
run_cmd(['hg', 'tag', '-f', 'CONFLICTING_TAG'], cwd=remote)
apply_and_push(self.wc, self.repodir,
lambda r, a: c(r, a, self.repodir), max_attempts=3)
self.assertEquals(getRevisions(self.wc), getRevisions(self.repodir))
def testApplyAndPushOnBranch(self):
clone(self.repodir, self.wc)
def c(repo, attempt):
run_cmd(['hg', 'branch', 'branch3'], cwd=repo)
run_cmd(['hg', 'tag', '-f', 'TEST'], cwd=repo)
apply_and_push(self.wc, self.repodir, c)
self.assertEquals(getRevisions(self.wc), getRevisions(self.repodir))
def testApplyAndPushWithNoChange(self):
clone(self.repodir, self.wc)
def c(r, a):
pass
self.assertRaises(
HgUtilError, apply_and_push, self.wc, self.repodir, c)
def testApplyAndPushForce(self):
clone(self.repodir, self.wc)
def c(repo, attempt, remote, local):
newfile_remote = os.path.join(remote, 'newfile')
newfile_local = os.path.join(local, 'newfile')
touch(newfile_remote)
run_cmd(['hg', 'add', 'newfile'], cwd=remote)
run_cmd(['hg', 'commit', '-m', '"add newfile"'], cwd=remote)
touch(newfile_local)
run_cmd(['hg', 'add', 'newfile'], cwd=local)
run_cmd(['hg', 'commit', '-m', '"re-add newfile"'], cwd=local)
apply_and_push(self.wc, self.repodir,
(lambda r, a: c(r, a, self.repodir, self.wc)), force=True)
def testApplyAndPushForceFail(self):
clone(self.repodir, self.wc)
def c(repo, attempt, remote, local):
newfile_remote = os.path.join(remote, 'newfile')
newfile_local = os.path.join(local, 'newfile')
touch(newfile_remote)
run_cmd(['hg', 'add', 'newfile'], cwd=remote)
run_cmd(['hg', 'commit', '-m', '"add newfile"'], cwd=remote)
touch(newfile_local)
run_cmd(['hg', 'add', 'newfile'], cwd=local)
run_cmd(['hg', 'commit', '-m', '"re-add newfile"'], cwd=local)
self.assertRaises(HgUtilError,
apply_and_push, self.wc, self.repodir,
(lambda r, a: c(r, a, self.repodir, self.wc)), force=False)
def testPath(self):
clone(self.repodir, self.wc)
p = path(self.wc)
self.assertEquals(p, self.repodir)
def testBustedHgrcWithShare(self):
# Test that we can recover from hgrc being lost
shareBase = os.path.join(self.tmpdir, 'share')
sharerepo = os.path.join(shareBase, self.repodir.lstrip("/"))
os.mkdir(shareBase)
mercurial(self.repodir, self.wc, shareBase=shareBase)
# Delete .hg/hgrc
for d in sharerepo, self.wc:
f = os.path.join(d, '.hg', 'hgrc')
os.unlink(f)
# path is busted now
p = path(self.wc)
self.assertEquals(p, None)
# cloning again should fix this up
mercurial(self.repodir, self.wc, shareBase=shareBase)
p = path(self.wc)
self.assertEquals(p, self.repodir)
def testBustedHgrc(self):
# Test that we can recover from hgrc being lost
mercurial(self.repodir, self.wc)
# Delete .hg/hgrc
os.unlink(os.path.join(self.wc, '.hg', 'hgrc'))
# path is busted now
p = path(self.wc)
self.assertEquals(p, None)
# cloning again should fix this up
mercurial(self.repodir, self.wc)
p = path(self.wc)
self.assertEquals(p, self.repodir)
def testInit(self):
tmpdir = os.path.join(self.tmpdir, 'new')
self.assertEquals(False, os.path.exists(tmpdir))
init(tmpdir)
self.assertEquals(True, os.path.exists(tmpdir))
self.assertEquals(True, os.path.exists(os.path.join(tmpdir, '.hg')))
def testUnbundle(self):
# First create the bundle
bundle = os.path.join(self.tmpdir, 'bundle')
run_cmd(['hg', 'bundle', '-a', bundle], cwd=self.repodir)
# Now unbundle it in a new place
newdir = os.path.join(self.tmpdir, 'new')
init(newdir)
unbundle(bundle, newdir)
self.assertEquals(self.revisions, getRevisions(newdir))
def testCloneWithBundle(self):
# First create the bundle
bundle = os.path.join(self.tmpdir, 'bundle')
run_cmd(['hg', 'bundle', '-a', bundle], cwd=self.repodir)
# Wrap unbundle so we can tell if it got called
orig_unbundle = unbundle
try:
called = []
def new_unbundle(*args, **kwargs):
called.append(True)
return orig_unbundle(*args, **kwargs)
hg.unbundle = new_unbundle
# Now clone it using the bundle
clone(self.repodir, self.wc, bundles=[bundle])
self.assertEquals(self.revisions, getRevisions(self.wc))
self.assertEquals(called, [True])
self.assertEquals(path(self.wc), self.repodir)
finally:
hg.unbundle = orig_unbundle
def testCloneWithUnrelatedBundle(self):
# First create the bundle
bundle = os.path.join(self.tmpdir, 'bundle')
run_cmd(['hg', 'bundle', '-a', bundle], cwd=self.repodir)
# Create an unrelated repo
repo2 = os.path.join(self.tmpdir, 'repo2')
run_cmd(['%s/init_hgrepo.sh' % os.path.dirname(__file__),
repo2], env={'extra': 'clone_unrelated'})
self.assertNotEqual(self.revisions, getRevisions(repo2))
# Clone repo2 using the unrelated bundle
clone(repo2, self.wc, bundles=[bundle])
# Make sure we don't have unrelated revisions
self.assertEquals(getRevisions(repo2), getRevisions(self.wc))
self.assertEquals(set(),
set(getRevisions(self.repodir)).intersection(set(getRevisions(self.wc))))
def testCloneWithBadBundle(self):
# First create the bad bundle
bundle = os.path.join(self.tmpdir, 'bundle')
open(bundle, 'w').write('ruh oh!')
# Wrap unbundle so we can tell if it got called
orig_unbundle = unbundle
try:
called = []
def new_unbundle(*args, **kwargs):
called.append(True)
return orig_unbundle(*args, **kwargs)
hg.unbundle = new_unbundle
# Now clone it using the bundle
clone(self.repodir, self.wc, bundles=[bundle])
self.assertEquals(self.revisions, getRevisions(self.wc))
self.assertEquals(called, [True])
finally:
hg.unbundle = orig_unbundle
def testCloneWithBundleMissingRevs(self):
# First create the bundle
bundle = os.path.join(self.tmpdir, 'bundle')
run_cmd(['hg', 'bundle', '-a', bundle], cwd=self.repodir)
# Create a commit
open(os.path.join(self.repodir, 'test.txt'), 'w').write('hello!')
run_cmd(['hg', 'add', 'test.txt'], cwd=self.repodir)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=self.repodir)
# Wrap unbundle so we can tell if it got called
orig_unbundle = unbundle
try:
called = []
def new_unbundle(*args, **kwargs):
called.append(True)
return orig_unbundle(*args, **kwargs)
hg.unbundle = new_unbundle
# Now clone it using the bundle
clone(self.repodir, self.wc, bundles=[bundle])
self.assertEquals(
getRevisions(self.repodir), getRevisions(self.wc))
self.assertEquals(called, [True])
finally:
hg.unbundle = orig_unbundle
def testCloneWithMirror(self):
mirror = os.path.join(self.tmpdir, 'repo2')
clone(self.repodir, mirror)
# Create a commit in the original repo
open(os.path.join(self.repodir, 'test.txt'), 'w').write('hello!')
run_cmd(['hg', 'add', 'test.txt'], cwd=self.repodir)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=self.repodir)
# Now clone from the mirror
clone(self.repodir, self.wc, mirrors=[mirror])
# We'll be missing the new revision from repodir
self.assertNotEquals(getRevisions(self.repodir), getRevisions(self.wc))
# But we should have everything from the mirror
self.assertEquals(getRevisions(mirror), getRevisions(self.wc))
# Our default path should point to the original repo though.
self.assertEquals(self.repodir, path(self.wc))
def testCloneWithBadMirror(self):
mirror = os.path.join(self.tmpdir, 'repo2')
# Now clone from the mirror
clone(self.repodir, self.wc, mirrors=[mirror])
# We still end up with a valid repo
self.assertEquals(self.revisions, getRevisions(self.wc))
self.assertEquals(self.repodir, path(self.wc))
def testCloneWithOneBadMirror(self):
mirror1 = os.path.join(self.tmpdir, 'mirror1')
mirror2 = os.path.join(self.tmpdir, 'mirror2')
# Mirror 2 is ok
clone(self.repodir, mirror2)
# Create a commit in the original repo
open(os.path.join(self.repodir, 'test.txt'), 'w').write('hello!')
run_cmd(['hg', 'add', 'test.txt'], cwd=self.repodir)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=self.repodir)
# Now clone from the mirror
clone(self.repodir, self.wc, mirrors=[mirror1, mirror2])
# We'll be missing the new revision from repodir
self.assertNotEquals(getRevisions(self.repodir), getRevisions(self.wc))
# But we should have everything from the mirror
self.assertEquals(getRevisions(mirror2), getRevisions(self.wc))
# Our default path should point to the original repo though.
self.assertEquals(self.repodir, path(self.wc))
def testCloneWithRevAndMirror(self):
mirror = os.path.join(self.tmpdir, 'repo2')
clone(self.repodir, mirror)
# Create a commit in the original repo
open(os.path.join(self.repodir, 'test.txt'), 'w').write('hello!')
run_cmd(['hg', 'add', 'test.txt'], cwd=self.repodir)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=self.repodir)
# Now clone from the mirror
revisions = getRevisions(self.repodir)
clone(self.repodir, self.wc, revision=revisions[0],
mirrors=[mirror], clone_by_rev=True)
# We'll be missing the middle revision (on another branch)
self.assertEquals(revisions[:2] + revisions[3:], getRevisions(self.wc))
# But not from the mirror
self.assertNotEquals(getRevisions(mirror), getRevisions(self.wc))
# Our default path should point to the original repo though.
self.assertEquals(self.repodir, path(self.wc))
def testPullWithMirror(self):
mirror = os.path.join(self.tmpdir, 'repo2')
clone(self.repodir, mirror)
# Create a new commit in the mirror repo
open(os.path.join(mirror, 'test.txt'), 'w').write('hello!')
run_cmd(['hg', 'add', 'test.txt'], cwd=mirror)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=mirror)
# Now clone from the original
clone(self.repodir, self.wc)
# Pull using the mirror
pull(self.repodir, self.wc, mirrors=[mirror])
self.assertEquals(getRevisions(self.wc), getRevisions(mirror))
# Our default path should point to the original repo
self.assertEquals(self.repodir, path(self.wc))
def testPullWithBadMirror(self):
mirror = os.path.join(self.tmpdir, 'repo2')
# Now clone from the original
clone(self.repodir, self.wc)
# Create a new commit in the original repo
open(os.path.join(self.repodir, 'test.txt'), 'w').write('hello!')
run_cmd(['hg', 'add', 'test.txt'], cwd=self.repodir)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=self.repodir)
# Pull using the mirror
pull(self.repodir, self.wc, mirrors=[mirror])
self.assertEquals(getRevisions(self.wc), getRevisions(self.repodir))
# Our default path should point to the original repo
self.assertEquals(self.repodir, path(self.wc))
def testPullWithUnrelatedMirror(self):
mirror = os.path.join(self.tmpdir, 'repo2')
run_cmd(['%s/init_hgrepo.sh' % os.path.dirname(__file__), mirror],
env={'extra': 'pull_unrealted'})
# Now clone from the original
clone(self.repodir, self.wc)
# Create a new commit in the original repo
open(os.path.join(self.repodir, 'test.txt'), 'w').write('hello!')
run_cmd(['hg', 'add', 'test.txt'], cwd=self.repodir)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=self.repodir)
# Pull using the mirror
pull(self.repodir, self.wc, mirrors=[mirror])
self.assertEquals(getRevisions(self.wc), getRevisions(self.repodir))
# We shouldn't have anything from the unrelated mirror
self.assertEquals(set(),
set(getRevisions(mirror)).intersection(set(getRevisions(self.wc))))
# Our default path should point to the original repo
self.assertEquals(self.repodir, path(self.wc))
def testMercurialWithShareAndBundle(self):
# First create the bundle
bundle = os.path.join(self.tmpdir, 'bundle')
run_cmd(['hg', 'bundle', '-a', bundle], cwd=self.repodir)
# Create a commit
open(os.path.join(self.repodir, 'test.txt'), 'w').write('hello!')
run_cmd(['hg', 'add', 'test.txt'], cwd=self.repodir)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=self.repodir)
# Wrap unbundle so we can tell if it got called
orig_unbundle = unbundle
try:
called = []
def new_unbundle(*args, **kwargs):
called.append(True)
return orig_unbundle(*args, **kwargs)
hg.unbundle = new_unbundle
shareBase = os.path.join(self.tmpdir, 'share')
sharerepo = os.path.join(shareBase, self.repodir.lstrip("/"))
os.mkdir(shareBase)
mercurial(
self.repodir, self.wc, shareBase=shareBase, bundles=[bundle])
self.assertEquals(called, [True])
self.assertEquals(
getRevisions(self.repodir), getRevisions(self.wc))
self.assertEquals(
getRevisions(self.repodir), getRevisions(sharerepo))
finally:
hg.unbundle = orig_unbundle
def testMercurialWithShareAndMirror(self):
# First create the mirror
mirror = os.path.join(self.tmpdir, 'repo2')
clone(self.repodir, mirror)
# Create a commit
open(os.path.join(self.repodir, 'test.txt'), 'w').write('hello!')
run_cmd(['hg', 'add', 'test.txt'], cwd=self.repodir)
run_cmd(['hg', 'commit', '-m', 'adding changeset'], cwd=self.repodir)
shareBase = os.path.join(self.tmpdir, 'share')
sharerepo = os.path.join(shareBase, self.repodir.lstrip("/"))
os.mkdir(shareBase)
mercurial(self.repodir, self.wc, shareBase=shareBase, mirrors=[mirror])
# Since we used the mirror, we should be missing a commit
self.assertNotEquals(getRevisions(self.repodir), getRevisions(self.wc))
self.assertNotEquals(
getRevisions(self.repodir), getRevisions(sharerepo))
self.assertEquals(getRevisions(mirror), getRevisions(self.wc))
def testMercurialByRevWithShareAndMirror(self):
# First create the mirror
mirror = os.path.join(self.tmpdir, 'repo2')
clone(self.repodir, mirror)
shareBase = os.path.join(self.tmpdir, 'share')
sharerepo = os.path.join(shareBase, self.repodir.lstrip("/"))
os.mkdir(shareBase)
mercurial(self.repodir, self.wc, shareBase=shareBase, mirrors=[
mirror], clone_by_rev=True, revision=self.revisions[-1])
# We should only have the one revision
self.assertEquals(getRevisions(sharerepo), self.revisions[-1:])
self.assertEquals(getRevisions(self.wc), self.revisions[-1:])
def testMercurialSkipPull(self):
# Clone once into our working copy
mercurial(self.repodir, self.wc)
# The second clone should avoid calling pull()
with patch('util.hg.pull') as patched_pull:
mercurial(self.repodir, self.wc, revision=self.revisions[-1])
self.assertEquals(patched_pull.call_count, 0)
def testAdjustPaths(self):
mercurial(self.repodir, self.wc)
# Make sure our default path is correct
self.assertEquals(path(self.wc), self.repodir)
# Add a comment, make sure it's still there if we don't change
# anything
hgrc = os.path.join(self.wc, '.hg', 'hgrc')
open(hgrc, 'a').write("# Hello world")
adjust_paths(self.wc, default=self.repodir)
self.assert_("Hello world" in open(hgrc).read())
# Add a path, and the comment goes away
adjust_paths(self.wc, test=self.repodir)
self.assert_("Hello world" not in open(hgrc).read())
# Make sure our paths are correct
self.assertEquals(path(self.wc), self.repodir)
self.assertEquals(path(self.wc, 'test'), self.repodir)
def testOutofSyncMirrorFailingMaster(self):
# First create the mirror
mirror = os.path.join(self.tmpdir, 'repo2')
clone(self.repodir, mirror)
shareBase = os.path.join(self.tmpdir, 'share')
os.mkdir(shareBase)
mercurial(self.repodir, self.wc, shareBase=shareBase, mirrors=[mirror])
# Create a bundle
bundle = os.path.join(self.tmpdir, 'bundle')
run_cmd(['hg', 'bundle', '-a', bundle], cwd=self.repodir)
# Move our repodir out of the way so that pulling/cloning from it fails
os.rename(self.repodir, self.repodir + "-bad")
# Try and update to a non-existent revision using our mirror and
# bundle, with the master failing. We should fail
self.assertRaises(subprocess.CalledProcessError, mercurial,
self.repodir, self.wc, shareBase=shareBase,
mirrors=[mirror], bundles=[bundle],
revision="1234567890")
def testCommit(self):
newfile = os.path.join(self.repodir, 'newfile')
touch(newfile)
run_cmd(['hg', 'add', 'newfile'], cwd=self.repodir)
rev = commit(self.repodir, user='unittest', msg='gooooood')
info = getRevInfo(self.repodir, rev)
self.assertEquals(info['msg'], 'gooooood')
# can't test for user, because it depends on local hg configs.
def testCommitWithUser(self):
newfile = os.path.join(self.repodir, 'newfile')
touch(newfile)
run_cmd(['hg', 'add', 'newfile'], cwd=self.repodir)
rev = commit(self.repodir, user='unittest', msg='new stuff!')
info = getRevInfo(self.repodir, rev)
self.assertEquals(info['user'], 'unittest')
self.assertEquals(info['msg'], 'new stuff!')
def testTag(self):
tag(self.repodir, ['test_tag'])
self.assertTrue('test_tag' in getTags(self.repodir))
def testMultitag(self):
tag(self.repodir, ['tag1', 'tag2'])
tags = getTags(self.repodir)
self.assertTrue('tag1' in tags)
self.assertTrue('tag2' in tags)
def testMultitagSet(self):
tag(self.repodir, set(['tag1', 'tag5']))
tags = getTags(self.repodir)
self.assertTrue('tag1' in tags)
self.assertTrue('tag5' in tags)
def testTagWithMsg(self):
rev = tag(self.repodir, ['tag'], msg='I made a tag!')
info = getRevInfo(self.repodir, rev)
self.assertEquals('I made a tag!', info['msg'])
def testTagWithUser(self):
rev = tag(self.repodir, ['taggy'], user='tagger')
info = getRevInfo(self.repodir, rev)
self.assertEquals('tagger', info['user'])
def testTagWithRevision(self):
tag(self.repodir, ['tag'], rev='1')
info = getRevInfo(self.repodir, '1')
self.assertEquals(['tag'], info['tags'])
def testMultitagWithRevision(self):
tag(self.repodir, ['tag1', 'tag2'], rev='1')
info = getRevInfo(self.repodir, '1')
self.assertEquals(['tag1', 'tag2'], info['tags'])
def testTagFailsIfExists(self):
run_cmd(['hg', 'tag', '-R', self.repodir, 'tagg'])
self.assertRaises(
subprocess.CalledProcessError, tag, self.repodir, 'tagg')
def testForcedTag(self):
run_cmd(['hg', 'tag', '-R', self.repodir, 'tag'])
tag(self.repodir, ['tag'], force=True)
self.assertTrue('tag' in getTags(self.repodir))
def testFailingBackend(self):
# Test that remote failures get retried
num_calls = [0]
def _my_get_hg_output(cmd, **kwargs):
num_calls[0] += 1
if cmd[0] == 'clone':
e = subprocess.CalledProcessError(-1, cmd)
e.output = "error: Name or service not known"
raise e
else:
return get_hg_output(cmd, **kwargs)
# Two retries are forced in setUp() above
with patch('util.hg.get_hg_output', new=_my_get_hg_output):
self.assertRaises(subprocess.CalledProcessError,
clone, "http://nxdomain.nxnx", self.wc)
self.assertEquals(num_calls, [2])
@patch("util.commands.get_output")
def testGetHgOutputAlwaysLogsException(self, get_output):
get_output.side_effect = subprocess.CalledProcessError(255, "kaboom!")
with patch("util.hg.log") as mocked_log:
try:
get_hg_output(["status"])
self.fail("CalledProcessError not raised")
except subprocess.CalledProcessError:
self.assertTrue(mocked_log.exception.called, "log.exception not called")
def testHasRev(self):
self.assertTrue(has_rev(self.repodir, self.revisions[0]))
self.assertFalse(has_rev(self.repodir, self.revisions[0] + 'g'))