| #!/usr/bin/env python |
| # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unit tests for upload_to_google_storage.py.""" |
| |
| import optparse |
| import os |
| import Queue |
| import shutil |
| import StringIO |
| import sys |
| import tempfile |
| import threading |
| import unittest |
| |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
| import upload_to_google_storage |
| from download_from_google_storage_unittests import GsutilMock |
| |
| # ../third_party/gsutil/gsutil |
| GSUTIL_DEFAULT_PATH = os.path.join( |
| os.path.dirname(os.path.dirname(os.path.abspath(__file__))), |
| 'third_party', 'gsutil', 'gsutil') |
| TEST_DIR = os.path.dirname(os.path.abspath(__file__)) |
| |
| |
| class UploadTests(unittest.TestCase): |
| def setUp(self): |
| self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH, None) |
| self.temp_dir = tempfile.mkdtemp(prefix='gstools_test') |
| self.base_path = os.path.join(self.temp_dir, 'gstools') |
| shutil.copytree(os.path.join(TEST_DIR, 'gstools'), self.base_path) |
| self.base_url = 'gs://sometesturl' |
| self.parser = optparse.OptionParser() |
| self.ret_codes = Queue.Queue() |
| self.stdout_queue = Queue.Queue() |
| self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt') |
| self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f' |
| |
| def cleanUp(self): |
| shutil.rmtree(self.temp_dir) |
| sys.stdin = sys.__stdin__ |
| |
| def test_upload_single_file(self): |
| filenames = [self.lorem_ipsum] |
| output_filename = '%s.sha1' % self.lorem_ipsum |
| code = upload_to_google_storage.upload_to_google_storage( |
| filenames, self.base_url, self.gsutil, True, False, 1, False) |
| self.assertEqual( |
| self.gsutil.history, |
| [('check_call', |
| ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))), |
| ('check_call', |
| ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, |
| self.lorem_ipsum_sha1)))]) |
| self.assertTrue(os.path.exists(output_filename)) |
| self.assertEqual( |
| open(output_filename, 'rb').read(), |
| '7871c8e24da15bad8b0be2c36edc9dc77e37727f') |
| os.remove(output_filename) |
| self.assertEqual(code, 0) |
| |
| def test_upload_single_file_remote_exists(self): |
| filenames = [self.lorem_ipsum] |
| output_filename = '%s.sha1' % self.lorem_ipsum |
| etag_string = 'ETag: 634d7c1ed3545383837428f031840a1e' |
| self.gsutil.add_expected(0, '', '') |
| self.gsutil.add_expected(0, etag_string, '') |
| code = upload_to_google_storage.upload_to_google_storage( |
| filenames, self.base_url, self.gsutil, False, False, 1, False) |
| self.assertEqual( |
| self.gsutil.history, |
| [('check_call', |
| ('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))), |
| ('check_call', |
| ('ls', '-L', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))]) |
| self.assertTrue(os.path.exists(output_filename)) |
| self.assertEqual( |
| open(output_filename, 'rb').read(), |
| '7871c8e24da15bad8b0be2c36edc9dc77e37727f') |
| os.remove(output_filename) |
| self.assertEqual(code, 0) |
| |
| def test_upload_worker_errors(self): |
| work_queue = Queue.Queue() |
| work_queue.put((self.lorem_ipsum, self.lorem_ipsum_sha1)) |
| work_queue.put((None, None)) |
| self.gsutil.add_expected(1, '', '') # For the first ls call. |
| self.gsutil.add_expected(20, '', 'Expected error message') |
| # pylint: disable=W0212 |
| upload_to_google_storage._upload_worker( |
| 0, |
| work_queue, |
| self.base_url, |
| self.gsutil, |
| threading.Lock(), |
| False, |
| False, |
| self.stdout_queue, |
| self.ret_codes) |
| expected_ret_codes = [ |
| (20, |
| 'Encountered error on uploading %s to %s/%s\nExpected error message' % |
| (self.lorem_ipsum, self.base_url, self.lorem_ipsum_sha1))] |
| self.assertEqual(list(self.ret_codes.queue), expected_ret_codes) |
| |
| def test_skip_hashing(self): |
| filenames = [self.lorem_ipsum] |
| output_filename = '%s.sha1' % self.lorem_ipsum |
| fake_hash = '6871c8e24da15bad8b0be2c36edc9dc77e37727f' |
| with open(output_filename, 'wb') as f: |
| f.write(fake_hash) # Fake hash. |
| code = upload_to_google_storage.upload_to_google_storage( |
| filenames, self.base_url, self.gsutil, False, False, 1, True) |
| self.assertEqual( |
| self.gsutil.history, |
| [('check_call', |
| ('ls', '%s/%s' % (self.base_url, fake_hash))), |
| ('check_call', |
| ('ls', '-L', '%s/%s' % (self.base_url, fake_hash))), |
| ('check_call', |
| ('cp', '-q', filenames[0], '%s/%s' % (self.base_url, fake_hash)))]) |
| self.assertEqual( |
| open(output_filename, 'rb').read(), fake_hash) |
| os.remove(output_filename) |
| self.assertEqual(code, 0) |
| |
| def test_get_targets_no_args(self): |
| try: |
| upload_to_google_storage.get_targets([], self.parser, False) |
| self.fail() |
| except SystemExit, e: |
| self.assertEqual(e.code, 2) |
| |
| def test_get_targets_passthrough(self): |
| result = upload_to_google_storage.get_targets( |
| ['a', 'b', 'c', 'd', 'e'], |
| self.parser, |
| False) |
| self.assertEqual(result, ['a', 'b', 'c', 'd', 'e']) |
| |
| def test_get_targets_multiple_stdin(self): |
| inputs = ['a', 'b', 'c', 'd', 'e'] |
| sys.stdin = StringIO.StringIO(os.linesep.join(inputs)) |
| result = upload_to_google_storage.get_targets( |
| ['-'], |
| self.parser, |
| False) |
| self.assertEqual(result, inputs) |
| |
| def test_get_targets_multiple_stdin_null(self): |
| inputs = ['a', 'b', 'c', 'd', 'e'] |
| sys.stdin = StringIO.StringIO('\0'.join(inputs)) |
| result = upload_to_google_storage.get_targets( |
| ['-'], |
| self.parser, |
| True) |
| self.assertEqual(result, inputs) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |