Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/storage/fileio.py: 52%

193 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:17 +0000

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Module for file-like access of blobs, usually invoked via Blob.open().""" 

16 

17import io 

18import warnings 

19 

20from google.api_core.exceptions import RequestRangeNotSatisfiable 

21from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE 

22from google.cloud.storage.retry import DEFAULT_RETRY 

23from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED 

24from google.cloud.storage.retry import ConditionalRetryPolicy 

25 

26 

27# Resumable uploads require a chunk size of precisely a multiple of 256 KiB. 

28CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB 

29DEFAULT_CHUNK_SIZE = 40 * 1024 * 1024 # 40 MiB 

30 

31# Valid keyword arguments for download methods, and blob.reload() if needed. 

32# Note: Changes here need to be reflected in the blob.open() docstring. 

33VALID_DOWNLOAD_KWARGS = { 

34 "if_generation_match", 

35 "if_generation_not_match", 

36 "if_metageneration_match", 

37 "if_metageneration_not_match", 

38 "timeout", 

39 "retry", 

40 "raw_download", 

41} 

42 

43# Valid keyword arguments for upload methods. 

44# Note: Changes here need to be reflected in the blob.open() docstring. 

45VALID_UPLOAD_KWARGS = { 

46 "content_type", 

47 "predefined_acl", 

48 "num_retries", 

49 "if_generation_match", 

50 "if_generation_not_match", 

51 "if_metageneration_match", 

52 "if_metageneration_not_match", 

53 "timeout", 

54 "checksum", 

55 "retry", 

56} 

57 

58 

59class BlobReader(io.BufferedIOBase): 

60 """A file-like object that reads from a blob. 

61 

62 :type blob: 'google.cloud.storage.blob.Blob' 

63 :param blob: 

64 The blob to download. 

65 

66 :type chunk_size: long 

67 :param chunk_size: 

68 (Optional) The minimum number of bytes to read at a time. If fewer 

69 bytes than the chunk_size are requested, the remainder is buffered. 

70 The default is the chunk_size of the blob, or 40MiB. 

71 

72 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 

73 :param retry: 

74 (Optional) How to retry the RPC. A None value will disable 

75 retries. A google.api_core.retry.Retry value will enable retries, 

76 and the object will define retriable response codes and errors and 

77 configure backoff and timeout options. 

78 

79 A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a 

80 Retry object and activates it only if certain conditions are met. 

81 This class exists to provide safe defaults for RPC calls that are 

82 not technically safe to retry normally (due to potential data 

83 duplication or other side-effects) but become safe to retry if a 

84 condition such as if_metageneration_match is set. 

85 

86 See the retry.py source code and docstrings in this package 

87 (google.cloud.storage.retry) for information on retry types and how 

88 to configure them. 

89 

90 Media operations (downloads and uploads) do not support non-default 

91 predicates in a Retry object. The default will always be used. Other 

92 configuration changes for Retry objects such as delays and deadlines 

93 are respected. 

94 

95 :param download_kwargs: 

96 Keyword arguments to pass to the underlying API calls. 

97 The following arguments are supported: 

98 

99 - ``if_generation_match`` 

100 - ``if_generation_not_match`` 

101 - ``if_metageneration_match`` 

102 - ``if_metageneration_not_match`` 

103 - ``timeout`` 

104 

105 Note that download_kwargs are also applied to blob.reload(), if a reload 

106 is needed during seek(). 

107 """ 

108 

109 def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs): 

110 for kwarg in download_kwargs: 

111 if kwarg not in VALID_DOWNLOAD_KWARGS: 

112 raise ValueError( 

113 f"BlobReader does not support keyword argument {kwarg}." 

114 ) 

115 

116 self._blob = blob 

117 self._pos = 0 

118 self._buffer = io.BytesIO() 

119 self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE 

120 self._retry = retry 

121 self._download_kwargs = download_kwargs 

122 

123 def read(self, size=-1): 

124 self._checkClosed() # Raises ValueError if closed. 

125 

126 result = self._buffer.read(size) 

127 # If the read request demands more bytes than are buffered, fetch more. 

128 remaining_size = size - len(result) 

129 if remaining_size > 0 or size < 0: 

130 self._pos += self._buffer.tell() 

131 read_size = len(result) 

132 

133 self._buffer.seek(0) 

134 self._buffer.truncate(0) # Clear the buffer to make way for new data. 

135 fetch_start = self._pos 

136 if size > 0: 

137 # Fetch the larger of self._chunk_size or the remaining_size. 

138 fetch_end = fetch_start + max(remaining_size, self._chunk_size) 

139 else: 

140 fetch_end = None 

141 

142 # Download the blob. Checksumming must be disabled as we are using 

143 # chunked downloads, and the server only knows the checksum of the 

144 # entire file. 

145 try: 

146 result += self._blob.download_as_bytes( 

147 start=fetch_start, 

148 end=fetch_end, 

149 checksum=None, 

150 retry=self._retry, 

151 **self._download_kwargs, 

152 ) 

153 except RequestRangeNotSatisfiable: 

154 # We've reached the end of the file. Python file objects should 

155 # return an empty response in this case, not raise an error. 

156 pass 

157 

158 # If more bytes were read than is immediately needed, buffer the 

159 # remainder and then trim the result. 

160 if size > 0 and len(result) > size: 

161 self._buffer.write(result[size:]) 

162 self._buffer.seek(0) 

163 result = result[:size] 

164 # Increment relative offset by true amount read. 

165 self._pos += len(result) - read_size 

166 return result 

167 

168 def read1(self, size=-1): 

169 return self.read(size) 

170 

171 def seek(self, pos, whence=0): 

172 """Seek within the blob. 

173 

174 This implementation of seek() uses knowledge of the blob size to 

175 validate that the reported position does not exceed the blob last byte. 

176 If the blob size is not already known it will call blob.reload(). 

177 """ 

178 self._checkClosed() # Raises ValueError if closed. 

179 

180 if self._blob.size is None: 

181 self._blob.reload(**self._download_kwargs) 

182 

183 initial_offset = self._pos + self._buffer.tell() 

184 

185 if whence == 0: 

186 target_pos = pos 

187 elif whence == 1: 

188 target_pos = initial_offset + pos 

189 elif whence == 2: 

190 target_pos = self._blob.size + pos 

191 if whence not in {0, 1, 2}: 

192 raise ValueError("invalid whence value") 

193 

194 if target_pos > self._blob.size: 

195 target_pos = self._blob.size 

196 

197 # Seek or invalidate buffer as needed. 

198 if target_pos < self._pos: 

199 # Target position < relative offset <= true offset. 

200 # As data is not in buffer, invalidate buffer. 

201 self._buffer.seek(0) 

202 self._buffer.truncate(0) 

203 new_pos = target_pos 

204 self._pos = target_pos 

205 else: 

206 # relative offset <= target position <= size of file. 

207 difference = target_pos - initial_offset 

208 new_pos = self._pos + self._buffer.seek(difference, 1) 

209 return new_pos 

210 

211 def close(self): 

212 self._buffer.close() 

213 

214 @property 

215 def closed(self): 

216 return self._buffer.closed 

217 

218 def readable(self): 

219 return True 

220 

221 def writable(self): 

222 return False 

223 

224 def seekable(self): 

225 return True 

226 

227 

228class BlobWriter(io.BufferedIOBase): 

229 """A file-like object that writes to a blob. 

230 

231 :type blob: 'google.cloud.storage.blob.Blob' 

232 :param blob: 

233 The blob to which to write. 

234 

235 :type chunk_size: long 

236 :param chunk_size: 

237 (Optional) The maximum number of bytes to buffer before sending data 

238 to the server, and the size of each request when data is sent. 

239 Writes are implemented as a "resumable upload", so chunk_size for 

240 writes must be exactly a multiple of 256KiB as with other resumable 

241 uploads. The default is the chunk_size of the blob, or 40 MiB. 

242 

243 :type text_mode: bool 

244 :param text_mode: 

245 (Deprecated) A synonym for ignore_flush. For backwards-compatibility, 

246 if True, sets ignore_flush to True. Use ignore_flush instead. This 

247 parameter will be removed in a future release. 

248 

249 :type ignore_flush: bool 

250 :param ignore_flush: 

251 Makes flush() do nothing instead of raise an error. flush() without 

252 closing is not supported by the remote service and therefore calling it 

253 on this class normally results in io.UnsupportedOperation. However, that 

254 behavior is incompatible with some consumers and wrappers of file 

255 objects in Python, such as zipfile.ZipFile or io.TextIOWrapper. Setting 

256 ignore_flush will cause flush() to successfully do nothing, for 

257 compatibility with those contexts. The correct way to actually flush 

258 data to the remote server is to close() (using this object as a context 

259 manager is recommended). 

260 

261 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 

262 :param retry: 

263 (Optional) How to retry the RPC. A None value will disable 

264 retries. A google.api_core.retry.Retry value will enable retries, 

265 and the object will define retriable response codes and errors and 

266 configure backoff and timeout options. 

267 

268 A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a 

269 Retry object and activates it only if certain conditions are met. 

270 This class exists to provide safe defaults for RPC calls that are 

271 not technically safe to retry normally (due to potential data 

272 duplication or other side-effects) but become safe to retry if a 

273 condition such as if_metageneration_match is set. 

274 

275 See the retry.py source code and docstrings in this package 

276 (google.cloud.storage.retry) for information on retry types and how 

277 to configure them. 

278 

279 Media operations (downloads and uploads) do not support non-default 

280 predicates in a Retry object. The default will always be used. Other 

281 configuration changes for Retry objects such as delays and deadlines 

282 are respected. 

283 

284 :param upload_kwargs: 

285 Keyword arguments to pass to the underlying API 

286 calls. The following arguments are supported: 

287 

288 - ``if_generation_match`` 

289 - ``if_generation_not_match`` 

290 - ``if_metageneration_match`` 

291 - ``if_metageneration_not_match`` 

292 - ``timeout`` 

293 - ``content_type`` 

294 - ``num_retries`` 

295 - ``predefined_acl`` 

296 - ``checksum`` 

297 """ 

298 

299 def __init__( 

300 self, 

301 blob, 

302 chunk_size=None, 

303 text_mode=False, 

304 ignore_flush=False, 

305 retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, 

306 **upload_kwargs, 

307 ): 

308 for kwarg in upload_kwargs: 

309 if kwarg not in VALID_UPLOAD_KWARGS: 

310 raise ValueError( 

311 f"BlobWriter does not support keyword argument {kwarg}." 

312 ) 

313 self._blob = blob 

314 self._buffer = SlidingBuffer() 

315 self._upload_and_transport = None 

316 # Resumable uploads require a chunk size of a multiple of 256KiB. 

317 # self._chunk_size must not be changed after the upload is initiated. 

318 self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE 

319 # text_mode is a deprecated synonym for ignore_flush 

320 self._ignore_flush = ignore_flush or text_mode 

321 self._retry = retry 

322 self._upload_kwargs = upload_kwargs 

323 

324 @property 

325 def _chunk_size(self): 

326 """Get the blob's default chunk size. 

327 

328 :rtype: int or ``NoneType`` 

329 :returns: The current blob's chunk size, if it is set. 

330 """ 

331 return self.__chunk_size 

332 

333 @_chunk_size.setter 

334 def _chunk_size(self, value): 

335 """Set the blob's default chunk size. 

336 

337 :type value: int 

338 :param value: (Optional) The current blob's chunk size, if it is set. 

339 

340 :raises: :class:`ValueError` if ``value`` is not ``None`` and is not a 

341 multiple of 256 KiB. 

342 """ 

343 if value is not None and value > 0 and value % CHUNK_SIZE_MULTIPLE != 0: 

344 raise ValueError( 

345 "Chunk size must be a multiple of %d." % CHUNK_SIZE_MULTIPLE 

346 ) 

347 self.__chunk_size = value 

348 

349 def write(self, b): 

350 self._checkClosed() # Raises ValueError if closed. 

351 

352 pos = self._buffer.write(b) 

353 

354 # If there is enough content, upload chunks. 

355 num_chunks = len(self._buffer) // self._chunk_size 

356 if num_chunks: 

357 self._upload_chunks_from_buffer(num_chunks) 

358 

359 return pos 

360 

361 def _initiate_upload(self): 

362 # num_retries is only supported for backwards-compatibility reasons. 

363 num_retries = self._upload_kwargs.pop("num_retries", None) 

364 retry = self._retry 

365 content_type = self._upload_kwargs.pop("content_type", None) 

366 

367 if num_retries is not None: 

368 warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2) 

369 # num_retries and retry are mutually exclusive. If num_retries is 

370 # set and retry is exactly the default, then nullify retry for 

371 # backwards compatibility. 

372 if retry is DEFAULT_RETRY_IF_GENERATION_SPECIFIED: 

373 retry = None 

374 

375 # Handle ConditionalRetryPolicy. 

376 if isinstance(retry, ConditionalRetryPolicy): 

377 # Conditional retries are designed for non-media calls, which change 

378 # arguments into query_params dictionaries. Media operations work 

379 # differently, so here we make a "fake" query_params to feed to the 

380 # ConditionalRetryPolicy. 

381 query_params = { 

382 "ifGenerationMatch": self._upload_kwargs.get("if_generation_match"), 

383 "ifMetagenerationMatch": self._upload_kwargs.get( 

384 "if_metageneration_match" 

385 ), 

386 } 

387 retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) 

388 

389 self._upload_and_transport = self._blob._initiate_resumable_upload( 

390 self._blob.bucket.client, 

391 self._buffer, 

392 content_type, 

393 None, 

394 num_retries, 

395 chunk_size=self._chunk_size, 

396 retry=retry, 

397 **self._upload_kwargs, 

398 ) 

399 

400 def _upload_chunks_from_buffer(self, num_chunks): 

401 """Upload a specified number of chunks.""" 

402 

403 # Initialize the upload if necessary. 

404 if not self._upload_and_transport: 

405 self._initiate_upload() 

406 

407 upload, transport = self._upload_and_transport 

408 

409 # Upload chunks. The SlidingBuffer class will manage seek position. 

410 for _ in range(num_chunks): 

411 upload.transmit_next_chunk(transport) 

412 

413 # Wipe the buffer of chunks uploaded, preserving any remaining data. 

414 self._buffer.flush() 

415 

416 def tell(self): 

417 return self._buffer.tell() + len(self._buffer) 

418 

419 def flush(self): 

420 # flush() is not fully supported by the remote service, so raise an 

421 # error here, unless self._ignore_flush is set. 

422 if not self._ignore_flush: 

423 raise io.UnsupportedOperation( 

424 "Cannot flush without finalizing upload. Use close() instead, " 

425 "or set ignore_flush=True when constructing this class (see " 

426 "docstring)." 

427 ) 

428 

429 def close(self): 

430 if not self._buffer.closed: 

431 self._upload_chunks_from_buffer(1) 

432 self._buffer.close() 

433 

434 @property 

435 def closed(self): 

436 return self._buffer.closed 

437 

438 def readable(self): 

439 return False 

440 

441 def writable(self): 

442 return True 

443 

444 def seekable(self): 

445 return False 

446 

447 

448class SlidingBuffer(object): 

449 """A non-rewindable buffer that frees memory of chunks already consumed. 

450 

451 This class is necessary because `google-resumable-media-python` expects 

452 `tell()` to work relative to the start of the file, not relative to a place 

453 in an intermediate buffer. Using this class, we present an external 

454 interface with consistent seek and tell behavior without having to actually 

455 store bytes already sent. 

456 

457 Behavior of this class differs from an ordinary BytesIO buffer. `write()` 

458 will always append to the end of the file only and not change the seek 

459 position otherwise. `flush()` will delete all data already read (data to the 

460 left of the seek position). `tell()` will report the seek position of the 

461 buffer including all deleted data. Additionally the class implements 

462 __len__() which will report the size of the actual underlying buffer. 

463 

464 This class does not attempt to implement the entire Python I/O interface. 

465 """ 

466 

467 def __init__(self): 

468 self._buffer = io.BytesIO() 

469 self._cursor = 0 

470 

471 def write(self, b): 

472 """Append to the end of the buffer without changing the position.""" 

473 self._checkClosed() # Raises ValueError if closed. 

474 

475 bookmark = self._buffer.tell() 

476 self._buffer.seek(0, io.SEEK_END) 

477 pos = self._buffer.write(b) 

478 self._buffer.seek(bookmark) 

479 return self._cursor + pos 

480 

481 def read(self, size=-1): 

482 """Read and move the cursor.""" 

483 self._checkClosed() # Raises ValueError if closed. 

484 

485 data = self._buffer.read(size) 

486 self._cursor += len(data) 

487 return data 

488 

489 def flush(self): 

490 """Delete already-read data (all data to the left of the position).""" 

491 self._checkClosed() # Raises ValueError if closed. 

492 

493 # BytesIO can't be deleted from the left, so save any leftover, unread 

494 # data and truncate at 0, then readd leftover data. 

495 leftover = self._buffer.read() 

496 self._buffer.seek(0) 

497 self._buffer.truncate(0) 

498 self._buffer.write(leftover) 

499 self._buffer.seek(0) 

500 

501 def tell(self): 

502 """Report how many bytes have been read from the buffer in total.""" 

503 return self._cursor 

504 

505 def seek(self, pos): 

506 """Seek to a position (backwards only) within the internal buffer. 

507 

508 This implementation of seek() verifies that the seek destination is 

509 contained in _buffer. It will raise ValueError if the destination byte 

510 has already been purged from the buffer. 

511 

512 The "whence" argument is not supported in this implementation. 

513 """ 

514 self._checkClosed() # Raises ValueError if closed. 

515 

516 buffer_initial_pos = self._buffer.tell() 

517 difference = pos - self._cursor 

518 buffer_seek_result = self._buffer.seek(difference, io.SEEK_CUR) 

519 if ( 

520 not buffer_seek_result - buffer_initial_pos == difference 

521 or pos > self._cursor 

522 ): 

523 # The seek did not arrive at the expected byte because the internal 

524 # buffer does not (or no longer) contains the byte. Reset and raise. 

525 self._buffer.seek(buffer_initial_pos) 

526 raise ValueError("Cannot seek() to that value.") 

527 

528 self._cursor = pos 

529 return self._cursor 

530 

531 def __len__(self): 

532 """Determine the size of the buffer by seeking to the end.""" 

533 bookmark = self._buffer.tell() 

534 length = self._buffer.seek(0, io.SEEK_END) 

535 self._buffer.seek(bookmark) 

536 return length 

537 

538 def close(self): 

539 return self._buffer.close() 

540 

541 def _checkClosed(self): 

542 return self._buffer._checkClosed() 

543 

544 @property 

545 def closed(self): 

546 return self._buffer.closed