1# Copyright 2021 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Module for file-like access of blobs, usually invoked via Blob.open().""" 
    16 
    17import io 
    18 
    19from google.api_core.exceptions import RequestRangeNotSatisfiable 
    20from google.cloud.storage.retry import DEFAULT_RETRY 
    21from google.cloud.storage.retry import ConditionalRetryPolicy 
    22 
    23 
    24# Resumable uploads require a chunk size of precisely a multiple of 256 KiB. 
    25CHUNK_SIZE_MULTIPLE = 256 * 1024  # 256 KiB 
    26DEFAULT_CHUNK_SIZE = 40 * 1024 * 1024  # 40 MiB 
    27 
    28# Valid keyword arguments for download methods, and blob.reload() if needed. 
    29# Note: Changes here need to be reflected in the blob.open() docstring. 
    30VALID_DOWNLOAD_KWARGS = { 
    31    "if_generation_match", 
    32    "if_generation_not_match", 
    33    "if_metageneration_match", 
    34    "if_metageneration_not_match", 
    35    "timeout", 
    36    "retry", 
    37    "raw_download", 
    38    "single_shot_download", 
    39} 
    40 
    41# Valid keyword arguments for upload methods. 
    42# Note: Changes here need to be reflected in the blob.open() docstring. 
    43VALID_UPLOAD_KWARGS = { 
    44    "content_type", 
    45    "predefined_acl", 
    46    "if_generation_match", 
    47    "if_generation_not_match", 
    48    "if_metageneration_match", 
    49    "if_metageneration_not_match", 
    50    "timeout", 
    51    "checksum", 
    52    "retry", 
    53} 
    54 
    55 
    56class BlobReader(io.BufferedIOBase): 
    57    """A file-like object that reads from a blob. 
    58 
    59    :type blob: 'google.cloud.storage.blob.Blob' 
    60    :param blob: 
    61        The blob to download. 
    62 
    63    :type chunk_size: long 
    64    :param chunk_size: 
    65        (Optional) The minimum number of bytes to read at a time. If fewer 
    66        bytes than the chunk_size are requested, the remainder is buffered. 
    67        The default is the chunk_size of the blob, or 40MiB. 
    68 
    69    :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 
    70    :param retry: 
    71        (Optional) How to retry the RPC. A None value will disable 
    72        retries. A google.api_core.retry.Retry value will enable retries, 
    73        and the object will define retriable response codes and errors and 
    74        configure backoff and timeout options. 
    75 
    76        A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a 
    77        Retry object and activates it only if certain conditions are met. 
    78        This class exists to provide safe defaults for RPC calls that are 
    79        not technically safe to retry normally (due to potential data 
    80        duplication or other side-effects) but become safe to retry if a 
    81        condition such as if_metageneration_match is set. 
    82 
    83        See the retry.py source code and docstrings in this package 
    84        (google.cloud.storage.retry) for information on retry types and how 
    85        to configure them. 
    86 
    87        Media operations (downloads and uploads) do not support non-default 
    88        predicates in a Retry object. The default will always be used. Other 
    89        configuration changes for Retry objects such as delays and deadlines 
    90        are respected. 
    91 
    92    :type download_kwargs: dict 
    93    :param download_kwargs: 
    94        Keyword arguments to pass to the underlying API calls. 
    95        The following arguments are supported: 
    96 
    97        - ``if_generation_match`` 
    98        - ``if_generation_not_match`` 
    99        - ``if_metageneration_match`` 
    100        - ``if_metageneration_not_match`` 
    101        - ``timeout`` 
    102        - ``raw_download`` 
    103        - ``single_shot_download`` 
    104 
    105        Note that download_kwargs (excluding ``raw_download`` and ``single_shot_download``) are also applied to blob.reload(), 
    106        if a reload is needed during seek(). 
    107    """ 
    108 
    109    def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs): 
    110        for kwarg in download_kwargs: 
    111            if kwarg not in VALID_DOWNLOAD_KWARGS: 
    112                raise ValueError( 
    113                    f"BlobReader does not support keyword argument {kwarg}." 
    114                ) 
    115 
    116        self._blob = blob 
    117        self._pos = 0 
    118        self._buffer = io.BytesIO() 
    119        self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE 
    120        self._retry = retry 
    121        self._download_kwargs = download_kwargs 
    122 
    123    def read(self, size=-1): 
    124        self._checkClosed()  # Raises ValueError if closed. 
    125 
    126        result = self._buffer.read(size) 
    127        # If the read request demands more bytes than are buffered, fetch more. 
    128        remaining_size = size - len(result) 
    129        if remaining_size > 0 or size < 0: 
    130            self._pos += self._buffer.tell() 
    131            read_size = len(result) 
    132 
    133            self._buffer.seek(0) 
    134            self._buffer.truncate(0)  # Clear the buffer to make way for new data. 
    135            fetch_start = self._pos 
    136            if size > 0: 
    137                # Fetch the larger of self._chunk_size or the remaining_size. 
    138                fetch_end = fetch_start + max(remaining_size, self._chunk_size) 
    139            else: 
    140                fetch_end = None 
    141 
    142            # Download the blob. Checksumming must be disabled as we are using 
    143            # chunked downloads, and the server only knows the checksum of the 
    144            # entire file. 
    145            try: 
    146                result += self._blob.download_as_bytes( 
    147                    start=fetch_start, 
    148                    end=fetch_end, 
    149                    checksum=None, 
    150                    retry=self._retry, 
    151                    **self._download_kwargs, 
    152                ) 
    153            except RequestRangeNotSatisfiable: 
    154                # We've reached the end of the file. Python file objects should 
    155                # return an empty response in this case, not raise an error. 
    156                pass 
    157 
    158            # If more bytes were read than is immediately needed, buffer the 
    159            # remainder and then trim the result. 
    160            if size > 0 and len(result) > size: 
    161                self._buffer.write(result[size:]) 
    162                self._buffer.seek(0) 
    163                result = result[:size] 
    164            # Increment relative offset by true amount read. 
    165            self._pos += len(result) - read_size 
    166        return result 
    167 
    168    def read1(self, size=-1): 
    169        return self.read(size) 
    170 
    171    def seek(self, pos, whence=0): 
    172        """Seek within the blob. 
    173 
    174        This implementation of seek() uses knowledge of the blob size to 
    175        validate that the reported position does not exceed the blob last byte. 
    176        If the blob size is not already known it will call blob.reload(). 
    177        """ 
    178        self._checkClosed()  # Raises ValueError if closed. 
    179 
    180        if self._blob.size is None: 
    181            reload_kwargs = { 
    182                k: v 
    183                for k, v in self._download_kwargs.items() 
    184                if (k != "raw_download" and k != "single_shot_download") 
    185            } 
    186            self._blob.reload(**reload_kwargs) 
    187 
    188        initial_offset = self._pos + self._buffer.tell() 
    189 
    190        if whence == 0: 
    191            target_pos = pos 
    192        elif whence == 1: 
    193            target_pos = initial_offset + pos 
    194        elif whence == 2: 
    195            target_pos = self._blob.size + pos 
    196        if whence not in {0, 1, 2}: 
    197            raise ValueError("invalid whence value") 
    198 
    199        if target_pos > self._blob.size: 
    200            target_pos = self._blob.size 
    201 
    202        # Seek or invalidate buffer as needed. 
    203        if target_pos < self._pos: 
    204            # Target position < relative offset <= true offset. 
    205            # As data is not in buffer, invalidate buffer. 
    206            self._buffer.seek(0) 
    207            self._buffer.truncate(0) 
    208            new_pos = target_pos 
    209            self._pos = target_pos 
    210        else: 
    211            # relative offset <= target position <= size of file. 
    212            difference = target_pos - initial_offset 
    213            new_pos = self._pos + self._buffer.seek(difference, 1) 
    214        return new_pos 
    215 
    216    def close(self): 
    217        self._buffer.close() 
    218 
    219    @property 
    220    def closed(self): 
    221        return self._buffer.closed 
    222 
    223    def readable(self): 
    224        return True 
    225 
    226    def writable(self): 
    227        return False 
    228 
    229    def seekable(self): 
    230        return True 
    231 
    232 
    233class BlobWriter(io.BufferedIOBase): 
    234    """A file-like object that writes to a blob. 
    235 
    236    :type blob: 'google.cloud.storage.blob.Blob' 
    237    :param blob: 
    238        The blob to which to write. 
    239 
    240    :type chunk_size: long 
    241    :param chunk_size: 
    242        (Optional) The maximum number of bytes to buffer before sending data 
    243        to the server, and the size of each request when data is sent. 
    244        Writes are implemented as a "resumable upload", so chunk_size for 
    245        writes must be exactly a multiple of 256KiB as with other resumable 
    246        uploads. The default is the chunk_size of the blob, or 40 MiB. 
    247 
    248    :type ignore_flush: bool 
    249    :param ignore_flush: 
    250        Makes flush() do nothing instead of raise an error. flush() without 
    251        closing is not supported by the remote service and therefore calling it 
    252        on this class normally results in io.UnsupportedOperation. However, that 
    253        behavior is incompatible with some consumers and wrappers of file 
    254        objects in Python, such as zipfile.ZipFile or io.TextIOWrapper. Setting 
    255        ignore_flush will cause flush() to successfully do nothing, for 
    256        compatibility with those contexts. The correct way to actually flush 
    257        data to the remote server is to close() (using this object as a context 
    258        manager is recommended). 
    259 
    260    :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 
    261    :param retry: 
    262        (Optional) How to retry the RPC. A None value will disable 
    263        retries. A google.api_core.retry.Retry value will enable retries, 
    264        and the object will define retriable response codes and errors and 
    265        configure backoff and timeout options. 
    266 
    267        A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a 
    268        Retry object and activates it only if certain conditions are met. 
    269        This class exists to provide safe defaults for RPC calls that are 
    270        not technically safe to retry normally (due to potential data 
    271        duplication or other side-effects) but become safe to retry if a 
    272        condition such as if_metageneration_match is set. 
    273 
    274        See the retry.py source code and docstrings in this package 
    275        (google.cloud.storage.retry) for information on retry types and how 
    276        to configure them. 
    277 
    278        Media operations (downloads and uploads) do not support non-default 
    279        predicates in a Retry object. The default will always be used. Other 
    280        configuration changes for Retry objects such as delays and deadlines 
    281        are respected. 
    282 
    283    :type upload_kwargs: dict 
    284    :param upload_kwargs: 
    285        Keyword arguments to pass to the underlying API 
    286        calls. The following arguments are supported: 
    287 
    288        - ``if_generation_match`` 
    289        - ``if_generation_not_match`` 
    290        - ``if_metageneration_match`` 
    291        - ``if_metageneration_not_match`` 
    292        - ``timeout`` 
    293        - ``content_type`` 
    294        - ``predefined_acl`` 
    295        - ``checksum`` 
    296    """ 
    297 
    298    def __init__( 
    299        self, 
    300        blob, 
    301        chunk_size=None, 
    302        ignore_flush=False, 
    303        retry=DEFAULT_RETRY, 
    304        **upload_kwargs, 
    305    ): 
    306        for kwarg in upload_kwargs: 
    307            if kwarg not in VALID_UPLOAD_KWARGS: 
    308                raise ValueError( 
    309                    f"BlobWriter does not support keyword argument {kwarg}." 
    310                ) 
    311        self._blob = blob 
    312        self._buffer = SlidingBuffer() 
    313        self._upload_and_transport = None 
    314        # Resumable uploads require a chunk size of a multiple of 256KiB. 
    315        # self._chunk_size must not be changed after the upload is initiated. 
    316        self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE 
    317        self._ignore_flush = ignore_flush 
    318        self._retry = retry 
    319        self._upload_kwargs = upload_kwargs 
    320 
    321    @property 
    322    def _chunk_size(self): 
    323        """Get the blob's default chunk size. 
    324 
    325        :rtype: int or ``NoneType`` 
    326        :returns: The current blob's chunk size, if it is set. 
    327        """ 
    328        return self.__chunk_size 
    329 
    330    @_chunk_size.setter 
    331    def _chunk_size(self, value): 
    332        """Set the blob's default chunk size. 
    333 
    334        :type value: int 
    335        :param value: (Optional) The current blob's chunk size, if it is set. 
    336 
    337        :raises: :class:`ValueError` if ``value`` is not ``None`` and is not a 
    338                 multiple of 256 KiB. 
    339        """ 
    340        if value is not None and value > 0 and value % CHUNK_SIZE_MULTIPLE != 0: 
    341            raise ValueError( 
    342                "Chunk size must be a multiple of %d." % CHUNK_SIZE_MULTIPLE 
    343            ) 
    344        self.__chunk_size = value 
    345 
    346    def write(self, b): 
    347        self._checkClosed()  # Raises ValueError if closed. 
    348 
    349        pos = self._buffer.write(b) 
    350 
    351        # If there is enough content, upload chunks. 
    352        num_chunks = len(self._buffer) // self._chunk_size 
    353        if num_chunks: 
    354            self._upload_chunks_from_buffer(num_chunks) 
    355 
    356        return pos 
    357 
    358    def _initiate_upload(self): 
    359        retry = self._retry 
    360        content_type = self._upload_kwargs.pop("content_type", None) 
    361 
    362        # Handle ConditionalRetryPolicy. 
    363        if isinstance(retry, ConditionalRetryPolicy): 
    364            # Conditional retries are designed for non-media calls, which change 
    365            # arguments into query_params dictionaries. Media operations work 
    366            # differently, so here we make a "fake" query_params to feed to the 
    367            # ConditionalRetryPolicy. 
    368            query_params = { 
    369                "ifGenerationMatch": self._upload_kwargs.get("if_generation_match"), 
    370                "ifMetagenerationMatch": self._upload_kwargs.get( 
    371                    "if_metageneration_match" 
    372                ), 
    373            } 
    374            retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) 
    375 
    376        self._upload_and_transport = self._blob._initiate_resumable_upload( 
    377            self._blob.bucket.client, 
    378            self._buffer, 
    379            content_type, 
    380            None, 
    381            chunk_size=self._chunk_size, 
    382            retry=retry, 
    383            **self._upload_kwargs, 
    384        ) 
    385 
    386    def _upload_chunks_from_buffer(self, num_chunks): 
    387        """Upload a specified number of chunks.""" 
    388 
    389        # Initialize the upload if necessary. 
    390        if not self._upload_and_transport: 
    391            self._initiate_upload() 
    392 
    393        upload, transport = self._upload_and_transport 
    394 
    395        # Attach timeout if specified in the keyword arguments. 
    396        # Otherwise, the default timeout will be used from the media library. 
    397        kwargs = {} 
    398        if "timeout" in self._upload_kwargs: 
    399            kwargs = {"timeout": self._upload_kwargs.get("timeout")} 
    400 
    401        # Upload chunks. The SlidingBuffer class will manage seek position. 
    402        for _ in range(num_chunks): 
    403            upload.transmit_next_chunk(transport, **kwargs) 
    404 
    405        # Wipe the buffer of chunks uploaded, preserving any remaining data. 
    406        self._buffer.flush() 
    407 
    408    def tell(self): 
    409        return self._buffer.tell() + len(self._buffer) 
    410 
    411    def flush(self): 
    412        # flush() is not fully supported by the remote service, so raise an 
    413        # error here, unless self._ignore_flush is set. 
    414        if not self._ignore_flush: 
    415            raise io.UnsupportedOperation( 
    416                "Cannot flush without finalizing upload. Use close() instead, " 
    417                "or set ignore_flush=True when constructing this class (see " 
    418                "docstring)." 
    419            ) 
    420 
    421    def close(self): 
    422        if not self._buffer.closed: 
    423            self._upload_chunks_from_buffer(1) 
    424        self._buffer.close() 
    425 
    426    def terminate(self): 
    427        """Cancel the ResumableUpload.""" 
    428        if self._upload_and_transport: 
    429            upload, transport = self._upload_and_transport 
    430            transport.delete(upload.upload_url) 
    431        self._buffer.close() 
    432 
    433    def __exit__(self, exc_type, exc_val, exc_tb): 
    434        if exc_type is not None: 
    435            self.terminate() 
    436        else: 
    437            self.close() 
    438 
    439    @property 
    440    def closed(self): 
    441        return self._buffer.closed 
    442 
    443    def readable(self): 
    444        return False 
    445 
    446    def writable(self): 
    447        return True 
    448 
    449    def seekable(self): 
    450        return False 
    451 
    452 
    453class SlidingBuffer(object): 
    454    """A non-rewindable buffer that frees memory of chunks already consumed. 
    455 
    456    This class is necessary because `google-resumable-media-python` expects 
    457    `tell()` to work relative to the start of the file, not relative to a place 
    458    in an intermediate buffer. Using this class, we present an external 
    459    interface with consistent seek and tell behavior without having to actually 
    460    store bytes already sent. 
    461 
    462    Behavior of this class differs from an ordinary BytesIO buffer. `write()` 
    463    will always append to the end of the file only and not change the seek 
    464    position otherwise. `flush()` will delete all data already read (data to the 
    465    left of the seek position). `tell()` will report the seek position of the 
    466    buffer including all deleted data. Additionally the class implements 
    467    __len__() which will report the size of the actual underlying buffer. 
    468 
    469    This class does not attempt to implement the entire Python I/O interface. 
    470    """ 
    471 
    472    def __init__(self): 
    473        self._buffer = io.BytesIO() 
    474        self._cursor = 0 
    475 
    476    def write(self, b): 
    477        """Append to the end of the buffer without changing the position.""" 
    478        self._checkClosed()  # Raises ValueError if closed. 
    479 
    480        bookmark = self._buffer.tell() 
    481        self._buffer.seek(0, io.SEEK_END) 
    482        pos = self._buffer.write(b) 
    483        self._buffer.seek(bookmark) 
    484        return pos 
    485 
    486    def read(self, size=-1): 
    487        """Read and move the cursor.""" 
    488        self._checkClosed()  # Raises ValueError if closed. 
    489 
    490        data = self._buffer.read(size) 
    491        self._cursor += len(data) 
    492        return data 
    493 
    494    def flush(self): 
    495        """Delete already-read data (all data to the left of the position).""" 
    496        self._checkClosed()  # Raises ValueError if closed. 
    497 
    498        # BytesIO can't be deleted from the left, so save any leftover, unread 
    499        # data and truncate at 0, then readd leftover data. 
    500        leftover = self._buffer.read() 
    501        self._buffer.seek(0) 
    502        self._buffer.truncate(0) 
    503        self._buffer.write(leftover) 
    504        self._buffer.seek(0) 
    505 
    506    def tell(self): 
    507        """Report how many bytes have been read from the buffer in total.""" 
    508        return self._cursor 
    509 
    510    def seek(self, pos): 
    511        """Seek to a position (backwards only) within the internal buffer. 
    512 
    513        This implementation of seek() verifies that the seek destination is 
    514        contained in _buffer. It will raise ValueError if the destination byte 
    515        has already been purged from the buffer. 
    516 
    517        The "whence" argument is not supported in this implementation. 
    518        """ 
    519        self._checkClosed()  # Raises ValueError if closed. 
    520 
    521        buffer_initial_pos = self._buffer.tell() 
    522        difference = pos - self._cursor 
    523        buffer_seek_result = self._buffer.seek(difference, io.SEEK_CUR) 
    524        if ( 
    525            not buffer_seek_result - buffer_initial_pos == difference 
    526            or pos > self._cursor 
    527        ): 
    528            # The seek did not arrive at the expected byte because the internal 
    529            # buffer does not (or no longer) contains the byte. Reset and raise. 
    530            self._buffer.seek(buffer_initial_pos) 
    531            raise ValueError("Cannot seek() to that value.") 
    532 
    533        self._cursor = pos 
    534        return self._cursor 
    535 
    536    def __len__(self): 
    537        """Determine the size of the buffer by seeking to the end.""" 
    538        bookmark = self._buffer.tell() 
    539        length = self._buffer.seek(0, io.SEEK_END) 
    540        self._buffer.seek(bookmark) 
    541        return length 
    542 
    543    def close(self): 
    544        return self._buffer.close() 
    545 
    546    def _checkClosed(self): 
    547        return self._buffer._checkClosed() 
    548 
    549    @property 
    550    def closed(self): 
    551        return self._buffer.closed