Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/storage/_media/_upload.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

373 statements  

1# Copyright 2017 Google Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Virtual bases classes for uploading media via Google APIs. 

16 

17Supported here are: 

18 

19* simple (media) uploads 

20* multipart uploads that contain both metadata and a small file as payload 

21* resumable uploads (with metadata as well) 

22""" 

23 

24import http.client 

25import json 

26import os 

27import random 

28import re 

29import sys 

30import urllib.parse 

31 

32from google.cloud.storage._media import _helpers 

33from google.cloud.storage._media import UPLOAD_CHUNK_SIZE 

34from google.cloud.storage.exceptions import InvalidResponse 

35from google.cloud.storage.exceptions import DataCorruption 

36from google.cloud.storage.retry import DEFAULT_RETRY 

37 

38from xml.etree import ElementTree 

39 

40 

41_CONTENT_TYPE_HEADER = "content-type" 

42_CONTENT_RANGE_TEMPLATE = "bytes {:d}-{:d}/{:d}" 

43_RANGE_UNKNOWN_TEMPLATE = "bytes {:d}-{:d}/*" 

44_EMPTY_RANGE_TEMPLATE = "bytes */{:d}" 

45_BOUNDARY_WIDTH = len(str(sys.maxsize - 1)) 

46_BOUNDARY_FORMAT = "==============={{:0{:d}d}}==".format(_BOUNDARY_WIDTH) 

47_MULTIPART_SEP = b"--" 

48_CRLF = b"\r\n" 

49_MULTIPART_BEGIN = b"\r\ncontent-type: application/json; charset=UTF-8\r\n\r\n" 

50_RELATED_HEADER = b'multipart/related; boundary="' 

51_BYTES_RANGE_RE = re.compile(r"bytes=0-(?P<end_byte>\d+)", flags=re.IGNORECASE) 

52_STREAM_ERROR_TEMPLATE = ( 

53 "Bytes stream is in unexpected state. " 

54 "The local stream has had {:d} bytes read from it while " 

55 "{:d} bytes have already been updated (they should match)." 

56) 

57_STREAM_READ_PAST_TEMPLATE = ( 

58 "{:d} bytes have been read from the stream, which exceeds " 

59 "the expected total {:d}." 

60) 

61_DELETE = "DELETE" 

62_POST = "POST" 

63_PUT = "PUT" 

64_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = ( 

65 "The computed ``{}`` checksum, ``{}``, and the checksum reported by the " 

66 "remote host, ``{}``, did not match." 

67) 

68_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 

69 "Response metadata had no ``{}`` value; checksum could not be validated." 

70) 

71_UPLOAD_HEADER_NO_APPROPRIATE_CHECKSUM_MESSAGE = ( 

72 "Response headers had no ``{}`` value; checksum could not be validated." 

73) 

74_MPU_INITIATE_QUERY = "?uploads" 

75_MPU_PART_QUERY_TEMPLATE = "?partNumber={part}&uploadId={upload_id}" 

76_S3_COMPAT_XML_NAMESPACE = "{http://s3.amazonaws.com/doc/2006-03-01/}" 

77_UPLOAD_ID_NODE = "UploadId" 

78_MPU_FINAL_QUERY_TEMPLATE = "?uploadId={upload_id}" 

79 

80 

81class UploadBase(object): 

82 """Base class for upload helpers. 

83 

84 Defines core shared behavior across different upload types. 

85 

86 Args: 

87 upload_url (str): The URL where the content will be uploaded. 

88 headers (Optional[Mapping[str, str]]): Extra headers that should 

89 be sent with the request, e.g. headers for encrypted data. 

90 retry (Optional[google.api_core.retry.Retry]): How to retry the 

91 RPC. A None value will disable retries. A 

92 google.api_core.retry.Retry value will enable retries, and the 

93 object will configure backoff and timeout options. 

94 

95 See the retry.py source code and docstrings in this package 

96 (google.cloud.storage.retry) for information on retry types and how 

97 to configure them. 

98 

99 Attributes: 

100 upload_url (str): The URL where the content will be uploaded. 

101 """ 

102 

103 def __init__(self, upload_url, headers=None, retry=DEFAULT_RETRY): 

104 self.upload_url = upload_url 

105 if headers is None: 

106 headers = {} 

107 self._headers = headers 

108 self._finished = False 

109 self._retry_strategy = retry 

110 

111 @property 

112 def finished(self): 

113 """bool: Flag indicating if the upload has completed.""" 

114 return self._finished 

115 

116 def _process_response(self, response): 

117 """Process the response from an HTTP request. 

118 

119 This is everything that must be done after a request that doesn't 

120 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

121 philosophy. 

122 

123 Args: 

124 response (object): The HTTP response object. 

125 

126 Raises: 

127 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

128 code is not 200. 

129 

130 .. _sans-I/O: https://sans-io.readthedocs.io/ 

131 """ 

132 # Tombstone the current upload so it cannot be used again (in either 

133 # failure or success). 

134 self._finished = True 

135 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

136 

137 @staticmethod 

138 def _get_status_code(response): 

139 """Access the status code from an HTTP response. 

140 

141 Args: 

142 response (object): The HTTP response object. 

143 

144 Raises: 

145 NotImplementedError: Always, since virtual. 

146 """ 

147 raise NotImplementedError("This implementation is virtual.") 

148 

149 @staticmethod 

150 def _get_headers(response): 

151 """Access the headers from an HTTP response. 

152 

153 Args: 

154 response (object): The HTTP response object. 

155 

156 Raises: 

157 NotImplementedError: Always, since virtual. 

158 """ 

159 raise NotImplementedError("This implementation is virtual.") 

160 

161 @staticmethod 

162 def _get_body(response): 

163 """Access the response body from an HTTP response. 

164 

165 Args: 

166 response (object): The HTTP response object. 

167 

168 Raises: 

169 NotImplementedError: Always, since virtual. 

170 """ 

171 raise NotImplementedError("This implementation is virtual.") 

172 

173 

174class SimpleUpload(UploadBase): 

175 """Upload a resource to a Google API. 

176 

177 A **simple** media upload sends no metadata and completes the upload 

178 in a single request. 

179 

180 Args: 

181 upload_url (str): The URL where the content will be uploaded. 

182 headers (Optional[Mapping[str, str]]): Extra headers that should 

183 be sent with the request, e.g. headers for encrypted data. 

184 retry (Optional[google.api_core.retry.Retry]): How to retry the 

185 RPC. A None value will disable retries. A 

186 google.api_core.retry.Retry value will enable retries, and the 

187 object will configure backoff and timeout options. 

188 

189 See the retry.py source code and docstrings in this package 

190 (google.cloud.storage.retry) for information on retry types and how 

191 to configure them. 

192 

193 Attributes: 

194 upload_url (str): The URL where the content will be uploaded. 

195 """ 

196 

197 def _prepare_request(self, data, content_type): 

198 """Prepare the contents of an HTTP request. 

199 

200 This is everything that must be done before a request that doesn't 

201 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

202 philosophy. 

203 

204 .. note: 

205 

206 This method will be used only once, so ``headers`` will be 

207 mutated by having a new key added to it. 

208 

209 Args: 

210 data (bytes): The resource content to be uploaded. 

211 content_type (str): The content type for the request. 

212 

213 Returns: 

214 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

215 

216 * HTTP verb for the request (always POST) 

217 * the URL for the request 

218 * the body of the request 

219 * headers for the request 

220 

221 Raises: 

222 ValueError: If the current upload has already finished. 

223 TypeError: If ``data`` isn't bytes. 

224 

225 .. _sans-I/O: https://sans-io.readthedocs.io/ 

226 """ 

227 if self.finished: 

228 raise ValueError("An upload can only be used once.") 

229 

230 if not isinstance(data, bytes): 

231 raise TypeError("`data` must be bytes, received", type(data)) 

232 self._headers[_CONTENT_TYPE_HEADER] = content_type 

233 return _POST, self.upload_url, data, self._headers 

234 

235 def transmit(self, transport, data, content_type, timeout=None): 

236 """Transmit the resource to be uploaded. 

237 

238 Args: 

239 transport (object): An object which can make authenticated 

240 requests. 

241 data (bytes): The resource content to be uploaded. 

242 content_type (str): The content type of the resource, e.g. a JPEG 

243 image has content type ``image/jpeg``. 

244 timeout (Optional[Union[float, Tuple[float, float]]]): 

245 The number of seconds to wait for the server response. 

246 Depending on the retry strategy, a request may be repeated 

247 several times using the same timeout each time. 

248 

249 Can also be passed as a tuple (connect_timeout, read_timeout). 

250 See :meth:`requests.Session.request` documentation for details. 

251 

252 Raises: 

253 NotImplementedError: Always, since virtual. 

254 """ 

255 raise NotImplementedError("This implementation is virtual.") 

256 

257 

258class MultipartUpload(UploadBase): 

259 """Upload a resource with metadata to a Google API. 

260 

261 A **multipart** upload sends both metadata and the resource in a single 

262 (multipart) request. 

263 

264 Args: 

265 upload_url (str): The URL where the content will be uploaded. 

266 headers (Optional[Mapping[str, str]]): Extra headers that should 

267 be sent with the request, e.g. headers for encrypted data. 

268 checksum Optional([str]): The type of checksum to compute to verify 

269 the integrity of the object. The request metadata will be amended 

270 to include the computed value. Using this option will override a 

271 manually-set checksum value. Supported values are "md5", 

272 "crc32c", "auto", and None. The default is "auto", which will try 

273 to detect if the C extension for crc32c is installed and fall back 

274 to md5 otherwise. 

275 retry (Optional[google.api_core.retry.Retry]): How to retry the 

276 RPC. A None value will disable retries. A 

277 google.api_core.retry.Retry value will enable retries, and the 

278 object will configure backoff and timeout options. 

279 

280 See the retry.py source code and docstrings in this package 

281 (google.cloud.storage.retry) for information on retry types and how 

282 to configure them. 

283 

284 Attributes: 

285 upload_url (str): The URL where the content will be uploaded. 

286 """ 

287 

288 def __init__(self, upload_url, headers=None, checksum="auto", retry=DEFAULT_RETRY): 

289 super(MultipartUpload, self).__init__(upload_url, headers=headers, retry=retry) 

290 self._checksum_type = checksum 

291 if self._checksum_type == "auto": 

292 self._checksum_type = ( 

293 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 

294 ) 

295 

296 def _prepare_request(self, data, metadata, content_type): 

297 """Prepare the contents of an HTTP request. 

298 

299 This is everything that must be done before a request that doesn't 

300 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

301 philosophy. 

302 

303 .. note: 

304 

305 This method will be used only once, so ``headers`` will be 

306 mutated by having a new key added to it. 

307 

308 Args: 

309 data (bytes): The resource content to be uploaded. 

310 metadata (Mapping[str, str]): The resource metadata, such as an 

311 ACL list. 

312 content_type (str): The content type of the resource, e.g. a JPEG 

313 image has content type ``image/jpeg``. 

314 

315 Returns: 

316 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

317 

318 * HTTP verb for the request (always POST) 

319 * the URL for the request 

320 * the body of the request 

321 * headers for the request 

322 

323 Raises: 

324 ValueError: If the current upload has already finished. 

325 TypeError: If ``data`` isn't bytes. 

326 

327 .. _sans-I/O: https://sans-io.readthedocs.io/ 

328 """ 

329 if self.finished: 

330 raise ValueError("An upload can only be used once.") 

331 

332 if not isinstance(data, bytes): 

333 raise TypeError("`data` must be bytes, received", type(data)) 

334 

335 checksum_object = _helpers._get_checksum_object(self._checksum_type) 

336 if checksum_object is not None: 

337 checksum_object.update(data) 

338 actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest()) 

339 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

340 metadata[metadata_key] = actual_checksum 

341 

342 content, multipart_boundary = construct_multipart_request( 

343 data, metadata, content_type 

344 ) 

345 multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"' 

346 self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type 

347 

348 return _POST, self.upload_url, content, self._headers 

349 

350 def transmit(self, transport, data, metadata, content_type, timeout=None): 

351 """Transmit the resource to be uploaded. 

352 

353 Args: 

354 transport (object): An object which can make authenticated 

355 requests. 

356 data (bytes): The resource content to be uploaded. 

357 metadata (Mapping[str, str]): The resource metadata, such as an 

358 ACL list. 

359 content_type (str): The content type of the resource, e.g. a JPEG 

360 image has content type ``image/jpeg``. 

361 timeout (Optional[Union[float, Tuple[float, float]]]): 

362 The number of seconds to wait for the server response. 

363 Depending on the retry strategy, a request may be repeated 

364 several times using the same timeout each time. 

365 

366 Can also be passed as a tuple (connect_timeout, read_timeout). 

367 See :meth:`requests.Session.request` documentation for details. 

368 

369 Raises: 

370 NotImplementedError: Always, since virtual. 

371 """ 

372 raise NotImplementedError("This implementation is virtual.") 

373 

374 

375class ResumableUpload(UploadBase): 

376 """Initiate and fulfill a resumable upload to a Google API. 

377 

378 A **resumable** upload sends an initial request with the resource metadata 

379 and then gets assigned an upload ID / upload URL to send bytes to. 

380 Using the upload URL, the upload is then done in chunks (determined by 

381 the user) until all bytes have been uploaded. 

382 

383 Args: 

384 upload_url (str): The URL where the resumable upload will be initiated. 

385 chunk_size (int): The size of each chunk used to upload the resource. 

386 headers (Optional[Mapping[str, str]]): Extra headers that should 

387 be sent with every request. 

388 checksum Optional([str]): The type of checksum to compute to verify 

389 the integrity of the object. After the upload is complete, the 

390 server-computed checksum of the resulting object will be checked 

391 and google.cloud.storage.exceptions.DataCorruption will be raised on 

392 a mismatch. The corrupted file will not be deleted from the remote 

393 host automatically. Supported values are "md5", "crc32c", "auto", 

394 and None. The default is "auto", which will try to detect if the C 

395 extension for crc32c is installed and fall back to md5 otherwise. 

396 retry (Optional[google.api_core.retry.Retry]): How to retry the 

397 RPC. A None value will disable retries. A 

398 google.api_core.retry.Retry value will enable retries, and the 

399 object will configure backoff and timeout options. 

400 

401 See the retry.py source code and docstrings in this package 

402 (google.cloud.storage.retry) for information on retry types and how 

403 to configure them. 

404 

405 Attributes: 

406 upload_url (str): The URL where the content will be uploaded. 

407 

408 Raises: 

409 ValueError: If ``chunk_size`` is not a multiple of 

410 :data:`.UPLOAD_CHUNK_SIZE`. 

411 """ 

412 

413 def __init__( 

414 self, upload_url, chunk_size, checksum="auto", headers=None, retry=DEFAULT_RETRY 

415 ): 

416 super(ResumableUpload, self).__init__(upload_url, headers=headers, retry=retry) 

417 if chunk_size % UPLOAD_CHUNK_SIZE != 0: 

418 raise ValueError( 

419 "{} KB must divide chunk size".format(UPLOAD_CHUNK_SIZE / 1024) 

420 ) 

421 self._chunk_size = chunk_size 

422 self._stream = None 

423 self._content_type = None 

424 self._bytes_uploaded = 0 

425 self._bytes_checksummed = 0 

426 self._checksum_type = checksum 

427 if self._checksum_type == "auto": 

428 self._checksum_type = ( 

429 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 

430 ) 

431 self._checksum_object = None 

432 self._total_bytes = None 

433 self._resumable_url = None 

434 self._invalid = False 

435 

436 @property 

437 def invalid(self): 

438 """bool: Indicates if the upload is in an invalid state. 

439 

440 This will occur if a call to :meth:`transmit_next_chunk` fails. 

441 To recover from such a failure, call :meth:`recover`. 

442 """ 

443 return self._invalid 

444 

445 @property 

446 def chunk_size(self): 

447 """int: The size of each chunk used to upload the resource.""" 

448 return self._chunk_size 

449 

450 @property 

451 def resumable_url(self): 

452 """Optional[str]: The URL of the in-progress resumable upload.""" 

453 return self._resumable_url 

454 

455 @property 

456 def bytes_uploaded(self): 

457 """int: Number of bytes that have been uploaded.""" 

458 return self._bytes_uploaded 

459 

460 @property 

461 def total_bytes(self): 

462 """Optional[int]: The total number of bytes to be uploaded. 

463 

464 If this upload is initiated (via :meth:`initiate`) with 

465 ``stream_final=True``, this value will be populated based on the size 

466 of the ``stream`` being uploaded. (By default ``stream_final=True``.) 

467 

468 If this upload is initiated with ``stream_final=False``, 

469 :attr:`total_bytes` will be :data:`None` since it cannot be 

470 determined from the stream. 

471 """ 

472 return self._total_bytes 

473 

474 def _prepare_initiate_request( 

475 self, stream, metadata, content_type, total_bytes=None, stream_final=True 

476 ): 

477 """Prepare the contents of HTTP request to initiate upload. 

478 

479 This is everything that must be done before a request that doesn't 

480 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

481 philosophy. 

482 

483 Args: 

484 stream (IO[bytes]): The stream (i.e. file-like object) that will 

485 be uploaded. The stream **must** be at the beginning (i.e. 

486 ``stream.tell() == 0``). 

487 metadata (Mapping[str, str]): The resource metadata, such as an 

488 ACL list. 

489 content_type (str): The content type of the resource, e.g. a JPEG 

490 image has content type ``image/jpeg``. 

491 total_bytes (Optional[int]): The total number of bytes to be 

492 uploaded. If specified, the upload size **will not** be 

493 determined from the stream (even if ``stream_final=True``). 

494 stream_final (Optional[bool]): Indicates if the ``stream`` is 

495 "final" (i.e. no more bytes will be added to it). In this case 

496 we determine the upload size from the size of the stream. If 

497 ``total_bytes`` is passed, this argument will be ignored. 

498 

499 Returns: 

500 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

501 

502 * HTTP verb for the request (always POST) 

503 * the URL for the request 

504 * the body of the request 

505 * headers for the request 

506 

507 Raises: 

508 ValueError: If the current upload has already been initiated. 

509 ValueError: If ``stream`` is not at the beginning. 

510 

511 .. _sans-I/O: https://sans-io.readthedocs.io/ 

512 """ 

513 if self.resumable_url is not None: 

514 raise ValueError("This upload has already been initiated.") 

515 if stream.tell() != 0: 

516 raise ValueError("Stream must be at beginning.") 

517 

518 self._stream = stream 

519 self._content_type = content_type 

520 

521 # Signed URL requires content type set directly - not through x-upload-content-type 

522 parse_result = urllib.parse.urlparse(self.upload_url) 

523 parsed_query = urllib.parse.parse_qs(parse_result.query) 

524 if "x-goog-signature" in parsed_query or "X-Goog-Signature" in parsed_query: 

525 # Deconstruct **self._headers first so that content type defined here takes priority 

526 headers = {**self._headers, _CONTENT_TYPE_HEADER: content_type} 

527 else: 

528 # Deconstruct **self._headers first so that content type defined here takes priority 

529 headers = { 

530 **self._headers, 

531 _CONTENT_TYPE_HEADER: "application/json; charset=UTF-8", 

532 "x-upload-content-type": content_type, 

533 } 

534 # Set the total bytes if possible. 

535 if total_bytes is not None: 

536 self._total_bytes = total_bytes 

537 elif stream_final: 

538 self._total_bytes = get_total_bytes(stream) 

539 # Add the total bytes to the headers if set. 

540 if self._total_bytes is not None: 

541 content_length = "{:d}".format(self._total_bytes) 

542 headers["x-upload-content-length"] = content_length 

543 

544 payload = json.dumps(metadata).encode("utf-8") 

545 return _POST, self.upload_url, payload, headers 

546 

547 def _process_initiate_response(self, response): 

548 """Process the response from an HTTP request that initiated upload. 

549 

550 This is everything that must be done after a request that doesn't 

551 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

552 philosophy. 

553 

554 This method takes the URL from the ``Location`` header and stores it 

555 for future use. Within that URL, we assume the ``upload_id`` query 

556 parameter has been included, but we do not check. 

557 

558 Args: 

559 response (object): The HTTP response object (need headers). 

560 

561 .. _sans-I/O: https://sans-io.readthedocs.io/ 

562 """ 

563 _helpers.require_status_code( 

564 response, 

565 (http.client.OK, http.client.CREATED), 

566 self._get_status_code, 

567 callback=self._make_invalid, 

568 ) 

569 self._resumable_url = _helpers.header_required( 

570 response, "location", self._get_headers 

571 ) 

572 

573 def initiate( 

574 self, 

575 transport, 

576 stream, 

577 metadata, 

578 content_type, 

579 total_bytes=None, 

580 stream_final=True, 

581 timeout=None, 

582 ): 

583 """Initiate a resumable upload. 

584 

585 By default, this method assumes your ``stream`` is in a "final" 

586 state ready to transmit. However, ``stream_final=False`` can be used 

587 to indicate that the size of the resource is not known. This can happen 

588 if bytes are being dynamically fed into ``stream``, e.g. if the stream 

589 is attached to application logs. 

590 

591 If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be 

592 read from the stream every time :meth:`transmit_next_chunk` is called. 

593 If one of those reads produces strictly fewer bites than the chunk 

594 size, the upload will be concluded. 

595 

596 Args: 

597 transport (object): An object which can make authenticated 

598 requests. 

599 stream (IO[bytes]): The stream (i.e. file-like object) that will 

600 be uploaded. The stream **must** be at the beginning (i.e. 

601 ``stream.tell() == 0``). 

602 metadata (Mapping[str, str]): The resource metadata, such as an 

603 ACL list. 

604 content_type (str): The content type of the resource, e.g. a JPEG 

605 image has content type ``image/jpeg``. 

606 total_bytes (Optional[int]): The total number of bytes to be 

607 uploaded. If specified, the upload size **will not** be 

608 determined from the stream (even if ``stream_final=True``). 

609 stream_final (Optional[bool]): Indicates if the ``stream`` is 

610 "final" (i.e. no more bytes will be added to it). In this case 

611 we determine the upload size from the size of the stream. If 

612 ``total_bytes`` is passed, this argument will be ignored. 

613 timeout (Optional[Union[float, Tuple[float, float]]]): 

614 The number of seconds to wait for the server response. 

615 Depending on the retry strategy, a request may be repeated 

616 several times using the same timeout each time. 

617 

618 Can also be passed as a tuple (connect_timeout, read_timeout). 

619 See :meth:`requests.Session.request` documentation for details. 

620 

621 Raises: 

622 NotImplementedError: Always, since virtual. 

623 """ 

624 raise NotImplementedError("This implementation is virtual.") 

625 

626 def _prepare_request(self): 

627 """Prepare the contents of HTTP request to upload a chunk. 

628 

629 This is everything that must be done before a request that doesn't 

630 require network I/O. This is based on the `sans-I/O`_ philosophy. 

631 

632 For the time being, this **does require** some form of I/O to read 

633 a chunk from ``stream`` (via :func:`get_next_chunk`). However, this 

634 will (almost) certainly not be network I/O. 

635 

636 Returns: 

637 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

638 

639 * HTTP verb for the request (always PUT) 

640 * the URL for the request 

641 * the body of the request 

642 * headers for the request 

643 

644 The headers incorporate the ``_headers`` on the current instance. 

645 

646 Raises: 

647 ValueError: If the current upload has finished. 

648 ValueError: If the current upload is in an invalid state. 

649 ValueError: If the current upload has not been initiated. 

650 ValueError: If the location in the stream (i.e. ``stream.tell()``) 

651 does not agree with ``bytes_uploaded``. 

652 

653 .. _sans-I/O: https://sans-io.readthedocs.io/ 

654 """ 

655 if self.finished: 

656 raise ValueError("Upload has finished.") 

657 if self.invalid: 

658 raise ValueError( 

659 "Upload is in an invalid state. To recover call `recover()`." 

660 ) 

661 if self.resumable_url is None: 

662 raise ValueError( 

663 "This upload has not been initiated. Please call " 

664 "initiate() before beginning to transmit chunks." 

665 ) 

666 

667 start_byte, payload, content_range = get_next_chunk( 

668 self._stream, self._chunk_size, self._total_bytes 

669 ) 

670 if start_byte != self.bytes_uploaded: 

671 msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded) 

672 raise ValueError(msg) 

673 

674 self._update_checksum(start_byte, payload) 

675 

676 headers = { 

677 **self._headers, 

678 _CONTENT_TYPE_HEADER: self._content_type, 

679 _helpers.CONTENT_RANGE_HEADER: content_range, 

680 } 

681 return _PUT, self.resumable_url, payload, headers 

682 

683 def _update_checksum(self, start_byte, payload): 

684 """Update the checksum with the payload if not already updated. 

685 

686 Because error recovery can result in bytes being transmitted more than 

687 once, the checksum tracks the number of bytes checked in 

688 self._bytes_checksummed and skips bytes that have already been summed. 

689 """ 

690 if not self._checksum_type: 

691 return 

692 

693 if not self._checksum_object: 

694 self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 

695 

696 if start_byte < self._bytes_checksummed: 

697 offset = self._bytes_checksummed - start_byte 

698 data = payload[offset:] 

699 else: 

700 data = payload 

701 

702 self._checksum_object.update(data) 

703 self._bytes_checksummed += len(data) 

704 

705 def _make_invalid(self): 

706 """Simple setter for ``invalid``. 

707 

708 This is intended to be passed along as a callback to helpers that 

709 raise an exception so they can mark this instance as invalid before 

710 raising. 

711 """ 

712 self._invalid = True 

713 

714 def _process_resumable_response(self, response, bytes_sent): 

715 """Process the response from an HTTP request. 

716 

717 This is everything that must be done after a request that doesn't 

718 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

719 philosophy. 

720 

721 Args: 

722 response (object): The HTTP response object. 

723 bytes_sent (int): The number of bytes sent in the request that 

724 ``response`` was returned for. 

725 

726 Raises: 

727 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

728 code is 308 and the ``range`` header is not of the form 

729 ``bytes 0-{end}``. 

730 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

731 code is not 200 or 308. 

732 

733 .. _sans-I/O: https://sans-io.readthedocs.io/ 

734 """ 

735 status_code = _helpers.require_status_code( 

736 response, 

737 (http.client.OK, http.client.PERMANENT_REDIRECT), 

738 self._get_status_code, 

739 callback=self._make_invalid, 

740 ) 

741 if status_code == http.client.OK: 

742 # NOTE: We use the "local" information of ``bytes_sent`` to update 

743 # ``bytes_uploaded``, but do not verify this against other 

744 # state. However, there may be some other information: 

745 # 

746 # * a ``size`` key in JSON response body 

747 # * the ``total_bytes`` attribute (if set) 

748 # * ``stream.tell()`` (relying on fact that ``initiate()`` 

749 # requires stream to be at the beginning) 

750 self._bytes_uploaded = self._bytes_uploaded + bytes_sent 

751 # Tombstone the current upload so it cannot be used again. 

752 self._finished = True 

753 # Validate the checksum. This can raise an exception on failure. 

754 self._validate_checksum(response) 

755 else: 

756 bytes_range = _helpers.header_required( 

757 response, 

758 _helpers.RANGE_HEADER, 

759 self._get_headers, 

760 callback=self._make_invalid, 

761 ) 

762 match = _BYTES_RANGE_RE.match(bytes_range) 

763 if match is None: 

764 self._make_invalid() 

765 raise InvalidResponse( 

766 response, 

767 'Unexpected "range" header', 

768 bytes_range, 

769 'Expected to be of the form "bytes=0-{end}"', 

770 ) 

771 self._bytes_uploaded = int(match.group("end_byte")) + 1 

772 

773 def _validate_checksum(self, response): 

774 """Check the computed checksum, if any, against the recieved metadata. 

775 

776 Args: 

777 response (object): The HTTP response object. 

778 

779 Raises: 

780 ~google.cloud.storage.exceptions.DataCorruption: If the checksum 

781 computed locally and the checksum reported by the remote host do 

782 not match. 

783 """ 

784 if self._checksum_type is None: 

785 return 

786 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

787 metadata = response.json() 

788 remote_checksum = metadata.get(metadata_key) 

789 if remote_checksum is None: 

790 raise InvalidResponse( 

791 response, 

792 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 

793 self._get_headers(response), 

794 ) 

795 local_checksum = _helpers.prepare_checksum_digest( 

796 self._checksum_object.digest() 

797 ) 

798 if local_checksum != remote_checksum: 

799 raise DataCorruption( 

800 response, 

801 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 

802 self._checksum_type.upper(), local_checksum, remote_checksum 

803 ), 

804 ) 

805 

806 def transmit_next_chunk(self, transport, timeout=None): 

807 """Transmit the next chunk of the resource to be uploaded. 

808 

809 If the current upload was initiated with ``stream_final=False``, 

810 this method will dynamically determine if the upload has completed. 

811 The upload will be considered complete if the stream produces 

812 fewer than :attr:`chunk_size` bytes when a chunk is read from it. 

813 

814 Args: 

815 transport (object): An object which can make authenticated 

816 requests. 

817 timeout (Optional[Union[float, Tuple[float, float]]]): 

818 The number of seconds to wait for the server response. 

819 Depending on the retry strategy, a request may be repeated 

820 several times using the same timeout each time. 

821 

822 Can also be passed as a tuple (connect_timeout, read_timeout). 

823 See :meth:`requests.Session.request` documentation for details. 

824 

825 Raises: 

826 NotImplementedError: Always, since virtual. 

827 """ 

828 raise NotImplementedError("This implementation is virtual.") 

829 

830 def _prepare_recover_request(self): 

831 """Prepare the contents of HTTP request to recover from failure. 

832 

833 This is everything that must be done before a request that doesn't 

834 require network I/O. This is based on the `sans-I/O`_ philosophy. 

835 

836 We assume that the :attr:`resumable_url` is set (i.e. the only way 

837 the upload can end up :attr:`invalid` is if it has been initiated. 

838 

839 Returns: 

840 Tuple[str, str, NoneType, Mapping[str, str]]: The quadruple 

841 

842 * HTTP verb for the request (always PUT) 

843 * the URL for the request 

844 * the body of the request (always :data:`None`) 

845 * headers for the request 

846 

847 The headers **do not** incorporate the ``_headers`` on the 

848 current instance. 

849 

850 .. _sans-I/O: https://sans-io.readthedocs.io/ 

851 """ 

852 headers = {_helpers.CONTENT_RANGE_HEADER: "bytes */*"} 

853 return _PUT, self.resumable_url, None, headers 

854 

855 def _process_recover_response(self, response): 

856 """Process the response from an HTTP request to recover from failure. 

857 

858 This is everything that must be done after a request that doesn't 

859 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

860 philosophy. 

861 

862 Args: 

863 response (object): The HTTP response object. 

864 

865 Raises: 

866 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

867 code is not 308. 

868 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

869 code is 308 and the ``range`` header is not of the form 

870 ``bytes 0-{end}``. 

871 

872 .. _sans-I/O: https://sans-io.readthedocs.io/ 

873 """ 

874 _helpers.require_status_code( 

875 response, (http.client.PERMANENT_REDIRECT,), self._get_status_code 

876 ) 

877 headers = self._get_headers(response) 

878 if _helpers.RANGE_HEADER in headers: 

879 bytes_range = headers[_helpers.RANGE_HEADER] 

880 match = _BYTES_RANGE_RE.match(bytes_range) 

881 if match is None: 

882 raise InvalidResponse( 

883 response, 

884 'Unexpected "range" header', 

885 bytes_range, 

886 'Expected to be of the form "bytes=0-{end}"', 

887 ) 

888 self._bytes_uploaded = int(match.group("end_byte")) + 1 

889 else: 

890 # In this case, the upload has not "begun". 

891 self._bytes_uploaded = 0 

892 

893 self._stream.seek(self._bytes_uploaded) 

894 self._invalid = False 

895 

896 def recover(self, transport): 

897 """Recover from a failure. 

898 

899 This method should be used when a :class:`ResumableUpload` is in an 

900 :attr:`~ResumableUpload.invalid` state due to a request failure. 

901 

902 This will verify the progress with the server and make sure the 

903 current upload is in a valid state before :meth:`transmit_next_chunk` 

904 can be used again. 

905 

906 Args: 

907 transport (object): An object which can make authenticated 

908 requests. 

909 

910 Raises: 

911 NotImplementedError: Always, since virtual. 

912 """ 

913 raise NotImplementedError("This implementation is virtual.") 

914 

915 

916class XMLMPUContainer(UploadBase): 

917 """Initiate and close an upload using the XML MPU API. 

918 

919 An XML MPU sends an initial request and then receives an upload ID. 

920 Using the upload ID, the upload is then done in numbered parts and the 

921 parts can be uploaded concurrently. 

922 

923 In order to avoid concurrency issues with this container object, the 

924 uploading of individual parts is handled separately, by XMLMPUPart objects 

925 spawned from this container class. The XMLMPUPart objects are not 

926 necessarily in the same process as the container, so they do not update the 

927 container automatically. 

928 

929 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 

930 given the JSON multipart upload, so the abbreviation "MPU" will be used 

931 throughout. 

932 

933 See: https://cloud.google.com/storage/docs/multipart-uploads 

934 

935 Args: 

936 upload_url (str): The URL of the object (without query parameters). The 

937 initiate, PUT, and finalization requests will all use this URL, with 

938 varying query parameters. 

939 filename (str): The name (path) of the file to upload. 

940 headers (Optional[Mapping[str, str]]): Extra headers that should 

941 be sent with every request. 

942 retry (Optional[google.api_core.retry.Retry]): How to retry the 

943 RPC. A None value will disable retries. A 

944 google.api_core.retry.Retry value will enable retries, and the 

945 object will configure backoff and timeout options. 

946 

947 See the retry.py source code and docstrings in this package 

948 (google.cloud.storage.retry) for information on retry types and how 

949 to configure them. 

950 

951 Attributes: 

952 upload_url (str): The URL where the content will be uploaded. 

953 upload_id (Optional(str)): The ID of the upload from the initialization 

954 response. 

955 """ 

956 

957 def __init__( 

958 self, upload_url, filename, headers=None, upload_id=None, retry=DEFAULT_RETRY 

959 ): 

960 super().__init__(upload_url, headers=headers, retry=retry) 

961 self._filename = filename 

962 self._upload_id = upload_id 

963 self._parts = {} 

964 

965 @property 

966 def upload_id(self): 

967 return self._upload_id 

968 

969 def register_part(self, part_number, etag): 

970 """Register an uploaded part by part number and corresponding etag. 

971 

972 XMLMPUPart objects represent individual parts, and their part number 

973 and etag can be registered to the container object with this method 

974 and therefore incorporated in the finalize() call to finish the upload. 

975 

976 This method accepts part_number and etag, but not XMLMPUPart objects 

977 themselves, to reduce the complexity involved in running XMLMPUPart 

978 uploads in separate processes. 

979 

980 Args: 

981 part_number (int): The part number. Parts are assembled into the 

982 final uploaded object with finalize() in order of their part 

983 numbers. 

984 etag (str): The etag included in the server response after upload. 

985 """ 

986 self._parts[part_number] = etag 

987 

988 def _prepare_initiate_request(self, content_type): 

989 """Prepare the contents of HTTP request to initiate upload. 

990 

991 This is everything that must be done before a request that doesn't 

992 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

993 philosophy. 

994 

995 Args: 

996 content_type (str): The content type of the resource, e.g. a JPEG 

997 image has content type ``image/jpeg``. 

998 

999 Returns: 

1000 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1001 

1002 * HTTP verb for the request (always POST) 

1003 * the URL for the request 

1004 * the body of the request 

1005 * headers for the request 

1006 

1007 Raises: 

1008 ValueError: If the current upload has already been initiated. 

1009 

1010 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1011 """ 

1012 if self.upload_id is not None: 

1013 raise ValueError("This upload has already been initiated.") 

1014 

1015 initiate_url = self.upload_url + _MPU_INITIATE_QUERY 

1016 

1017 headers = { 

1018 **self._headers, 

1019 _CONTENT_TYPE_HEADER: content_type, 

1020 } 

1021 return _POST, initiate_url, None, headers 

1022 

1023 def _process_initiate_response(self, response): 

1024 """Process the response from an HTTP request that initiated the upload. 

1025 

1026 This is everything that must be done after a request that doesn't 

1027 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1028 philosophy. 

1029 

1030 This method takes the URL from the ``Location`` header and stores it 

1031 for future use. Within that URL, we assume the ``upload_id`` query 

1032 parameter has been included, but we do not check. 

1033 

1034 Args: 

1035 response (object): The HTTP response object. 

1036 

1037 Raises: 

1038 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

1039 code is not 200. 

1040 

1041 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1042 """ 

1043 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

1044 root = ElementTree.fromstring(response.text) 

1045 self._upload_id = root.find(_S3_COMPAT_XML_NAMESPACE + _UPLOAD_ID_NODE).text 

1046 

1047 def initiate( 

1048 self, 

1049 transport, 

1050 content_type, 

1051 timeout=None, 

1052 ): 

1053 """Initiate an MPU and record the upload ID. 

1054 

1055 Args: 

1056 transport (object): An object which can make authenticated 

1057 requests. 

1058 content_type (str): The content type of the resource, e.g. a JPEG 

1059 image has content type ``image/jpeg``. 

1060 timeout (Optional[Union[float, Tuple[float, float]]]): 

1061 The number of seconds to wait for the server response. 

1062 Depending on the retry strategy, a request may be repeated 

1063 several times using the same timeout each time. 

1064 

1065 Can also be passed as a tuple (connect_timeout, read_timeout). 

1066 See :meth:`requests.Session.request` documentation for details. 

1067 

1068 Raises: 

1069 NotImplementedError: Always, since virtual. 

1070 """ 

1071 raise NotImplementedError("This implementation is virtual.") 

1072 

1073 def _prepare_finalize_request(self): 

1074 """Prepare the contents of an HTTP request to finalize the upload. 

1075 

1076 All of the parts must be registered before calling this method. 

1077 

1078 Returns: 

1079 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1080 

1081 * HTTP verb for the request (always POST) 

1082 * the URL for the request 

1083 * the body of the request 

1084 * headers for the request 

1085 

1086 Raises: 

1087 ValueError: If the upload has not been initiated. 

1088 """ 

1089 if self.upload_id is None: 

1090 raise ValueError("This upload has not yet been initiated.") 

1091 

1092 final_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id) 

1093 finalize_url = self.upload_url + final_query 

1094 final_xml_root = ElementTree.Element("CompleteMultipartUpload") 

1095 for part_number, etag in self._parts.items(): 

1096 part = ElementTree.SubElement(final_xml_root, "Part") # put in a loop 

1097 ElementTree.SubElement(part, "PartNumber").text = str(part_number) 

1098 ElementTree.SubElement(part, "ETag").text = etag 

1099 payload = ElementTree.tostring(final_xml_root) 

1100 return _POST, finalize_url, payload, self._headers 

1101 

1102 def _process_finalize_response(self, response): 

1103 """Process the response from an HTTP request that finalized the upload. 

1104 

1105 This is everything that must be done after a request that doesn't 

1106 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1107 philosophy. 

1108 

1109 Args: 

1110 response (object): The HTTP response object. 

1111 

1112 Raises: 

1113 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

1114 code is not 200. 

1115 

1116 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1117 """ 

1118 

1119 _helpers.require_status_code(response, (http.client.OK,), self._get_status_code) 

1120 self._finished = True 

1121 

1122 def finalize( 

1123 self, 

1124 transport, 

1125 timeout=None, 

1126 ): 

1127 """Finalize an MPU request with all the parts. 

1128 

1129 Args: 

1130 transport (object): An object which can make authenticated 

1131 requests. 

1132 timeout (Optional[Union[float, Tuple[float, float]]]): 

1133 The number of seconds to wait for the server response. 

1134 Depending on the retry strategy, a request may be repeated 

1135 several times using the same timeout each time. 

1136 

1137 Can also be passed as a tuple (connect_timeout, read_timeout). 

1138 See :meth:`requests.Session.request` documentation for details. 

1139 

1140 Raises: 

1141 NotImplementedError: Always, since virtual. 

1142 """ 

1143 raise NotImplementedError("This implementation is virtual.") 

1144 

1145 def _prepare_cancel_request(self): 

1146 """Prepare the contents of an HTTP request to cancel the upload. 

1147 

1148 Returns: 

1149 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1150 

1151 * HTTP verb for the request (always DELETE) 

1152 * the URL for the request 

1153 * the body of the request 

1154 * headers for the request 

1155 

1156 Raises: 

1157 ValueError: If the upload has not been initiated. 

1158 """ 

1159 if self.upload_id is None: 

1160 raise ValueError("This upload has not yet been initiated.") 

1161 

1162 cancel_query = _MPU_FINAL_QUERY_TEMPLATE.format(upload_id=self._upload_id) 

1163 cancel_url = self.upload_url + cancel_query 

1164 return _DELETE, cancel_url, None, self._headers 

1165 

1166 def _process_cancel_response(self, response): 

1167 """Process the response from an HTTP request that canceled the upload. 

1168 

1169 This is everything that must be done after a request that doesn't 

1170 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1171 philosophy. 

1172 

1173 Args: 

1174 response (object): The HTTP response object. 

1175 

1176 Raises: 

1177 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

1178 code is not 204. 

1179 

1180 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1181 """ 

1182 

1183 _helpers.require_status_code( 

1184 response, (http.client.NO_CONTENT,), self._get_status_code 

1185 ) 

1186 

1187 def cancel( 

1188 self, 

1189 transport, 

1190 timeout=None, 

1191 ): 

1192 """Cancel an MPU request and permanently delete any uploaded parts. 

1193 

1194 This cannot be undone. 

1195 

1196 Args: 

1197 transport (object): An object which can make authenticated 

1198 requests. 

1199 timeout (Optional[Union[float, Tuple[float, float]]]): 

1200 The number of seconds to wait for the server response. 

1201 Depending on the retry strategy, a request may be repeated 

1202 several times using the same timeout each time. 

1203 

1204 Can also be passed as a tuple (connect_timeout, read_timeout). 

1205 See :meth:`requests.Session.request` documentation for details. 

1206 

1207 Raises: 

1208 NotImplementedError: Always, since virtual. 

1209 """ 

1210 raise NotImplementedError("This implementation is virtual.") 

1211 

1212 

1213class XMLMPUPart(UploadBase): 

1214 """Upload a single part of an existing XML MPU container. 

1215 

1216 An XML MPU sends an initial request and then receives an upload ID. 

1217 Using the upload ID, the upload is then done in numbered parts and the 

1218 parts can be uploaded concurrently. 

1219 

1220 In order to avoid concurrency issues with the container object, the 

1221 uploading of individual parts is handled separately by multiple objects 

1222 of this class. Once a part is uploaded, it can be registered with the 

1223 container with `container.register_part(part.part_number, part.etag)`. 

1224 

1225 MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous 

1226 given the JSON multipart upload, so the abbreviation "MPU" will be used 

1227 throughout. 

1228 

1229 See: https://cloud.google.com/storage/docs/multipart-uploads 

1230 

1231 Args: 

1232 upload_url (str): The URL of the object (without query parameters). 

1233 upload_id (str): The ID of the upload from the initialization response. 

1234 filename (str): The name (path) of the file to upload. 

1235 start (int): The byte index of the beginning of the part. 

1236 end (int): The byte index of the end of the part. 

1237 part_number (int): The part number. Part numbers will be assembled in 

1238 sequential order when the container is finalized. 

1239 headers (Optional[Mapping[str, str]]): Extra headers that should 

1240 be sent with every request. 

1241 checksum (Optional([str])): The type of checksum to compute to verify 

1242 the integrity of the object. The request headers will be amended 

1243 to include the computed value. Supported values are "md5", "crc32c", 

1244 "auto" and None. The default is "auto", which will try to detect if 

1245 the C extension for crc32c is installed and fall back to md5 

1246 otherwise. 

1247 retry (Optional[google.api_core.retry.Retry]): How to retry the 

1248 RPC. A None value will disable retries. A 

1249 google.api_core.retry.Retry value will enable retries, and the 

1250 object will configure backoff and timeout options. 

1251 

1252 See the retry.py source code and docstrings in this package 

1253 (google.cloud.storage.retry) for information on retry types and how 

1254 to configure them. 

1255 

1256 Attributes: 

1257 upload_url (str): The URL of the object (without query parameters). 

1258 upload_id (str): The ID of the upload from the initialization response. 

1259 filename (str): The name (path) of the file to upload. 

1260 start (int): The byte index of the beginning of the part. 

1261 end (int): The byte index of the end of the part. 

1262 part_number (int): The part number. Part numbers will be assembled in 

1263 sequential order when the container is finalized. 

1264 etag (Optional(str)): The etag returned by the service after upload. 

1265 """ 

1266 

1267 def __init__( 

1268 self, 

1269 upload_url, 

1270 upload_id, 

1271 filename, 

1272 start, 

1273 end, 

1274 part_number, 

1275 headers=None, 

1276 checksum="auto", 

1277 retry=DEFAULT_RETRY, 

1278 ): 

1279 super().__init__(upload_url, headers=headers, retry=retry) 

1280 self._filename = filename 

1281 self._start = start 

1282 self._end = end 

1283 self._upload_id = upload_id 

1284 self._part_number = part_number 

1285 self._etag = None 

1286 self._checksum_type = checksum 

1287 if self._checksum_type == "auto": 

1288 self._checksum_type = ( 

1289 "crc32c" if _helpers._is_crc32c_available_and_fast() else "md5" 

1290 ) 

1291 self._checksum_object = None 

1292 

1293 @property 

1294 def part_number(self): 

1295 return self._part_number 

1296 

1297 @property 

1298 def upload_id(self): 

1299 return self._upload_id 

1300 

1301 @property 

1302 def filename(self): 

1303 return self._filename 

1304 

1305 @property 

1306 def etag(self): 

1307 return self._etag 

1308 

1309 @property 

1310 def start(self): 

1311 return self._start 

1312 

1313 @property 

1314 def end(self): 

1315 return self._end 

1316 

1317 def _prepare_upload_request(self): 

1318 """Prepare the contents of HTTP request to upload a part. 

1319 

1320 This is everything that must be done before a request that doesn't 

1321 require network I/O. This is based on the `sans-I/O`_ philosophy. 

1322 

1323 For the time being, this **does require** some form of I/O to read 

1324 a part from ``stream`` (via :func:`get_part_payload`). However, this 

1325 will (almost) certainly not be network I/O. 

1326 

1327 Returns: 

1328 Tuple[str, str, bytes, Mapping[str, str]]: The quadruple 

1329 

1330 * HTTP verb for the request (always PUT) 

1331 * the URL for the request 

1332 * the body of the request 

1333 * headers for the request 

1334 

1335 The headers incorporate the ``_headers`` on the current instance. 

1336 

1337 Raises: 

1338 ValueError: If the current upload has finished. 

1339 

1340 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1341 """ 

1342 if self.finished: 

1343 raise ValueError("This part has already been uploaded.") 

1344 

1345 with open(self._filename, "br") as f: 

1346 f.seek(self._start) 

1347 payload = f.read(self._end - self._start) 

1348 

1349 self._checksum_object = _helpers._get_checksum_object(self._checksum_type) 

1350 if self._checksum_object is not None: 

1351 self._checksum_object.update(payload) 

1352 

1353 part_query = _MPU_PART_QUERY_TEMPLATE.format( 

1354 part=self._part_number, upload_id=self._upload_id 

1355 ) 

1356 upload_url = self.upload_url + part_query 

1357 return _PUT, upload_url, payload, self._headers 

1358 

1359 def _process_upload_response(self, response): 

1360 """Process the response from an HTTP request. 

1361 

1362 This is everything that must be done after a request that doesn't 

1363 require network I/O (or other I/O). This is based on the `sans-I/O`_ 

1364 philosophy. 

1365 

1366 Args: 

1367 response (object): The HTTP response object. 

1368 

1369 Raises: 

1370 ~google.cloud.storage.exceptions.InvalidResponse: If the status 

1371 code is not 200 or the response is missing data. 

1372 

1373 .. _sans-I/O: https://sans-io.readthedocs.io/ 

1374 """ 

1375 _helpers.require_status_code( 

1376 response, 

1377 (http.client.OK,), 

1378 self._get_status_code, 

1379 ) 

1380 

1381 self._validate_checksum(response) 

1382 

1383 etag = _helpers.header_required(response, "etag", self._get_headers) 

1384 self._etag = etag 

1385 self._finished = True 

1386 

1387 def upload( 

1388 self, 

1389 transport, 

1390 timeout=None, 

1391 ): 

1392 """Upload the part. 

1393 

1394 Args: 

1395 transport (object): An object which can make authenticated 

1396 requests. 

1397 timeout (Optional[Union[float, Tuple[float, float]]]): 

1398 The number of seconds to wait for the server response. 

1399 Depending on the retry strategy, a request may be repeated 

1400 several times using the same timeout each time. 

1401 

1402 Can also be passed as a tuple (connect_timeout, read_timeout). 

1403 See :meth:`requests.Session.request` documentation for details. 

1404 

1405 Raises: 

1406 NotImplementedError: Always, since virtual. 

1407 """ 

1408 raise NotImplementedError("This implementation is virtual.") 

1409 

1410 def _validate_checksum(self, response): 

1411 """Check the computed checksum, if any, against the response headers. 

1412 

1413 Args: 

1414 response (object): The HTTP response object. 

1415 

1416 Raises: 

1417 ~google.cloud.storage.exceptions.DataCorruption: If the checksum 

1418 computed locally and the checksum reported by the remote host do 

1419 not match. 

1420 """ 

1421 if self._checksum_type is None: 

1422 return 

1423 

1424 remote_checksum = _helpers._get_uploaded_checksum_from_headers( 

1425 response, self._get_headers, self._checksum_type 

1426 ) 

1427 

1428 if remote_checksum is None: 

1429 metadata_key = _helpers._get_metadata_key(self._checksum_type) 

1430 raise InvalidResponse( 

1431 response, 

1432 _UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key), 

1433 self._get_headers(response), 

1434 ) 

1435 local_checksum = _helpers.prepare_checksum_digest( 

1436 self._checksum_object.digest() 

1437 ) 

1438 if local_checksum != remote_checksum: 

1439 raise DataCorruption( 

1440 response, 

1441 _UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( 

1442 self._checksum_type.upper(), local_checksum, remote_checksum 

1443 ), 

1444 ) 

1445 

1446 

1447def get_boundary(): 

1448 """Get a random boundary for a multipart request. 

1449 

1450 Returns: 

1451 bytes: The boundary used to separate parts of a multipart request. 

1452 """ 

1453 random_int = random.randrange(sys.maxsize) 

1454 boundary = _BOUNDARY_FORMAT.format(random_int) 

1455 # NOTE: Neither % formatting nor .format() are available for byte strings 

1456 # in Python 3.4, so we must use unicode strings as templates. 

1457 return boundary.encode("utf-8") 

1458 

1459 

1460def construct_multipart_request(data, metadata, content_type): 

1461 """Construct a multipart request body. 

1462 

1463 Args: 

1464 data (bytes): The resource content (UTF-8 encoded as bytes) 

1465 to be uploaded. 

1466 metadata (Mapping[str, str]): The resource metadata, such as an 

1467 ACL list. 

1468 content_type (str): The content type of the resource, e.g. a JPEG 

1469 image has content type ``image/jpeg``. 

1470 

1471 Returns: 

1472 Tuple[bytes, bytes]: The multipart request body and the boundary used 

1473 between each part. 

1474 """ 

1475 multipart_boundary = get_boundary() 

1476 json_bytes = json.dumps(metadata).encode("utf-8") 

1477 content_type = content_type.encode("utf-8") 

1478 # Combine the two parts into a multipart payload. 

1479 # NOTE: We'd prefer a bytes template but are restricted by Python 3.4. 

1480 boundary_sep = _MULTIPART_SEP + multipart_boundary 

1481 content = ( 

1482 boundary_sep 

1483 + _MULTIPART_BEGIN 

1484 + json_bytes 

1485 + _CRLF 

1486 + boundary_sep 

1487 + _CRLF 

1488 + b"content-type: " 

1489 + content_type 

1490 + _CRLF 

1491 + _CRLF 

1492 + data # Empty line between headers and body. 

1493 + _CRLF 

1494 + boundary_sep 

1495 + _MULTIPART_SEP 

1496 ) 

1497 

1498 return content, multipart_boundary 

1499 

1500 

1501def get_total_bytes(stream): 

1502 """Determine the total number of bytes in a stream. 

1503 

1504 Args: 

1505 stream (IO[bytes]): The stream (i.e. file-like object). 

1506 

1507 Returns: 

1508 int: The number of bytes. 

1509 """ 

1510 current_position = stream.tell() 

1511 # NOTE: ``.seek()`` **should** return the same value that ``.tell()`` 

1512 # returns, but in Python 2, ``file`` objects do not. 

1513 stream.seek(0, os.SEEK_END) 

1514 end_position = stream.tell() 

1515 # Go back to the initial position. 

1516 stream.seek(current_position) 

1517 

1518 return end_position 

1519 

1520 

1521def get_next_chunk(stream, chunk_size, total_bytes): 

1522 """Get a chunk from an I/O stream. 

1523 

1524 The ``stream`` may have fewer bytes remaining than ``chunk_size`` 

1525 so it may not always be the case that 

1526 ``end_byte == start_byte + chunk_size - 1``. 

1527 

1528 Args: 

1529 stream (IO[bytes]): The stream (i.e. file-like object). 

1530 chunk_size (int): The size of the chunk to be read from the ``stream``. 

1531 total_bytes (Optional[int]): The (expected) total number of bytes 

1532 in the ``stream``. 

1533 

1534 Returns: 

1535 Tuple[int, bytes, str]: Triple of: 

1536 

1537 * the start byte index 

1538 * the content in between the start and end bytes (inclusive) 

1539 * content range header for the chunk (slice) that has been read 

1540 

1541 Raises: 

1542 ValueError: If ``total_bytes == 0`` but ``stream.read()`` yields 

1543 non-empty content. 

1544 ValueError: If there is no data left to consume. This corresponds 

1545 exactly to the case ``end_byte < start_byte``, which can only 

1546 occur if ``end_byte == start_byte - 1``. 

1547 """ 

1548 start_byte = stream.tell() 

1549 if total_bytes is not None and start_byte + chunk_size >= total_bytes > 0: 

1550 payload = stream.read(total_bytes - start_byte) 

1551 else: 

1552 payload = stream.read(chunk_size) 

1553 end_byte = stream.tell() - 1 

1554 

1555 num_bytes_read = len(payload) 

1556 if total_bytes is None: 

1557 if num_bytes_read < chunk_size: 

1558 # We now **KNOW** the total number of bytes. 

1559 total_bytes = end_byte + 1 

1560 elif total_bytes == 0: 

1561 # NOTE: We also expect ``start_byte == 0`` here but don't check 

1562 # because ``_prepare_initiate_request()`` requires the 

1563 # stream to be at the beginning. 

1564 if num_bytes_read != 0: 

1565 raise ValueError( 

1566 "Stream specified as empty, but produced non-empty content." 

1567 ) 

1568 else: 

1569 if num_bytes_read == 0: 

1570 raise ValueError( 

1571 "Stream is already exhausted. There is no content remaining." 

1572 ) 

1573 

1574 content_range = get_content_range(start_byte, end_byte, total_bytes) 

1575 return start_byte, payload, content_range 

1576 

1577 

1578def get_content_range(start_byte, end_byte, total_bytes): 

1579 """Convert start, end and total into content range header. 

1580 

1581 If ``total_bytes`` is not known, uses "bytes {start}-{end}/*". 

1582 If we are dealing with an empty range (i.e. ``end_byte < start_byte``) 

1583 then "bytes */{total}" is used. 

1584 

1585 This function **ASSUMES** that if the size is not known, the caller will 

1586 not also pass an empty range. 

1587 

1588 Args: 

1589 start_byte (int): The start (inclusive) of the byte range. 

1590 end_byte (int): The end (inclusive) of the byte range. 

1591 total_bytes (Optional[int]): The number of bytes in the byte 

1592 range (if known). 

1593 

1594 Returns: 

1595 str: The content range header. 

1596 """ 

1597 if total_bytes is None: 

1598 return _RANGE_UNKNOWN_TEMPLATE.format(start_byte, end_byte) 

1599 elif end_byte < start_byte: 

1600 return _EMPTY_RANGE_TEMPLATE.format(total_bytes) 

1601 else: 

1602 return _CONTENT_RANGE_TEMPLATE.format(start_byte, end_byte, total_bytes)