blob: 215b505793661d4219fa7ef13db6767c72867798 [file] [log] [blame]
# Copyright 2011 Google Inc. All Rights Reserved.
# Copyright 2011, Nexenta Systems Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import boto
import errno
import gzip
import hashlib
import mimetypes
import os
import platform
import re
import subprocess
import stat
import sys
import tempfile
import threading
import time
from boto import config
from boto.exception import GSResponseError
from boto.exception import ResumableUploadException
from boto.gs.resumable_upload_handler import ResumableUploadHandler
from boto.s3.keyfile import KeyFile
from boto.s3.resumable_download_handler import ResumableDownloadHandler
from boto.storage_uri import BucketStorageUri
from gslib.command import COMMAND_NAME
from gslib.command import COMMAND_NAME_ALIASES
from gslib.command import CONFIG_REQUIRED
from gslib.command import Command
from gslib.command import FILE_URIS_OK
from gslib.command import MAX_ARGS
from gslib.command import MIN_ARGS
from gslib.command import PROVIDER_URIS_OK
from gslib.command import SUPPORTED_SUB_ARGS
from gslib.command import URIS_START_ARG
from gslib.exception import CommandException
from gslib.help_provider import HELP_NAME
from gslib.help_provider import HELP_NAME_ALIASES
from gslib.help_provider import HELP_ONE_LINE_SUMMARY
from gslib.help_provider import HELP_TEXT
from gslib.help_provider import HELP_TYPE
from gslib.help_provider import HelpType
from gslib.name_expansion import NameExpansionIterator
from gslib.util import ExtractErrorDetail
from gslib.util import IS_WINDOWS
from gslib.util import MakeHumanReadable
from gslib.util import NO_MAX
from gslib.util import TWO_MB
from gslib.wildcard_iterator import ContainsWildcard
_detailed_help_text = ("""
<B>SYNOPSIS</B>
gsutil cp [OPTION]... src_uri dst_uri
- or -
gsutil cp [OPTION]... src_uri... dst_uri
- or -
gsutil cp [OPTION]... -I dst_uri
<B>DESCRIPTION</B>
The gsutil cp command allows you to copy data between your local file
system and the cloud, copy data within the cloud, and copy data between
cloud storage providers. For example, to copy all text files from the
local directory to a bucket you could do:
gsutil cp *.txt gs://my_bucket
Similarly, you can download text files from a bucket by doing:
gsutil cp gs://my_bucket/*.txt .
If you want to copy an entire directory tree you need to use the -R option:
gsutil cp -R dir gs://my_bucket
If you have a large number of files to upload you might want to use the
gsutil -m option, to perform a parallel (multi-threaded/multi-processing)
copy:
gsutil -m cp -R dir gs://my_bucket
You can pass a list of URIs to copy on STDIN instead of as command line
arguments by using the -I option. This allows you to use gsutil in a
pipeline to copy files and objects as generated by a program, such as:
some_program | gsutil -m cp -I gs://my_bucket
The contents of STDIN can name files, cloud URIs, and wildcards of files
and cloud URIs.
<B>HOW NAMES ARE CONSTRUCTED</B>
The gsutil cp command strives to name objects in a way consistent with how
Linux cp works, which causes names to be constructed in varying ways depending
on whether you're performing a recursive directory copy or copying
individually named objects; and whether you're copying to an existing or
non-existent directory.
When performing recursive directory copies, object names are constructed
that mirror the source directory structure starting at the point of
recursive processing. For example, the command:
gsutil cp -R dir1/dir2 gs://my_bucket
will create objects named like gs://my_bucket/dir2/a/b/c, assuming
dir1/dir2 contains the file a/b/c.
In contrast, copying individually named files will result in objects named
by the final path component of the source files. For example, the command:
gsutil cp dir1/dir2/** gs://my_bucket
will create objects named like gs://my_bucket/c.
The same rules apply for downloads: recursive copies of buckets and
bucket subdirectories produce a mirrored filename structure, while copying
individually (or wildcard) named objects produce flatly named files.
Note that in the above example the '**' wildcard matches all names
anywhere under dir. The wildcard '*' will match names just one level deep. For
more details see 'gsutil help wildcards'.
There's an additional wrinkle when working with subdirectories: the resulting
names depend on whether the destination subdirectory exists. For example,
if gs://my_bucket/subdir exists as a subdirectory, the command:
gsutil cp -R dir1/dir2 gs://my_bucket/subdir
will create objects named like gs://my_bucket/subdir/dir2/a/b/c. In contrast,
if gs://my_bucket/subdir does not exist, this same gsutil cp command will
create objects named like gs://my_bucket/subdir/a/b/c.
<B>COPYING TO/FROM SUBDIRECTORIES; DISTRIBUTING TRANSFERS ACROSS MACHINES</B>
You can use gsutil to copy to and from subdirectories by using a command like:
gsutil cp -R dir gs://my_bucket/data
This will cause dir and all of its files and nested subdirectories to be
copied under the specified destination, resulting in objects with names like
gs://my_bucket/data/dir/a/b/c. Similarly you can download from bucket
subdirectories by using a command like:
gsutil cp -R gs://my_bucket/data dir
This will cause everything nested under gs://my_bucket/data to be downloaded
into dir, resulting in files with names like dir/data/a/b/c.
Copying subdirectories is useful if you want to add data to an existing
bucket directory structure over time. It's also useful if you want
to parallelize uploads and downloads across multiple machines (often
reducing overall transfer time compared with simply running gsutil -m
cp on one machine). For example, if your bucket contains this structure:
gs://my_bucket/data/result_set_01/
gs://my_bucket/data/result_set_02/
...
gs://my_bucket/data/result_set_99/
you could perform concurrent downloads across 3 machines by running these
commands on each machine, respectively:
gsutil -m cp -R gs://my_bucket/data/result_set_[0-3]* dir
gsutil -m cp -R gs://my_bucket/data/result_set_[4-6]* dir
gsutil -m cp -R gs://my_bucket/data/result_set_[7-9]* dir
Note that dir could be a local directory on each machine, or it could
be a directory mounted off of a shared file server; whether the latter
performs acceptably may depend on a number of things, so we recommend
you experiment and find out what works best for you.
<B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B>
If both the source and destination URI are cloud URIs from the same
provider, gsutil copies data "in the cloud" (i.e., without downloading
to and uploading from the machine where you run gsutil). In addition to
the performance and cost advantages of doing this, copying in the cloud
preserves metadata (like Content-Type and Cache-Control). In contrast,
when you download data from the cloud it ends up in a file, which has
no associated metadata. Thus, unless you have some way to hold on to
or re-create that metadata, downloading to a file will not retain the
metadata.
Note that by default, the gsutil cp command does not copy the object
ACL to the new object, and instead will use the default bucket ACL (see
"gsutil help setdefacl"). You can override this behavior with the -p
option (see OPTIONS below).
gsutil does not preserve metadata when copying objects between providers.
<B>RESUMABLE TRANSFERS</B>
gsutil automatically uses the Google Cloud Storage resumable upload
feature whenever you use the cp command to upload an object that is larger
than 2 MB. You do not need to specify any special command line options
to make this happen. If your upload is interrupted you can restart the
upload by running the same cp command that you ran to start the upload.
Similarly, gsutil automatically performs resumable downloads (using HTTP
standard Range GET operations) whenever you use the cp command to download an
object larger than 2 MB.
Resumable uploads and downloads store some state information in a file
in ~/.gsutil named by the destination object or file. If you attempt to
resume a transfer from a machine with a different directory, the transfer
will start over from scratch.
See also "gsutil help prod" for details on using resumable transfers
in production.
<B>STREAMING TRANSFERS</B>
Use '-' in place of src_uri or dst_uri to perform a streaming
transfer. For example:
long_running_computation | gsutil cp - gs://my_bucket/obj
Streaming transfers do not support resumable uploads/downloads.
(The Google resumable transfer protocol has a way to support streaming
transers, but gsutil doesn't currently implement support for this.)
<B>CHANGING TEMP DIRECTORIES</B>
gsutil writes data to a temporary directory in several cases:
- when compressing data to be uploaded (see the -z option)
- when decompressing data being downloaded (when the data has
Content-Encoding:gzip, e.g., as happens when uploaded using gsutil cp -z)
- when running integration tests (using the gsutil test command)
In these cases it's possible the temp file location on your system that
gsutil selects by default may not have enough space. If you find that
gsutil runs out of space during one of these operations (e.g., raising
"CommandException: Inadequate temp space available to compress <your file>"
during a gsutil cp -z operation), you can change where it writes these
temp files by setting the TMPDIR environment variable. On Linux and MacOS
you can do this either by running gsutil this way:
TMPDIR=/some/directory gsutil cp ...
or by adding this line to your ~/.bashrc file and then restarting the shell
before running gsutil:
export TMPDIR=/some/directory
On Windows 7 you can change the TMPDIR environment variable from Start ->
Computer -> System -> Advanced System Settings -> Environment Variables.
You need to reboot after making this change for it to take effect. (Rebooting
is not necessary after running the export command on Linux and MacOS.)
<B>OPTIONS</B>
-a canned_acl Sets named canned_acl when uploaded objects created. See
'gsutil help acls' for further details.
-c If an error occurrs, continue to attempt to copy the remaining
files.
-D Copy in "daisy chain" mode, i.e., copying between two buckets by
hooking a download to an upload, via the machine where gsutil is
run. By default, data are copied between two buckets "in the
cloud", i.e., without needing to copy via the machine where
gsutil runs. However, copy-in-the-cloud is not supported when
copying between different locations (like US and EU) or between
different storage classes (like STANDARD and
DURABLE_REDUCED_AVAILABILITY). For these cases, you can use the
-D option to copy data between buckets.
Note: Daisy chain mode is automatically used when copying
between providers (e.g., to copy data from Google Cloud Storage
to another provider).
-e Exclude symlinks. When specified, symbolic links will not be
copied.
-n No-clobber. When specified, existing files or objects at the
destination will not be overwritten. Any items that are skipped
by this option will be reported as being skipped. This option
will perform an additional HEAD request to check if an item
exists before attempting to upload the data. This will save
retransmitting data, but the additional HTTP requests may make
small object transfers slower and more expensive.
This option can be combined with the -c option to build a script
that copies a large number of objects, allowing retries when
some failures occur from which gsutil doesn't automatically
recover, using a bash script like the following:
status=1
while [ $status -ne 0 ] ; do
gsutil cp -c -n -R ./dir gs://bucket
status=$?
done
The -c option will cause copying to continue after failures
occur, and the -n option will cause objects already copied to be
skipped on subsequent iterations. The loop will continue running
as long as gsutil exits with a non-zero status (such a status
indicates there was at least one failure during the gsutil run).
-p Causes ACLs to be preserved when copying in the cloud. Note that
this option has performance and cost implications, because it
is essentially performing three requests (getacl, cp, setacl).
(The performance issue can be mitigated to some degree by
using gsutil -m cp to cause parallel copying.)
You can avoid the additional performance and cost of using cp -p
if you want all objects in the destination bucket to end up with
the same ACL by setting a default ACL on that bucket instead of
using cp -p. See "help gsutil setdefacl".
Note that it's not valid to specify both the -a and -p options
together.
-q Causes copies to be performed quietly, i.e., without reporting
progress indicators of files being copied. Errors are still
reported. This option can be useful for running gsutil from a
cron job that logs its output to a file, for which the only
information desired in the log is failures.
-R, -r Causes directories, buckets, and bucket subdirectories to be
copied recursively. If you neglect to use this option for
an upload, gsutil will copy any files it finds and skip any
directories. Similarly, neglecting to specify -R for a download
will cause gsutil to copy any objects at the current bucket
directory level, and skip any subdirectories.
-v Requests that the version-specific URI for each uploaded object
be printed. Given this URI you can make future upload requests
that are safe in the face of concurrent updates, because Google
Cloud Storage will refuse to perform the update if the current
object version doesn't match the version-specific URI. See
'gsutil help versioning' for more details. Note: at present this
option does not work correctly for objects copied "in the cloud"
(e.g., gsutil cp gs://bucket/obj1 gs://bucket/obj2).
-z ext1,... Compresses file uploads with the given extensions. If you are
uploading a large file with compressible content, such as
a .js, .css, or .html file, you can gzip-compress the file
during the upload process by specifying the -z <extensions>
option. Compressing data before upload saves on usage charges
because you are uploading a smaller amount of data.
When you specify the -z option, the data from your files is
compressed before it is uploaded, but your actual files are left
uncompressed on the local disk. The uploaded objects retain the
original content type and name as the original files but are
given a Content-Encoding header with the value "gzip" to
indicate that the object data stored are compressed on the
Google Cloud Storage servers.
For example, the following command:
gsutil cp -z html -a public-read cattypes.html gs://mycats
will do all of the following:
- Upload as the object gs://mycats/cattypes.html (cp command)
- Set the Content-Type to text/html (based on file extension)
- Compress the data in the file cattypes.html (-z option)
- Set the Content-Encoding to gzip (-z option)
- Set the ACL to public-read (-a option)
- If a user tries to view cattypes.html in a browser, the
browser will know to uncompress the data based on the
Content-Encoding header, and to render it as HTML based on
the Content-Type header.
""")
class CpCommand(Command):
"""
Implementation of gsutil cp command.
Note that CpCommand is run for both gsutil cp and gsutil mv. The latter
happens by MvCommand calling CpCommand and passing the hidden (undocumented)
-M option. This allows the copy and remove needed for each mv to run
together (rather than first running all the cp's and then all the rm's, as
we originally had implemented), which in turn avoids the following problem
with removing the wrong objects: starting with a bucket containing only
the object gs://bucket/obj, say the user does:
gsutil mv gs://bucket/* gs://bucket/d.txt
If we ran all the cp's and then all the rm's and we didn't expand the wildcard
first, the cp command would first copy gs://bucket/obj to gs://bucket/d.txt,
and the rm command would then remove that object. In the implementation
prior to gsutil release 3.12 we avoided this by building a list of objects
to process and then running the copies and then the removes; but building
the list up front limits scalability (compared with the current approach
of processing the bucket listing iterator on the fly).
"""
# Set default Content-Type type.
DEFAULT_CONTENT_TYPE = 'application/octet-stream'
USE_MAGICFILE = boto.config.getbool('GSUtil', 'use_magicfile', False)
# Command specification (processed by parent class).
command_spec = {
# Name of command.
COMMAND_NAME : 'cp',
# List of command name aliases.
COMMAND_NAME_ALIASES : ['copy'],
# Min number of args required by this command.
MIN_ARGS : 1,
# Max number of args required by this command, or NO_MAX.
MAX_ARGS : NO_MAX,
# Getopt-style string specifying acceptable sub args.
# -t is deprecated but leave intact for now to avoid breakage.
SUPPORTED_SUB_ARGS : 'a:cDeIMNnpqrRtvz:',
# True if file URIs acceptable for this command.
FILE_URIS_OK : True,
# True if provider-only URIs acceptable for this command.
PROVIDER_URIS_OK : False,
# Index in args of first URI arg.
URIS_START_ARG : 0,
# True if must configure gsutil before running command.
CONFIG_REQUIRED : True,
}
help_spec = {
# Name of command or auxiliary help info for which this help applies.
HELP_NAME : 'cp',
# List of help name aliases.
HELP_NAME_ALIASES : ['copy'],
# Type of help:
HELP_TYPE : HelpType.COMMAND_HELP,
# One line summary of this help.
HELP_ONE_LINE_SUMMARY : 'Copy files and objects',
# The full help text.
HELP_TEXT : _detailed_help_text,
}
def _CheckFinalMd5(self, key, file_name):
"""
Checks that etag from server agrees with md5 computed after the
download completes.
"""
obj_md5 = key.etag.strip('"\'')
file_md5 = None
if hasattr(key, 'md5') and key.md5:
file_md5 = key.md5
else:
print 'Computing MD5 from scratch for resumed download'
# Open file in binary mode to avoid surprises in Windows.
fp = open(file_name, 'rb')
try:
file_md5 = key.compute_md5(fp)[0]
finally:
fp.close()
if self.debug:
print 'Checking file md5 against etag. (%s/%s)' % (file_md5, obj_md5)
if file_md5 != obj_md5:
# Checksums don't match - remove file and raise exception.
os.unlink(file_name)
raise CommandException(
'File changed during download: md5 signature doesn\'t match '
'etag (incorrect downloaded file deleted)')
def _CheckForDirFileConflict(self, exp_src_uri, dst_uri):
"""Checks whether copying exp_src_uri into dst_uri is not possible.
This happens if a directory exists in local file system where a file
needs to go or vice versa. In that case we print an error message and
exits. Example: if the file "./x" exists and you try to do:
gsutil cp gs://mybucket/x/y .
the request can't succeed because it requires a directory where
the file x exists.
Note that we don't enforce any corresponding restrictions for buckets,
because the flat namespace semantics for buckets doesn't prohibit such
cases the way hierarchical file systems do. For example, if a bucket
contains an object called gs://bucket/dir and then you run the command:
gsutil cp file1 file2 gs://bucket/dir
you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and
gs://bucket/dir/file2.
Args:
exp_src_uri: Expanded source StorageUri of copy.
dst_uri: Destination URI.
Raises:
CommandException: if errors encountered.
"""
if dst_uri.is_cloud_uri():
# The problem can only happen for file destination URIs.
return
dst_path = dst_uri.object_name
final_dir = os.path.dirname(dst_path)
if os.path.isfile(final_dir):
raise CommandException('Cannot retrieve %s because a file exists '
'where a directory needs to be created (%s).' %
(exp_src_uri, final_dir))
if os.path.isdir(dst_path):
raise CommandException('Cannot retrieve %s because a directory exists '
'(%s) where the file needs to be created.' %
(exp_src_uri, dst_path))
def _InsistDstUriNamesContainer(self, exp_dst_uri,
have_existing_dst_container, command_name):
"""
Raises an exception if URI doesn't name a directory, bucket, or bucket
subdir, with special exception for cp -R (see comments below).
Args:
exp_dst_uri: Wildcard-expanding dst_uri.
have_existing_dst_container: bool indicator of whether exp_dst_uri
names a container (directory, bucket, or existing bucket subdir).
command_name: Name of command making call. May not be the same as
self.command_name in the case of commands implemented atop other
commands (like mv command).
Raises:
CommandException: if the URI being checked does not name a container.
"""
if exp_dst_uri.is_file_uri():
ok = exp_dst_uri.names_directory()
else:
if have_existing_dst_container:
ok = True
else:
# It's ok to specify a non-existing bucket subdir, for example:
# gsutil cp -R dir gs://bucket/abc
# where gs://bucket/abc isn't an existing subdir.
ok = exp_dst_uri.names_object()
if not ok:
raise CommandException('Destination URI must name a directory, bucket, '
'or bucket\nsubdirectory for the multiple '
'source form of the %s command.' % command_name)
class _FileCopyCallbackHandler(object):
"""Outputs progress info for large copy requests."""
def __init__(self, upload):
if upload:
self.announce_text = 'Uploading'
else:
self.announce_text = 'Downloading'
def call(self, total_bytes_transferred, total_size):
sys.stderr.write('%s: %s/%s \r' % (
self.announce_text,
MakeHumanReadable(total_bytes_transferred),
MakeHumanReadable(total_size)))
if total_bytes_transferred == total_size:
sys.stderr.write('\n')
class _StreamCopyCallbackHandler(object):
"""Outputs progress info for Stream copy to cloud.
Total Size of the stream is not known, so we output
only the bytes transferred.
"""
def call(self, total_bytes_transferred, total_size):
sys.stderr.write('Uploading: %s \r' % (
MakeHumanReadable(total_bytes_transferred)))
if total_size and total_bytes_transferred == total_size:
sys.stderr.write('\n')
def _GetTransferHandlers(self, dst_uri, size, upload):
"""
Selects upload/download and callback handlers.
We use a callback handler that shows a simple textual progress indicator
if size is above the configurable threshold.
We use a resumable transfer handler if size is >= the configurable
threshold and resumable transfers are supported by the given provider.
boto supports resumable downloads for all providers, but resumable
uploads are currently only supported by GS.
Args:
dst_uri: the destination URI.
size: size of file (object) being uploaded (downloaded).
upload: bool indication of whether transfer is an upload.
"""
config = boto.config
resumable_threshold = config.getint('GSUtil', 'resumable_threshold', TWO_MB)
transfer_handler = None
cb = None
num_cb = None
# Checks whether the destination file is a "special" file, like /dev/null on
# Linux platforms or null on Windows platforms, so we can disable resumable
# download support since the file size of the destination won't ever be
# correct.
dst_is_special = False
if dst_uri.is_file_uri():
# Check explicitly first because os.stat doesn't work on 'nul' in Windows.
if dst_uri.object_name == os.devnull:
dst_is_special = True
try:
mode = os.stat(dst_uri.object_name).st_mode
if stat.S_ISCHR(mode):
dst_is_special = True
except OSError:
pass
if size >= resumable_threshold and not dst_is_special:
if not self.quiet:
cb = self._FileCopyCallbackHandler(upload).call
num_cb = int(size / TWO_MB)
resumable_tracker_dir = config.get(
'GSUtil', 'resumable_tracker_dir',
os.path.expanduser('~' + os.sep + '.gsutil'))
if not os.path.exists(resumable_tracker_dir):
os.makedirs(resumable_tracker_dir)
if upload:
# Encode the dest bucket and object name into the tracker file name.
res_tracker_file_name = (
re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s.url' %
(dst_uri.bucket_name, dst_uri.object_name)))
else:
# Encode the fully-qualified dest file name into the tracker file name.
res_tracker_file_name = (
re.sub('[/\\\\]', '_', 'resumable_download__%s.etag' %
(os.path.realpath(dst_uri.object_name))))
res_tracker_file_name = _hash_filename(res_tracker_file_name)
tracker_file = '%s%s%s' % (resumable_tracker_dir, os.sep,
res_tracker_file_name)
if upload:
if dst_uri.scheme == 'gs':
transfer_handler = ResumableUploadHandler(tracker_file)
else:
transfer_handler = ResumableDownloadHandler(tracker_file)
return (cb, num_cb, transfer_handler)
def _LogCopyOperation(self, src_uri, dst_uri, headers):
"""
Logs copy operation being performed, including Content-Type if appropriate.
"""
if self.quiet:
return
if 'Content-Type' in headers and dst_uri.is_cloud_uri():
content_type_msg = ' [Content-Type=%s]' % headers['Content-Type']
else:
content_type_msg = ''
if src_uri.is_stream():
self.THREADED_LOGGER.info('Copying from <STDIN>%s...', content_type_msg)
else:
self.THREADED_LOGGER.info('Copying %s%s...', src_uri, content_type_msg)
# We pass the headers explicitly to this call instead of using self.headers
# so we can set different metadata (like Content-Type type) for each object.
def _CopyObjToObjInTheCloud(self, src_key, src_uri, dst_uri, headers):
"""Performs copy-in-the cloud from specified src to dest object.
Args:
src_key: Source Key.
src_uri: Source StorageUri.
dst_uri: Destination StorageUri.
headers: A copy of the headers dictionary.
Returns:
(elapsed_time, bytes_transferred, dst_uri) excluding overhead like initial
HEAD. Note: At present copy-in-the-cloud doesn't return the generation of
the created object, so the returned URI is actually not version-specific
(unlike other cp cases).
Raises:
CommandException: if errors encountered.
"""
self._SetContentTypeHeader(src_uri, headers)
self._LogCopyOperation(src_uri, dst_uri, headers)
# Do Object -> object copy within same provider (uses
# x-<provider>-copy-source metadata HTTP header to request copying at the
# server).
src_bucket = src_uri.get_bucket(False, headers)
preserve_acl = False
canned_acl = None
if self.sub_opts:
for o, a in self.sub_opts:
if o == '-a':
canned_acls = dst_uri.canned_acls()
if a not in canned_acls:
raise CommandException('Invalid canned ACL "%s".' % a)
canned_acl = a
headers[dst_uri.get_provider().acl_header] = canned_acl
if o == '-p':
preserve_acl = True
if preserve_acl and canned_acl:
raise CommandException(
'Specifying both the -p and -a options together is invalid.')
start_time = time.time()
# Pass headers in headers param not metadata param, so boto will copy
# existing key's metadata and just set the additional headers specified
# in the headers param (rather than using the headers to override existing
# metadata). In particular this allows us to copy the existing key's
# Content-Type and other metadata users need while still being able to
# set headers the API needs (like x-goog-project-id). Note that this means
# you can't do something like:
# gsutil cp -t Content-Type text/html gs://bucket/* gs://bucket2
# to change the Content-Type while copying.
try:
dst_key = dst_uri.copy_key(
src_bucket.name, src_uri.object_name, preserve_acl=False,
headers=headers, src_version_id=src_uri.version_id,
src_generation=src_uri.generation)
except GSResponseError as e:
exc_name, error_detail = ExtractErrorDetail(e)
if (exc_name == 'GSResponseError'
and ('Copy-in-the-cloud disallowed' in error_detail)):
raise CommandException('%s.\nNote: you can copy between locations '
'and between storage classes by using the '
'gsutil cp -D option.' % error_detail)
else:
raise
end_time = time.time()
return (end_time - start_time, src_key.size,
dst_uri.clone_replace_key(dst_key))
def _CheckFreeSpace(self, path):
"""Return path/drive free space (in bytes)."""
if platform.system() == 'Windows':
from ctypes import c_int, c_uint64, c_wchar_p, windll, POINTER, WINFUNCTYPE, WinError
try:
GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_wchar_p, POINTER(c_uint64),
POINTER(c_uint64), POINTER(c_uint64))
GetDiskFreeSpaceEx = GetDiskFreeSpaceEx(
('GetDiskFreeSpaceExW', windll.kernel32), (
(1, 'lpszPathName'),
(2, 'lpFreeUserSpace'),
(2, 'lpTotalSpace'),
(2, 'lpFreeSpace'),))
except AttributeError:
GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_char_p, POINTER(c_uint64),
POINTER(c_uint64), POINTER(c_uint64))
GetDiskFreeSpaceEx = GetDiskFreeSpaceEx(
('GetDiskFreeSpaceExA', windll.kernel32), (
(1, 'lpszPathName'),
(2, 'lpFreeUserSpace'),
(2, 'lpTotalSpace'),
(2, 'lpFreeSpace'),))
def GetDiskFreeSpaceEx_errcheck(result, func, args):
if not result:
raise WinError()
return args[1].value
GetDiskFreeSpaceEx.errcheck = GetDiskFreeSpaceEx_errcheck
return GetDiskFreeSpaceEx(os.getenv('SystemDrive'))
else:
(_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
return f_frsize * f_bavail
def _PerformResumableUploadIfApplies(self, fp, dst_uri, canned_acl, headers):
"""
Performs resumable upload if supported by provider and file is above
threshold, else performs non-resumable upload.
Returns (elapsed_time, bytes_transferred, version-specific dst_uri).
"""
start_time = time.time()
# Determine file size different ways for case where fp is actually a wrapper
# around a Key vs an actual file.
if isinstance(fp, KeyFile):
file_size = fp.getkey().size
else:
file_size = os.path.getsize(fp.name)
(cb, num_cb, res_upload_handler) = self._GetTransferHandlers(
dst_uri, file_size, True)
if dst_uri.scheme == 'gs':
# Resumable upload protocol is Google Cloud Storage-specific.
dst_uri.set_contents_from_file(fp, headers, policy=canned_acl,
cb=cb, num_cb=num_cb,
res_upload_handler=res_upload_handler)
else:
dst_uri.set_contents_from_file(fp, headers, policy=canned_acl,
cb=cb, num_cb=num_cb)
if res_upload_handler:
# ResumableUploadHandler does not update upload_start_point from its
# initial value of -1 if transferring the whole file, so clamp at 0
bytes_transferred = file_size - max(
res_upload_handler.upload_start_point, 0)
else:
bytes_transferred = file_size
end_time = time.time()
return (end_time - start_time, bytes_transferred, dst_uri)
def _PerformStreamingUpload(self, fp, dst_uri, headers, canned_acl=None):
"""
Performs a streaming upload to the cloud.
Args:
fp: The file whose contents to upload.
dst_uri: Destination StorageUri.
headers: A copy of the headers dictionary.
canned_acl: Optional canned ACL to set on the object.
Returns (elapsed_time, bytes_transferred, version-specific dst_uri).
"""
start_time = time.time()
if self.quiet:
cb = None
else:
cb = self._StreamCopyCallbackHandler().call
dst_uri.set_contents_from_stream(
fp, headers, policy=canned_acl, cb=cb)
try:
bytes_transferred = fp.tell()
except:
bytes_transferred = 0
end_time = time.time()
return (end_time - start_time, bytes_transferred, dst_uri)
def _SetContentTypeHeader(self, src_uri, headers):
"""
Sets content type header to value specified in '-h Content-Type' option (if
specified); else sets using Content-Type detection.
"""
if 'Content-Type' in headers:
# If empty string specified (i.e., -h "Content-Type:") set header to None,
# which will inhibit boto from sending the CT header. Otherwise, boto will
# pass through the user specified CT header.
if not headers['Content-Type']:
headers['Content-Type'] = None
# else we'll keep the value passed in via -h option (not performing
# content type detection).
else:
# Only do content type recognition is src_uri is a file. Object-to-object
# copies with no -h Content-Type specified re-use the content type of the
# source object.
if src_uri.is_file_uri():
object_name = src_uri.object_name
content_type = None
# Streams (denoted by '-') are expected to be 'application/octet-stream'
# and 'file' would partially consume them.
if object_name != '-':
if self.USE_MAGICFILE:
p = subprocess.Popen(['file', '--mime-type', object_name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = p.communicate()
if p.returncode != 0 or error:
raise CommandException(
'Encountered error running "file --mime-type %s" '
'(returncode=%d).\n%s' % (object_name, p.returncode, error))
# Parse output by removing line delimiter and splitting on last ":
content_type = output.rstrip().rpartition(': ')[2]
else:
content_type = mimetypes.guess_type(object_name)[0]
if not content_type:
content_type = self.DEFAULT_CONTENT_TYPE
headers['Content-Type'] = content_type
def _UploadFileToObject(self, src_key, src_uri, dst_uri, headers,
should_log=True):
"""Uploads a local file to an object.
Args:
src_key: Source StorageUri. Must be a file URI.
src_uri: Source StorageUri.
dst_uri: Destination StorageUri.
headers: The headers dictionary.
should_log: bool indicator whether we should log this operation.
Returns:
(elapsed_time, bytes_transferred, version-specific dst_uri), excluding
overhead like initial HEAD.
Raises:
CommandException: if errors encountered.
"""
gzip_exts = []
canned_acl = None
if self.sub_opts:
for o, a in self.sub_opts:
if o == '-a':
canned_acls = dst_uri.canned_acls()
if a not in canned_acls:
raise CommandException('Invalid canned ACL "%s".' % a)
canned_acl = a
elif o == '-t':
print('Warning: -t is deprecated, and will be removed in the future. '
'Content type\ndetection is '
'now performed by default, unless inhibited by specifying '
'a\nContent-Type header via the -h option.')
elif o == '-z':
gzip_exts = a.split(',')
self._SetContentTypeHeader(src_uri, headers)
if should_log:
self._LogCopyOperation(src_uri, dst_uri, headers)
if 'Content-Language' not in headers:
content_language = config.get_value('GSUtil', 'content_language')
if content_language:
headers['Content-Language'] = content_language
fname_parts = src_uri.object_name.split('.')
if len(fname_parts) > 1 and fname_parts[-1] in gzip_exts:
if self.debug:
print 'Compressing %s (to tmp)...' % src_key
(gzip_fh, gzip_path) = tempfile.mkstemp()
gzip_fp = None
try:
# Check for temp space. Assume the compressed object is at most 2x
# the size of the object (normally should compress to smaller than
# the object)
if (self._CheckFreeSpace(gzip_path)
< 2*int(os.path.getsize(src_key.name))):
raise CommandException('Inadequate temp space available to compress '
'%s' % src_key.name)
gzip_fp = gzip.open(gzip_path, 'wb')
gzip_fp.writelines(src_key.fp)
finally:
if gzip_fp:
gzip_fp.close()
os.close(gzip_fh)
headers['Content-Encoding'] = 'gzip'
gzip_fp = open(gzip_path, 'rb')
try:
(elapsed_time, bytes_transferred, result_uri) = (
self._PerformResumableUploadIfApplies(gzip_fp, dst_uri,
canned_acl, headers))
finally:
gzip_fp.close()
try:
os.unlink(gzip_path)
# Windows sometimes complains the temp file is locked when you try to
# delete it.
except Exception, e:
pass
elif (src_key.is_stream()
and dst_uri.get_provider().supports_chunked_transfer()):
(elapsed_time, bytes_transferred, result_uri) = (
self._PerformStreamingUpload(src_key.fp, dst_uri, headers,
canned_acl))
else:
if src_key.is_stream():
# For Providers that doesn't support chunked Transfers
tmp = tempfile.NamedTemporaryFile()
file_uri = self.suri_builder.StorageUri('file://%s' % tmp.name)
try:
file_uri.new_key(False, headers).set_contents_from_file(
src_key.fp, headers)
src_key = file_uri.get_key()
finally:
file_uri.close()
try:
(elapsed_time, bytes_transferred, result_uri) = (
self._PerformResumableUploadIfApplies(src_key.fp, dst_uri,
canned_acl, headers))
finally:
if src_key.is_stream():
tmp.close()
else:
src_key.close()
return (elapsed_time, bytes_transferred, result_uri)
def _DownloadObjectToFile(self, src_key, src_uri, dst_uri, headers,
should_log=True):
"""Downloads an object to a local file.
Args:
src_key: Source StorageUri. Must be a file URI.
src_uri: Source StorageUri.
dst_uri: Destination StorageUri.
headers: The headers dictionary.
should_log: bool indicator whether we should log this operation.
Returns:
(elapsed_time, bytes_transferred, dst_uri), excluding overhead like
initial HEAD.
Raises:
CommandException: if errors encountered.
"""
if should_log:
self._LogCopyOperation(src_uri, dst_uri, headers)
(cb, num_cb, res_download_handler) = self._GetTransferHandlers(
dst_uri, src_key.size, False)
file_name = dst_uri.object_name
dir_name = os.path.dirname(file_name)
if dir_name and not os.path.exists(dir_name):
# Do dir creation in try block so can ignore case where dir already
# exists. This is needed to avoid a race condition when running gsutil
# -m cp.
try:
os.makedirs(dir_name)
except OSError, e:
if e.errno != errno.EEXIST:
raise
# For gzipped objects not named *.gz download to a temp file and unzip.
if (hasattr(src_key, 'content_encoding')
and src_key.content_encoding == 'gzip'
and not file_name.endswith('.gz')):
# We can't use tempfile.mkstemp() here because we need a predictable
# filename for resumable downloads.
download_file_name = '%s_.gztmp' % file_name
need_to_unzip = True
else:
download_file_name = file_name
need_to_unzip = False
fp = None
try:
if res_download_handler:
fp = open(download_file_name, 'ab')
else:
fp = open(download_file_name, 'wb')
start_time = time.time()
src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb,
res_download_handler=res_download_handler)
# If a custom test method is defined, call it here. For the copy command,
# test methods are expected to take one argument: an open file pointer,
# and are used to perturb the open file during download to exercise
# download error detection.
if self.test_method:
self.test_method(fp)
end_time = time.time()
finally:
if fp:
fp.close()
# Discard the md5 if we are resuming a partial download.
if res_download_handler and res_download_handler.download_start_point:
src_key.md5 = None
# Verify downloaded file checksum matched source object's checksum.
self._CheckFinalMd5(src_key, download_file_name)
if res_download_handler:
bytes_transferred = (
src_key.size - res_download_handler.download_start_point)
else:
bytes_transferred = src_key.size
if need_to_unzip:
# Log that we're uncompressing if the file is big enough that
# decompressing would make it look like the transfer "stalled" at the end.
if not self.quiet and bytes_transferred > 10 * 1024 * 1024:
self.THREADED_LOGGER.info('Uncompressing downloaded tmp file to %s...',
file_name)
# Downloaded gzipped file to a filename w/o .gz extension, so unzip.
f_in = gzip.open(download_file_name, 'rb')
f_out = open(file_name, 'wb')
try:
while True:
data = f_in.read(8192)
if not data:
break
f_out.write(data)
finally:
f_out.close()
f_in.close()
os.unlink(download_file_name)
return (end_time - start_time, bytes_transferred, dst_uri)
def _PerformDownloadToStream(self, src_key, src_uri, str_fp, headers):
(cb, num_cb, res_download_handler) = self._GetTransferHandlers(
src_uri, src_key.size, False)
start_time = time.time()
src_key.get_contents_to_file(str_fp, headers, cb=cb, num_cb=num_cb)
end_time = time.time()
bytes_transferred = src_key.size
end_time = time.time()
return (end_time - start_time, bytes_transferred)
def _CopyFileToFile(self, src_key, src_uri, dst_uri, headers):
"""Copies a local file to a local file.
Args:
src_key: Source StorageUri. Must be a file URI.
src_uri: Source StorageUri.
dst_uri: Destination StorageUri.
headers: The headers dictionary.
Returns:
(elapsed_time, bytes_transferred, dst_uri), excluding
overhead like initial HEAD.
Raises:
CommandException: if errors encountered.
"""
self._LogCopyOperation(src_uri, dst_uri, headers)
dst_key = dst_uri.new_key(False, headers)
start_time = time.time()
dst_key.set_contents_from_file(src_key.fp, headers)
end_time = time.time()
return (end_time - start_time, os.path.getsize(src_key.fp.name), dst_uri)
def _CopyObjToObjDaisyChainMode(self, src_key, src_uri, dst_uri, headers):
"""Copies from src_uri to dst_uri in "daisy chain" mode.
See -D OPTION documentation about what daisy chain mode is.
Args:
src_key: Source Key.
src_uri: Source StorageUri.
dst_uri: Destination StorageUri.
headers: A copy of the headers dictionary.
Returns:
(elapsed_time, bytes_transferred, version-specific dst_uri) excluding
overhead like initial HEAD.
Raises:
CommandException: if errors encountered.
"""
self._SetContentTypeHeader(src_uri, headers)
self._LogCopyOperation(src_uri, dst_uri, headers)
canned_acl = None
if self.sub_opts:
for o, a in self.sub_opts:
if o == '-a':
canned_acls = dst_uri.canned_acls()
if a not in canned_acls:
raise CommandException('Invalid canned ACL "%s".' % a)
canned_acl = a
elif o == '-p':
# We don't attempt to preserve ACLs across providers because
# GCS and S3 support different ACLs and disjoint principals.
raise NotImplementedError('Cross-provider cp -p not supported')
return self._PerformResumableUploadIfApplies(KeyFile(src_key), dst_uri,
canned_acl, headers)
def _PerformCopy(self, src_uri, dst_uri):
"""Performs copy from src_uri to dst_uri, handling various special cases.
Args:
src_uri: Source StorageUri.
dst_uri: Destination StorageUri.
Returns:
(elapsed_time, bytes_transferred, version-specific dst_uri) excluding
overhead like initial HEAD.
Raises:
CommandException: if errors encountered.
"""
# Make a copy of the input headers each time so we can set a different
# content type for each object.
if self.headers:
headers = self.headers.copy()
else:
headers = {}
src_key = src_uri.get_key(False, headers)
if not src_key:
raise CommandException('"%s" does not exist.' % src_uri)
# On Windows, stdin is opened as text mode instead of binary which causes
# problems when piping a binary file, so this switches it to binary mode.
if IS_WINDOWS and src_uri.is_file_uri() and src_key.is_stream():
import msvcrt
msvcrt.setmode(src_key.fp.fileno(), os.O_BINARY)
if self.no_clobber:
# There are two checks to prevent clobbering:
# 1) The first check is to see if the item
# already exists at the destination and prevent the upload/download
# from happening. This is done by the exists() call.
# 2) The second check is only relevant if we are writing to gs. We can
# enforce that the server only writes the object if it doesn't exist
# by specifying the header below. This check only happens at the
# server after the complete file has been uploaded. We specify this
# header to prevent a race condition where a destination file may
# be created after the first check and before the file is fully
# uploaded.
# In order to save on unnecessary uploads/downloads we perform both
# checks. However, this may come at the cost of additional HTTP calls.
if dst_uri.exists(headers):
if not self.quiet:
self.THREADED_LOGGER.info('Skipping existing item: %s' %
dst_uri.uri)
return (0, 0, None)
if dst_uri.is_cloud_uri() and dst_uri.scheme == 'gs':
headers['x-goog-if-generation-match'] = '0'
if src_uri.is_cloud_uri() and dst_uri.is_cloud_uri():
if src_uri.scheme == dst_uri.scheme and not self.daisy_chain:
return self._CopyObjToObjInTheCloud(src_key, src_uri, dst_uri, headers)
else:
return self._CopyObjToObjDaisyChainMode(src_key, src_uri, dst_uri,
headers)
elif src_uri.is_file_uri() and dst_uri.is_cloud_uri():
return self._UploadFileToObject(src_key, src_uri, dst_uri, headers)
elif src_uri.is_cloud_uri() and dst_uri.is_file_uri():
return self._DownloadObjectToFile(src_key, src_uri, dst_uri, headers)
elif src_uri.is_file_uri() and dst_uri.is_file_uri():
return self._CopyFileToFile(src_key, src_uri, dst_uri, headers)
else:
raise CommandException('Unexpected src/dest case')
def _ExpandDstUri(self, dst_uri_str):
"""
Expands wildcard if present in dst_uri_str.
Args:
dst_uri_str: String representation of requested dst_uri.
Returns:
(exp_dst_uri, have_existing_dst_container)
where have_existing_dst_container is a bool indicating whether
exp_dst_uri names an existing directory, bucket, or bucket subdirectory.
Raises:
CommandException: if dst_uri_str matched more than 1 URI.
"""
dst_uri = self.suri_builder.StorageUri(dst_uri_str)
# Handle wildcarded dst_uri case.
if ContainsWildcard(dst_uri):
blr_expansion = list(self.WildcardIterator(dst_uri))
if len(blr_expansion) != 1:
raise CommandException('Destination (%s) must match exactly 1 URI' %
dst_uri_str)
blr = blr_expansion[0]
uri = blr.GetUri()
if uri.is_cloud_uri():
return (uri, uri.names_bucket() or blr.HasPrefix()
or blr.GetKey().endswith('/'))
else:
return (uri, uri.names_directory())
# Handle non-wildcarded dst_uri:
if dst_uri.is_file_uri():
return (dst_uri, dst_uri.names_directory())
if dst_uri.names_bucket():
return (dst_uri, True)
# For object URIs check 3 cases: (a) if the name ends with '/' treat as a
# subdir; else, perform a wildcard expansion with dst_uri + "*" and then
# find if (b) there's a Prefix matching dst_uri, or (c) name is of form
# dir_$folder$ (and in both these cases also treat dir as a subdir).
if dst_uri.is_cloud_uri() and dst_uri_str.endswith('/'):
return (dst_uri, True)
blr_expansion = list(self.WildcardIterator(
'%s*' % dst_uri_str.rstrip(dst_uri.delim)))
for blr in blr_expansion:
if blr.GetRStrippedUriString().endswith('_$folder$'):
return (dst_uri, True)
if blr.GetRStrippedUriString() == dst_uri_str.rstrip(dst_uri.delim):
return (dst_uri, blr.HasPrefix())
return (dst_uri, False)
def _ConstructDstUri(self, src_uri, exp_src_uri,
src_uri_names_container, src_uri_expands_to_multi,
have_multiple_srcs, exp_dst_uri,
have_existing_dest_subdir):
"""
Constructs the destination URI for a given exp_src_uri/exp_dst_uri pair,
using context-dependent naming rules that mimic Linux cp and mv behavior.
Args:
src_uri: src_uri to be copied.
exp_src_uri: Single StorageUri from wildcard expansion of src_uri.
src_uri_names_container: True if src_uri names a container (including the
case of a wildcard-named bucket subdir (like gs://bucket/abc,
where gs://bucket/abc/* matched some objects). Note that this is
additional semantics tha src_uri.names_container() doesn't understand
because the latter only understands StorageUris, not wildcards.
src_uri_expands_to_multi: True if src_uri expanded to multiple URIs.
have_multiple_srcs: True if this is a multi-source request. This can be
true if src_uri wildcard-expanded to multiple URIs or if there were
multiple source URIs in the request.
exp_dst_uri: the expanded StorageUri requested for the cp destination.
Final written path is constructed from this plus a context-dependent
variant of src_uri.
have_existing_dest_subdir: bool indicator whether dest is an existing
subdirectory.
Returns:
StorageUri to use for copy.
Raises:
CommandException if destination object name not specified for
source and source is a stream.
"""
if self._ShouldTreatDstUriAsSingleton(
have_multiple_srcs, have_existing_dest_subdir, exp_dst_uri):
# We're copying one file or object to one file or object.
return exp_dst_uri
if exp_src_uri.is_stream():
if exp_dst_uri.names_container():
raise CommandException('Destination object name needed when '
'source is a stream')
return exp_dst_uri
if not self.recursion_requested and not have_multiple_srcs:
# We're copying one file or object to a subdirectory. Append final comp
# of exp_src_uri to exp_dst_uri.
src_final_comp = exp_src_uri.object_name.rpartition(src_uri.delim)[-1]
return self.suri_builder.StorageUri('%s%s%s' % (
exp_dst_uri.uri.rstrip(exp_dst_uri.delim), exp_dst_uri.delim,
src_final_comp))
# Else we're copying multiple sources to a directory, bucket, or a bucket
# "sub-directory".
# Ensure exp_dst_uri ends in delim char if we're doing a multi-src copy or
# a copy to a directory. (The check for copying to a directory needs
# special-case handling so that the command:
# gsutil cp gs://bucket/obj dir
# will turn into file://dir/ instead of file://dir -- the latter would cause
# the file "dirobj" to be created.)
# Note: need to check have_multiple_srcs or src_uri.names_container()
# because src_uri could be a bucket containing a single object, named
# as gs://bucket.
if ((have_multiple_srcs or src_uri.names_container()
or os.path.isdir(exp_dst_uri.object_name))
and not exp_dst_uri.uri.endswith(exp_dst_uri.delim)):
exp_dst_uri = exp_dst_uri.clone_replace_name(
'%s%s' % (exp_dst_uri.object_name, exp_dst_uri.delim)
)
# Making naming behavior match how things work with local Linux cp and mv
# operations depends on many factors, including whether the destination is a
# container, the plurality of the source(s), and whether the mv command is
# being used:
# 1. For the "mv" command that specifies a non-existent destination subdir,
# renaming should occur at the level of the src subdir, vs appending that
# subdir beneath the dst subdir like is done for copying. For example:
# gsutil rm -R gs://bucket
# gsutil cp -R dir1 gs://bucket
# gsutil cp -R dir2 gs://bucket/subdir1
# gsutil mv gs://bucket/subdir1 gs://bucket/subdir2
# would (if using cp naming behavior) end up with paths like:
# gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops
# whereas mv naming behavior should result in:
# gs://bucket/subdir2/dir2/.svn/all-wcprops
# 2. Copying from directories, buckets, or bucket subdirs should result in
# objects/files mirroring the source directory hierarchy. For example:
# gsutil cp dir1/dir2 gs://bucket
# should create the object gs://bucket/dir2/file2, assuming dir1/dir2
# contains file2).
# To be consistent with Linux cp behavior, there's one more wrinkle when
# working with subdirs: The resulting object names depend on whether the
# destination subdirectory exists. For example, if gs://bucket/subdir
# exists, the command:
# gsutil cp -R dir1/dir2 gs://bucket/subdir
# should create objects named like gs://bucket/subdir/dir2/a/b/c. In
# contrast, if gs://bucket/subdir does not exist, this same command
# should create objects named like gs://bucket/subdir/a/b/c.
# 3. Copying individual files or objects to dirs, buckets or bucket subdirs
# should result in objects/files named by the final source file name
# component. Example:
# gsutil cp dir1/*.txt gs://bucket
# should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt,
# assuming dir1 contains f1.txt and f2.txt.
if (self.perform_mv and self.recursion_requested
and src_uri_expands_to_multi and not have_existing_dest_subdir):
# Case 1. Handle naming rules for bucket subdir mv. Here we want to
# line up the src_uri against its expansion, to find the base to build
# the new name. For example, running the command:
# gsutil mv gs://bucket/abcd gs://bucket/xyz
# when processing exp_src_uri=gs://bucket/abcd/123
# exp_src_uri_tail should become /123
# Note: mv.py code disallows wildcard specification of source URI.
exp_src_uri_tail = exp_src_uri.uri[len(src_uri.uri):]
dst_key_name = '%s/%s' % (exp_dst_uri.object_name.rstrip('/'),
exp_src_uri_tail.strip('/'))
return exp_dst_uri.clone_replace_name(dst_key_name)
if src_uri_names_container and not exp_dst_uri.names_file():
# Case 2. Build dst_key_name from subpath of exp_src_uri past
# where src_uri ends. For example, for src_uri=gs://bucket/ and
# exp_src_uri=gs://bucket/src_subdir/obj, dst_key_name should be
# src_subdir/obj.
src_uri_path_sans_final_dir = _GetPathBeforeFinalDir(src_uri)
dst_key_name = exp_src_uri.uri[
len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim)
# Handle case where dst_uri is a non-existent subdir.
if not have_existing_dest_subdir:
dst_key_name = dst_key_name.partition(src_uri.delim)[-1]
# Handle special case where src_uri was a directory named with '.' or
# './', so that running a command like:
# gsutil cp -r . gs://dest
# will produce obj names of the form gs://dest/abc instead of
# gs://dest/./abc.
if dst_key_name.startswith('.%s' % os.sep):
dst_key_name = dst_key_name[2:]
else:
# Case 3.
dst_key_name = exp_src_uri.object_name.rpartition(src_uri.delim)[-1]
if (exp_dst_uri.is_file_uri()
or self._ShouldTreatDstUriAsBucketSubDir(
have_multiple_srcs, exp_dst_uri, have_existing_dest_subdir)):
if exp_dst_uri.object_name.endswith(exp_dst_uri.delim):
dst_key_name = '%s%s%s' % (
exp_dst_uri.object_name.rstrip(exp_dst_uri.delim),
exp_dst_uri.delim, dst_key_name)
else:
delim = exp_dst_uri.delim if exp_dst_uri.object_name else ''
dst_key_name = '%s%s%s' % (exp_dst_uri.object_name, delim, dst_key_name)
return exp_dst_uri.clone_replace_name(dst_key_name)
def _FixWindowsNaming(self, src_uri, dst_uri):
"""
Rewrites the destination URI built by _ConstructDstUri() to translate
Windows pathnames to cloud pathnames if needed.
Args:
src_uri: Source URI to be copied.
dst_uri: The destination URI built by _ConstructDstUri().
Returns:
StorageUri to use for copy.
"""
if (src_uri.is_file_uri() and src_uri.delim == '\\'
and dst_uri.is_cloud_uri()):
trans_uri_str = re.sub(r'\\', '/', dst_uri.uri)
dst_uri = self.suri_builder.StorageUri(trans_uri_str)
return dst_uri
# Command entry point.
def RunCommand(self):
# Inner funcs.
def _CopyExceptionHandler(e):
"""Simple exception handler to allow post-completion status."""
self.THREADED_LOGGER.error(str(e))
self.copy_failure_count += 1
def _CopyFunc(name_expansion_result):
"""Worker function for performing the actual copy (and rm, for mv)."""
if self.perform_mv:
cmd_name = 'mv'
else:
cmd_name = self.command_name
src_uri = self.suri_builder.StorageUri(
name_expansion_result.GetSrcUriStr())
exp_src_uri = self.suri_builder.StorageUri(
name_expansion_result.GetExpandedUriStr())
src_uri_names_container = name_expansion_result.NamesContainer()
src_uri_expands_to_multi = name_expansion_result.NamesContainer()
have_multiple_srcs = name_expansion_result.IsMultiSrcRequest()
have_existing_dest_subdir = (
name_expansion_result.HaveExistingDstContainer())
if src_uri.names_provider():
raise CommandException(
'The %s command does not allow provider-only source URIs (%s)' %
(cmd_name, src_uri))
if have_multiple_srcs:
self._InsistDstUriNamesContainer(exp_dst_uri,
have_existing_dst_container,
cmd_name)
if self.perform_mv:
if name_expansion_result.NamesContainer():
# Use recursion_requested when performing name expansion for the
# directory mv case so we can determine if any of the source URIs are
# directories (and then use cp -R and rm -R to perform the move, to
# match the behavior of Linux mv (which when moving a directory moves
# all the contained files).
self.recursion_requested = True
# Disallow wildcard src URIs when moving directories, as supporting it
# would make the name transformation too complex and would also be
# dangerous (e.g., someone could accidentally move many objects to the
# wrong name, or accidentally overwrite many objects).
if ContainsWildcard(src_uri):
raise CommandException('The mv command disallows naming source '
'directories using wildcards')
if (exp_dst_uri.is_file_uri()
and not os.path.exists(exp_dst_uri.object_name)
and have_multiple_srcs):
os.makedirs(exp_dst_uri.object_name)
dst_uri = self._ConstructDstUri(src_uri, exp_src_uri,
src_uri_names_container,
src_uri_expands_to_multi,
have_multiple_srcs, exp_dst_uri,
have_existing_dest_subdir)
dst_uri = self._FixWindowsNaming(src_uri, dst_uri)
self._CheckForDirFileConflict(exp_src_uri, dst_uri)
if self._SrcDstSame(exp_src_uri, dst_uri):
raise CommandException('%s: "%s" and "%s" are the same file - '
'abort.' % (cmd_name, exp_src_uri, dst_uri))
if dst_uri.is_cloud_uri() and dst_uri.is_version_specific:
raise CommandException('%s: a version-specific URI\n(%s)\ncannot be '
'the destination for gsutil cp - abort.'
% (cmd_name, dst_uri))
elapsed_time = bytes_transferred = 0
try:
(elapsed_time, bytes_transferred, result_uri) = (
self._PerformCopy(exp_src_uri, dst_uri))
except Exception, e:
if self._IsNoClobberServerException(e):
if not self.quiet:
self.THREADED_LOGGER.info('Rejected (noclobber): %s' % dst_uri.uri)
elif self.continue_on_error:
if not self.quiet:
self.THREADED_LOGGER.error('Error copying %s: %s' % (src_uri.uri,
str(e)))
self.copy_failure_count += 1
else:
raise
if self.print_ver:
# Some cases don't return a version-specific URI (e.g., if destination
# is a file).
if hasattr(result_uri, 'version_specific_uri'):
self.THREADED_LOGGER.info('Created: %s' %
result_uri.version_specific_uri)
else:
self.THREADED_LOGGER.info('Created: %s' % result_uri.uri)
# TODO: If we ever use -n (noclobber) with -M (move) (not possible today
# since we call copy internally from move and don't specify the -n flag)
# we'll need to only remove the source when we have not skipped the
# destination.
if self.perform_mv:
if not self.quiet:
self.THREADED_LOGGER.info('Removing %s...', exp_src_uri)
exp_src_uri.delete_key(validate=False, headers=self.headers)
stats_lock.acquire()
self.total_elapsed_time += elapsed_time
self.total_bytes_transferred += bytes_transferred
stats_lock.release()
# Start of RunCommand code.
self._ParseArgs()
self.total_elapsed_time = self.total_bytes_transferred = 0
if self.args[-1] == '-' or self.args[-1] == 'file://-':
self._HandleStreamingDownload()
return 0
if self.read_args_from_stdin:
if len(self.args) != 1:
raise CommandException('Source URIs cannot be specified with -I option')
uri_strs = self._StdinIterator()
else:
if len(self.args) < 2:
raise CommandException('Wrong number of arguments for "cp" command.')
uri_strs = self.args[0:len(self.args)-1]
(exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri(
self.args[-1])
name_expansion_iterator = NameExpansionIterator(
self.command_name, self.proj_id_handler, self.headers, self.debug,
self.bucket_storage_uri_class, uri_strs,
self.recursion_requested or self.perform_mv,
have_existing_dst_container)
# Use a lock to ensure accurate statistics in the face of
# multi-threading/multi-processing.
stats_lock = threading.Lock()
# Tracks if any copies failed.
self.copy_failure_count = 0
# Start the clock.
start_time = time.time()
# Tuple of attributes to share/manage across multiple processes in
# parallel (-m) mode.
shared_attrs = ('copy_failure_count', 'total_bytes_transferred')
# Perform copy requests in parallel (-m) mode, if requested, using
# configured number of parallel processes and threads. Otherwise,
# perform requests with sequential function calls in current process.
self.Apply(_CopyFunc, name_expansion_iterator, _CopyExceptionHandler,
shared_attrs)
if self.debug:
print 'total_bytes_transferred:' + str(self.total_bytes_transferred)
end_time = time.time()
self.total_elapsed_time = end_time - start_time
# Sometimes, particularly when running unit tests, the total elapsed time
# is really small. On Windows, the timer resolution is too small and
# causes total_elapsed_time to be zero.
try:
float(self.total_bytes_transferred) / float(self.total_elapsed_time)
except ZeroDivisionError:
self.total_elapsed_time = 0.01
self.total_bytes_per_second = (float(self.total_bytes_transferred) /
float(self.total_elapsed_time))
if self.debug == 3:
# Note that this only counts the actual GET and PUT bytes for the copy
# - not any transfers for doing wildcard expansion, the initial HEAD
# request boto performs when doing a bucket.get_key() operation, etc.
if self.total_bytes_transferred != 0:
self.THREADED_LOGGER.info(
'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)',
self.total_bytes_transferred, self.total_elapsed_time,
MakeHumanReadable(self.total_bytes_per_second))
if self.copy_failure_count:
plural_str = ''
if self.copy_failure_count > 1:
plural_str = 's'
raise CommandException('%d file%s/object%s could not be transferred.' % (
self.copy_failure_count, plural_str, plural_str))
return 0
def _ParseArgs(self):
self.perform_mv = False
self.exclude_symlinks = False
self.quiet = False
self.no_clobber = False
self.continue_on_error = False
self.daisy_chain = False
self.read_args_from_stdin = False
self.print_ver = False
# self.recursion_requested initialized in command.py (so can be checked
# in parent class for all commands).
if self.sub_opts:
for o, unused_a in self.sub_opts:
if o == '-c':
self.continue_on_error = True
elif o == '-D':
self.daisy_chain = True
elif o == '-e':
self.exclude_symlinks = True
elif o == '-I':
self.read_args_from_stdin = True
elif o == '-M':
# Note that we signal to the cp command to perform a move (copy
# followed by remove) and use directory-move naming rules by passing
# the undocumented (for internal use) -M option when running the cp
# command from mv.py.
self.perform_mv = True
elif o == '-n':
self.no_clobber = True
elif o == '-q':
self.quiet = True
elif o == '-r' or o == '-R':
self.recursion_requested = True
elif o == '-v':
self.print_ver = True
def _HandleStreamingDownload(self):
# Destination is <STDOUT>. Manipulate sys.stdout so as to redirect all
# debug messages to <STDERR>.
stdout_fp = sys.stdout
sys.stdout = sys.stderr
did_some_work = False
for uri_str in self.args[0:len(self.args)-1]:
for uri in self.WildcardIterator(uri_str).IterUris():
did_some_work = True
key = uri.get_key(False, self.headers)
(elapsed_time, bytes_transferred) = self._PerformDownloadToStream(
key, uri, stdout_fp, self.headers)
self.total_elapsed_time += elapsed_time
self.total_bytes_transferred += bytes_transferred
if not did_some_work:
raise CommandException('No URIs matched')
if self.debug == 3:
if self.total_bytes_transferred != 0:
self.THREADED_LOGGER.info(
'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)',
self.total_bytes_transferred, self.total_elapsed_time,
MakeHumanReadable(float(self.total_bytes_transferred) /
float(self.total_elapsed_time)))
def _StdinIterator(self):
"""A generator function that returns lines from stdin."""
for line in sys.stdin:
# Strip CRLF.
yield line.rstrip()
def _SrcDstSame(self, src_uri, dst_uri):
"""Checks if src_uri and dst_uri represent the same object or file.
We don't handle anything about hard or symbolic links.
Args:
src_uri: Source StorageUri.
dst_uri: Destination StorageUri.
Returns:
Bool indicator.
"""
if src_uri.is_file_uri() and dst_uri.is_file_uri():
# Translate a/b/./c to a/b/c, so src=dst comparison below works.
new_src_path = os.path.normpath(src_uri.object_name)
new_dst_path = os.path.normpath(dst_uri.object_name)
return (src_uri.clone_replace_name(new_src_path).uri ==
dst_uri.clone_replace_name(new_dst_path).uri)
else:
return (src_uri.uri == dst_uri.uri and
src_uri.generation == dst_uri.generation and
src_uri.version_id == dst_uri.version_id)
def _ShouldTreatDstUriAsBucketSubDir(self, have_multiple_srcs, dst_uri,
have_existing_dest_subdir):
"""
Checks whether dst_uri should be treated as a bucket "sub-directory". The
decision about whether something constitutes a bucket "sub-directory"
depends on whether there are multiple sources in this request and whether
there is an existing bucket subdirectory. For example, when running the
command:
gsutil cp file gs://bucket/abc
if there's no existing gs://bucket/abc bucket subdirectory we should copy
file to the object gs://bucket/abc. In contrast, if
there's an existing gs://bucket/abc bucket subdirectory we should copy
file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc
exists, when running the command:
gsutil cp file1 file2 gs://bucket/abc
we should copy file1 to gs://bucket/abc/file1 (and similarly for file2).
Note that we don't disallow naming a bucket "sub-directory" where there's
already an object at that URI. For example it's legitimate (albeit
confusing) to have an object called gs://bucket/dir and
then run the command
gsutil cp file1 file2 gs://bucket/dir
Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1,
and gs://bucket/dir/file2.
Args:
have_multiple_srcs: Bool indicator of whether this is a multi-source
operation.
dst_uri: StorageUri to check.
have_existing_dest_subdir: bool indicator whether dest is an existing
subdirectory.
Returns:
bool indicator.
"""
return ((have_multiple_srcs and dst_uri.is_cloud_uri())
or (have_existing_dest_subdir))
def _ShouldTreatDstUriAsSingleton(self, have_multiple_srcs,
have_existing_dest_subdir, dst_uri):
"""
Checks that dst_uri names a singleton (file or object) after
dir/wildcard expansion. The decision is more nuanced than simply
dst_uri.names_singleton()) because of the possibility that an object path
might name a bucket sub-directory.
Args:
have_multiple_srcs: Bool indicator of whether this is a multi-source
operation.
have_existing_dest_subdir: bool indicator whether dest is an existing
subdirectory.
dst_uri: StorageUri to check.
Returns:
bool indicator.
"""
if have_multiple_srcs:
# Only a file meets the criteria in this case.
return dst_uri.names_file()
return not have_existing_dest_subdir and dst_uri.names_singleton()
def _IsNoClobberServerException(self, e):
"""
Checks to see if the server attempted to clobber a file after we specified
in the header that we didn't want the file clobbered.
Args:
e: The Exception that was generated by a failed copy operation
Returns:
bool indicator - True indicates that the server did attempt to clobber
an existing file.
"""
return self.no_clobber and (
(isinstance(e, GSResponseError) and e.status==412) or
(isinstance(e, ResumableUploadException) and 'code 412' in e.message))
def _GetPathBeforeFinalDir(uri):
"""
Returns the part of the path before the final directory component for the
given URI, handling cases for file system directories, bucket, and bucket
subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket',
and for file://dir we'll return file://
Args:
uri: StorageUri.
Returns:
String name of above-described path, sans final path separator.
"""
sep = uri.delim
assert not uri.names_file()
if uri.names_directory():
past_scheme = uri.uri[len('file://'):]
if past_scheme.find(sep) == -1:
return 'file://'
else:
return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
if uri.names_bucket():
return '%s://' % uri.scheme
# Else it names a bucket subdir.
return uri.uri.rstrip(sep).rpartition(sep)[0]
def _hash_filename(filename):
"""
Apply a hash function (SHA1) to shorten the passed file name. The spec
for the hashed file name is as follows:
TRACKER_<hash>_<trailing>
where hash is a SHA1 hash on the original file name and trailing is
the last 16 chars from the original file name. Max file name lengths
vary by operating system so the goal of this function is to ensure
the hashed version takes fewer than 100 characters.
Args:
filename: file name to be hashed.
Returns:
shorter, hashed version of passed file name
"""
if not isinstance(filename, unicode):
filename = unicode(filename, 'utf8').encode('utf-8')
m = hashlib.sha1(filename)
return "TRACKER_" + m.hexdigest() + '.' + filename[-16:]