| # This Source Code Form is subject to the terms of the Mozilla Public |
| # License, v. 2.0. If a copy of the MPL was not distributed with this |
| # file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| |
| from mozbuild.util import ensureParentDir |
| |
| from mozpack.errors import ( |
| ErrorMessage, |
| errors, |
| ) |
| from mozpack.files import ( |
| AbsoluteSymlinkFile, |
| ComposedFinder, |
| DeflatedFile, |
| Dest, |
| ExistingFile, |
| FileFinder, |
| File, |
| GeneratedFile, |
| JarFinder, |
| ManifestFile, |
| MercurialFile, |
| MercurialRevisionFinder, |
| MinifiedJavaScript, |
| MinifiedProperties, |
| PreprocessedFile, |
| XPTFile, |
| ) |
| |
| # We don't have hglib installed everywhere. |
| try: |
| import hglib |
| except ImportError: |
| hglib = None |
| |
| try: |
| from mozpack.hg import MercurialNativeRevisionFinder |
| except ImportError: |
| MercurialNativeRevisionFinder = None |
| |
| from mozpack.mozjar import ( |
| JarReader, |
| JarWriter, |
| ) |
| from mozpack.chrome.manifest import ( |
| ManifestContent, |
| ManifestResource, |
| ManifestLocale, |
| ManifestOverride, |
| ) |
| import unittest |
| import mozfile |
| import mozunit |
| import os |
| import random |
| import string |
| import sys |
| import mozpack.path as mozpath |
| from tempfile import mkdtemp |
| from io import BytesIO |
| from StringIO import StringIO |
| from xpt import Typelib |
| |
| |
| class TestWithTmpDir(unittest.TestCase): |
| def setUp(self): |
| self.tmpdir = mkdtemp() |
| |
| self.symlink_supported = False |
| |
| if not hasattr(os, 'symlink'): |
| return |
| |
| dummy_path = self.tmppath('dummy_file') |
| with open(dummy_path, 'a'): |
| pass |
| |
| try: |
| os.symlink(dummy_path, self.tmppath('dummy_symlink')) |
| os.remove(self.tmppath('dummy_symlink')) |
| except EnvironmentError: |
| pass |
| finally: |
| os.remove(dummy_path) |
| |
| self.symlink_supported = True |
| |
| |
| def tearDown(self): |
| mozfile.rmtree(self.tmpdir) |
| |
| def tmppath(self, relpath): |
| return os.path.normpath(os.path.join(self.tmpdir, relpath)) |
| |
| |
| class MockDest(BytesIO, Dest): |
| def __init__(self): |
| BytesIO.__init__(self) |
| self.mode = None |
| |
| def read(self, length=-1): |
| if self.mode != 'r': |
| self.seek(0) |
| self.mode = 'r' |
| return BytesIO.read(self, length) |
| |
| def write(self, data): |
| if self.mode != 'w': |
| self.seek(0) |
| self.truncate(0) |
| self.mode = 'w' |
| return BytesIO.write(self, data) |
| |
| def exists(self): |
| return True |
| |
| def close(self): |
| if self.mode: |
| self.mode = None |
| |
| |
| class DestNoWrite(Dest): |
| def write(self, data): |
| raise RuntimeError |
| |
| |
| class TestDest(TestWithTmpDir): |
| def test_dest(self): |
| dest = Dest(self.tmppath('dest')) |
| self.assertFalse(dest.exists()) |
| dest.write('foo') |
| self.assertTrue(dest.exists()) |
| dest.write('foo') |
| self.assertEqual(dest.read(4), 'foof') |
| self.assertEqual(dest.read(), 'oo') |
| self.assertEqual(dest.read(), '') |
| dest.write('bar') |
| self.assertEqual(dest.read(4), 'bar') |
| dest.close() |
| self.assertEqual(dest.read(), 'bar') |
| dest.write('foo') |
| dest.close() |
| dest.write('qux') |
| self.assertEqual(dest.read(), 'qux') |
| |
| rand = ''.join(random.choice(string.letters) for i in xrange(131597)) |
| samples = [ |
| '', |
| 'test', |
| 'fooo', |
| 'same', |
| 'same', |
| 'Different and longer', |
| rand, |
| rand, |
| rand[:-1] + '_', |
| 'test' |
| ] |
| |
| |
| class TestFile(TestWithTmpDir): |
| def test_file(self): |
| ''' |
| Check that File.copy yields the proper content in the destination file |
| in all situations that trigger different code paths: |
| - different content |
| - different content of the same size |
| - same content |
| - long content |
| ''' |
| src = self.tmppath('src') |
| dest = self.tmppath('dest') |
| |
| for content in samples: |
| with open(src, 'wb') as tmp: |
| tmp.write(content) |
| # Ensure the destination file, when it exists, is older than the |
| # source |
| if os.path.exists(dest): |
| time = os.path.getmtime(src) - 1 |
| os.utime(dest, (time, time)) |
| f = File(src) |
| f.copy(dest) |
| self.assertEqual(content, open(dest, 'rb').read()) |
| self.assertEqual(content, f.open().read()) |
| self.assertEqual(content, f.open().read()) |
| |
| def test_file_dest(self): |
| ''' |
| Similar to test_file, but for a destination object instead of |
| a destination file. This ensures the destination object is being |
| used properly by File.copy, ensuring that other subclasses of Dest |
| will work. |
| ''' |
| src = self.tmppath('src') |
| dest = MockDest() |
| |
| for content in samples: |
| with open(src, 'wb') as tmp: |
| tmp.write(content) |
| f = File(src) |
| f.copy(dest) |
| self.assertEqual(content, dest.getvalue()) |
| |
| def test_file_open(self): |
| ''' |
| Test whether File.open returns an appropriately reset file object. |
| ''' |
| src = self.tmppath('src') |
| content = ''.join(samples) |
| with open(src, 'wb') as tmp: |
| tmp.write(content) |
| |
| f = File(src) |
| self.assertEqual(content[:42], f.open().read(42)) |
| self.assertEqual(content, f.open().read()) |
| |
| def test_file_no_write(self): |
| ''' |
| Test various conditions where File.copy is expected not to write |
| in the destination file. |
| ''' |
| src = self.tmppath('src') |
| dest = self.tmppath('dest') |
| |
| with open(src, 'wb') as tmp: |
| tmp.write('test') |
| |
| # Initial copy |
| f = File(src) |
| f.copy(dest) |
| |
| # Ensure subsequent copies won't trigger writes |
| f.copy(DestNoWrite(dest)) |
| self.assertEqual('test', open(dest, 'rb').read()) |
| |
| # When the source file is newer, but with the same content, no copy |
| # should occur |
| time = os.path.getmtime(src) - 1 |
| os.utime(dest, (time, time)) |
| f.copy(DestNoWrite(dest)) |
| self.assertEqual('test', open(dest, 'rb').read()) |
| |
| # When the source file is older than the destination file, even with |
| # different content, no copy should occur. |
| with open(src, 'wb') as tmp: |
| tmp.write('fooo') |
| time = os.path.getmtime(dest) - 1 |
| os.utime(src, (time, time)) |
| f.copy(DestNoWrite(dest)) |
| self.assertEqual('test', open(dest, 'rb').read()) |
| |
| # Double check that under conditions where a copy occurs, we would get |
| # an exception. |
| time = os.path.getmtime(src) - 1 |
| os.utime(dest, (time, time)) |
| self.assertRaises(RuntimeError, f.copy, DestNoWrite(dest)) |
| |
| # skip_if_older=False is expected to force a copy in this situation. |
| f.copy(dest, skip_if_older=False) |
| self.assertEqual('fooo', open(dest, 'rb').read()) |
| |
| |
| class TestAbsoluteSymlinkFile(TestWithTmpDir): |
| def test_absolute_relative(self): |
| AbsoluteSymlinkFile('/foo') |
| |
| with self.assertRaisesRegexp(ValueError, 'Symlink target not absolute'): |
| AbsoluteSymlinkFile('./foo') |
| |
| def test_symlink_file(self): |
| source = self.tmppath('test_path') |
| with open(source, 'wt') as fh: |
| fh.write('Hello world') |
| |
| s = AbsoluteSymlinkFile(source) |
| dest = self.tmppath('symlink') |
| self.assertTrue(s.copy(dest)) |
| |
| if self.symlink_supported: |
| self.assertTrue(os.path.islink(dest)) |
| link = os.readlink(dest) |
| self.assertEqual(link, source) |
| else: |
| self.assertTrue(os.path.isfile(dest)) |
| content = open(dest).read() |
| self.assertEqual(content, 'Hello world') |
| |
| def test_replace_file_with_symlink(self): |
| # If symlinks are supported, an existing file should be replaced by a |
| # symlink. |
| source = self.tmppath('test_path') |
| with open(source, 'wt') as fh: |
| fh.write('source') |
| |
| dest = self.tmppath('dest') |
| with open(dest, 'a'): |
| pass |
| |
| s = AbsoluteSymlinkFile(source) |
| s.copy(dest, skip_if_older=False) |
| |
| if self.symlink_supported: |
| self.assertTrue(os.path.islink(dest)) |
| link = os.readlink(dest) |
| self.assertEqual(link, source) |
| else: |
| self.assertTrue(os.path.isfile(dest)) |
| content = open(dest).read() |
| self.assertEqual(content, 'source') |
| |
| def test_replace_symlink(self): |
| if not self.symlink_supported: |
| return |
| |
| source = self.tmppath('source') |
| with open(source, 'a'): |
| pass |
| |
| dest = self.tmppath('dest') |
| |
| os.symlink(self.tmppath('bad'), dest) |
| self.assertTrue(os.path.islink(dest)) |
| |
| s = AbsoluteSymlinkFile(source) |
| self.assertTrue(s.copy(dest)) |
| |
| self.assertTrue(os.path.islink(dest)) |
| link = os.readlink(dest) |
| self.assertEqual(link, source) |
| |
| def test_noop(self): |
| if not hasattr(os, 'symlink'): |
| return |
| |
| source = self.tmppath('source') |
| dest = self.tmppath('dest') |
| |
| with open(source, 'a'): |
| pass |
| |
| os.symlink(source, dest) |
| link = os.readlink(dest) |
| self.assertEqual(link, source) |
| |
| s = AbsoluteSymlinkFile(source) |
| self.assertFalse(s.copy(dest)) |
| |
| link = os.readlink(dest) |
| self.assertEqual(link, source) |
| |
| class TestPreprocessedFile(TestWithTmpDir): |
| def test_preprocess(self): |
| ''' |
| Test that copying the file invokes the preprocessor |
| ''' |
| src = self.tmppath('src') |
| dest = self.tmppath('dest') |
| |
| with open(src, 'wb') as tmp: |
| tmp.write('#ifdef FOO\ntest\n#endif') |
| |
| f = PreprocessedFile(src, depfile_path=None, marker='#', defines={'FOO': True}) |
| self.assertTrue(f.copy(dest)) |
| |
| self.assertEqual('test\n', open(dest, 'rb').read()) |
| |
| def test_preprocess_file_no_write(self): |
| ''' |
| Test various conditions where PreprocessedFile.copy is expected not to |
| write in the destination file. |
| ''' |
| src = self.tmppath('src') |
| dest = self.tmppath('dest') |
| depfile = self.tmppath('depfile') |
| |
| with open(src, 'wb') as tmp: |
| tmp.write('#ifdef FOO\ntest\n#endif') |
| |
| # Initial copy |
| f = PreprocessedFile(src, depfile_path=depfile, marker='#', defines={'FOO': True}) |
| self.assertTrue(f.copy(dest)) |
| |
| # Ensure subsequent copies won't trigger writes |
| self.assertFalse(f.copy(DestNoWrite(dest))) |
| self.assertEqual('test\n', open(dest, 'rb').read()) |
| |
| # When the source file is older than the destination file, even with |
| # different content, no copy should occur. |
| with open(src, 'wb') as tmp: |
| tmp.write('#ifdef FOO\nfooo\n#endif') |
| time = os.path.getmtime(dest) - 1 |
| os.utime(src, (time, time)) |
| self.assertFalse(f.copy(DestNoWrite(dest))) |
| self.assertEqual('test\n', open(dest, 'rb').read()) |
| |
| # skip_if_older=False is expected to force a copy in this situation. |
| self.assertTrue(f.copy(dest, skip_if_older=False)) |
| self.assertEqual('fooo\n', open(dest, 'rb').read()) |
| |
| def test_preprocess_file_dependencies(self): |
| ''' |
| Test that the preprocess runs if the dependencies of the source change |
| ''' |
| src = self.tmppath('src') |
| dest = self.tmppath('dest') |
| incl = self.tmppath('incl') |
| deps = self.tmppath('src.pp') |
| |
| with open(src, 'wb') as tmp: |
| tmp.write('#ifdef FOO\ntest\n#endif') |
| |
| with open(incl, 'wb') as tmp: |
| tmp.write('foo bar') |
| |
| # Initial copy |
| f = PreprocessedFile(src, depfile_path=deps, marker='#', defines={'FOO': True}) |
| self.assertTrue(f.copy(dest)) |
| |
| # Update the source so it #includes the include file. |
| with open(src, 'wb') as tmp: |
| tmp.write('#include incl\n') |
| time = os.path.getmtime(dest) + 1 |
| os.utime(src, (time, time)) |
| self.assertTrue(f.copy(dest)) |
| self.assertEqual('foo bar', open(dest, 'rb').read()) |
| |
| # If one of the dependencies changes, the file should be updated. The |
| # mtime of the dependency is set after the destination file, to avoid |
| # both files having the same time. |
| with open(incl, 'wb') as tmp: |
| tmp.write('quux') |
| time = os.path.getmtime(dest) + 1 |
| os.utime(incl, (time, time)) |
| self.assertTrue(f.copy(dest)) |
| self.assertEqual('quux', open(dest, 'rb').read()) |
| |
| # Perform one final copy to confirm that we don't run the preprocessor |
| # again. We update the mtime of the destination so it's newer than the |
| # input files. This would "just work" if we weren't changing |
| time = os.path.getmtime(incl) + 1 |
| os.utime(dest, (time, time)) |
| self.assertFalse(f.copy(DestNoWrite(dest))) |
| |
| def test_replace_symlink(self): |
| ''' |
| Test that if the destination exists, and is a symlink, the target of |
| the symlink is not overwritten by the preprocessor output. |
| ''' |
| if not self.symlink_supported: |
| return |
| |
| source = self.tmppath('source') |
| dest = self.tmppath('dest') |
| pp_source = self.tmppath('pp_in') |
| deps = self.tmppath('deps') |
| |
| with open(source, 'a'): |
| pass |
| |
| os.symlink(source, dest) |
| self.assertTrue(os.path.islink(dest)) |
| |
| with open(pp_source, 'wb') as tmp: |
| tmp.write('#define FOO\nPREPROCESSED') |
| |
| f = PreprocessedFile(pp_source, depfile_path=deps, marker='#', |
| defines={'FOO': True}) |
| self.assertTrue(f.copy(dest)) |
| |
| self.assertEqual('PREPROCESSED', open(dest, 'rb').read()) |
| self.assertFalse(os.path.islink(dest)) |
| self.assertEqual('', open(source, 'rb').read()) |
| |
| class TestExistingFile(TestWithTmpDir): |
| def test_required_missing_dest(self): |
| with self.assertRaisesRegexp(ErrorMessage, 'Required existing file'): |
| f = ExistingFile(required=True) |
| f.copy(self.tmppath('dest')) |
| |
| def test_required_existing_dest(self): |
| p = self.tmppath('dest') |
| with open(p, 'a'): |
| pass |
| |
| f = ExistingFile(required=True) |
| f.copy(p) |
| |
| def test_optional_missing_dest(self): |
| f = ExistingFile(required=False) |
| f.copy(self.tmppath('dest')) |
| |
| def test_optional_existing_dest(self): |
| p = self.tmppath('dest') |
| with open(p, 'a'): |
| pass |
| |
| f = ExistingFile(required=False) |
| f.copy(p) |
| |
| |
| class TestGeneratedFile(TestWithTmpDir): |
| def test_generated_file(self): |
| ''' |
| Check that GeneratedFile.copy yields the proper content in the |
| destination file in all situations that trigger different code paths |
| (see TestFile.test_file) |
| ''' |
| dest = self.tmppath('dest') |
| |
| for content in samples: |
| f = GeneratedFile(content) |
| f.copy(dest) |
| self.assertEqual(content, open(dest, 'rb').read()) |
| |
| def test_generated_file_open(self): |
| ''' |
| Test whether GeneratedFile.open returns an appropriately reset file |
| object. |
| ''' |
| content = ''.join(samples) |
| f = GeneratedFile(content) |
| self.assertEqual(content[:42], f.open().read(42)) |
| self.assertEqual(content, f.open().read()) |
| |
| def test_generated_file_no_write(self): |
| ''' |
| Test various conditions where GeneratedFile.copy is expected not to |
| write in the destination file. |
| ''' |
| dest = self.tmppath('dest') |
| |
| # Initial copy |
| f = GeneratedFile('test') |
| f.copy(dest) |
| |
| # Ensure subsequent copies won't trigger writes |
| f.copy(DestNoWrite(dest)) |
| self.assertEqual('test', open(dest, 'rb').read()) |
| |
| # When using a new instance with the same content, no copy should occur |
| f = GeneratedFile('test') |
| f.copy(DestNoWrite(dest)) |
| self.assertEqual('test', open(dest, 'rb').read()) |
| |
| # Double check that under conditions where a copy occurs, we would get |
| # an exception. |
| f = GeneratedFile('fooo') |
| self.assertRaises(RuntimeError, f.copy, DestNoWrite(dest)) |
| |
| |
| class TestDeflatedFile(TestWithTmpDir): |
| def test_deflated_file(self): |
| ''' |
| Check that DeflatedFile.copy yields the proper content in the |
| destination file in all situations that trigger different code paths |
| (see TestFile.test_file) |
| ''' |
| src = self.tmppath('src.jar') |
| dest = self.tmppath('dest') |
| |
| contents = {} |
| with JarWriter(src) as jar: |
| for content in samples: |
| name = ''.join(random.choice(string.letters) |
| for i in xrange(8)) |
| jar.add(name, content, compress=True) |
| contents[name] = content |
| |
| for j in JarReader(src): |
| f = DeflatedFile(j) |
| f.copy(dest) |
| self.assertEqual(contents[j.filename], open(dest, 'rb').read()) |
| |
| def test_deflated_file_open(self): |
| ''' |
| Test whether DeflatedFile.open returns an appropriately reset file |
| object. |
| ''' |
| src = self.tmppath('src.jar') |
| content = ''.join(samples) |
| with JarWriter(src) as jar: |
| jar.add('content', content) |
| |
| f = DeflatedFile(JarReader(src)['content']) |
| self.assertEqual(content[:42], f.open().read(42)) |
| self.assertEqual(content, f.open().read()) |
| |
| def test_deflated_file_no_write(self): |
| ''' |
| Test various conditions where DeflatedFile.copy is expected not to |
| write in the destination file. |
| ''' |
| src = self.tmppath('src.jar') |
| dest = self.tmppath('dest') |
| |
| with JarWriter(src) as jar: |
| jar.add('test', 'test') |
| jar.add('test2', 'test') |
| jar.add('fooo', 'fooo') |
| |
| jar = JarReader(src) |
| # Initial copy |
| f = DeflatedFile(jar['test']) |
| f.copy(dest) |
| |
| # Ensure subsequent copies won't trigger writes |
| f.copy(DestNoWrite(dest)) |
| self.assertEqual('test', open(dest, 'rb').read()) |
| |
| # When using a different file with the same content, no copy should |
| # occur |
| f = DeflatedFile(jar['test2']) |
| f.copy(DestNoWrite(dest)) |
| self.assertEqual('test', open(dest, 'rb').read()) |
| |
| # Double check that under conditions where a copy occurs, we would get |
| # an exception. |
| f = DeflatedFile(jar['fooo']) |
| self.assertRaises(RuntimeError, f.copy, DestNoWrite(dest)) |
| |
| |
| class TestManifestFile(TestWithTmpDir): |
| def test_manifest_file(self): |
| f = ManifestFile('chrome') |
| f.add(ManifestContent('chrome', 'global', 'toolkit/content/global/')) |
| f.add(ManifestResource('chrome', 'gre-resources', 'toolkit/res/')) |
| f.add(ManifestResource('chrome/pdfjs', 'pdfjs', './')) |
| f.add(ManifestContent('chrome/pdfjs', 'pdfjs', 'pdfjs')) |
| f.add(ManifestLocale('chrome', 'browser', 'en-US', |
| 'en-US/locale/browser/')) |
| |
| f.copy(self.tmppath('chrome.manifest')) |
| self.assertEqual(open(self.tmppath('chrome.manifest')).readlines(), [ |
| 'content global toolkit/content/global/\n', |
| 'resource gre-resources toolkit/res/\n', |
| 'resource pdfjs pdfjs/\n', |
| 'content pdfjs pdfjs/pdfjs\n', |
| 'locale browser en-US en-US/locale/browser/\n', |
| ]) |
| |
| self.assertRaises( |
| ValueError, |
| f.remove, |
| ManifestContent('', 'global', 'toolkit/content/global/') |
| ) |
| self.assertRaises( |
| ValueError, |
| f.remove, |
| ManifestOverride('chrome', 'chrome://global/locale/netError.dtd', |
| 'chrome://browser/locale/netError.dtd') |
| ) |
| |
| f.remove(ManifestContent('chrome', 'global', |
| 'toolkit/content/global/')) |
| self.assertRaises( |
| ValueError, |
| f.remove, |
| ManifestContent('chrome', 'global', 'toolkit/content/global/') |
| ) |
| |
| f.copy(self.tmppath('chrome.manifest')) |
| content = open(self.tmppath('chrome.manifest')).read() |
| self.assertEqual(content[:42], f.open().read(42)) |
| self.assertEqual(content, f.open().read()) |
| |
| # Compiled typelib for the following IDL: |
| # interface foo; |
| # [scriptable, uuid(5f70da76-519c-4858-b71e-e3c92333e2d6)] |
| # interface bar { |
| # void bar(in foo f); |
| # }; |
| # We need to make this [scriptable] so it doesn't get deleted from the |
| # typelib. We don't need to make the foo interfaces below [scriptable], |
| # because they will be automatically included by virtue of being an |
| # argument to a method of |bar|. |
| bar_xpt = GeneratedFile( |
| b'\x58\x50\x43\x4F\x4D\x0A\x54\x79\x70\x65\x4C\x69\x62\x0D\x0A\x1A' + |
| b'\x01\x02\x00\x02\x00\x00\x00\x7B\x00\x00\x00\x24\x00\x00\x00\x5C' + |
| b'\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + |
| b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x5F' + |
| b'\x70\xDA\x76\x51\x9C\x48\x58\xB7\x1E\xE3\xC9\x23\x33\xE2\xD6\x00' + |
| b'\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x0D\x00\x66\x6F\x6F\x00' + |
| b'\x62\x61\x72\x00\x62\x61\x72\x00\x00\x00\x00\x01\x00\x00\x00\x00' + |
| b'\x09\x01\x80\x92\x00\x01\x80\x06\x00\x00\x80' |
| ) |
| |
| # Compiled typelib for the following IDL: |
| # [uuid(3271bebc-927e-4bef-935e-44e0aaf3c1e5)] |
| # interface foo { |
| # void foo(); |
| # }; |
| foo_xpt = GeneratedFile( |
| b'\x58\x50\x43\x4F\x4D\x0A\x54\x79\x70\x65\x4C\x69\x62\x0D\x0A\x1A' + |
| b'\x01\x02\x00\x01\x00\x00\x00\x57\x00\x00\x00\x24\x00\x00\x00\x40' + |
| b'\x80\x00\x00\x32\x71\xBE\xBC\x92\x7E\x4B\xEF\x93\x5E\x44\xE0\xAA' + |
| b'\xF3\xC1\xE5\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x09\x00' + |
| b'\x66\x6F\x6F\x00\x66\x6F\x6F\x00\x00\x00\x00\x01\x00\x00\x00\x00' + |
| b'\x05\x00\x80\x06\x00\x00\x00' |
| ) |
| |
| # Compiled typelib for the following IDL: |
| # [uuid(7057f2aa-fdc2-4559-abde-08d939f7e80d)] |
| # interface foo { |
| # void foo(); |
| # }; |
| foo2_xpt = GeneratedFile( |
| b'\x58\x50\x43\x4F\x4D\x0A\x54\x79\x70\x65\x4C\x69\x62\x0D\x0A\x1A' + |
| b'\x01\x02\x00\x01\x00\x00\x00\x57\x00\x00\x00\x24\x00\x00\x00\x40' + |
| b'\x80\x00\x00\x70\x57\xF2\xAA\xFD\xC2\x45\x59\xAB\xDE\x08\xD9\x39' + |
| b'\xF7\xE8\x0D\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x09\x00' + |
| b'\x66\x6F\x6F\x00\x66\x6F\x6F\x00\x00\x00\x00\x01\x00\x00\x00\x00' + |
| b'\x05\x00\x80\x06\x00\x00\x00' |
| ) |
| |
| |
| def read_interfaces(file): |
| return dict((i.name, i) for i in Typelib.read(file).interfaces) |
| |
| |
| class TestXPTFile(TestWithTmpDir): |
| def test_xpt_file(self): |
| x = XPTFile() |
| x.add(foo_xpt) |
| x.add(bar_xpt) |
| x.copy(self.tmppath('interfaces.xpt')) |
| |
| foo = read_interfaces(foo_xpt.open()) |
| foo2 = read_interfaces(foo2_xpt.open()) |
| bar = read_interfaces(bar_xpt.open()) |
| linked = read_interfaces(self.tmppath('interfaces.xpt')) |
| self.assertEqual(foo['foo'], linked['foo']) |
| self.assertEqual(bar['bar'], linked['bar']) |
| |
| x.remove(foo_xpt) |
| x.copy(self.tmppath('interfaces2.xpt')) |
| linked = read_interfaces(self.tmppath('interfaces2.xpt')) |
| self.assertEqual(bar['foo'], linked['foo']) |
| self.assertEqual(bar['bar'], linked['bar']) |
| |
| x.add(foo_xpt) |
| x.copy(DestNoWrite(self.tmppath('interfaces.xpt'))) |
| linked = read_interfaces(self.tmppath('interfaces.xpt')) |
| self.assertEqual(foo['foo'], linked['foo']) |
| self.assertEqual(bar['bar'], linked['bar']) |
| |
| x = XPTFile() |
| x.add(foo2_xpt) |
| x.add(bar_xpt) |
| x.copy(self.tmppath('interfaces.xpt')) |
| linked = read_interfaces(self.tmppath('interfaces.xpt')) |
| self.assertEqual(foo2['foo'], linked['foo']) |
| self.assertEqual(bar['bar'], linked['bar']) |
| |
| x = XPTFile() |
| x.add(foo_xpt) |
| x.add(foo2_xpt) |
| x.add(bar_xpt) |
| from xpt import DataError |
| self.assertRaises(DataError, x.copy, self.tmppath('interfaces.xpt')) |
| |
| |
| class TestMinifiedProperties(TestWithTmpDir): |
| def test_minified_properties(self): |
| propLines = [ |
| '# Comments are removed', |
| 'foo = bar', |
| '', |
| '# Another comment', |
| ] |
| prop = GeneratedFile('\n'.join(propLines)) |
| self.assertEqual(MinifiedProperties(prop).open().readlines(), |
| ['foo = bar\n', '\n']) |
| open(self.tmppath('prop'), 'wb').write('\n'.join(propLines)) |
| MinifiedProperties(File(self.tmppath('prop'))) \ |
| .copy(self.tmppath('prop2')) |
| self.assertEqual(open(self.tmppath('prop2')).readlines(), |
| ['foo = bar\n', '\n']) |
| |
| |
| class TestMinifiedJavaScript(TestWithTmpDir): |
| orig_lines = [ |
| '// Comment line', |
| 'let foo = "bar";', |
| 'var bar = true;', |
| '', |
| '// Another comment', |
| ] |
| |
| def test_minified_javascript(self): |
| orig_f = GeneratedFile('\n'.join(self.orig_lines)) |
| min_f = MinifiedJavaScript(orig_f) |
| |
| mini_lines = min_f.open().readlines() |
| self.assertTrue(mini_lines) |
| self.assertTrue(len(mini_lines) < len(self.orig_lines)) |
| |
| def _verify_command(self, code): |
| our_dir = os.path.abspath(os.path.dirname(__file__)) |
| return [ |
| sys.executable, |
| os.path.join(our_dir, 'support', 'minify_js_verify.py'), |
| code, |
| ] |
| |
| def test_minified_verify_success(self): |
| orig_f = GeneratedFile('\n'.join(self.orig_lines)) |
| min_f = MinifiedJavaScript(orig_f, |
| verify_command=self._verify_command('0')) |
| |
| mini_lines = min_f.open().readlines() |
| self.assertTrue(mini_lines) |
| self.assertTrue(len(mini_lines) < len(self.orig_lines)) |
| |
| def test_minified_verify_failure(self): |
| orig_f = GeneratedFile('\n'.join(self.orig_lines)) |
| errors.out = StringIO() |
| min_f = MinifiedJavaScript(orig_f, |
| verify_command=self._verify_command('1')) |
| |
| mini_lines = min_f.open().readlines() |
| output = errors.out.getvalue() |
| errors.out = sys.stderr |
| self.assertEqual(output, |
| 'Warning: JS minification verification failed for <unknown>:\n' |
| 'Warning: Error message\n') |
| self.assertEqual(mini_lines, orig_f.open().readlines()) |
| |
| |
| class MatchTestTemplate(object): |
| def prepare_match_test(self, with_dotfiles=False): |
| self.add('bar') |
| self.add('foo/bar') |
| self.add('foo/baz') |
| self.add('foo/qux/1') |
| self.add('foo/qux/bar') |
| self.add('foo/qux/2/test') |
| self.add('foo/qux/2/test2') |
| if with_dotfiles: |
| self.add('foo/.foo') |
| self.add('foo/.bar/foo') |
| |
| def do_match_test(self): |
| self.do_check('', [ |
| 'bar', 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', |
| 'foo/qux/2/test', 'foo/qux/2/test2' |
| ]) |
| self.do_check('*', [ |
| 'bar', 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', |
| 'foo/qux/2/test', 'foo/qux/2/test2' |
| ]) |
| self.do_check('foo/qux', [ |
| 'foo/qux/1', 'foo/qux/bar', 'foo/qux/2/test', 'foo/qux/2/test2' |
| ]) |
| self.do_check('foo/b*', ['foo/bar', 'foo/baz']) |
| self.do_check('baz', []) |
| self.do_check('foo/foo', []) |
| self.do_check('foo/*ar', ['foo/bar']) |
| self.do_check('*ar', ['bar']) |
| self.do_check('*/bar', ['foo/bar']) |
| self.do_check('foo/*ux', [ |
| 'foo/qux/1', 'foo/qux/bar', 'foo/qux/2/test', 'foo/qux/2/test2' |
| ]) |
| self.do_check('foo/q*ux', [ |
| 'foo/qux/1', 'foo/qux/bar', 'foo/qux/2/test', 'foo/qux/2/test2' |
| ]) |
| self.do_check('foo/*/2/test*', ['foo/qux/2/test', 'foo/qux/2/test2']) |
| self.do_check('**/bar', ['bar', 'foo/bar', 'foo/qux/bar']) |
| self.do_check('foo/**/test', ['foo/qux/2/test']) |
| self.do_check('foo', [ |
| 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', |
| 'foo/qux/2/test', 'foo/qux/2/test2' |
| ]) |
| self.do_check('foo/**', [ |
| 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', |
| 'foo/qux/2/test', 'foo/qux/2/test2' |
| ]) |
| self.do_check('**/2/test*', ['foo/qux/2/test', 'foo/qux/2/test2']) |
| self.do_check('**/foo', [ |
| 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', |
| 'foo/qux/2/test', 'foo/qux/2/test2' |
| ]) |
| self.do_check('**/barbaz', []) |
| self.do_check('f**/bar', ['foo/bar']) |
| |
| def do_finder_test(self, finder): |
| self.assertTrue(finder.contains('foo/.foo')) |
| self.assertTrue(finder.contains('foo/.bar')) |
| self.assertTrue('foo/.foo' in [f for f, c in |
| finder.find('foo/.foo')]) |
| self.assertTrue('foo/.bar/foo' in [f for f, c in |
| finder.find('foo/.bar')]) |
| self.assertEqual(sorted([f for f, c in finder.find('foo/.*')]), |
| ['foo/.bar/foo', 'foo/.foo']) |
| for pattern in ['foo', '**', '**/*', '**/foo', 'foo/*']: |
| self.assertFalse('foo/.foo' in [f for f, c in |
| finder.find(pattern)]) |
| self.assertFalse('foo/.bar/foo' in [f for f, c in |
| finder.find(pattern)]) |
| self.assertEqual(sorted([f for f, c in finder.find(pattern)]), |
| sorted([f for f, c in finder |
| if mozpath.match(f, pattern)])) |
| |
| |
| def do_check(test, finder, pattern, result): |
| if result: |
| test.assertTrue(finder.contains(pattern)) |
| else: |
| test.assertFalse(finder.contains(pattern)) |
| test.assertEqual(sorted(list(f for f, c in finder.find(pattern))), |
| sorted(result)) |
| |
| |
| class TestFileFinder(MatchTestTemplate, TestWithTmpDir): |
| def add(self, path): |
| ensureParentDir(self.tmppath(path)) |
| open(self.tmppath(path), 'wb').write(path) |
| |
| def do_check(self, pattern, result): |
| do_check(self, self.finder, pattern, result) |
| |
| def test_file_finder(self): |
| self.prepare_match_test(with_dotfiles=True) |
| self.finder = FileFinder(self.tmpdir) |
| self.do_match_test() |
| self.do_finder_test(self.finder) |
| |
| def test_get(self): |
| self.prepare_match_test() |
| finder = FileFinder(self.tmpdir) |
| |
| self.assertIsNone(finder.get('does-not-exist')) |
| res = finder.get('bar') |
| self.assertIsInstance(res, File) |
| self.assertEqual(mozpath.normpath(res.path), |
| mozpath.join(self.tmpdir, 'bar')) |
| |
| def test_ignored_dirs(self): |
| """Ignored directories should not have results returned.""" |
| self.prepare_match_test() |
| self.add('fooz') |
| |
| # Present to ensure prefix matching doesn't exclude. |
| self.add('foo/quxz') |
| |
| self.finder = FileFinder(self.tmpdir, ignore=['foo/qux']) |
| |
| self.do_check('**', ['bar', 'foo/bar', 'foo/baz', 'foo/quxz', 'fooz']) |
| self.do_check('foo/*', ['foo/bar', 'foo/baz', 'foo/quxz']) |
| self.do_check('foo/**', ['foo/bar', 'foo/baz', 'foo/quxz']) |
| self.do_check('foo/qux/**', []) |
| self.do_check('foo/qux/*', []) |
| self.do_check('foo/qux/bar', []) |
| self.do_check('foo/quxz', ['foo/quxz']) |
| self.do_check('fooz', ['fooz']) |
| |
| def test_ignored_files(self): |
| """Ignored files should not have results returned.""" |
| self.prepare_match_test() |
| |
| # Be sure prefix match doesn't get ignored. |
| self.add('barz') |
| |
| self.finder = FileFinder(self.tmpdir, ignore=['foo/bar', 'bar']) |
| self.do_check('**', ['barz', 'foo/baz', 'foo/qux/1', 'foo/qux/2/test', |
| 'foo/qux/2/test2', 'foo/qux/bar']) |
| self.do_check('foo/**', ['foo/baz', 'foo/qux/1', 'foo/qux/2/test', |
| 'foo/qux/2/test2', 'foo/qux/bar']) |
| |
| def test_ignored_patterns(self): |
| """Ignore entries with patterns should be honored.""" |
| self.prepare_match_test() |
| |
| self.add('foo/quxz') |
| |
| self.finder = FileFinder(self.tmpdir, ignore=['foo/qux/*']) |
| self.do_check('**', ['foo/bar', 'foo/baz', 'foo/quxz', 'bar']) |
| self.do_check('foo/**', ['foo/bar', 'foo/baz', 'foo/quxz']) |
| |
| def test_dotfiles(self): |
| """Finder can find files beginning with . is configured.""" |
| self.prepare_match_test(with_dotfiles=True) |
| self.finder = FileFinder(self.tmpdir, find_dotfiles=True) |
| self.do_check('**', ['bar', 'foo/.foo', 'foo/.bar/foo', |
| 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', |
| 'foo/qux/2/test', 'foo/qux/2/test2']) |
| |
| def test_dotfiles_plus_ignore(self): |
| self.prepare_match_test(with_dotfiles=True) |
| self.finder = FileFinder(self.tmpdir, find_dotfiles=True, |
| ignore=['foo/.bar/**']) |
| self.do_check('foo/**', ['foo/.foo', 'foo/bar', 'foo/baz', |
| 'foo/qux/1', 'foo/qux/bar', 'foo/qux/2/test', 'foo/qux/2/test2']) |
| |
| |
| class TestJarFinder(MatchTestTemplate, TestWithTmpDir): |
| def add(self, path): |
| self.jar.add(path, path, compress=True) |
| |
| def do_check(self, pattern, result): |
| do_check(self, self.finder, pattern, result) |
| |
| def test_jar_finder(self): |
| self.jar = JarWriter(file=self.tmppath('test.jar')) |
| self.prepare_match_test() |
| self.jar.finish() |
| reader = JarReader(file=self.tmppath('test.jar')) |
| self.finder = JarFinder(self.tmppath('test.jar'), reader) |
| self.do_match_test() |
| |
| self.assertIsNone(self.finder.get('does-not-exist')) |
| self.assertIsInstance(self.finder.get('bar'), DeflatedFile) |
| |
| |
| class TestComposedFinder(MatchTestTemplate, TestWithTmpDir): |
| def add(self, path, content=None): |
| # Put foo/qux files under $tmp/b. |
| if path.startswith('foo/qux/'): |
| real_path = mozpath.join('b', path[8:]) |
| else: |
| real_path = mozpath.join('a', path) |
| ensureParentDir(self.tmppath(real_path)) |
| if not content: |
| content = path |
| open(self.tmppath(real_path), 'wb').write(content) |
| |
| def do_check(self, pattern, result): |
| if '*' in pattern: |
| return |
| do_check(self, self.finder, pattern, result) |
| |
| def test_composed_finder(self): |
| self.prepare_match_test() |
| # Also add files in $tmp/a/foo/qux because ComposedFinder is |
| # expected to mask foo/qux entirely with content from $tmp/b. |
| ensureParentDir(self.tmppath('a/foo/qux/hoge')) |
| open(self.tmppath('a/foo/qux/hoge'), 'wb').write('hoge') |
| open(self.tmppath('a/foo/qux/bar'), 'wb').write('not the right content') |
| self.finder = ComposedFinder({ |
| '': FileFinder(self.tmppath('a')), |
| 'foo/qux': FileFinder(self.tmppath('b')), |
| }) |
| self.do_match_test() |
| |
| self.assertIsNone(self.finder.get('does-not-exist')) |
| self.assertIsInstance(self.finder.get('bar'), File) |
| |
| |
| @unittest.skipUnless(hglib, 'hglib not available') |
| class TestMercurialRevisionFinder(MatchTestTemplate, TestWithTmpDir): |
| def setUp(self): |
| super(TestMercurialRevisionFinder, self).setUp() |
| hglib.init(self.tmpdir) |
| |
| def add(self, path): |
| c = hglib.open(self.tmpdir) |
| ensureParentDir(self.tmppath(path)) |
| with open(self.tmppath(path), 'wb') as fh: |
| fh.write(path) |
| c.add(self.tmppath(path)) |
| |
| def do_check(self, pattern, result): |
| do_check(self, self.finder, pattern, result) |
| |
| def _get_finder(self, *args, **kwargs): |
| return MercurialRevisionFinder(*args, **kwargs) |
| |
| def test_default_revision(self): |
| self.prepare_match_test() |
| c = hglib.open(self.tmpdir) |
| c.commit('initial commit') |
| self.finder = self._get_finder(self.tmpdir) |
| self.do_match_test() |
| |
| self.assertIsNone(self.finder.get('does-not-exist')) |
| self.assertIsInstance(self.finder.get('bar'), MercurialFile) |
| |
| def test_old_revision(self): |
| c = hglib.open(self.tmpdir) |
| with open(self.tmppath('foo'), 'wb') as fh: |
| fh.write('foo initial') |
| c.add(self.tmppath('foo')) |
| c.commit('initial') |
| |
| with open(self.tmppath('foo'), 'wb') as fh: |
| fh.write('foo second') |
| with open(self.tmppath('bar'), 'wb') as fh: |
| fh.write('bar second') |
| c.add(self.tmppath('bar')) |
| c.commit('second') |
| # This wipes out the working directory, ensuring the finder isn't |
| # finding anything from the filesystem. |
| c.rawcommand(['update', 'null']) |
| |
| finder = self._get_finder(self.tmpdir, 0) |
| f = finder.get('foo') |
| self.assertEqual(f.read(), 'foo initial') |
| self.assertEqual(f.read(), 'foo initial', 'read again for good measure') |
| self.assertIsNone(finder.get('bar')) |
| |
| finder = MercurialRevisionFinder(self.tmpdir, rev=1) |
| f = finder.get('foo') |
| self.assertEqual(f.read(), 'foo second') |
| f = finder.get('bar') |
| self.assertEqual(f.read(), 'bar second') |
| |
| def test_recognize_repo_paths(self): |
| c = hglib.open(self.tmpdir) |
| with open(self.tmppath('foo'), 'wb') as fh: |
| fh.write('initial') |
| c.add(self.tmppath('foo')) |
| c.commit('initial') |
| c.rawcommand(['update', 'null']) |
| |
| finder = self._get_finder(self.tmpdir, 0, |
| recognize_repo_paths=True) |
| with self.assertRaises(NotImplementedError): |
| list(finder.find('')) |
| |
| with self.assertRaises(ValueError): |
| finder.get('foo') |
| with self.assertRaises(ValueError): |
| finder.get('') |
| |
| f = finder.get(self.tmppath('foo')) |
| self.assertIsInstance(f, MercurialFile) |
| self.assertEqual(f.read(), 'initial') |
| |
| |
| @unittest.skipUnless(MercurialNativeRevisionFinder, 'hgnative not available') |
| class TestMercurialNativeRevisionFinder(TestMercurialRevisionFinder): |
| def _get_finder(self, *args, **kwargs): |
| return MercurialNativeRevisionFinder(*args, **kwargs) |
| |
| |
| if __name__ == '__main__': |
| mozunit.main() |