Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/storage/fileio.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

199 statements  

1# Copyright 2021 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Module for file-like access of blobs, usually invoked via Blob.open().""" 

16 

17import io 

18 

19from google.api_core.exceptions import RequestRangeNotSatisfiable 

20from google.cloud.storage.retry import DEFAULT_RETRY 

21from google.cloud.storage.retry import ConditionalRetryPolicy 

22 

23 

24# Resumable uploads require a chunk size of precisely a multiple of 256 KiB. 

25CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB 

26DEFAULT_CHUNK_SIZE = 40 * 1024 * 1024 # 40 MiB 

27 

28# Valid keyword arguments for download methods, and blob.reload() if needed. 

29# Note: Changes here need to be reflected in the blob.open() docstring. 

30VALID_DOWNLOAD_KWARGS = { 

31 "if_generation_match", 

32 "if_generation_not_match", 

33 "if_metageneration_match", 

34 "if_metageneration_not_match", 

35 "timeout", 

36 "retry", 

37 "raw_download", 

38 "single_shot_download", 

39} 

40 

41# Valid keyword arguments for upload methods. 

42# Note: Changes here need to be reflected in the blob.open() docstring. 

43VALID_UPLOAD_KWARGS = { 

44 "content_type", 

45 "predefined_acl", 

46 "if_generation_match", 

47 "if_generation_not_match", 

48 "if_metageneration_match", 

49 "if_metageneration_not_match", 

50 "timeout", 

51 "checksum", 

52 "retry", 

53} 

54 

55 

56class BlobReader(io.BufferedIOBase): 

57 """A file-like object that reads from a blob. 

58 

59 :type blob: 'google.cloud.storage.blob.Blob' 

60 :param blob: 

61 The blob to download. 

62 

63 :type chunk_size: long 

64 :param chunk_size: 

65 (Optional) The minimum number of bytes to read at a time. If fewer 

66 bytes than the chunk_size are requested, the remainder is buffered. 

67 The default is the chunk_size of the blob, or 40MiB. 

68 

69 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 

70 :param retry: 

71 (Optional) How to retry the RPC. A None value will disable 

72 retries. A google.api_core.retry.Retry value will enable retries, 

73 and the object will define retriable response codes and errors and 

74 configure backoff and timeout options. 

75 

76 A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a 

77 Retry object and activates it only if certain conditions are met. 

78 This class exists to provide safe defaults for RPC calls that are 

79 not technically safe to retry normally (due to potential data 

80 duplication or other side-effects) but become safe to retry if a 

81 condition such as if_metageneration_match is set. 

82 

83 See the retry.py source code and docstrings in this package 

84 (google.cloud.storage.retry) for information on retry types and how 

85 to configure them. 

86 

87 Media operations (downloads and uploads) do not support non-default 

88 predicates in a Retry object. The default will always be used. Other 

89 configuration changes for Retry objects such as delays and deadlines 

90 are respected. 

91 

92 :type download_kwargs: dict 

93 :param download_kwargs: 

94 Keyword arguments to pass to the underlying API calls. 

95 The following arguments are supported: 

96 

97 - ``if_generation_match`` 

98 - ``if_generation_not_match`` 

99 - ``if_metageneration_match`` 

100 - ``if_metageneration_not_match`` 

101 - ``timeout`` 

102 - ``raw_download`` 

103 - ``single_shot_download`` 

104 

105 Note that download_kwargs (excluding ``raw_download`` and ``single_shot_download``) are also applied to blob.reload(), 

106 if a reload is needed during seek(). 

107 """ 

108 

109 def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs): 

110 for kwarg in download_kwargs: 

111 if kwarg not in VALID_DOWNLOAD_KWARGS: 

112 raise ValueError( 

113 f"BlobReader does not support keyword argument {kwarg}." 

114 ) 

115 

116 self._blob = blob 

117 self._pos = 0 

118 self._buffer = io.BytesIO() 

119 self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE 

120 self._retry = retry 

121 self._download_kwargs = download_kwargs 

122 

123 def read(self, size=-1): 

124 self._checkClosed() # Raises ValueError if closed. 

125 

126 result = self._buffer.read(size) 

127 # If the read request demands more bytes than are buffered, fetch more. 

128 remaining_size = size - len(result) 

129 if remaining_size > 0 or size < 0: 

130 self._pos += self._buffer.tell() 

131 read_size = len(result) 

132 

133 self._buffer.seek(0) 

134 self._buffer.truncate(0) # Clear the buffer to make way for new data. 

135 fetch_start = self._pos 

136 if size > 0: 

137 # Fetch the larger of self._chunk_size or the remaining_size. 

138 fetch_end = fetch_start + max(remaining_size, self._chunk_size) 

139 else: 

140 fetch_end = None 

141 

142 # Download the blob. Checksumming must be disabled as we are using 

143 # chunked downloads, and the server only knows the checksum of the 

144 # entire file. 

145 try: 

146 result += self._blob.download_as_bytes( 

147 start=fetch_start, 

148 end=fetch_end, 

149 checksum=None, 

150 retry=self._retry, 

151 **self._download_kwargs, 

152 ) 

153 except RequestRangeNotSatisfiable: 

154 # We've reached the end of the file. Python file objects should 

155 # return an empty response in this case, not raise an error. 

156 pass 

157 

158 # If more bytes were read than is immediately needed, buffer the 

159 # remainder and then trim the result. 

160 if size > 0 and len(result) > size: 

161 self._buffer.write(result[size:]) 

162 self._buffer.seek(0) 

163 result = result[:size] 

164 # Increment relative offset by true amount read. 

165 self._pos += len(result) - read_size 

166 return result 

167 

168 def read1(self, size=-1): 

169 return self.read(size) 

170 

171 def seek(self, pos, whence=0): 

172 """Seek within the blob. 

173 

174 This implementation of seek() uses knowledge of the blob size to 

175 validate that the reported position does not exceed the blob last byte. 

176 If the blob size is not already known it will call blob.reload(). 

177 """ 

178 self._checkClosed() # Raises ValueError if closed. 

179 

180 if self._blob.size is None: 

181 reload_kwargs = { 

182 k: v 

183 for k, v in self._download_kwargs.items() 

184 if (k != "raw_download" and k != "single_shot_download") 

185 } 

186 self._blob.reload(**reload_kwargs) 

187 

188 initial_offset = self._pos + self._buffer.tell() 

189 

190 if whence == 0: 

191 target_pos = pos 

192 elif whence == 1: 

193 target_pos = initial_offset + pos 

194 elif whence == 2: 

195 target_pos = self._blob.size + pos 

196 if whence not in {0, 1, 2}: 

197 raise ValueError("invalid whence value") 

198 

199 if target_pos > self._blob.size: 

200 target_pos = self._blob.size 

201 

202 # Seek or invalidate buffer as needed. 

203 if target_pos < self._pos: 

204 # Target position < relative offset <= true offset. 

205 # As data is not in buffer, invalidate buffer. 

206 self._buffer.seek(0) 

207 self._buffer.truncate(0) 

208 new_pos = target_pos 

209 self._pos = target_pos 

210 else: 

211 # relative offset <= target position <= size of file. 

212 difference = target_pos - initial_offset 

213 new_pos = self._pos + self._buffer.seek(difference, 1) 

214 return new_pos 

215 

216 def close(self): 

217 self._buffer.close() 

218 

219 @property 

220 def closed(self): 

221 return self._buffer.closed 

222 

223 def readable(self): 

224 return True 

225 

226 def writable(self): 

227 return False 

228 

229 def seekable(self): 

230 return True 

231 

232 

233class BlobWriter(io.BufferedIOBase): 

234 """A file-like object that writes to a blob. 

235 

236 :type blob: 'google.cloud.storage.blob.Blob' 

237 :param blob: 

238 The blob to which to write. 

239 

240 :type chunk_size: long 

241 :param chunk_size: 

242 (Optional) The maximum number of bytes to buffer before sending data 

243 to the server, and the size of each request when data is sent. 

244 Writes are implemented as a "resumable upload", so chunk_size for 

245 writes must be exactly a multiple of 256KiB as with other resumable 

246 uploads. The default is the chunk_size of the blob, or 40 MiB. 

247 

248 :type ignore_flush: bool 

249 :param ignore_flush: 

250 Makes flush() do nothing instead of raise an error. flush() without 

251 closing is not supported by the remote service and therefore calling it 

252 on this class normally results in io.UnsupportedOperation. However, that 

253 behavior is incompatible with some consumers and wrappers of file 

254 objects in Python, such as zipfile.ZipFile or io.TextIOWrapper. Setting 

255 ignore_flush will cause flush() to successfully do nothing, for 

256 compatibility with those contexts. The correct way to actually flush 

257 data to the remote server is to close() (using this object as a context 

258 manager is recommended). 

259 

260 :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy 

261 :param retry: 

262 (Optional) How to retry the RPC. A None value will disable 

263 retries. A google.api_core.retry.Retry value will enable retries, 

264 and the object will define retriable response codes and errors and 

265 configure backoff and timeout options. 

266 

267 A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a 

268 Retry object and activates it only if certain conditions are met. 

269 This class exists to provide safe defaults for RPC calls that are 

270 not technically safe to retry normally (due to potential data 

271 duplication or other side-effects) but become safe to retry if a 

272 condition such as if_metageneration_match is set. 

273 

274 See the retry.py source code and docstrings in this package 

275 (google.cloud.storage.retry) for information on retry types and how 

276 to configure them. 

277 

278 Media operations (downloads and uploads) do not support non-default 

279 predicates in a Retry object. The default will always be used. Other 

280 configuration changes for Retry objects such as delays and deadlines 

281 are respected. 

282 

283 :type upload_kwargs: dict 

284 :param upload_kwargs: 

285 Keyword arguments to pass to the underlying API 

286 calls. The following arguments are supported: 

287 

288 - ``if_generation_match`` 

289 - ``if_generation_not_match`` 

290 - ``if_metageneration_match`` 

291 - ``if_metageneration_not_match`` 

292 - ``timeout`` 

293 - ``content_type`` 

294 - ``predefined_acl`` 

295 - ``checksum`` 

296 """ 

297 

298 def __init__( 

299 self, 

300 blob, 

301 chunk_size=None, 

302 ignore_flush=False, 

303 retry=DEFAULT_RETRY, 

304 **upload_kwargs, 

305 ): 

306 for kwarg in upload_kwargs: 

307 if kwarg not in VALID_UPLOAD_KWARGS: 

308 raise ValueError( 

309 f"BlobWriter does not support keyword argument {kwarg}." 

310 ) 

311 self._blob = blob 

312 self._buffer = SlidingBuffer() 

313 self._upload_and_transport = None 

314 # Resumable uploads require a chunk size of a multiple of 256KiB. 

315 # self._chunk_size must not be changed after the upload is initiated. 

316 self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE 

317 self._ignore_flush = ignore_flush 

318 self._retry = retry 

319 self._upload_kwargs = upload_kwargs 

320 

321 @property 

322 def _chunk_size(self): 

323 """Get the blob's default chunk size. 

324 

325 :rtype: int or ``NoneType`` 

326 :returns: The current blob's chunk size, if it is set. 

327 """ 

328 return self.__chunk_size 

329 

330 @_chunk_size.setter 

331 def _chunk_size(self, value): 

332 """Set the blob's default chunk size. 

333 

334 :type value: int 

335 :param value: (Optional) The current blob's chunk size, if it is set. 

336 

337 :raises: :class:`ValueError` if ``value`` is not ``None`` and is not a 

338 multiple of 256 KiB. 

339 """ 

340 if value is not None and value > 0 and value % CHUNK_SIZE_MULTIPLE != 0: 

341 raise ValueError( 

342 "Chunk size must be a multiple of %d." % CHUNK_SIZE_MULTIPLE 

343 ) 

344 self.__chunk_size = value 

345 

346 def write(self, b): 

347 self._checkClosed() # Raises ValueError if closed. 

348 

349 pos = self._buffer.write(b) 

350 

351 # If there is enough content, upload chunks. 

352 num_chunks = len(self._buffer) // self._chunk_size 

353 if num_chunks: 

354 self._upload_chunks_from_buffer(num_chunks) 

355 

356 return pos 

357 

358 def _initiate_upload(self): 

359 retry = self._retry 

360 content_type = self._upload_kwargs.pop("content_type", None) 

361 

362 # Handle ConditionalRetryPolicy. 

363 if isinstance(retry, ConditionalRetryPolicy): 

364 # Conditional retries are designed for non-media calls, which change 

365 # arguments into query_params dictionaries. Media operations work 

366 # differently, so here we make a "fake" query_params to feed to the 

367 # ConditionalRetryPolicy. 

368 query_params = { 

369 "ifGenerationMatch": self._upload_kwargs.get("if_generation_match"), 

370 "ifMetagenerationMatch": self._upload_kwargs.get( 

371 "if_metageneration_match" 

372 ), 

373 } 

374 retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) 

375 

376 self._upload_and_transport = self._blob._initiate_resumable_upload( 

377 self._blob.bucket.client, 

378 self._buffer, 

379 content_type, 

380 None, 

381 chunk_size=self._chunk_size, 

382 retry=retry, 

383 **self._upload_kwargs, 

384 ) 

385 

386 def _upload_chunks_from_buffer(self, num_chunks): 

387 """Upload a specified number of chunks.""" 

388 

389 # Initialize the upload if necessary. 

390 if not self._upload_and_transport: 

391 self._initiate_upload() 

392 

393 upload, transport = self._upload_and_transport 

394 

395 # Attach timeout if specified in the keyword arguments. 

396 # Otherwise, the default timeout will be used from the media library. 

397 kwargs = {} 

398 if "timeout" in self._upload_kwargs: 

399 kwargs = {"timeout": self._upload_kwargs.get("timeout")} 

400 

401 # Upload chunks. The SlidingBuffer class will manage seek position. 

402 for _ in range(num_chunks): 

403 upload.transmit_next_chunk(transport, **kwargs) 

404 

405 # Wipe the buffer of chunks uploaded, preserving any remaining data. 

406 self._buffer.flush() 

407 

408 def tell(self): 

409 return self._buffer.tell() + len(self._buffer) 

410 

411 def flush(self): 

412 # flush() is not fully supported by the remote service, so raise an 

413 # error here, unless self._ignore_flush is set. 

414 if not self._ignore_flush: 

415 raise io.UnsupportedOperation( 

416 "Cannot flush without finalizing upload. Use close() instead, " 

417 "or set ignore_flush=True when constructing this class (see " 

418 "docstring)." 

419 ) 

420 

421 def close(self): 

422 if not self._buffer.closed: 

423 self._upload_chunks_from_buffer(1) 

424 self._buffer.close() 

425 

426 def terminate(self): 

427 """Cancel the ResumableUpload.""" 

428 if self._upload_and_transport: 

429 upload, transport = self._upload_and_transport 

430 transport.delete(upload.upload_url) 

431 self._buffer.close() 

432 

433 def __exit__(self, exc_type, exc_val, exc_tb): 

434 if exc_type is not None: 

435 self.terminate() 

436 else: 

437 self.close() 

438 

439 @property 

440 def closed(self): 

441 return self._buffer.closed 

442 

443 def readable(self): 

444 return False 

445 

446 def writable(self): 

447 return True 

448 

449 def seekable(self): 

450 return False 

451 

452 

453class SlidingBuffer(object): 

454 """A non-rewindable buffer that frees memory of chunks already consumed. 

455 

456 This class is necessary because `google-resumable-media-python` expects 

457 `tell()` to work relative to the start of the file, not relative to a place 

458 in an intermediate buffer. Using this class, we present an external 

459 interface with consistent seek and tell behavior without having to actually 

460 store bytes already sent. 

461 

462 Behavior of this class differs from an ordinary BytesIO buffer. `write()` 

463 will always append to the end of the file only and not change the seek 

464 position otherwise. `flush()` will delete all data already read (data to the 

465 left of the seek position). `tell()` will report the seek position of the 

466 buffer including all deleted data. Additionally the class implements 

467 __len__() which will report the size of the actual underlying buffer. 

468 

469 This class does not attempt to implement the entire Python I/O interface. 

470 """ 

471 

472 def __init__(self): 

473 self._buffer = io.BytesIO() 

474 self._cursor = 0 

475 

476 def write(self, b): 

477 """Append to the end of the buffer without changing the position.""" 

478 self._checkClosed() # Raises ValueError if closed. 

479 

480 bookmark = self._buffer.tell() 

481 self._buffer.seek(0, io.SEEK_END) 

482 pos = self._buffer.write(b) 

483 self._buffer.seek(bookmark) 

484 return pos 

485 

486 def read(self, size=-1): 

487 """Read and move the cursor.""" 

488 self._checkClosed() # Raises ValueError if closed. 

489 

490 data = self._buffer.read(size) 

491 self._cursor += len(data) 

492 return data 

493 

494 def flush(self): 

495 """Delete already-read data (all data to the left of the position).""" 

496 self._checkClosed() # Raises ValueError if closed. 

497 

498 # BytesIO can't be deleted from the left, so save any leftover, unread 

499 # data and truncate at 0, then readd leftover data. 

500 leftover = self._buffer.read() 

501 self._buffer.seek(0) 

502 self._buffer.truncate(0) 

503 self._buffer.write(leftover) 

504 self._buffer.seek(0) 

505 

506 def tell(self): 

507 """Report how many bytes have been read from the buffer in total.""" 

508 return self._cursor 

509 

510 def seek(self, pos): 

511 """Seek to a position (backwards only) within the internal buffer. 

512 

513 This implementation of seek() verifies that the seek destination is 

514 contained in _buffer. It will raise ValueError if the destination byte 

515 has already been purged from the buffer. 

516 

517 The "whence" argument is not supported in this implementation. 

518 """ 

519 self._checkClosed() # Raises ValueError if closed. 

520 

521 buffer_initial_pos = self._buffer.tell() 

522 difference = pos - self._cursor 

523 buffer_seek_result = self._buffer.seek(difference, io.SEEK_CUR) 

524 if ( 

525 not buffer_seek_result - buffer_initial_pos == difference 

526 or pos > self._cursor 

527 ): 

528 # The seek did not arrive at the expected byte because the internal 

529 # buffer does not (or no longer) contains the byte. Reset and raise. 

530 self._buffer.seek(buffer_initial_pos) 

531 raise ValueError("Cannot seek() to that value.") 

532 

533 self._cursor = pos 

534 return self._cursor 

535 

536 def __len__(self): 

537 """Determine the size of the buffer by seeking to the end.""" 

538 bookmark = self._buffer.tell() 

539 length = self._buffer.seek(0, io.SEEK_END) 

540 self._buffer.seek(bookmark) 

541 return length 

542 

543 def close(self): 

544 return self._buffer.close() 

545 

546 def _checkClosed(self): 

547 return self._buffer._checkClosed() 

548 

549 @property 

550 def closed(self): 

551 return self._buffer.closed