blob: 6ab29f2c47b4199912e37bad08a52a64b4a87119 [file] [log] [blame]
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from mozpack.copier import (
FileCopier,
FilePurger,
FileRegistry,
Jarrer,
)
from mozpack.files import GeneratedFile
from mozpack.mozjar import JarReader
import mozpack.path
import unittest
import mozunit
import os
import shutil
from mozpack.errors import ErrorMessage
from tempfile import mkdtemp
from mozpack.test.test_files import (
MockDest,
MatchTestTemplate,
TestWithTmpDir,
)
class TestFileRegistry(MatchTestTemplate, unittest.TestCase):
def add(self, path):
self.registry.add(path, GeneratedFile(path))
def do_check(self, pattern, result):
self.checked = True
if result:
self.assertTrue(self.registry.contains(pattern))
else:
self.assertFalse(self.registry.contains(pattern))
self.assertEqual(self.registry.match(pattern), result)
def test_file_registry(self):
self.registry = FileRegistry()
self.registry.add('foo', GeneratedFile('foo'))
bar = GeneratedFile('bar')
self.registry.add('bar', bar)
self.assertEqual(self.registry.paths(), ['foo', 'bar'])
self.assertEqual(self.registry['bar'], bar)
self.assertRaises(ErrorMessage, self.registry.add, 'foo',
GeneratedFile('foo2'))
self.assertRaises(ErrorMessage, self.registry.remove, 'qux')
self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
GeneratedFile('foobar'))
self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar/baz',
GeneratedFile('foobar'))
self.assertEqual(self.registry.paths(), ['foo', 'bar'])
self.registry.remove('foo')
self.assertEqual(self.registry.paths(), ['bar'])
self.registry.remove('bar')
self.assertEqual(self.registry.paths(), [])
self.prepare_match_test()
self.do_match_test()
self.assertTrue(self.checked)
self.assertEqual(self.registry.paths(), [
'bar',
'foo/bar',
'foo/baz',
'foo/qux/1',
'foo/qux/bar',
'foo/qux/2/test',
'foo/qux/2/test2',
])
self.registry.remove('foo/qux')
self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz'])
self.registry.add('foo/qux', GeneratedFile('fooqux'))
self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz',
'foo/qux'])
self.registry.remove('foo/b*')
self.assertEqual(self.registry.paths(), ['bar', 'foo/qux'])
self.assertEqual([f for f, c in self.registry], ['bar', 'foo/qux'])
self.assertEqual(len(self.registry), 2)
self.add('foo/.foo')
self.assertTrue(self.registry.contains('foo/.foo'))
class TestFileCopier(unittest.TestCase):
def setUp(self):
self.tmpdir = mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def all_dirs(self, base):
all_dirs = set()
for root, dirs, files in os.walk(base):
if not dirs:
all_dirs.add(mozpack.path.relpath(root, base))
return all_dirs
def all_files(self, base):
all_files = set()
for root, dirs, files in os.walk(base):
for f in files:
all_files.add(
mozpack.path.join(mozpack.path.relpath(root, base), f))
return all_files
def test_file_copier(self):
copier = FileCopier()
copier.add('foo/bar', GeneratedFile('foobar'))
copier.add('foo/qux', GeneratedFile('fooqux'))
copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
copier.add('bar', GeneratedFile('bar'))
copier.add('qux/foo', GeneratedFile('quxfoo'))
copier.add('qux/bar', GeneratedFile(''))
copier.copy(self.tmpdir)
self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
self.assertEqual(self.all_dirs(self.tmpdir),
set(['foo/deep/nested/directory', 'qux']))
copier.remove('foo')
copier.add('test', GeneratedFile('test'))
copier.copy(self.tmpdir)
self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
self.assertEqual(self.all_dirs(self.tmpdir), set(['qux']))
class TestFilePurger(TestWithTmpDir):
def test_file_purger(self):
existing = os.path.join(self.tmpdir, 'existing')
extra = os.path.join(self.tmpdir, 'extra')
empty_dir = os.path.join(self.tmpdir, 'dir')
with open(existing, 'a'):
pass
with open(extra, 'a'):
pass
os.mkdir(empty_dir)
with open(os.path.join(empty_dir, 'foo'), 'a'):
pass
self.assertTrue(os.path.exists(existing))
self.assertTrue(os.path.exists(extra))
purger = FilePurger()
purger.add('existing')
result = purger.purge(self.tmpdir)
self.assertEqual(result.removed_files_count, 2)
self.assertEqual(result.removed_directories_count, 1)
self.assertTrue(os.path.exists(existing))
self.assertFalse(os.path.exists(extra))
self.assertFalse(os.path.exists(empty_dir))
class TestJarrer(unittest.TestCase):
def check_jar(self, dest, copier):
jar = JarReader(fileobj=dest)
self.assertEqual([f.filename for f in jar], copier.paths())
for f in jar:
self.assertEqual(f.uncompressed_data.read(),
copier[f.filename].content)
def test_jarrer(self):
copier = Jarrer()
copier.add('foo/bar', GeneratedFile('foobar'))
copier.add('foo/qux', GeneratedFile('fooqux'))
copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
copier.add('bar', GeneratedFile('bar'))
copier.add('qux/foo', GeneratedFile('quxfoo'))
copier.add('qux/bar', GeneratedFile(''))
dest = MockDest()
copier.copy(dest)
self.check_jar(dest, copier)
copier.remove('foo')
copier.add('test', GeneratedFile('test'))
copier.copy(dest)
self.check_jar(dest, copier)
copier.remove('test')
copier.add('test', GeneratedFile('replaced-content'))
copier.copy(dest)
self.check_jar(dest, copier)
copier.copy(dest)
self.check_jar(dest, copier)
preloaded = ['qux/bar', 'bar']
copier.preload(preloaded)
copier.copy(dest)
dest.seek(0)
jar = JarReader(fileobj=dest)
self.assertEqual([f.filename for f in jar], preloaded +
[p for p in copier.paths() if not p in preloaded])
self.assertEqual(jar.last_preloaded, preloaded[-1])
if __name__ == '__main__':
mozunit.main()