Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/storage/fileio.py: 52%
193 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:17 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:17 +0000
1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Module for file-like access of blobs, usually invoked via Blob.open()."""
17import io
18import warnings
20from google.api_core.exceptions import RequestRangeNotSatisfiable
21from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE
22from google.cloud.storage.retry import DEFAULT_RETRY
23from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED
24from google.cloud.storage.retry import ConditionalRetryPolicy
27# Resumable uploads require a chunk size of precisely a multiple of 256 KiB.
28CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB
29DEFAULT_CHUNK_SIZE = 40 * 1024 * 1024 # 40 MiB
31# Valid keyword arguments for download methods, and blob.reload() if needed.
32# Note: Changes here need to be reflected in the blob.open() docstring.
33VALID_DOWNLOAD_KWARGS = {
34 "if_generation_match",
35 "if_generation_not_match",
36 "if_metageneration_match",
37 "if_metageneration_not_match",
38 "timeout",
39 "retry",
40 "raw_download",
41}
43# Valid keyword arguments for upload methods.
44# Note: Changes here need to be reflected in the blob.open() docstring.
45VALID_UPLOAD_KWARGS = {
46 "content_type",
47 "predefined_acl",
48 "num_retries",
49 "if_generation_match",
50 "if_generation_not_match",
51 "if_metageneration_match",
52 "if_metageneration_not_match",
53 "timeout",
54 "checksum",
55 "retry",
56}
59class BlobReader(io.BufferedIOBase):
60 """A file-like object that reads from a blob.
62 :type blob: 'google.cloud.storage.blob.Blob'
63 :param blob:
64 The blob to download.
66 :type chunk_size: long
67 :param chunk_size:
68 (Optional) The minimum number of bytes to read at a time. If fewer
69 bytes than the chunk_size are requested, the remainder is buffered.
70 The default is the chunk_size of the blob, or 40MiB.
72 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
73 :param retry:
74 (Optional) How to retry the RPC. A None value will disable
75 retries. A google.api_core.retry.Retry value will enable retries,
76 and the object will define retriable response codes and errors and
77 configure backoff and timeout options.
79 A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
80 Retry object and activates it only if certain conditions are met.
81 This class exists to provide safe defaults for RPC calls that are
82 not technically safe to retry normally (due to potential data
83 duplication or other side-effects) but become safe to retry if a
84 condition such as if_metageneration_match is set.
86 See the retry.py source code and docstrings in this package
87 (google.cloud.storage.retry) for information on retry types and how
88 to configure them.
90 Media operations (downloads and uploads) do not support non-default
91 predicates in a Retry object. The default will always be used. Other
92 configuration changes for Retry objects such as delays and deadlines
93 are respected.
95 :param download_kwargs:
96 Keyword arguments to pass to the underlying API calls.
97 The following arguments are supported:
99 - ``if_generation_match``
100 - ``if_generation_not_match``
101 - ``if_metageneration_match``
102 - ``if_metageneration_not_match``
103 - ``timeout``
105 Note that download_kwargs are also applied to blob.reload(), if a reload
106 is needed during seek().
107 """
109 def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs):
110 for kwarg in download_kwargs:
111 if kwarg not in VALID_DOWNLOAD_KWARGS:
112 raise ValueError(
113 f"BlobReader does not support keyword argument {kwarg}."
114 )
116 self._blob = blob
117 self._pos = 0
118 self._buffer = io.BytesIO()
119 self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
120 self._retry = retry
121 self._download_kwargs = download_kwargs
123 def read(self, size=-1):
124 self._checkClosed() # Raises ValueError if closed.
126 result = self._buffer.read(size)
127 # If the read request demands more bytes than are buffered, fetch more.
128 remaining_size = size - len(result)
129 if remaining_size > 0 or size < 0:
130 self._pos += self._buffer.tell()
131 read_size = len(result)
133 self._buffer.seek(0)
134 self._buffer.truncate(0) # Clear the buffer to make way for new data.
135 fetch_start = self._pos
136 if size > 0:
137 # Fetch the larger of self._chunk_size or the remaining_size.
138 fetch_end = fetch_start + max(remaining_size, self._chunk_size)
139 else:
140 fetch_end = None
142 # Download the blob. Checksumming must be disabled as we are using
143 # chunked downloads, and the server only knows the checksum of the
144 # entire file.
145 try:
146 result += self._blob.download_as_bytes(
147 start=fetch_start,
148 end=fetch_end,
149 checksum=None,
150 retry=self._retry,
151 **self._download_kwargs,
152 )
153 except RequestRangeNotSatisfiable:
154 # We've reached the end of the file. Python file objects should
155 # return an empty response in this case, not raise an error.
156 pass
158 # If more bytes were read than is immediately needed, buffer the
159 # remainder and then trim the result.
160 if size > 0 and len(result) > size:
161 self._buffer.write(result[size:])
162 self._buffer.seek(0)
163 result = result[:size]
164 # Increment relative offset by true amount read.
165 self._pos += len(result) - read_size
166 return result
168 def read1(self, size=-1):
169 return self.read(size)
171 def seek(self, pos, whence=0):
172 """Seek within the blob.
174 This implementation of seek() uses knowledge of the blob size to
175 validate that the reported position does not exceed the blob last byte.
176 If the blob size is not already known it will call blob.reload().
177 """
178 self._checkClosed() # Raises ValueError if closed.
180 if self._blob.size is None:
181 self._blob.reload(**self._download_kwargs)
183 initial_offset = self._pos + self._buffer.tell()
185 if whence == 0:
186 target_pos = pos
187 elif whence == 1:
188 target_pos = initial_offset + pos
189 elif whence == 2:
190 target_pos = self._blob.size + pos
191 if whence not in {0, 1, 2}:
192 raise ValueError("invalid whence value")
194 if target_pos > self._blob.size:
195 target_pos = self._blob.size
197 # Seek or invalidate buffer as needed.
198 if target_pos < self._pos:
199 # Target position < relative offset <= true offset.
200 # As data is not in buffer, invalidate buffer.
201 self._buffer.seek(0)
202 self._buffer.truncate(0)
203 new_pos = target_pos
204 self._pos = target_pos
205 else:
206 # relative offset <= target position <= size of file.
207 difference = target_pos - initial_offset
208 new_pos = self._pos + self._buffer.seek(difference, 1)
209 return new_pos
211 def close(self):
212 self._buffer.close()
214 @property
215 def closed(self):
216 return self._buffer.closed
218 def readable(self):
219 return True
221 def writable(self):
222 return False
224 def seekable(self):
225 return True
228class BlobWriter(io.BufferedIOBase):
229 """A file-like object that writes to a blob.
231 :type blob: 'google.cloud.storage.blob.Blob'
232 :param blob:
233 The blob to which to write.
235 :type chunk_size: long
236 :param chunk_size:
237 (Optional) The maximum number of bytes to buffer before sending data
238 to the server, and the size of each request when data is sent.
239 Writes are implemented as a "resumable upload", so chunk_size for
240 writes must be exactly a multiple of 256KiB as with other resumable
241 uploads. The default is the chunk_size of the blob, or 40 MiB.
243 :type text_mode: bool
244 :param text_mode:
245 (Deprecated) A synonym for ignore_flush. For backwards-compatibility,
246 if True, sets ignore_flush to True. Use ignore_flush instead. This
247 parameter will be removed in a future release.
249 :type ignore_flush: bool
250 :param ignore_flush:
251 Makes flush() do nothing instead of raise an error. flush() without
252 closing is not supported by the remote service and therefore calling it
253 on this class normally results in io.UnsupportedOperation. However, that
254 behavior is incompatible with some consumers and wrappers of file
255 objects in Python, such as zipfile.ZipFile or io.TextIOWrapper. Setting
256 ignore_flush will cause flush() to successfully do nothing, for
257 compatibility with those contexts. The correct way to actually flush
258 data to the remote server is to close() (using this object as a context
259 manager is recommended).
261 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
262 :param retry:
263 (Optional) How to retry the RPC. A None value will disable
264 retries. A google.api_core.retry.Retry value will enable retries,
265 and the object will define retriable response codes and errors and
266 configure backoff and timeout options.
268 A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
269 Retry object and activates it only if certain conditions are met.
270 This class exists to provide safe defaults for RPC calls that are
271 not technically safe to retry normally (due to potential data
272 duplication or other side-effects) but become safe to retry if a
273 condition such as if_metageneration_match is set.
275 See the retry.py source code and docstrings in this package
276 (google.cloud.storage.retry) for information on retry types and how
277 to configure them.
279 Media operations (downloads and uploads) do not support non-default
280 predicates in a Retry object. The default will always be used. Other
281 configuration changes for Retry objects such as delays and deadlines
282 are respected.
284 :param upload_kwargs:
285 Keyword arguments to pass to the underlying API
286 calls. The following arguments are supported:
288 - ``if_generation_match``
289 - ``if_generation_not_match``
290 - ``if_metageneration_match``
291 - ``if_metageneration_not_match``
292 - ``timeout``
293 - ``content_type``
294 - ``num_retries``
295 - ``predefined_acl``
296 - ``checksum``
297 """
299 def __init__(
300 self,
301 blob,
302 chunk_size=None,
303 text_mode=False,
304 ignore_flush=False,
305 retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
306 **upload_kwargs,
307 ):
308 for kwarg in upload_kwargs:
309 if kwarg not in VALID_UPLOAD_KWARGS:
310 raise ValueError(
311 f"BlobWriter does not support keyword argument {kwarg}."
312 )
313 self._blob = blob
314 self._buffer = SlidingBuffer()
315 self._upload_and_transport = None
316 # Resumable uploads require a chunk size of a multiple of 256KiB.
317 # self._chunk_size must not be changed after the upload is initiated.
318 self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
319 # text_mode is a deprecated synonym for ignore_flush
320 self._ignore_flush = ignore_flush or text_mode
321 self._retry = retry
322 self._upload_kwargs = upload_kwargs
324 @property
325 def _chunk_size(self):
326 """Get the blob's default chunk size.
328 :rtype: int or ``NoneType``
329 :returns: The current blob's chunk size, if it is set.
330 """
331 return self.__chunk_size
333 @_chunk_size.setter
334 def _chunk_size(self, value):
335 """Set the blob's default chunk size.
337 :type value: int
338 :param value: (Optional) The current blob's chunk size, if it is set.
340 :raises: :class:`ValueError` if ``value`` is not ``None`` and is not a
341 multiple of 256 KiB.
342 """
343 if value is not None and value > 0 and value % CHUNK_SIZE_MULTIPLE != 0:
344 raise ValueError(
345 "Chunk size must be a multiple of %d." % CHUNK_SIZE_MULTIPLE
346 )
347 self.__chunk_size = value
349 def write(self, b):
350 self._checkClosed() # Raises ValueError if closed.
352 pos = self._buffer.write(b)
354 # If there is enough content, upload chunks.
355 num_chunks = len(self._buffer) // self._chunk_size
356 if num_chunks:
357 self._upload_chunks_from_buffer(num_chunks)
359 return pos
361 def _initiate_upload(self):
362 # num_retries is only supported for backwards-compatibility reasons.
363 num_retries = self._upload_kwargs.pop("num_retries", None)
364 retry = self._retry
365 content_type = self._upload_kwargs.pop("content_type", None)
367 if num_retries is not None:
368 warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2)
369 # num_retries and retry are mutually exclusive. If num_retries is
370 # set and retry is exactly the default, then nullify retry for
371 # backwards compatibility.
372 if retry is DEFAULT_RETRY_IF_GENERATION_SPECIFIED:
373 retry = None
375 # Handle ConditionalRetryPolicy.
376 if isinstance(retry, ConditionalRetryPolicy):
377 # Conditional retries are designed for non-media calls, which change
378 # arguments into query_params dictionaries. Media operations work
379 # differently, so here we make a "fake" query_params to feed to the
380 # ConditionalRetryPolicy.
381 query_params = {
382 "ifGenerationMatch": self._upload_kwargs.get("if_generation_match"),
383 "ifMetagenerationMatch": self._upload_kwargs.get(
384 "if_metageneration_match"
385 ),
386 }
387 retry = retry.get_retry_policy_if_conditions_met(query_params=query_params)
389 self._upload_and_transport = self._blob._initiate_resumable_upload(
390 self._blob.bucket.client,
391 self._buffer,
392 content_type,
393 None,
394 num_retries,
395 chunk_size=self._chunk_size,
396 retry=retry,
397 **self._upload_kwargs,
398 )
400 def _upload_chunks_from_buffer(self, num_chunks):
401 """Upload a specified number of chunks."""
403 # Initialize the upload if necessary.
404 if not self._upload_and_transport:
405 self._initiate_upload()
407 upload, transport = self._upload_and_transport
409 # Upload chunks. The SlidingBuffer class will manage seek position.
410 for _ in range(num_chunks):
411 upload.transmit_next_chunk(transport)
413 # Wipe the buffer of chunks uploaded, preserving any remaining data.
414 self._buffer.flush()
416 def tell(self):
417 return self._buffer.tell() + len(self._buffer)
419 def flush(self):
420 # flush() is not fully supported by the remote service, so raise an
421 # error here, unless self._ignore_flush is set.
422 if not self._ignore_flush:
423 raise io.UnsupportedOperation(
424 "Cannot flush without finalizing upload. Use close() instead, "
425 "or set ignore_flush=True when constructing this class (see "
426 "docstring)."
427 )
429 def close(self):
430 if not self._buffer.closed:
431 self._upload_chunks_from_buffer(1)
432 self._buffer.close()
434 @property
435 def closed(self):
436 return self._buffer.closed
438 def readable(self):
439 return False
441 def writable(self):
442 return True
444 def seekable(self):
445 return False
448class SlidingBuffer(object):
449 """A non-rewindable buffer that frees memory of chunks already consumed.
451 This class is necessary because `google-resumable-media-python` expects
452 `tell()` to work relative to the start of the file, not relative to a place
453 in an intermediate buffer. Using this class, we present an external
454 interface with consistent seek and tell behavior without having to actually
455 store bytes already sent.
457 Behavior of this class differs from an ordinary BytesIO buffer. `write()`
458 will always append to the end of the file only and not change the seek
459 position otherwise. `flush()` will delete all data already read (data to the
460 left of the seek position). `tell()` will report the seek position of the
461 buffer including all deleted data. Additionally the class implements
462 __len__() which will report the size of the actual underlying buffer.
464 This class does not attempt to implement the entire Python I/O interface.
465 """
467 def __init__(self):
468 self._buffer = io.BytesIO()
469 self._cursor = 0
471 def write(self, b):
472 """Append to the end of the buffer without changing the position."""
473 self._checkClosed() # Raises ValueError if closed.
475 bookmark = self._buffer.tell()
476 self._buffer.seek(0, io.SEEK_END)
477 pos = self._buffer.write(b)
478 self._buffer.seek(bookmark)
479 return self._cursor + pos
481 def read(self, size=-1):
482 """Read and move the cursor."""
483 self._checkClosed() # Raises ValueError if closed.
485 data = self._buffer.read(size)
486 self._cursor += len(data)
487 return data
489 def flush(self):
490 """Delete already-read data (all data to the left of the position)."""
491 self._checkClosed() # Raises ValueError if closed.
493 # BytesIO can't be deleted from the left, so save any leftover, unread
494 # data and truncate at 0, then readd leftover data.
495 leftover = self._buffer.read()
496 self._buffer.seek(0)
497 self._buffer.truncate(0)
498 self._buffer.write(leftover)
499 self._buffer.seek(0)
501 def tell(self):
502 """Report how many bytes have been read from the buffer in total."""
503 return self._cursor
505 def seek(self, pos):
506 """Seek to a position (backwards only) within the internal buffer.
508 This implementation of seek() verifies that the seek destination is
509 contained in _buffer. It will raise ValueError if the destination byte
510 has already been purged from the buffer.
512 The "whence" argument is not supported in this implementation.
513 """
514 self._checkClosed() # Raises ValueError if closed.
516 buffer_initial_pos = self._buffer.tell()
517 difference = pos - self._cursor
518 buffer_seek_result = self._buffer.seek(difference, io.SEEK_CUR)
519 if (
520 not buffer_seek_result - buffer_initial_pos == difference
521 or pos > self._cursor
522 ):
523 # The seek did not arrive at the expected byte because the internal
524 # buffer does not (or no longer) contains the byte. Reset and raise.
525 self._buffer.seek(buffer_initial_pos)
526 raise ValueError("Cannot seek() to that value.")
528 self._cursor = pos
529 return self._cursor
531 def __len__(self):
532 """Determine the size of the buffer by seeking to the end."""
533 bookmark = self._buffer.tell()
534 length = self._buffer.seek(0, io.SEEK_END)
535 self._buffer.seek(bookmark)
536 return length
538 def close(self):
539 return self._buffer.close()
541 def _checkClosed(self):
542 return self._buffer._checkClosed()
544 @property
545 def closed(self):
546 return self._buffer.closed