blob: 617d1e8c65a01f50966dc5a6936153f0790f075c [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2019 The Cobalt Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tools for extracting a Cobalt Archive.
This no-dependency tool will extract a Cobalt Archive across all platforms.
This is slightly complicated because of issues on Windows where the poor
support for pathnames longer than 255 characters is an issue.
"""
import argparse
import logging
import os
import shutil
import subprocess
import sys
import zipfile
################################################################################
# API #
################################################################################
def ExtractTo(archive_zip_path, output_dir, outstream=None):
return _ExtractTo(archive_zip_path=archive_zip_path,
output_dir=output_dir,
outstream=outstream)
def ToWinUncPath(dos_path, encoding=None):
"""Returns a windows UNC path which enables long path names in win32 apis."""
return _ToWinUncPath(dos_path, encoding)
################################################################################
# IMPL #
################################################################################
_OUT_ARCHIVE_ROOT = '__cobalt_archive'
_OUT_FINALIZE_DECOMPRESSION_PATH = '%s/%s' % (_OUT_ARCHIVE_ROOT,
'finalize_decompression')
_OUT_DECOMP_PY = '%s/%s' % (_OUT_FINALIZE_DECOMPRESSION_PATH,
'decompress.py')
_OUT_DECOMP_JSON = '%s/%s' % (_OUT_FINALIZE_DECOMPRESSION_PATH,
'decompress.json')
_IS_WINDOWS = sys.platform in ['win32', 'cygwin']
def _ToWinUncPath(dos_path, encoding=None):
"""Windows supports long file names when using a UNC path."""
assert _IS_WINDOWS
do_convert = (not isinstance(dos_path, unicode) and encoding is not None)
if do_convert:
dos_path = dos_path.decode(encoding)
path = os.path.abspath(dos_path)
if path.startswith(u'\\\\'):
return u'\\\\?\\UNC\\' + path[2:]
return u'\\\\?\\' + path
def _GetZipFileClass():
"""Get the ZipFile class for the platform"""
if not _IS_WINDOWS:
return zipfile.ZipFile
else:
class ZipFileLongPaths(zipfile.ZipFile):
"""Handles extracting to paths longer than 255 characters."""
def _extract_member(self, member, targetpath, pwd):
targetpath = _ToWinUncPath(targetpath)
return zipfile.ZipFile._extract_member(self, member, targetpath, pwd)
return ZipFileLongPaths
def _UnzipFiles(input_zip_path, output_dir, outstream):
"""Returns True if all files were extracted, else False."""
if outstream is None:
outstream = sys.stdout
all_ok = True
zf_class = _GetZipFileClass()
with zf_class(input_zip_path, 'r', allowZip64=True) as zf:
for zinfo in zf.infolist():
try:
logging.debug('Extracting: %s -> %s', zinfo.filename, output_dir)
zf.extract(zinfo, path=output_dir)
except Exception as err: # pylint: disable=broad-except
msg = 'Exception happend during bundle extraction: ' + str(err) + '\n'
outstream.write(msg)
all_ok = False
return all_ok
def _ExtractTo(archive_zip_path, output_dir, outstream=None):
"""Returns True if all files were extracted, False otherwise."""
outstream = outstream if outstream else sys.stdout
assert os.path.exists(archive_zip_path), 'No archive at %s' % archive_zip_path
logging.info('UNZIPPING %s -> %s', archive_zip_path, output_dir)
ok = _UnzipFiles(archive_zip_path, output_dir, outstream)
# Now that all files have been extracted, execute the final decompress
# step.
decomp_py = os.path.abspath(os.path.join(output_dir, _OUT_DECOMP_PY))
assert(os.path.isfile(decomp_py)), decomp_py
cmd_str = 'python ' + decomp_py
outstream.write('Executing: %s\n' % cmd_str)
rc = subprocess.call(cmd_str, shell=True, stdout=outstream,
stderr=outstream)
ok = ok & (rc == 0)
return ok
def _CreateArgumentParser():
"""Creates a parser that will print the full help on failure to parse."""
parser = argparse.ArgumentParser()
parser.add_argument('archive_path', help='Archive to extract.')
parser.add_argument('output_path', help='Output path to extract the archive.')
parser.add_argument('--delete', action='store_true',
help='Deletes output_path if it exists.')
parser.add_argument('--verbose', action='store_true')
return parser
def main():
parser = _CreateArgumentParser()
# To make this future compatible parse_known_args() is used and any unknown
# args will generate a warning rather than be a fatal event. This allows
# any flags used in the future to be used on this tool.
args, unknown_args = parser.parse_known_args()
logging_lvl = logging.INFO
if args.verbose:
logging_lvl = logging.DEBUG
fmt = '[%(filename)s:%(lineno)s:%(levelname)s] %(message)s'
logging.basicConfig(format=fmt, level=logging_lvl)
if unknown_args:
logging.warning('Unknown (ignored) args: %s', unknown_args)
if args.delete:
logging.info('Removing previous folder at %s', args.output_path)
shutil.rmtree(args.output_path, ignore_errors=True)
all_ok = ExtractTo(args.archive_path, args.output_path)
if all_ok:
sys.exit(0)
else:
logging.critical('Errors happened.')
sys.exit(1)
if __name__ == '__main__':
main()