Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/s3transfer/upload.py: 30%

248 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import math 

14from io import BytesIO 

15 

16from s3transfer.compat import readable, seekable 

17from s3transfer.futures import IN_MEMORY_UPLOAD_TAG 

18from s3transfer.tasks import ( 

19 CompleteMultipartUploadTask, 

20 CreateMultipartUploadTask, 

21 SubmissionTask, 

22 Task, 

23) 

24from s3transfer.utils import ( 

25 ChunksizeAdjuster, 

26 DeferredOpenFile, 

27 get_callbacks, 

28 get_filtered_dict, 

29) 

30 

31 

32class AggregatedProgressCallback: 

33 def __init__(self, callbacks, threshold=1024 * 256): 

34 """Aggregates progress updates for every provided progress callback 

35 

36 :type callbacks: A list of functions that accepts bytes_transferred 

37 as a single argument 

38 :param callbacks: The callbacks to invoke when threshold is reached 

39 

40 :type threshold: int 

41 :param threshold: The progress threshold in which to take the 

42 aggregated progress and invoke the progress callback with that 

43 aggregated progress total 

44 """ 

45 self._callbacks = callbacks 

46 self._threshold = threshold 

47 self._bytes_seen = 0 

48 

49 def __call__(self, bytes_transferred): 

50 self._bytes_seen += bytes_transferred 

51 if self._bytes_seen >= self._threshold: 

52 self._trigger_callbacks() 

53 

54 def flush(self): 

55 """Flushes out any progress that has not been sent to its callbacks""" 

56 if self._bytes_seen > 0: 

57 self._trigger_callbacks() 

58 

59 def _trigger_callbacks(self): 

60 for callback in self._callbacks: 

61 callback(bytes_transferred=self._bytes_seen) 

62 self._bytes_seen = 0 

63 

64 

65class InterruptReader: 

66 """Wrapper that can interrupt reading using an error 

67 

68 It uses a transfer coordinator to propagate an error if it notices 

69 that a read is being made while the file is being read from. 

70 

71 :type fileobj: file-like obj 

72 :param fileobj: The file-like object to read from 

73 

74 :type transfer_coordinator: s3transfer.futures.TransferCoordinator 

75 :param transfer_coordinator: The transfer coordinator to use if the 

76 reader needs to be interrupted. 

77 """ 

78 

79 def __init__(self, fileobj, transfer_coordinator): 

80 self._fileobj = fileobj 

81 self._transfer_coordinator = transfer_coordinator 

82 

83 def read(self, amount=None): 

84 # If there is an exception, then raise the exception. 

85 # We raise an error instead of returning no bytes because for 

86 # requests where the content length and md5 was sent, it will 

87 # cause md5 mismatches and retries as there was no indication that 

88 # the stream being read from encountered any issues. 

89 if self._transfer_coordinator.exception: 

90 raise self._transfer_coordinator.exception 

91 return self._fileobj.read(amount) 

92 

93 def seek(self, where, whence=0): 

94 self._fileobj.seek(where, whence) 

95 

96 def tell(self): 

97 return self._fileobj.tell() 

98 

99 def close(self): 

100 self._fileobj.close() 

101 

102 def __enter__(self): 

103 return self 

104 

105 def __exit__(self, *args, **kwargs): 

106 self.close() 

107 

108 

109class UploadInputManager: 

110 """Base manager class for handling various types of files for uploads 

111 

112 This class is typically used for the UploadSubmissionTask class to help 

113 determine the following: 

114 

115 * How to determine the size of the file 

116 * How to determine if a multipart upload is required 

117 * How to retrieve the body for a PutObject 

118 * How to retrieve the bodies for a set of UploadParts 

119 

120 The answers/implementations differ for the various types of file inputs 

121 that may be accepted. All implementations must subclass and override 

122 public methods from this class. 

123 """ 

124 

125 def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None): 

126 self._osutil = osutil 

127 self._transfer_coordinator = transfer_coordinator 

128 self._bandwidth_limiter = bandwidth_limiter 

129 

130 @classmethod 

131 def is_compatible(cls, upload_source): 

132 """Determines if the source for the upload is compatible with manager 

133 

134 :param upload_source: The source for which the upload will pull data 

135 from. 

136 

137 :returns: True if the manager can handle the type of source specified 

138 otherwise returns False. 

139 """ 

140 raise NotImplementedError('must implement _is_compatible()') 

141 

142 def stores_body_in_memory(self, operation_name): 

143 """Whether the body it provides are stored in-memory 

144 

145 :type operation_name: str 

146 :param operation_name: The name of the client operation that the body 

147 is being used for. Valid operation_names are ``put_object`` and 

148 ``upload_part``. 

149 

150 :rtype: boolean 

151 :returns: True if the body returned by the manager will be stored in 

152 memory. False if the manager will not directly store the body in 

153 memory. 

154 """ 

155 raise NotImplementedError('must implement store_body_in_memory()') 

156 

157 def provide_transfer_size(self, transfer_future): 

158 """Provides the transfer size of an upload 

159 

160 :type transfer_future: s3transfer.futures.TransferFuture 

161 :param transfer_future: The future associated with upload request 

162 """ 

163 raise NotImplementedError('must implement provide_transfer_size()') 

164 

165 def requires_multipart_upload(self, transfer_future, config): 

166 """Determines where a multipart upload is required 

167 

168 :type transfer_future: s3transfer.futures.TransferFuture 

169 :param transfer_future: The future associated with upload request 

170 

171 :type config: s3transfer.manager.TransferConfig 

172 :param config: The config associated to the transfer manager 

173 

174 :rtype: boolean 

175 :returns: True, if the upload should be multipart based on 

176 configuration and size. False, otherwise. 

177 """ 

178 raise NotImplementedError('must implement requires_multipart_upload()') 

179 

180 def get_put_object_body(self, transfer_future): 

181 """Returns the body to use for PutObject 

182 

183 :type transfer_future: s3transfer.futures.TransferFuture 

184 :param transfer_future: The future associated with upload request 

185 

186 :type config: s3transfer.manager.TransferConfig 

187 :param config: The config associated to the transfer manager 

188 

189 :rtype: s3transfer.utils.ReadFileChunk 

190 :returns: A ReadFileChunk including all progress callbacks 

191 associated with the transfer future. 

192 """ 

193 raise NotImplementedError('must implement get_put_object_body()') 

194 

195 def yield_upload_part_bodies(self, transfer_future, chunksize): 

196 """Yields the part number and body to use for each UploadPart 

197 

198 :type transfer_future: s3transfer.futures.TransferFuture 

199 :param transfer_future: The future associated with upload request 

200 

201 :type chunksize: int 

202 :param chunksize: The chunksize to use for this upload. 

203 

204 :rtype: int, s3transfer.utils.ReadFileChunk 

205 :returns: Yields the part number and the ReadFileChunk including all 

206 progress callbacks associated with the transfer future for that 

207 specific yielded part. 

208 """ 

209 raise NotImplementedError('must implement yield_upload_part_bodies()') 

210 

211 def _wrap_fileobj(self, fileobj): 

212 fileobj = InterruptReader(fileobj, self._transfer_coordinator) 

213 if self._bandwidth_limiter: 

214 fileobj = self._bandwidth_limiter.get_bandwith_limited_stream( 

215 fileobj, self._transfer_coordinator, enabled=False 

216 ) 

217 return fileobj 

218 

219 def _get_progress_callbacks(self, transfer_future): 

220 callbacks = get_callbacks(transfer_future, 'progress') 

221 # We only want to be wrapping the callbacks if there are callbacks to 

222 # invoke because we do not want to be doing any unnecessary work if 

223 # there are no callbacks to invoke. 

224 if callbacks: 

225 return [AggregatedProgressCallback(callbacks)] 

226 return [] 

227 

228 def _get_close_callbacks(self, aggregated_progress_callbacks): 

229 return [callback.flush for callback in aggregated_progress_callbacks] 

230 

231 

232class UploadFilenameInputManager(UploadInputManager): 

233 """Upload utility for filenames""" 

234 

235 @classmethod 

236 def is_compatible(cls, upload_source): 

237 return isinstance(upload_source, str) 

238 

239 def stores_body_in_memory(self, operation_name): 

240 return False 

241 

242 def provide_transfer_size(self, transfer_future): 

243 transfer_future.meta.provide_transfer_size( 

244 self._osutil.get_file_size(transfer_future.meta.call_args.fileobj) 

245 ) 

246 

247 def requires_multipart_upload(self, transfer_future, config): 

248 return transfer_future.meta.size >= config.multipart_threshold 

249 

250 def get_put_object_body(self, transfer_future): 

251 # Get a file-like object for the given input 

252 fileobj, full_size = self._get_put_object_fileobj_with_full_size( 

253 transfer_future 

254 ) 

255 

256 # Wrap fileobj with interrupt reader that will quickly cancel 

257 # uploads if needed instead of having to wait for the socket 

258 # to completely read all of the data. 

259 fileobj = self._wrap_fileobj(fileobj) 

260 

261 callbacks = self._get_progress_callbacks(transfer_future) 

262 close_callbacks = self._get_close_callbacks(callbacks) 

263 size = transfer_future.meta.size 

264 # Return the file-like object wrapped into a ReadFileChunk to get 

265 # progress. 

266 return self._osutil.open_file_chunk_reader_from_fileobj( 

267 fileobj=fileobj, 

268 chunk_size=size, 

269 full_file_size=full_size, 

270 callbacks=callbacks, 

271 close_callbacks=close_callbacks, 

272 ) 

273 

274 def yield_upload_part_bodies(self, transfer_future, chunksize): 

275 full_file_size = transfer_future.meta.size 

276 num_parts = self._get_num_parts(transfer_future, chunksize) 

277 for part_number in range(1, num_parts + 1): 

278 callbacks = self._get_progress_callbacks(transfer_future) 

279 close_callbacks = self._get_close_callbacks(callbacks) 

280 start_byte = chunksize * (part_number - 1) 

281 # Get a file-like object for that part and the size of the full 

282 # file size for the associated file-like object for that part. 

283 fileobj, full_size = self._get_upload_part_fileobj_with_full_size( 

284 transfer_future.meta.call_args.fileobj, 

285 start_byte=start_byte, 

286 part_size=chunksize, 

287 full_file_size=full_file_size, 

288 ) 

289 

290 # Wrap fileobj with interrupt reader that will quickly cancel 

291 # uploads if needed instead of having to wait for the socket 

292 # to completely read all of the data. 

293 fileobj = self._wrap_fileobj(fileobj) 

294 

295 # Wrap the file-like object into a ReadFileChunk to get progress. 

296 read_file_chunk = self._osutil.open_file_chunk_reader_from_fileobj( 

297 fileobj=fileobj, 

298 chunk_size=chunksize, 

299 full_file_size=full_size, 

300 callbacks=callbacks, 

301 close_callbacks=close_callbacks, 

302 ) 

303 yield part_number, read_file_chunk 

304 

305 def _get_deferred_open_file(self, fileobj, start_byte): 

306 fileobj = DeferredOpenFile( 

307 fileobj, start_byte, open_function=self._osutil.open 

308 ) 

309 return fileobj 

310 

311 def _get_put_object_fileobj_with_full_size(self, transfer_future): 

312 fileobj = transfer_future.meta.call_args.fileobj 

313 size = transfer_future.meta.size 

314 return self._get_deferred_open_file(fileobj, 0), size 

315 

316 def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs): 

317 start_byte = kwargs['start_byte'] 

318 full_size = kwargs['full_file_size'] 

319 return self._get_deferred_open_file(fileobj, start_byte), full_size 

320 

321 def _get_num_parts(self, transfer_future, part_size): 

322 return int(math.ceil(transfer_future.meta.size / float(part_size))) 

323 

324 

325class UploadSeekableInputManager(UploadFilenameInputManager): 

326 """Upload utility for an open file object""" 

327 

328 @classmethod 

329 def is_compatible(cls, upload_source): 

330 return readable(upload_source) and seekable(upload_source) 

331 

332 def stores_body_in_memory(self, operation_name): 

333 if operation_name == 'put_object': 

334 return False 

335 else: 

336 return True 

337 

338 def provide_transfer_size(self, transfer_future): 

339 fileobj = transfer_future.meta.call_args.fileobj 

340 # To determine size, first determine the starting position 

341 # Seek to the end and then find the difference in the length 

342 # between the end and start positions. 

343 start_position = fileobj.tell() 

344 fileobj.seek(0, 2) 

345 end_position = fileobj.tell() 

346 fileobj.seek(start_position) 

347 transfer_future.meta.provide_transfer_size( 

348 end_position - start_position 

349 ) 

350 

351 def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs): 

352 # Note: It is unfortunate that in order to do a multithreaded 

353 # multipart upload we cannot simply copy the filelike object 

354 # since there is not really a mechanism in python (i.e. os.dup 

355 # points to the same OS filehandle which causes concurrency 

356 # issues). So instead we need to read from the fileobj and 

357 # chunk the data out to separate file-like objects in memory. 

358 data = fileobj.read(kwargs['part_size']) 

359 # We return the length of the data instead of the full_file_size 

360 # because we partitioned the data into separate BytesIO objects 

361 # meaning the BytesIO object has no knowledge of its start position 

362 # relative the input source nor access to the rest of the input 

363 # source. So we must treat it as its own standalone file. 

364 return BytesIO(data), len(data) 

365 

366 def _get_put_object_fileobj_with_full_size(self, transfer_future): 

367 fileobj = transfer_future.meta.call_args.fileobj 

368 # The current position needs to be taken into account when retrieving 

369 # the full size of the file. 

370 size = fileobj.tell() + transfer_future.meta.size 

371 return fileobj, size 

372 

373 

374class UploadNonSeekableInputManager(UploadInputManager): 

375 """Upload utility for a file-like object that cannot seek.""" 

376 

377 def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None): 

378 super().__init__(osutil, transfer_coordinator, bandwidth_limiter) 

379 self._initial_data = b'' 

380 

381 @classmethod 

382 def is_compatible(cls, upload_source): 

383 return readable(upload_source) 

384 

385 def stores_body_in_memory(self, operation_name): 

386 return True 

387 

388 def provide_transfer_size(self, transfer_future): 

389 # No-op because there is no way to do this short of reading the entire 

390 # body into memory. 

391 return 

392 

393 def requires_multipart_upload(self, transfer_future, config): 

394 # If the user has set the size, we can use that. 

395 if transfer_future.meta.size is not None: 

396 return transfer_future.meta.size >= config.multipart_threshold 

397 

398 # This is tricky to determine in this case because we can't know how 

399 # large the input is. So to figure it out, we read data into memory 

400 # up until the threshold and compare how much data was actually read 

401 # against the threshold. 

402 fileobj = transfer_future.meta.call_args.fileobj 

403 threshold = config.multipart_threshold 

404 self._initial_data = self._read(fileobj, threshold, False) 

405 if len(self._initial_data) < threshold: 

406 return False 

407 else: 

408 return True 

409 

410 def get_put_object_body(self, transfer_future): 

411 callbacks = self._get_progress_callbacks(transfer_future) 

412 close_callbacks = self._get_close_callbacks(callbacks) 

413 fileobj = transfer_future.meta.call_args.fileobj 

414 

415 body = self._wrap_data( 

416 self._initial_data + fileobj.read(), callbacks, close_callbacks 

417 ) 

418 

419 # Zero out the stored data so we don't have additional copies 

420 # hanging around in memory. 

421 self._initial_data = None 

422 return body 

423 

424 def yield_upload_part_bodies(self, transfer_future, chunksize): 

425 file_object = transfer_future.meta.call_args.fileobj 

426 part_number = 0 

427 

428 # Continue reading parts from the file-like object until it is empty. 

429 while True: 

430 callbacks = self._get_progress_callbacks(transfer_future) 

431 close_callbacks = self._get_close_callbacks(callbacks) 

432 part_number += 1 

433 part_content = self._read(file_object, chunksize) 

434 if not part_content: 

435 break 

436 part_object = self._wrap_data( 

437 part_content, callbacks, close_callbacks 

438 ) 

439 

440 # Zero out part_content to avoid hanging on to additional data. 

441 part_content = None 

442 yield part_number, part_object 

443 

444 def _read(self, fileobj, amount, truncate=True): 

445 """ 

446 Reads a specific amount of data from a stream and returns it. If there 

447 is any data in initial_data, that will be popped out first. 

448 

449 :type fileobj: A file-like object that implements read 

450 :param fileobj: The stream to read from. 

451 

452 :type amount: int 

453 :param amount: The number of bytes to read from the stream. 

454 

455 :type truncate: bool 

456 :param truncate: Whether or not to truncate initial_data after 

457 reading from it. 

458 

459 :return: Generator which generates part bodies from the initial data. 

460 """ 

461 # If the the initial data is empty, we simply read from the fileobj 

462 if len(self._initial_data) == 0: 

463 return fileobj.read(amount) 

464 

465 # If the requested number of bytes is less than the amount of 

466 # initial data, pull entirely from initial data. 

467 if amount <= len(self._initial_data): 

468 data = self._initial_data[:amount] 

469 # Truncate initial data so we don't hang onto the data longer 

470 # than we need. 

471 if truncate: 

472 self._initial_data = self._initial_data[amount:] 

473 return data 

474 

475 # At this point there is some initial data left, but not enough to 

476 # satisfy the number of bytes requested. Pull out the remaining 

477 # initial data and read the rest from the fileobj. 

478 amount_to_read = amount - len(self._initial_data) 

479 data = self._initial_data + fileobj.read(amount_to_read) 

480 

481 # Zero out initial data so we don't hang onto the data any more. 

482 if truncate: 

483 self._initial_data = b'' 

484 return data 

485 

486 def _wrap_data(self, data, callbacks, close_callbacks): 

487 """ 

488 Wraps data with the interrupt reader and the file chunk reader. 

489 

490 :type data: bytes 

491 :param data: The data to wrap. 

492 

493 :type callbacks: list 

494 :param callbacks: The callbacks associated with the transfer future. 

495 

496 :type close_callbacks: list 

497 :param close_callbacks: The callbacks to be called when closing the 

498 wrapper for the data. 

499 

500 :return: Fully wrapped data. 

501 """ 

502 fileobj = self._wrap_fileobj(BytesIO(data)) 

503 return self._osutil.open_file_chunk_reader_from_fileobj( 

504 fileobj=fileobj, 

505 chunk_size=len(data), 

506 full_file_size=len(data), 

507 callbacks=callbacks, 

508 close_callbacks=close_callbacks, 

509 ) 

510 

511 

512class UploadSubmissionTask(SubmissionTask): 

513 """Task for submitting tasks to execute an upload""" 

514 

515 UPLOAD_PART_ARGS = [ 

516 'ChecksumAlgorithm', 

517 'SSECustomerKey', 

518 'SSECustomerAlgorithm', 

519 'SSECustomerKeyMD5', 

520 'RequestPayer', 

521 'ExpectedBucketOwner', 

522 ] 

523 

524 COMPLETE_MULTIPART_ARGS = [ 

525 'SSECustomerKey', 

526 'SSECustomerAlgorithm', 

527 'SSECustomerKeyMD5', 

528 'RequestPayer', 

529 'ExpectedBucketOwner', 

530 ] 

531 

532 def _get_upload_input_manager_cls(self, transfer_future): 

533 """Retrieves a class for managing input for an upload based on file type 

534 

535 :type transfer_future: s3transfer.futures.TransferFuture 

536 :param transfer_future: The transfer future for the request 

537 

538 :rtype: class of UploadInputManager 

539 :returns: The appropriate class to use for managing a specific type of 

540 input for uploads. 

541 """ 

542 upload_manager_resolver_chain = [ 

543 UploadFilenameInputManager, 

544 UploadSeekableInputManager, 

545 UploadNonSeekableInputManager, 

546 ] 

547 

548 fileobj = transfer_future.meta.call_args.fileobj 

549 for upload_manager_cls in upload_manager_resolver_chain: 

550 if upload_manager_cls.is_compatible(fileobj): 

551 return upload_manager_cls 

552 raise RuntimeError( 

553 'Input {} of type: {} is not supported.'.format( 

554 fileobj, type(fileobj) 

555 ) 

556 ) 

557 

558 def _submit( 

559 self, 

560 client, 

561 config, 

562 osutil, 

563 request_executor, 

564 transfer_future, 

565 bandwidth_limiter=None, 

566 ): 

567 """ 

568 :param client: The client associated with the transfer manager 

569 

570 :type config: s3transfer.manager.TransferConfig 

571 :param config: The transfer config associated with the transfer 

572 manager 

573 

574 :type osutil: s3transfer.utils.OSUtil 

575 :param osutil: The os utility associated to the transfer manager 

576 

577 :type request_executor: s3transfer.futures.BoundedExecutor 

578 :param request_executor: The request executor associated with the 

579 transfer manager 

580 

581 :type transfer_future: s3transfer.futures.TransferFuture 

582 :param transfer_future: The transfer future associated with the 

583 transfer request that tasks are being submitted for 

584 """ 

585 upload_input_manager = self._get_upload_input_manager_cls( 

586 transfer_future 

587 )(osutil, self._transfer_coordinator, bandwidth_limiter) 

588 

589 # Determine the size if it was not provided 

590 if transfer_future.meta.size is None: 

591 upload_input_manager.provide_transfer_size(transfer_future) 

592 

593 # Do a multipart upload if needed, otherwise do a regular put object. 

594 if not upload_input_manager.requires_multipart_upload( 

595 transfer_future, config 

596 ): 

597 self._submit_upload_request( 

598 client, 

599 config, 

600 osutil, 

601 request_executor, 

602 transfer_future, 

603 upload_input_manager, 

604 ) 

605 else: 

606 self._submit_multipart_request( 

607 client, 

608 config, 

609 osutil, 

610 request_executor, 

611 transfer_future, 

612 upload_input_manager, 

613 ) 

614 

615 def _submit_upload_request( 

616 self, 

617 client, 

618 config, 

619 osutil, 

620 request_executor, 

621 transfer_future, 

622 upload_input_manager, 

623 ): 

624 call_args = transfer_future.meta.call_args 

625 

626 # Get any tags that need to be associated to the put object task 

627 put_object_tag = self._get_upload_task_tag( 

628 upload_input_manager, 'put_object' 

629 ) 

630 

631 # Submit the request of a single upload. 

632 self._transfer_coordinator.submit( 

633 request_executor, 

634 PutObjectTask( 

635 transfer_coordinator=self._transfer_coordinator, 

636 main_kwargs={ 

637 'client': client, 

638 'fileobj': upload_input_manager.get_put_object_body( 

639 transfer_future 

640 ), 

641 'bucket': call_args.bucket, 

642 'key': call_args.key, 

643 'extra_args': call_args.extra_args, 

644 }, 

645 is_final=True, 

646 ), 

647 tag=put_object_tag, 

648 ) 

649 

650 def _submit_multipart_request( 

651 self, 

652 client, 

653 config, 

654 osutil, 

655 request_executor, 

656 transfer_future, 

657 upload_input_manager, 

658 ): 

659 call_args = transfer_future.meta.call_args 

660 

661 # Submit the request to create a multipart upload. 

662 create_multipart_future = self._transfer_coordinator.submit( 

663 request_executor, 

664 CreateMultipartUploadTask( 

665 transfer_coordinator=self._transfer_coordinator, 

666 main_kwargs={ 

667 'client': client, 

668 'bucket': call_args.bucket, 

669 'key': call_args.key, 

670 'extra_args': call_args.extra_args, 

671 }, 

672 ), 

673 ) 

674 

675 # Submit requests to upload the parts of the file. 

676 part_futures = [] 

677 extra_part_args = self._extra_upload_part_args(call_args.extra_args) 

678 

679 # Get any tags that need to be associated to the submitted task 

680 # for upload the data 

681 upload_part_tag = self._get_upload_task_tag( 

682 upload_input_manager, 'upload_part' 

683 ) 

684 

685 size = transfer_future.meta.size 

686 adjuster = ChunksizeAdjuster() 

687 chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size) 

688 part_iterator = upload_input_manager.yield_upload_part_bodies( 

689 transfer_future, chunksize 

690 ) 

691 

692 for part_number, fileobj in part_iterator: 

693 part_futures.append( 

694 self._transfer_coordinator.submit( 

695 request_executor, 

696 UploadPartTask( 

697 transfer_coordinator=self._transfer_coordinator, 

698 main_kwargs={ 

699 'client': client, 

700 'fileobj': fileobj, 

701 'bucket': call_args.bucket, 

702 'key': call_args.key, 

703 'part_number': part_number, 

704 'extra_args': extra_part_args, 

705 }, 

706 pending_main_kwargs={ 

707 'upload_id': create_multipart_future 

708 }, 

709 ), 

710 tag=upload_part_tag, 

711 ) 

712 ) 

713 

714 complete_multipart_extra_args = self._extra_complete_multipart_args( 

715 call_args.extra_args 

716 ) 

717 # Submit the request to complete the multipart upload. 

718 self._transfer_coordinator.submit( 

719 request_executor, 

720 CompleteMultipartUploadTask( 

721 transfer_coordinator=self._transfer_coordinator, 

722 main_kwargs={ 

723 'client': client, 

724 'bucket': call_args.bucket, 

725 'key': call_args.key, 

726 'extra_args': complete_multipart_extra_args, 

727 }, 

728 pending_main_kwargs={ 

729 'upload_id': create_multipart_future, 

730 'parts': part_futures, 

731 }, 

732 is_final=True, 

733 ), 

734 ) 

735 

736 def _extra_upload_part_args(self, extra_args): 

737 # Only the args in UPLOAD_PART_ARGS actually need to be passed 

738 # onto the upload_part calls. 

739 return get_filtered_dict(extra_args, self.UPLOAD_PART_ARGS) 

740 

741 def _extra_complete_multipart_args(self, extra_args): 

742 return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS) 

743 

744 def _get_upload_task_tag(self, upload_input_manager, operation_name): 

745 tag = None 

746 if upload_input_manager.stores_body_in_memory(operation_name): 

747 tag = IN_MEMORY_UPLOAD_TAG 

748 return tag 

749 

750 

751class PutObjectTask(Task): 

752 """Task to do a nonmultipart upload""" 

753 

754 def _main(self, client, fileobj, bucket, key, extra_args): 

755 """ 

756 :param client: The client to use when calling PutObject 

757 :param fileobj: The file to upload. 

758 :param bucket: The name of the bucket to upload to 

759 :param key: The name of the key to upload to 

760 :param extra_args: A dictionary of any extra arguments that may be 

761 used in the upload. 

762 """ 

763 with fileobj as body: 

764 client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args) 

765 

766 

767class UploadPartTask(Task): 

768 """Task to upload a part in a multipart upload""" 

769 

770 def _main( 

771 self, client, fileobj, bucket, key, upload_id, part_number, extra_args 

772 ): 

773 """ 

774 :param client: The client to use when calling PutObject 

775 :param fileobj: The file to upload. 

776 :param bucket: The name of the bucket to upload to 

777 :param key: The name of the key to upload to 

778 :param upload_id: The id of the upload 

779 :param part_number: The number representing the part of the multipart 

780 upload 

781 :param extra_args: A dictionary of any extra arguments that may be 

782 used in the upload. 

783 

784 :rtype: dict 

785 :returns: A dictionary representing a part:: 

786 

787 {'Etag': etag_value, 'PartNumber': part_number} 

788 

789 This value can be appended to a list to be used to complete 

790 the multipart upload. 

791 """ 

792 with fileobj as body: 

793 response = client.upload_part( 

794 Bucket=bucket, 

795 Key=key, 

796 UploadId=upload_id, 

797 PartNumber=part_number, 

798 Body=body, 

799 **extra_args, 

800 ) 

801 etag = response['ETag'] 

802 part_metadata = {'ETag': etag, 'PartNumber': part_number} 

803 if 'ChecksumAlgorithm' in extra_args: 

804 algorithm_name = extra_args['ChecksumAlgorithm'].upper() 

805 checksum_member = f'Checksum{algorithm_name}' 

806 if checksum_member in response: 

807 part_metadata[checksum_member] = response[checksum_member] 

808 return part_metadata