1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Module for file-like access of blobs, usually invoked via Blob.open()."""
16
17import io
18
19from google.api_core.exceptions import RequestRangeNotSatisfiable
20from google.cloud.storage.retry import DEFAULT_RETRY
21from google.cloud.storage.retry import ConditionalRetryPolicy
22
23
24# Resumable uploads require a chunk size of precisely a multiple of 256 KiB.
25CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB
26DEFAULT_CHUNK_SIZE = 40 * 1024 * 1024 # 40 MiB
27
28# Valid keyword arguments for download methods, and blob.reload() if needed.
29# Note: Changes here need to be reflected in the blob.open() docstring.
30VALID_DOWNLOAD_KWARGS = {
31 "if_generation_match",
32 "if_generation_not_match",
33 "if_metageneration_match",
34 "if_metageneration_not_match",
35 "timeout",
36 "retry",
37 "raw_download",
38 "single_shot_download",
39}
40
41# Valid keyword arguments for upload methods.
42# Note: Changes here need to be reflected in the blob.open() docstring.
43VALID_UPLOAD_KWARGS = {
44 "content_type",
45 "predefined_acl",
46 "if_generation_match",
47 "if_generation_not_match",
48 "if_metageneration_match",
49 "if_metageneration_not_match",
50 "timeout",
51 "checksum",
52 "retry",
53}
54
55
56class BlobReader(io.BufferedIOBase):
57 """A file-like object that reads from a blob.
58
59 :type blob: 'google.cloud.storage.blob.Blob'
60 :param blob:
61 The blob to download.
62
63 :type chunk_size: long
64 :param chunk_size:
65 (Optional) The minimum number of bytes to read at a time. If fewer
66 bytes than the chunk_size are requested, the remainder is buffered.
67 The default is the chunk_size of the blob, or 40MiB.
68
69 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
70 :param retry:
71 (Optional) How to retry the RPC. A None value will disable
72 retries. A google.api_core.retry.Retry value will enable retries,
73 and the object will define retriable response codes and errors and
74 configure backoff and timeout options.
75
76 A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
77 Retry object and activates it only if certain conditions are met.
78 This class exists to provide safe defaults for RPC calls that are
79 not technically safe to retry normally (due to potential data
80 duplication or other side-effects) but become safe to retry if a
81 condition such as if_metageneration_match is set.
82
83 See the retry.py source code and docstrings in this package
84 (google.cloud.storage.retry) for information on retry types and how
85 to configure them.
86
87 Media operations (downloads and uploads) do not support non-default
88 predicates in a Retry object. The default will always be used. Other
89 configuration changes for Retry objects such as delays and deadlines
90 are respected.
91
92 :type download_kwargs: dict
93 :param download_kwargs:
94 Keyword arguments to pass to the underlying API calls.
95 The following arguments are supported:
96
97 - ``if_generation_match``
98 - ``if_generation_not_match``
99 - ``if_metageneration_match``
100 - ``if_metageneration_not_match``
101 - ``timeout``
102 - ``raw_download``
103 - ``single_shot_download``
104
105 Note that download_kwargs (excluding ``raw_download`` and ``single_shot_download``) are also applied to blob.reload(),
106 if a reload is needed during seek().
107 """
108
109 def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs):
110 for kwarg in download_kwargs:
111 if kwarg not in VALID_DOWNLOAD_KWARGS:
112 raise ValueError(
113 f"BlobReader does not support keyword argument {kwarg}."
114 )
115
116 self._blob = blob
117 self._pos = 0
118 self._buffer = io.BytesIO()
119 self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
120 self._retry = retry
121 self._download_kwargs = download_kwargs
122
123 def read(self, size=-1):
124 self._checkClosed() # Raises ValueError if closed.
125
126 result = self._buffer.read(size)
127 # If the read request demands more bytes than are buffered, fetch more.
128 remaining_size = size - len(result)
129 if remaining_size > 0 or size < 0:
130 self._pos += self._buffer.tell()
131 read_size = len(result)
132
133 self._buffer.seek(0)
134 self._buffer.truncate(0) # Clear the buffer to make way for new data.
135 fetch_start = self._pos
136 if size > 0:
137 # Fetch the larger of self._chunk_size or the remaining_size.
138 fetch_end = fetch_start + max(remaining_size, self._chunk_size)
139 else:
140 fetch_end = None
141
142 # Download the blob. Checksumming must be disabled as we are using
143 # chunked downloads, and the server only knows the checksum of the
144 # entire file.
145 try:
146 result += self._blob.download_as_bytes(
147 start=fetch_start,
148 end=fetch_end,
149 checksum=None,
150 retry=self._retry,
151 **self._download_kwargs,
152 )
153 except RequestRangeNotSatisfiable:
154 # We've reached the end of the file. Python file objects should
155 # return an empty response in this case, not raise an error.
156 pass
157
158 # If more bytes were read than is immediately needed, buffer the
159 # remainder and then trim the result.
160 if size > 0 and len(result) > size:
161 self._buffer.write(result[size:])
162 self._buffer.seek(0)
163 result = result[:size]
164 # Increment relative offset by true amount read.
165 self._pos += len(result) - read_size
166 return result
167
168 def read1(self, size=-1):
169 return self.read(size)
170
171 def seek(self, pos, whence=0):
172 """Seek within the blob.
173
174 This implementation of seek() uses knowledge of the blob size to
175 validate that the reported position does not exceed the blob last byte.
176 If the blob size is not already known it will call blob.reload().
177 """
178 self._checkClosed() # Raises ValueError if closed.
179
180 if self._blob.size is None:
181 reload_kwargs = {
182 k: v
183 for k, v in self._download_kwargs.items()
184 if (k != "raw_download" and k != "single_shot_download")
185 }
186 self._blob.reload(**reload_kwargs)
187
188 initial_offset = self._pos + self._buffer.tell()
189
190 if whence == 0:
191 target_pos = pos
192 elif whence == 1:
193 target_pos = initial_offset + pos
194 elif whence == 2:
195 target_pos = self._blob.size + pos
196 if whence not in {0, 1, 2}:
197 raise ValueError("invalid whence value")
198
199 if target_pos > self._blob.size:
200 target_pos = self._blob.size
201
202 # Seek or invalidate buffer as needed.
203 if target_pos < self._pos:
204 # Target position < relative offset <= true offset.
205 # As data is not in buffer, invalidate buffer.
206 self._buffer.seek(0)
207 self._buffer.truncate(0)
208 new_pos = target_pos
209 self._pos = target_pos
210 else:
211 # relative offset <= target position <= size of file.
212 difference = target_pos - initial_offset
213 new_pos = self._pos + self._buffer.seek(difference, 1)
214 return new_pos
215
216 def close(self):
217 self._buffer.close()
218
219 @property
220 def closed(self):
221 return self._buffer.closed
222
223 def readable(self):
224 return True
225
226 def writable(self):
227 return False
228
229 def seekable(self):
230 return True
231
232
233class BlobWriter(io.BufferedIOBase):
234 """A file-like object that writes to a blob.
235
236 :type blob: 'google.cloud.storage.blob.Blob'
237 :param blob:
238 The blob to which to write.
239
240 :type chunk_size: long
241 :param chunk_size:
242 (Optional) The maximum number of bytes to buffer before sending data
243 to the server, and the size of each request when data is sent.
244 Writes are implemented as a "resumable upload", so chunk_size for
245 writes must be exactly a multiple of 256KiB as with other resumable
246 uploads. The default is the chunk_size of the blob, or 40 MiB.
247
248 :type ignore_flush: bool
249 :param ignore_flush:
250 Makes flush() do nothing instead of raise an error. flush() without
251 closing is not supported by the remote service and therefore calling it
252 on this class normally results in io.UnsupportedOperation. However, that
253 behavior is incompatible with some consumers and wrappers of file
254 objects in Python, such as zipfile.ZipFile or io.TextIOWrapper. Setting
255 ignore_flush will cause flush() to successfully do nothing, for
256 compatibility with those contexts. The correct way to actually flush
257 data to the remote server is to close() (using this object as a context
258 manager is recommended).
259
260 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
261 :param retry:
262 (Optional) How to retry the RPC. A None value will disable
263 retries. A google.api_core.retry.Retry value will enable retries,
264 and the object will define retriable response codes and errors and
265 configure backoff and timeout options.
266
267 A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
268 Retry object and activates it only if certain conditions are met.
269 This class exists to provide safe defaults for RPC calls that are
270 not technically safe to retry normally (due to potential data
271 duplication or other side-effects) but become safe to retry if a
272 condition such as if_metageneration_match is set.
273
274 See the retry.py source code and docstrings in this package
275 (google.cloud.storage.retry) for information on retry types and how
276 to configure them.
277
278 Media operations (downloads and uploads) do not support non-default
279 predicates in a Retry object. The default will always be used. Other
280 configuration changes for Retry objects such as delays and deadlines
281 are respected.
282
283 :type upload_kwargs: dict
284 :param upload_kwargs:
285 Keyword arguments to pass to the underlying API
286 calls. The following arguments are supported:
287
288 - ``if_generation_match``
289 - ``if_generation_not_match``
290 - ``if_metageneration_match``
291 - ``if_metageneration_not_match``
292 - ``timeout``
293 - ``content_type``
294 - ``predefined_acl``
295 - ``checksum``
296 """
297
298 def __init__(
299 self,
300 blob,
301 chunk_size=None,
302 ignore_flush=False,
303 retry=DEFAULT_RETRY,
304 **upload_kwargs,
305 ):
306 for kwarg in upload_kwargs:
307 if kwarg not in VALID_UPLOAD_KWARGS:
308 raise ValueError(
309 f"BlobWriter does not support keyword argument {kwarg}."
310 )
311 self._blob = blob
312 self._buffer = SlidingBuffer()
313 self._upload_and_transport = None
314 # Resumable uploads require a chunk size of a multiple of 256KiB.
315 # self._chunk_size must not be changed after the upload is initiated.
316 self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
317 self._ignore_flush = ignore_flush
318 self._retry = retry
319 self._upload_kwargs = upload_kwargs
320
321 @property
322 def _chunk_size(self):
323 """Get the blob's default chunk size.
324
325 :rtype: int or ``NoneType``
326 :returns: The current blob's chunk size, if it is set.
327 """
328 return self.__chunk_size
329
330 @_chunk_size.setter
331 def _chunk_size(self, value):
332 """Set the blob's default chunk size.
333
334 :type value: int
335 :param value: (Optional) The current blob's chunk size, if it is set.
336
337 :raises: :class:`ValueError` if ``value`` is not ``None`` and is not a
338 multiple of 256 KiB.
339 """
340 if value is not None and value > 0 and value % CHUNK_SIZE_MULTIPLE != 0:
341 raise ValueError(
342 "Chunk size must be a multiple of %d." % CHUNK_SIZE_MULTIPLE
343 )
344 self.__chunk_size = value
345
346 def write(self, b):
347 self._checkClosed() # Raises ValueError if closed.
348
349 pos = self._buffer.write(b)
350
351 # If there is enough content, upload chunks.
352 num_chunks = len(self._buffer) // self._chunk_size
353 if num_chunks:
354 self._upload_chunks_from_buffer(num_chunks)
355
356 return pos
357
358 def _initiate_upload(self):
359 retry = self._retry
360 content_type = self._upload_kwargs.pop("content_type", None)
361
362 # Handle ConditionalRetryPolicy.
363 if isinstance(retry, ConditionalRetryPolicy):
364 # Conditional retries are designed for non-media calls, which change
365 # arguments into query_params dictionaries. Media operations work
366 # differently, so here we make a "fake" query_params to feed to the
367 # ConditionalRetryPolicy.
368 query_params = {
369 "ifGenerationMatch": self._upload_kwargs.get("if_generation_match"),
370 "ifMetagenerationMatch": self._upload_kwargs.get(
371 "if_metageneration_match"
372 ),
373 }
374 retry = retry.get_retry_policy_if_conditions_met(query_params=query_params)
375
376 self._upload_and_transport = self._blob._initiate_resumable_upload(
377 self._blob.bucket.client,
378 self._buffer,
379 content_type,
380 None,
381 chunk_size=self._chunk_size,
382 retry=retry,
383 **self._upload_kwargs,
384 )
385
386 def _upload_chunks_from_buffer(self, num_chunks):
387 """Upload a specified number of chunks."""
388
389 # Initialize the upload if necessary.
390 if not self._upload_and_transport:
391 self._initiate_upload()
392
393 upload, transport = self._upload_and_transport
394
395 # Attach timeout if specified in the keyword arguments.
396 # Otherwise, the default timeout will be used from the media library.
397 kwargs = {}
398 if "timeout" in self._upload_kwargs:
399 kwargs = {"timeout": self._upload_kwargs.get("timeout")}
400
401 # Upload chunks. The SlidingBuffer class will manage seek position.
402 for _ in range(num_chunks):
403 upload.transmit_next_chunk(transport, **kwargs)
404
405 # Wipe the buffer of chunks uploaded, preserving any remaining data.
406 self._buffer.flush()
407
408 def tell(self):
409 return self._buffer.tell() + len(self._buffer)
410
411 def flush(self):
412 # flush() is not fully supported by the remote service, so raise an
413 # error here, unless self._ignore_flush is set.
414 if not self._ignore_flush:
415 raise io.UnsupportedOperation(
416 "Cannot flush without finalizing upload. Use close() instead, "
417 "or set ignore_flush=True when constructing this class (see "
418 "docstring)."
419 )
420
421 def close(self):
422 if not self._buffer.closed:
423 self._upload_chunks_from_buffer(1)
424 self._buffer.close()
425
426 def terminate(self):
427 """Cancel the ResumableUpload."""
428 if self._upload_and_transport:
429 upload, transport = self._upload_and_transport
430 transport.delete(upload.upload_url)
431 self._buffer.close()
432
433 def __exit__(self, exc_type, exc_val, exc_tb):
434 if exc_type is not None:
435 self.terminate()
436 else:
437 self.close()
438
439 @property
440 def closed(self):
441 return self._buffer.closed
442
443 def readable(self):
444 return False
445
446 def writable(self):
447 return True
448
449 def seekable(self):
450 return False
451
452
453class SlidingBuffer(object):
454 """A non-rewindable buffer that frees memory of chunks already consumed.
455
456 This class is necessary because `google-resumable-media-python` expects
457 `tell()` to work relative to the start of the file, not relative to a place
458 in an intermediate buffer. Using this class, we present an external
459 interface with consistent seek and tell behavior without having to actually
460 store bytes already sent.
461
462 Behavior of this class differs from an ordinary BytesIO buffer. `write()`
463 will always append to the end of the file only and not change the seek
464 position otherwise. `flush()` will delete all data already read (data to the
465 left of the seek position). `tell()` will report the seek position of the
466 buffer including all deleted data. Additionally the class implements
467 __len__() which will report the size of the actual underlying buffer.
468
469 This class does not attempt to implement the entire Python I/O interface.
470 """
471
472 def __init__(self):
473 self._buffer = io.BytesIO()
474 self._cursor = 0
475
476 def write(self, b):
477 """Append to the end of the buffer without changing the position."""
478 self._checkClosed() # Raises ValueError if closed.
479
480 bookmark = self._buffer.tell()
481 self._buffer.seek(0, io.SEEK_END)
482 pos = self._buffer.write(b)
483 self._buffer.seek(bookmark)
484 return pos
485
486 def read(self, size=-1):
487 """Read and move the cursor."""
488 self._checkClosed() # Raises ValueError if closed.
489
490 data = self._buffer.read(size)
491 self._cursor += len(data)
492 return data
493
494 def flush(self):
495 """Delete already-read data (all data to the left of the position)."""
496 self._checkClosed() # Raises ValueError if closed.
497
498 # BytesIO can't be deleted from the left, so save any leftover, unread
499 # data and truncate at 0, then readd leftover data.
500 leftover = self._buffer.read()
501 self._buffer.seek(0)
502 self._buffer.truncate(0)
503 self._buffer.write(leftover)
504 self._buffer.seek(0)
505
506 def tell(self):
507 """Report how many bytes have been read from the buffer in total."""
508 return self._cursor
509
510 def seek(self, pos):
511 """Seek to a position (backwards only) within the internal buffer.
512
513 This implementation of seek() verifies that the seek destination is
514 contained in _buffer. It will raise ValueError if the destination byte
515 has already been purged from the buffer.
516
517 The "whence" argument is not supported in this implementation.
518 """
519 self._checkClosed() # Raises ValueError if closed.
520
521 buffer_initial_pos = self._buffer.tell()
522 difference = pos - self._cursor
523 buffer_seek_result = self._buffer.seek(difference, io.SEEK_CUR)
524 if (
525 not buffer_seek_result - buffer_initial_pos == difference
526 or pos > self._cursor
527 ):
528 # The seek did not arrive at the expected byte because the internal
529 # buffer does not (or no longer) contains the byte. Reset and raise.
530 self._buffer.seek(buffer_initial_pos)
531 raise ValueError("Cannot seek() to that value.")
532
533 self._cursor = pos
534 return self._cursor
535
536 def __len__(self):
537 """Determine the size of the buffer by seeking to the end."""
538 bookmark = self._buffer.tell()
539 length = self._buffer.seek(0, io.SEEK_END)
540 self._buffer.seek(bookmark)
541 return length
542
543 def close(self):
544 return self._buffer.close()
545
546 def _checkClosed(self):
547 return self._buffer._checkClosed()
548
549 @property
550 def closed(self):
551 return self._buffer.closed