Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/s3transfer/upload.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

263 statements  

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import math 

14from io import BytesIO 

15 

16from s3transfer.compat import readable, seekable 

17from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS 

18from s3transfer.futures import IN_MEMORY_UPLOAD_TAG 

19from s3transfer.tasks import ( 

20 CompleteMultipartUploadTask, 

21 CreateMultipartUploadTask, 

22 SubmissionTask, 

23 Task, 

24) 

25from s3transfer.utils import ( 

26 ChunksizeAdjuster, 

27 DeferredOpenFile, 

28 get_callbacks, 

29 get_filtered_dict, 

30) 

31 

32 

33class AggregatedProgressCallback: 

34 def __init__(self, callbacks, threshold=1024 * 256): 

35 """Aggregates progress updates for every provided progress callback 

36 

37 :type callbacks: A list of functions that accepts bytes_transferred 

38 as a single argument 

39 :param callbacks: The callbacks to invoke when threshold is reached 

40 

41 :type threshold: int 

42 :param threshold: The progress threshold in which to take the 

43 aggregated progress and invoke the progress callback with that 

44 aggregated progress total 

45 """ 

46 self._callbacks = callbacks 

47 self._threshold = threshold 

48 self._bytes_seen = 0 

49 

50 def __call__(self, bytes_transferred): 

51 self._bytes_seen += bytes_transferred 

52 if self._bytes_seen >= self._threshold: 

53 self._trigger_callbacks() 

54 

55 def flush(self): 

56 """Flushes out any progress that has not been sent to its callbacks""" 

57 if self._bytes_seen > 0: 

58 self._trigger_callbacks() 

59 

60 def _trigger_callbacks(self): 

61 for callback in self._callbacks: 

62 callback(bytes_transferred=self._bytes_seen) 

63 self._bytes_seen = 0 

64 

65 

66class InterruptReader: 

67 """Wrapper that can interrupt reading using an error 

68 

69 It uses a transfer coordinator to propagate an error if it notices 

70 that a read is being made while the file is being read from. 

71 

72 :type fileobj: file-like obj 

73 :param fileobj: The file-like object to read from 

74 

75 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

76 :param transfer_coordinator: The transfer coordinator to use if the 

77 reader needs to be interrupted. 

78 """ 

79 

80 def __init__(self, fileobj, transfer_coordinator): 

81 self._fileobj = fileobj 

82 self._transfer_coordinator = transfer_coordinator 

83 

84 def read(self, amount=None): 

85 # If there is an exception, then raise the exception. 

86 # We raise an error instead of returning no bytes because for 

87 # requests where the content length and md5 was sent, it will 

88 # cause md5 mismatches and retries as there was no indication that 

89 # the stream being read from encountered any issues. 

90 if self._transfer_coordinator.exception: 

91 raise self._transfer_coordinator.exception 

92 return self._fileobj.read(amount) 

93 

94 def seek(self, where, whence=0): 

95 self._fileobj.seek(where, whence) 

96 

97 def tell(self): 

98 return self._fileobj.tell() 

99 

100 def close(self): 

101 self._fileobj.close() 

102 

103 def __enter__(self): 

104 return self 

105 

106 def __exit__(self, *args, **kwargs): 

107 self.close() 

108 

109 

110class UploadInputManager: 

111 """Base manager class for handling various types of files for uploads 

112 

113 This class is typically used for the UploadSubmissionTask class to help 

114 determine the following: 

115 

116 * How to determine the size of the file 

117 * How to determine if a multipart upload is required 

118 * How to retrieve the body for a PutObject 

119 * How to retrieve the bodies for a set of UploadParts 

120 

121 The answers/implementations differ for the various types of file inputs 

122 that may be accepted. All implementations must subclass and override 

123 public methods from this class. 

124 """ 

125 

126 def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None): 

127 self._osutil = osutil 

128 self._transfer_coordinator = transfer_coordinator 

129 self._bandwidth_limiter = bandwidth_limiter 

130 

131 @classmethod 

132 def is_compatible(cls, upload_source): 

133 """Determines if the source for the upload is compatible with manager 

134 

135 :param upload_source: The source for which the upload will pull data 

136 from. 

137 

138 :returns: True if the manager can handle the type of source specified 

139 otherwise returns False. 

140 """ 

141 raise NotImplementedError('must implement _is_compatible()') 

142 

143 def stores_body_in_memory(self, operation_name): 

144 """Whether the body it provides are stored in-memory 

145 

146 :type operation_name: str 

147 :param operation_name: The name of the client operation that the body 

148 is being used for. Valid operation_names are ``put_object`` and 

149 ``upload_part``. 

150 

151 :rtype: boolean 

152 :returns: True if the body returned by the manager will be stored in 

153 memory. False if the manager will not directly store the body in 

154 memory. 

155 """ 

156 raise NotImplementedError('must implement store_body_in_memory()') 

157 

158 def provide_transfer_size(self, transfer_future): 

159 """Provides the transfer size of an upload 

160 

161 :type transfer_future: s3transfer.futures.TransferFuture 

162 :param transfer_future: The future associated with upload request 

163 """ 

164 raise NotImplementedError('must implement provide_transfer_size()') 

165 

166 def requires_multipart_upload(self, transfer_future, config): 

167 """Determines where a multipart upload is required 

168 

169 :type transfer_future: s3transfer.futures.TransferFuture 

170 :param transfer_future: The future associated with upload request 

171 

172 :type config: s3transfer.manager.TransferConfig 

173 :param config: The config associated to the transfer manager 

174 

175 :rtype: boolean 

176 :returns: True, if the upload should be multipart based on 

177 configuration and size. False, otherwise. 

178 """ 

179 raise NotImplementedError('must implement requires_multipart_upload()') 

180 

181 def get_put_object_body(self, transfer_future): 

182 """Returns the body to use for PutObject 

183 

184 :type transfer_future: s3transfer.futures.TransferFuture 

185 :param transfer_future: The future associated with upload request 

186 

187 :type config: s3transfer.manager.TransferConfig 

188 :param config: The config associated to the transfer manager 

189 

190 :rtype: s3transfer.utils.ReadFileChunk 

191 :returns: A ReadFileChunk including all progress callbacks 

192 associated with the transfer future. 

193 """ 

194 raise NotImplementedError('must implement get_put_object_body()') 

195 

196 def yield_upload_part_bodies(self, transfer_future, chunksize): 

197 """Yields the part number and body to use for each UploadPart 

198 

199 :type transfer_future: s3transfer.futures.TransferFuture 

200 :param transfer_future: The future associated with upload request 

201 

202 :type chunksize: int 

203 :param chunksize: The chunksize to use for this upload. 

204 

205 :rtype: int, s3transfer.utils.ReadFileChunk 

206 :returns: Yields the part number and the ReadFileChunk including all 

207 progress callbacks associated with the transfer future for that 

208 specific yielded part. 

209 """ 

210 raise NotImplementedError('must implement yield_upload_part_bodies()') 

211 

212 def _wrap_fileobj(self, fileobj): 

213 fileobj = InterruptReader(fileobj, self._transfer_coordinator) 

214 if self._bandwidth_limiter: 

215 fileobj = self._bandwidth_limiter.get_bandwith_limited_stream( 

216 fileobj, self._transfer_coordinator, enabled=False 

217 ) 

218 return fileobj 

219 

220 def _get_progress_callbacks(self, transfer_future): 

221 callbacks = get_callbacks(transfer_future, 'progress') 

222 # We only want to be wrapping the callbacks if there are callbacks to 

223 # invoke because we do not want to be doing any unnecessary work if 

224 # there are no callbacks to invoke. 

225 if callbacks: 

226 return [AggregatedProgressCallback(callbacks)] 

227 return [] 

228 

229 def _get_close_callbacks(self, aggregated_progress_callbacks): 

230 return [callback.flush for callback in aggregated_progress_callbacks] 

231 

232 

233class UploadFilenameInputManager(UploadInputManager): 

234 """Upload utility for filenames""" 

235 

236 @classmethod 

237 def is_compatible(cls, upload_source): 

238 return isinstance(upload_source, str) 

239 

240 def stores_body_in_memory(self, operation_name): 

241 return False 

242 

243 def provide_transfer_size(self, transfer_future): 

244 transfer_future.meta.provide_transfer_size( 

245 self._osutil.get_file_size(transfer_future.meta.call_args.fileobj) 

246 ) 

247 

248 def requires_multipart_upload(self, transfer_future, config): 

249 return transfer_future.meta.size >= config.multipart_threshold 

250 

251 def get_put_object_body(self, transfer_future): 

252 # Get a file-like object for the given input 

253 fileobj, full_size = self._get_put_object_fileobj_with_full_size( 

254 transfer_future 

255 ) 

256 

257 # Wrap fileobj with interrupt reader that will quickly cancel 

258 # uploads if needed instead of having to wait for the socket 

259 # to completely read all of the data. 

260 fileobj = self._wrap_fileobj(fileobj) 

261 

262 callbacks = self._get_progress_callbacks(transfer_future) 

263 close_callbacks = self._get_close_callbacks(callbacks) 

264 size = transfer_future.meta.size 

265 # Return the file-like object wrapped into a ReadFileChunk to get 

266 # progress. 

267 return self._osutil.open_file_chunk_reader_from_fileobj( 

268 fileobj=fileobj, 

269 chunk_size=size, 

270 full_file_size=full_size, 

271 callbacks=callbacks, 

272 close_callbacks=close_callbacks, 

273 ) 

274 

275 def yield_upload_part_bodies(self, transfer_future, chunksize): 

276 full_file_size = transfer_future.meta.size 

277 num_parts = self._get_num_parts(transfer_future, chunksize) 

278 for part_number in range(1, num_parts + 1): 

279 callbacks = self._get_progress_callbacks(transfer_future) 

280 close_callbacks = self._get_close_callbacks(callbacks) 

281 start_byte = chunksize * (part_number - 1) 

282 # Get a file-like object for that part and the size of the full 

283 # file size for the associated file-like object for that part. 

284 fileobj, full_size = self._get_upload_part_fileobj_with_full_size( 

285 transfer_future.meta.call_args.fileobj, 

286 start_byte=start_byte, 

287 part_size=chunksize, 

288 full_file_size=full_file_size, 

289 ) 

290 

291 # Wrap fileobj with interrupt reader that will quickly cancel 

292 # uploads if needed instead of having to wait for the socket 

293 # to completely read all of the data. 

294 fileobj = self._wrap_fileobj(fileobj) 

295 

296 # Wrap the file-like object into a ReadFileChunk to get progress. 

297 read_file_chunk = self._osutil.open_file_chunk_reader_from_fileobj( 

298 fileobj=fileobj, 

299 chunk_size=chunksize, 

300 full_file_size=full_size, 

301 callbacks=callbacks, 

302 close_callbacks=close_callbacks, 

303 ) 

304 yield part_number, read_file_chunk 

305 

306 def _get_deferred_open_file(self, fileobj, start_byte): 

307 fileobj = DeferredOpenFile( 

308 fileobj, start_byte, open_function=self._osutil.open 

309 ) 

310 return fileobj 

311 

312 def _get_put_object_fileobj_with_full_size(self, transfer_future): 

313 fileobj = transfer_future.meta.call_args.fileobj 

314 size = transfer_future.meta.size 

315 return self._get_deferred_open_file(fileobj, 0), size 

316 

317 def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs): 

318 start_byte = kwargs['start_byte'] 

319 full_size = kwargs['full_file_size'] 

320 return self._get_deferred_open_file(fileobj, start_byte), full_size 

321 

322 def _get_num_parts(self, transfer_future, part_size): 

323 return int(math.ceil(transfer_future.meta.size / float(part_size))) 

324 

325 

326class UploadSeekableInputManager(UploadFilenameInputManager): 

327 """Upload utility for an open file object""" 

328 

329 @classmethod 

330 def is_compatible(cls, upload_source): 

331 return readable(upload_source) and seekable(upload_source) 

332 

333 def stores_body_in_memory(self, operation_name): 

334 if operation_name == 'put_object': 

335 return False 

336 else: 

337 return True 

338 

339 def provide_transfer_size(self, transfer_future): 

340 fileobj = transfer_future.meta.call_args.fileobj 

341 # To determine size, first determine the starting position 

342 # Seek to the end and then find the difference in the length 

343 # between the end and start positions. 

344 start_position = fileobj.tell() 

345 fileobj.seek(0, 2) 

346 end_position = fileobj.tell() 

347 fileobj.seek(start_position) 

348 transfer_future.meta.provide_transfer_size( 

349 end_position - start_position 

350 ) 

351 

352 def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs): 

353 # Note: It is unfortunate that in order to do a multithreaded 

354 # multipart upload we cannot simply copy the filelike object 

355 # since there is not really a mechanism in python (i.e. os.dup 

356 # points to the same OS filehandle which causes concurrency 

357 # issues). So instead we need to read from the fileobj and 

358 # chunk the data out to separate file-like objects in memory. 

359 data = fileobj.read(kwargs['part_size']) 

360 # We return the length of the data instead of the full_file_size 

361 # because we partitioned the data into separate BytesIO objects 

362 # meaning the BytesIO object has no knowledge of its start position 

363 # relative the input source nor access to the rest of the input 

364 # source. So we must treat it as its own standalone file. 

365 return BytesIO(data), len(data) 

366 

367 def _get_put_object_fileobj_with_full_size(self, transfer_future): 

368 fileobj = transfer_future.meta.call_args.fileobj 

369 # The current position needs to be taken into account when retrieving 

370 # the full size of the file. 

371 size = fileobj.tell() + transfer_future.meta.size 

372 return fileobj, size 

373 

374 

375class UploadNonSeekableInputManager(UploadInputManager): 

376 """Upload utility for a file-like object that cannot seek.""" 

377 

378 def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None): 

379 super().__init__(osutil, transfer_coordinator, bandwidth_limiter) 

380 self._initial_data = b'' 

381 

382 @classmethod 

383 def is_compatible(cls, upload_source): 

384 return readable(upload_source) 

385 

386 def stores_body_in_memory(self, operation_name): 

387 return True 

388 

389 def provide_transfer_size(self, transfer_future): 

390 # No-op because there is no way to do this short of reading the entire 

391 # body into memory. 

392 return 

393 

394 def requires_multipart_upload(self, transfer_future, config): 

395 # If the user has set the size, we can use that. 

396 if transfer_future.meta.size is not None: 

397 return transfer_future.meta.size >= config.multipart_threshold 

398 

399 # This is tricky to determine in this case because we can't know how 

400 # large the input is. So to figure it out, we read data into memory 

401 # up until the threshold and compare how much data was actually read 

402 # against the threshold. 

403 fileobj = transfer_future.meta.call_args.fileobj 

404 threshold = config.multipart_threshold 

405 self._initial_data = self._read(fileobj, threshold, False) 

406 if len(self._initial_data) < threshold: 

407 return False 

408 else: 

409 return True 

410 

411 def get_put_object_body(self, transfer_future): 

412 callbacks = self._get_progress_callbacks(transfer_future) 

413 close_callbacks = self._get_close_callbacks(callbacks) 

414 fileobj = transfer_future.meta.call_args.fileobj 

415 

416 body = self._wrap_data( 

417 self._initial_data + fileobj.read(), callbacks, close_callbacks 

418 ) 

419 

420 # Zero out the stored data so we don't have additional copies 

421 # hanging around in memory. 

422 self._initial_data = None 

423 return body 

424 

425 def yield_upload_part_bodies(self, transfer_future, chunksize): 

426 file_object = transfer_future.meta.call_args.fileobj 

427 part_number = 0 

428 

429 # Continue reading parts from the file-like object until it is empty. 

430 while True: 

431 callbacks = self._get_progress_callbacks(transfer_future) 

432 close_callbacks = self._get_close_callbacks(callbacks) 

433 part_number += 1 

434 part_content = self._read(file_object, chunksize) 

435 if not part_content: 

436 break 

437 part_object = self._wrap_data( 

438 part_content, callbacks, close_callbacks 

439 ) 

440 

441 # Zero out part_content to avoid hanging on to additional data. 

442 part_content = None 

443 yield part_number, part_object 

444 

445 def _read(self, fileobj, amount, truncate=True): 

446 """ 

447 Reads a specific amount of data from a stream and returns it. If there 

448 is any data in initial_data, that will be popped out first. 

449 

450 :type fileobj: A file-like object that implements read 

451 :param fileobj: The stream to read from. 

452 

453 :type amount: int 

454 :param amount: The number of bytes to read from the stream. 

455 

456 :type truncate: bool 

457 :param truncate: Whether or not to truncate initial_data after 

458 reading from it. 

459 

460 :return: Generator which generates part bodies from the initial data. 

461 """ 

462 # If the the initial data is empty, we simply read from the fileobj 

463 if len(self._initial_data) == 0: 

464 return fileobj.read(amount) 

465 

466 # If the requested number of bytes is less than the amount of 

467 # initial data, pull entirely from initial data. 

468 if amount <= len(self._initial_data): 

469 data = self._initial_data[:amount] 

470 # Truncate initial data so we don't hang onto the data longer 

471 # than we need. 

472 if truncate: 

473 self._initial_data = self._initial_data[amount:] 

474 return data 

475 

476 # At this point there is some initial data left, but not enough to 

477 # satisfy the number of bytes requested. Pull out the remaining 

478 # initial data and read the rest from the fileobj. 

479 amount_to_read = amount - len(self._initial_data) 

480 data = self._initial_data + fileobj.read(amount_to_read) 

481 

482 # Zero out initial data so we don't hang onto the data any more. 

483 if truncate: 

484 self._initial_data = b'' 

485 return data 

486 

487 def _wrap_data(self, data, callbacks, close_callbacks): 

488 """ 

489 Wraps data with the interrupt reader and the file chunk reader. 

490 

491 :type data: bytes 

492 :param data: The data to wrap. 

493 

494 :type callbacks: list 

495 :param callbacks: The callbacks associated with the transfer future. 

496 

497 :type close_callbacks: list 

498 :param close_callbacks: The callbacks to be called when closing the 

499 wrapper for the data. 

500 

501 :return: Fully wrapped data. 

502 """ 

503 fileobj = self._wrap_fileobj(BytesIO(data)) 

504 return self._osutil.open_file_chunk_reader_from_fileobj( 

505 fileobj=fileobj, 

506 chunk_size=len(data), 

507 full_file_size=len(data), 

508 callbacks=callbacks, 

509 close_callbacks=close_callbacks, 

510 ) 

511 

512 

513class UploadSubmissionTask(SubmissionTask): 

514 """Task for submitting tasks to execute an upload""" 

515 

516 PUT_OBJECT_BLOCKLIST = ["ChecksumType", "MpuObjectSize"] 

517 

518 CREATE_MULTIPART_BLOCKLIST = FULL_OBJECT_CHECKSUM_ARGS + ["MpuObjectSize"] 

519 

520 UPLOAD_PART_ARGS = [ 

521 'ChecksumAlgorithm', 

522 'SSECustomerKey', 

523 'SSECustomerAlgorithm', 

524 'SSECustomerKeyMD5', 

525 'RequestPayer', 

526 'ExpectedBucketOwner', 

527 ] 

528 

529 COMPLETE_MULTIPART_ARGS = [ 

530 'SSECustomerKey', 

531 'SSECustomerAlgorithm', 

532 'SSECustomerKeyMD5', 

533 'RequestPayer', 

534 'ExpectedBucketOwner', 

535 'ChecksumType', 

536 'MpuObjectSize', 

537 ] + FULL_OBJECT_CHECKSUM_ARGS 

538 

539 def _get_upload_input_manager_cls(self, transfer_future): 

540 """Retrieves a class for managing input for an upload based on file type 

541 

542 :type transfer_future: s3transfer.futures.TransferFuture 

543 :param transfer_future: The transfer future for the request 

544 

545 :rtype: class of UploadInputManager 

546 :returns: The appropriate class to use for managing a specific type of 

547 input for uploads. 

548 """ 

549 upload_manager_resolver_chain = [ 

550 UploadFilenameInputManager, 

551 UploadSeekableInputManager, 

552 UploadNonSeekableInputManager, 

553 ] 

554 

555 fileobj = transfer_future.meta.call_args.fileobj 

556 for upload_manager_cls in upload_manager_resolver_chain: 

557 if upload_manager_cls.is_compatible(fileobj): 

558 return upload_manager_cls 

559 raise RuntimeError( 

560 f'Input {fileobj} of type: {type(fileobj)} is not supported.' 

561 ) 

562 

563 def _submit( 

564 self, 

565 client, 

566 config, 

567 osutil, 

568 request_executor, 

569 transfer_future, 

570 bandwidth_limiter=None, 

571 ): 

572 """ 

573 :param client: The client associated with the transfer manager 

574 

575 :type config: s3transfer.manager.TransferConfig 

576 :param config: The transfer config associated with the transfer 

577 manager 

578 

579 :type osutil: s3transfer.utils.OSUtil 

580 :param osutil: The os utility associated to the transfer manager 

581 

582 :type request_executor: s3transfer.futures.BoundedExecutor 

583 :param request_executor: The request executor associated with the 

584 transfer manager 

585 

586 :type transfer_future: s3transfer.futures.TransferFuture 

587 :param transfer_future: The transfer future associated with the 

588 transfer request that tasks are being submitted for 

589 """ 

590 upload_input_manager = self._get_upload_input_manager_cls( 

591 transfer_future 

592 )(osutil, self._transfer_coordinator, bandwidth_limiter) 

593 

594 # Determine the size if it was not provided 

595 if transfer_future.meta.size is None: 

596 upload_input_manager.provide_transfer_size(transfer_future) 

597 

598 # Do a multipart upload if needed, otherwise do a regular put object. 

599 if not upload_input_manager.requires_multipart_upload( 

600 transfer_future, config 

601 ): 

602 self._submit_upload_request( 

603 client, 

604 config, 

605 osutil, 

606 request_executor, 

607 transfer_future, 

608 upload_input_manager, 

609 ) 

610 else: 

611 self._submit_multipart_request( 

612 client, 

613 config, 

614 osutil, 

615 request_executor, 

616 transfer_future, 

617 upload_input_manager, 

618 ) 

619 

620 def _submit_upload_request( 

621 self, 

622 client, 

623 config, 

624 osutil, 

625 request_executor, 

626 transfer_future, 

627 upload_input_manager, 

628 ): 

629 call_args = transfer_future.meta.call_args 

630 

631 put_object_extra_args = self._extra_put_object_args( 

632 call_args.extra_args 

633 ) 

634 

635 # Get any tags that need to be associated to the put object task 

636 put_object_tag = self._get_upload_task_tag( 

637 upload_input_manager, 'put_object' 

638 ) 

639 

640 # Submit the request of a single upload. 

641 self._transfer_coordinator.submit( 

642 request_executor, 

643 PutObjectTask( 

644 transfer_coordinator=self._transfer_coordinator, 

645 main_kwargs={ 

646 'client': client, 

647 'fileobj': upload_input_manager.get_put_object_body( 

648 transfer_future 

649 ), 

650 'bucket': call_args.bucket, 

651 'key': call_args.key, 

652 'extra_args': put_object_extra_args, 

653 }, 

654 is_final=True, 

655 ), 

656 tag=put_object_tag, 

657 ) 

658 

659 def _submit_multipart_request( 

660 self, 

661 client, 

662 config, 

663 osutil, 

664 request_executor, 

665 transfer_future, 

666 upload_input_manager, 

667 ): 

668 call_args = transfer_future.meta.call_args 

669 

670 # When a user provided checksum is passed, set "ChecksumType" to "FULL_OBJECT" 

671 # and "ChecksumAlgorithm" to the related algorithm. 

672 for checksum in FULL_OBJECT_CHECKSUM_ARGS: 

673 if checksum in call_args.extra_args: 

674 call_args.extra_args["ChecksumType"] = "FULL_OBJECT" 

675 call_args.extra_args["ChecksumAlgorithm"] = checksum.replace( 

676 "Checksum", "" 

677 ) 

678 

679 create_multipart_extra_args = self._extra_create_multipart_args( 

680 call_args.extra_args 

681 ) 

682 

683 # Submit the request to create a multipart upload. 

684 create_multipart_future = self._transfer_coordinator.submit( 

685 request_executor, 

686 CreateMultipartUploadTask( 

687 transfer_coordinator=self._transfer_coordinator, 

688 main_kwargs={ 

689 'client': client, 

690 'bucket': call_args.bucket, 

691 'key': call_args.key, 

692 'extra_args': create_multipart_extra_args, 

693 }, 

694 ), 

695 ) 

696 

697 # Submit requests to upload the parts of the file. 

698 part_futures = [] 

699 extra_part_args = self._extra_upload_part_args(call_args.extra_args) 

700 

701 # Get any tags that need to be associated to the submitted task 

702 # for upload the data 

703 upload_part_tag = self._get_upload_task_tag( 

704 upload_input_manager, 'upload_part' 

705 ) 

706 

707 size = transfer_future.meta.size 

708 adjuster = ChunksizeAdjuster() 

709 chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size) 

710 part_iterator = upload_input_manager.yield_upload_part_bodies( 

711 transfer_future, chunksize 

712 ) 

713 

714 for part_number, fileobj in part_iterator: 

715 part_futures.append( 

716 self._transfer_coordinator.submit( 

717 request_executor, 

718 UploadPartTask( 

719 transfer_coordinator=self._transfer_coordinator, 

720 main_kwargs={ 

721 'client': client, 

722 'fileobj': fileobj, 

723 'bucket': call_args.bucket, 

724 'key': call_args.key, 

725 'part_number': part_number, 

726 'extra_args': extra_part_args, 

727 }, 

728 pending_main_kwargs={ 

729 'upload_id': create_multipart_future 

730 }, 

731 ), 

732 tag=upload_part_tag, 

733 ) 

734 ) 

735 

736 complete_multipart_extra_args = self._extra_complete_multipart_args( 

737 call_args.extra_args 

738 ) 

739 # Submit the request to complete the multipart upload. 

740 self._transfer_coordinator.submit( 

741 request_executor, 

742 CompleteMultipartUploadTask( 

743 transfer_coordinator=self._transfer_coordinator, 

744 main_kwargs={ 

745 'client': client, 

746 'bucket': call_args.bucket, 

747 'key': call_args.key, 

748 'extra_args': complete_multipart_extra_args, 

749 }, 

750 pending_main_kwargs={ 

751 'upload_id': create_multipart_future, 

752 'parts': part_futures, 

753 }, 

754 is_final=True, 

755 ), 

756 ) 

757 

758 def _extra_upload_part_args(self, extra_args): 

759 # Only the args in UPLOAD_PART_ARGS actually need to be passed 

760 # onto the upload_part calls. 

761 return get_filtered_dict(extra_args, self.UPLOAD_PART_ARGS) 

762 

763 def _extra_complete_multipart_args(self, extra_args): 

764 return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS) 

765 

766 def _extra_create_multipart_args(self, extra_args): 

767 return get_filtered_dict( 

768 extra_args, blocklisted_keys=self.CREATE_MULTIPART_BLOCKLIST 

769 ) 

770 

771 def _extra_put_object_args(self, extra_args): 

772 return get_filtered_dict( 

773 extra_args, blocklisted_keys=self.PUT_OBJECT_BLOCKLIST 

774 ) 

775 

776 def _get_upload_task_tag(self, upload_input_manager, operation_name): 

777 tag = None 

778 if upload_input_manager.stores_body_in_memory(operation_name): 

779 tag = IN_MEMORY_UPLOAD_TAG 

780 return tag 

781 

782 

783class PutObjectTask(Task): 

784 """Task to do a nonmultipart upload""" 

785 

786 def _main(self, client, fileobj, bucket, key, extra_args): 

787 """ 

788 :param client: The client to use when calling PutObject 

789 :param fileobj: The file to upload. 

790 :param bucket: The name of the bucket to upload to 

791 :param key: The name of the key to upload to 

792 :param extra_args: A dictionary of any extra arguments that may be 

793 used in the upload. 

794 """ 

795 with fileobj as body: 

796 client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args) 

797 

798 

799class UploadPartTask(Task): 

800 """Task to upload a part in a multipart upload""" 

801 

802 def _main( 

803 self, client, fileobj, bucket, key, upload_id, part_number, extra_args 

804 ): 

805 """ 

806 :param client: The client to use when calling PutObject 

807 :param fileobj: The file to upload. 

808 :param bucket: The name of the bucket to upload to 

809 :param key: The name of the key to upload to 

810 :param upload_id: The id of the upload 

811 :param part_number: The number representing the part of the multipart 

812 upload 

813 :param extra_args: A dictionary of any extra arguments that may be 

814 used in the upload. 

815 

816 :rtype: dict 

817 :returns: A dictionary representing a part:: 

818 

819 {'Etag': etag_value, 'PartNumber': part_number} 

820 

821 This value can be appended to a list to be used to complete 

822 the multipart upload. 

823 """ 

824 with fileobj as body: 

825 response = client.upload_part( 

826 Bucket=bucket, 

827 Key=key, 

828 UploadId=upload_id, 

829 PartNumber=part_number, 

830 Body=body, 

831 **extra_args, 

832 ) 

833 etag = response['ETag'] 

834 part_metadata = {'ETag': etag, 'PartNumber': part_number} 

835 if 'ChecksumAlgorithm' in extra_args: 

836 algorithm_name = extra_args['ChecksumAlgorithm'].upper() 

837 checksum_member = f'Checksum{algorithm_name}' 

838 if checksum_member in response: 

839 part_metadata[checksum_member] = response[checksum_member] 

840 return part_metadata