1# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import math
14from io import BytesIO
15
16from s3transfer.compat import readable, seekable
17from s3transfer.futures import IN_MEMORY_UPLOAD_TAG
18from s3transfer.tasks import (
19 CompleteMultipartUploadTask,
20 CreateMultipartUploadTask,
21 SubmissionTask,
22 Task,
23)
24from s3transfer.utils import (
25 ChunksizeAdjuster,
26 DeferredOpenFile,
27 get_callbacks,
28 get_filtered_dict,
29)
30
31
32class AggregatedProgressCallback:
33 def __init__(self, callbacks, threshold=1024 * 256):
34 """Aggregates progress updates for every provided progress callback
35
36 :type callbacks: A list of functions that accepts bytes_transferred
37 as a single argument
38 :param callbacks: The callbacks to invoke when threshold is reached
39
40 :type threshold: int
41 :param threshold: The progress threshold in which to take the
42 aggregated progress and invoke the progress callback with that
43 aggregated progress total
44 """
45 self._callbacks = callbacks
46 self._threshold = threshold
47 self._bytes_seen = 0
48
49 def __call__(self, bytes_transferred):
50 self._bytes_seen += bytes_transferred
51 if self._bytes_seen >= self._threshold:
52 self._trigger_callbacks()
53
54 def flush(self):
55 """Flushes out any progress that has not been sent to its callbacks"""
56 if self._bytes_seen > 0:
57 self._trigger_callbacks()
58
59 def _trigger_callbacks(self):
60 for callback in self._callbacks:
61 callback(bytes_transferred=self._bytes_seen)
62 self._bytes_seen = 0
63
64
65class InterruptReader:
66 """Wrapper that can interrupt reading using an error
67
68 It uses a transfer coordinator to propagate an error if it notices
69 that a read is being made while the file is being read from.
70
71 :type fileobj: file-like obj
72 :param fileobj: The file-like object to read from
73
74 :type transfer_coordinator: s3transfer.futures.TransferCoordinator
75 :param transfer_coordinator: The transfer coordinator to use if the
76 reader needs to be interrupted.
77 """
78
79 def __init__(self, fileobj, transfer_coordinator):
80 self._fileobj = fileobj
81 self._transfer_coordinator = transfer_coordinator
82
83 def read(self, amount=None):
84 # If there is an exception, then raise the exception.
85 # We raise an error instead of returning no bytes because for
86 # requests where the content length and md5 was sent, it will
87 # cause md5 mismatches and retries as there was no indication that
88 # the stream being read from encountered any issues.
89 if self._transfer_coordinator.exception:
90 raise self._transfer_coordinator.exception
91 return self._fileobj.read(amount)
92
93 def seek(self, where, whence=0):
94 self._fileobj.seek(where, whence)
95
96 def tell(self):
97 return self._fileobj.tell()
98
99 def close(self):
100 self._fileobj.close()
101
102 def __enter__(self):
103 return self
104
105 def __exit__(self, *args, **kwargs):
106 self.close()
107
108
109class UploadInputManager:
110 """Base manager class for handling various types of files for uploads
111
112 This class is typically used for the UploadSubmissionTask class to help
113 determine the following:
114
115 * How to determine the size of the file
116 * How to determine if a multipart upload is required
117 * How to retrieve the body for a PutObject
118 * How to retrieve the bodies for a set of UploadParts
119
120 The answers/implementations differ for the various types of file inputs
121 that may be accepted. All implementations must subclass and override
122 public methods from this class.
123 """
124
125 def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
126 self._osutil = osutil
127 self._transfer_coordinator = transfer_coordinator
128 self._bandwidth_limiter = bandwidth_limiter
129
130 @classmethod
131 def is_compatible(cls, upload_source):
132 """Determines if the source for the upload is compatible with manager
133
134 :param upload_source: The source for which the upload will pull data
135 from.
136
137 :returns: True if the manager can handle the type of source specified
138 otherwise returns False.
139 """
140 raise NotImplementedError('must implement _is_compatible()')
141
142 def stores_body_in_memory(self, operation_name):
143 """Whether the body it provides are stored in-memory
144
145 :type operation_name: str
146 :param operation_name: The name of the client operation that the body
147 is being used for. Valid operation_names are ``put_object`` and
148 ``upload_part``.
149
150 :rtype: boolean
151 :returns: True if the body returned by the manager will be stored in
152 memory. False if the manager will not directly store the body in
153 memory.
154 """
155 raise NotImplementedError('must implement store_body_in_memory()')
156
157 def provide_transfer_size(self, transfer_future):
158 """Provides the transfer size of an upload
159
160 :type transfer_future: s3transfer.futures.TransferFuture
161 :param transfer_future: The future associated with upload request
162 """
163 raise NotImplementedError('must implement provide_transfer_size()')
164
165 def requires_multipart_upload(self, transfer_future, config):
166 """Determines where a multipart upload is required
167
168 :type transfer_future: s3transfer.futures.TransferFuture
169 :param transfer_future: The future associated with upload request
170
171 :type config: s3transfer.manager.TransferConfig
172 :param config: The config associated to the transfer manager
173
174 :rtype: boolean
175 :returns: True, if the upload should be multipart based on
176 configuration and size. False, otherwise.
177 """
178 raise NotImplementedError('must implement requires_multipart_upload()')
179
180 def get_put_object_body(self, transfer_future):
181 """Returns the body to use for PutObject
182
183 :type transfer_future: s3transfer.futures.TransferFuture
184 :param transfer_future: The future associated with upload request
185
186 :type config: s3transfer.manager.TransferConfig
187 :param config: The config associated to the transfer manager
188
189 :rtype: s3transfer.utils.ReadFileChunk
190 :returns: A ReadFileChunk including all progress callbacks
191 associated with the transfer future.
192 """
193 raise NotImplementedError('must implement get_put_object_body()')
194
195 def yield_upload_part_bodies(self, transfer_future, chunksize):
196 """Yields the part number and body to use for each UploadPart
197
198 :type transfer_future: s3transfer.futures.TransferFuture
199 :param transfer_future: The future associated with upload request
200
201 :type chunksize: int
202 :param chunksize: The chunksize to use for this upload.
203
204 :rtype: int, s3transfer.utils.ReadFileChunk
205 :returns: Yields the part number and the ReadFileChunk including all
206 progress callbacks associated with the transfer future for that
207 specific yielded part.
208 """
209 raise NotImplementedError('must implement yield_upload_part_bodies()')
210
211 def _wrap_fileobj(self, fileobj):
212 fileobj = InterruptReader(fileobj, self._transfer_coordinator)
213 if self._bandwidth_limiter:
214 fileobj = self._bandwidth_limiter.get_bandwith_limited_stream(
215 fileobj, self._transfer_coordinator, enabled=False
216 )
217 return fileobj
218
219 def _get_progress_callbacks(self, transfer_future):
220 callbacks = get_callbacks(transfer_future, 'progress')
221 # We only want to be wrapping the callbacks if there are callbacks to
222 # invoke because we do not want to be doing any unnecessary work if
223 # there are no callbacks to invoke.
224 if callbacks:
225 return [AggregatedProgressCallback(callbacks)]
226 return []
227
228 def _get_close_callbacks(self, aggregated_progress_callbacks):
229 return [callback.flush for callback in aggregated_progress_callbacks]
230
231
232class UploadFilenameInputManager(UploadInputManager):
233 """Upload utility for filenames"""
234
235 @classmethod
236 def is_compatible(cls, upload_source):
237 return isinstance(upload_source, str)
238
239 def stores_body_in_memory(self, operation_name):
240 return False
241
242 def provide_transfer_size(self, transfer_future):
243 transfer_future.meta.provide_transfer_size(
244 self._osutil.get_file_size(transfer_future.meta.call_args.fileobj)
245 )
246
247 def requires_multipart_upload(self, transfer_future, config):
248 return transfer_future.meta.size >= config.multipart_threshold
249
250 def get_put_object_body(self, transfer_future):
251 # Get a file-like object for the given input
252 fileobj, full_size = self._get_put_object_fileobj_with_full_size(
253 transfer_future
254 )
255
256 # Wrap fileobj with interrupt reader that will quickly cancel
257 # uploads if needed instead of having to wait for the socket
258 # to completely read all of the data.
259 fileobj = self._wrap_fileobj(fileobj)
260
261 callbacks = self._get_progress_callbacks(transfer_future)
262 close_callbacks = self._get_close_callbacks(callbacks)
263 size = transfer_future.meta.size
264 # Return the file-like object wrapped into a ReadFileChunk to get
265 # progress.
266 return self._osutil.open_file_chunk_reader_from_fileobj(
267 fileobj=fileobj,
268 chunk_size=size,
269 full_file_size=full_size,
270 callbacks=callbacks,
271 close_callbacks=close_callbacks,
272 )
273
274 def yield_upload_part_bodies(self, transfer_future, chunksize):
275 full_file_size = transfer_future.meta.size
276 num_parts = self._get_num_parts(transfer_future, chunksize)
277 for part_number in range(1, num_parts + 1):
278 callbacks = self._get_progress_callbacks(transfer_future)
279 close_callbacks = self._get_close_callbacks(callbacks)
280 start_byte = chunksize * (part_number - 1)
281 # Get a file-like object for that part and the size of the full
282 # file size for the associated file-like object for that part.
283 fileobj, full_size = self._get_upload_part_fileobj_with_full_size(
284 transfer_future.meta.call_args.fileobj,
285 start_byte=start_byte,
286 part_size=chunksize,
287 full_file_size=full_file_size,
288 )
289
290 # Wrap fileobj with interrupt reader that will quickly cancel
291 # uploads if needed instead of having to wait for the socket
292 # to completely read all of the data.
293 fileobj = self._wrap_fileobj(fileobj)
294
295 # Wrap the file-like object into a ReadFileChunk to get progress.
296 read_file_chunk = self._osutil.open_file_chunk_reader_from_fileobj(
297 fileobj=fileobj,
298 chunk_size=chunksize,
299 full_file_size=full_size,
300 callbacks=callbacks,
301 close_callbacks=close_callbacks,
302 )
303 yield part_number, read_file_chunk
304
305 def _get_deferred_open_file(self, fileobj, start_byte):
306 fileobj = DeferredOpenFile(
307 fileobj, start_byte, open_function=self._osutil.open
308 )
309 return fileobj
310
311 def _get_put_object_fileobj_with_full_size(self, transfer_future):
312 fileobj = transfer_future.meta.call_args.fileobj
313 size = transfer_future.meta.size
314 return self._get_deferred_open_file(fileobj, 0), size
315
316 def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs):
317 start_byte = kwargs['start_byte']
318 full_size = kwargs['full_file_size']
319 return self._get_deferred_open_file(fileobj, start_byte), full_size
320
321 def _get_num_parts(self, transfer_future, part_size):
322 return int(math.ceil(transfer_future.meta.size / float(part_size)))
323
324
325class UploadSeekableInputManager(UploadFilenameInputManager):
326 """Upload utility for an open file object"""
327
328 @classmethod
329 def is_compatible(cls, upload_source):
330 return readable(upload_source) and seekable(upload_source)
331
332 def stores_body_in_memory(self, operation_name):
333 if operation_name == 'put_object':
334 return False
335 else:
336 return True
337
338 def provide_transfer_size(self, transfer_future):
339 fileobj = transfer_future.meta.call_args.fileobj
340 # To determine size, first determine the starting position
341 # Seek to the end and then find the difference in the length
342 # between the end and start positions.
343 start_position = fileobj.tell()
344 fileobj.seek(0, 2)
345 end_position = fileobj.tell()
346 fileobj.seek(start_position)
347 transfer_future.meta.provide_transfer_size(
348 end_position - start_position
349 )
350
351 def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs):
352 # Note: It is unfortunate that in order to do a multithreaded
353 # multipart upload we cannot simply copy the filelike object
354 # since there is not really a mechanism in python (i.e. os.dup
355 # points to the same OS filehandle which causes concurrency
356 # issues). So instead we need to read from the fileobj and
357 # chunk the data out to separate file-like objects in memory.
358 data = fileobj.read(kwargs['part_size'])
359 # We return the length of the data instead of the full_file_size
360 # because we partitioned the data into separate BytesIO objects
361 # meaning the BytesIO object has no knowledge of its start position
362 # relative the input source nor access to the rest of the input
363 # source. So we must treat it as its own standalone file.
364 return BytesIO(data), len(data)
365
366 def _get_put_object_fileobj_with_full_size(self, transfer_future):
367 fileobj = transfer_future.meta.call_args.fileobj
368 # The current position needs to be taken into account when retrieving
369 # the full size of the file.
370 size = fileobj.tell() + transfer_future.meta.size
371 return fileobj, size
372
373
374class UploadNonSeekableInputManager(UploadInputManager):
375 """Upload utility for a file-like object that cannot seek."""
376
377 def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
378 super().__init__(osutil, transfer_coordinator, bandwidth_limiter)
379 self._initial_data = b''
380
381 @classmethod
382 def is_compatible(cls, upload_source):
383 return readable(upload_source)
384
385 def stores_body_in_memory(self, operation_name):
386 return True
387
388 def provide_transfer_size(self, transfer_future):
389 # No-op because there is no way to do this short of reading the entire
390 # body into memory.
391 return
392
393 def requires_multipart_upload(self, transfer_future, config):
394 # If the user has set the size, we can use that.
395 if transfer_future.meta.size is not None:
396 return transfer_future.meta.size >= config.multipart_threshold
397
398 # This is tricky to determine in this case because we can't know how
399 # large the input is. So to figure it out, we read data into memory
400 # up until the threshold and compare how much data was actually read
401 # against the threshold.
402 fileobj = transfer_future.meta.call_args.fileobj
403 threshold = config.multipart_threshold
404 self._initial_data = self._read(fileobj, threshold, False)
405 if len(self._initial_data) < threshold:
406 return False
407 else:
408 return True
409
410 def get_put_object_body(self, transfer_future):
411 callbacks = self._get_progress_callbacks(transfer_future)
412 close_callbacks = self._get_close_callbacks(callbacks)
413 fileobj = transfer_future.meta.call_args.fileobj
414
415 body = self._wrap_data(
416 self._initial_data + fileobj.read(), callbacks, close_callbacks
417 )
418
419 # Zero out the stored data so we don't have additional copies
420 # hanging around in memory.
421 self._initial_data = None
422 return body
423
424 def yield_upload_part_bodies(self, transfer_future, chunksize):
425 file_object = transfer_future.meta.call_args.fileobj
426 part_number = 0
427
428 # Continue reading parts from the file-like object until it is empty.
429 while True:
430 callbacks = self._get_progress_callbacks(transfer_future)
431 close_callbacks = self._get_close_callbacks(callbacks)
432 part_number += 1
433 part_content = self._read(file_object, chunksize)
434 if not part_content:
435 break
436 part_object = self._wrap_data(
437 part_content, callbacks, close_callbacks
438 )
439
440 # Zero out part_content to avoid hanging on to additional data.
441 part_content = None
442 yield part_number, part_object
443
444 def _read(self, fileobj, amount, truncate=True):
445 """
446 Reads a specific amount of data from a stream and returns it. If there
447 is any data in initial_data, that will be popped out first.
448
449 :type fileobj: A file-like object that implements read
450 :param fileobj: The stream to read from.
451
452 :type amount: int
453 :param amount: The number of bytes to read from the stream.
454
455 :type truncate: bool
456 :param truncate: Whether or not to truncate initial_data after
457 reading from it.
458
459 :return: Generator which generates part bodies from the initial data.
460 """
461 # If the the initial data is empty, we simply read from the fileobj
462 if len(self._initial_data) == 0:
463 return fileobj.read(amount)
464
465 # If the requested number of bytes is less than the amount of
466 # initial data, pull entirely from initial data.
467 if amount <= len(self._initial_data):
468 data = self._initial_data[:amount]
469 # Truncate initial data so we don't hang onto the data longer
470 # than we need.
471 if truncate:
472 self._initial_data = self._initial_data[amount:]
473 return data
474
475 # At this point there is some initial data left, but not enough to
476 # satisfy the number of bytes requested. Pull out the remaining
477 # initial data and read the rest from the fileobj.
478 amount_to_read = amount - len(self._initial_data)
479 data = self._initial_data + fileobj.read(amount_to_read)
480
481 # Zero out initial data so we don't hang onto the data any more.
482 if truncate:
483 self._initial_data = b''
484 return data
485
486 def _wrap_data(self, data, callbacks, close_callbacks):
487 """
488 Wraps data with the interrupt reader and the file chunk reader.
489
490 :type data: bytes
491 :param data: The data to wrap.
492
493 :type callbacks: list
494 :param callbacks: The callbacks associated with the transfer future.
495
496 :type close_callbacks: list
497 :param close_callbacks: The callbacks to be called when closing the
498 wrapper for the data.
499
500 :return: Fully wrapped data.
501 """
502 fileobj = self._wrap_fileobj(BytesIO(data))
503 return self._osutil.open_file_chunk_reader_from_fileobj(
504 fileobj=fileobj,
505 chunk_size=len(data),
506 full_file_size=len(data),
507 callbacks=callbacks,
508 close_callbacks=close_callbacks,
509 )
510
511
512class UploadSubmissionTask(SubmissionTask):
513 """Task for submitting tasks to execute an upload"""
514
515 UPLOAD_PART_ARGS = [
516 'ChecksumAlgorithm',
517 'SSECustomerKey',
518 'SSECustomerAlgorithm',
519 'SSECustomerKeyMD5',
520 'RequestPayer',
521 'ExpectedBucketOwner',
522 ]
523
524 COMPLETE_MULTIPART_ARGS = [
525 'SSECustomerKey',
526 'SSECustomerAlgorithm',
527 'SSECustomerKeyMD5',
528 'RequestPayer',
529 'ExpectedBucketOwner',
530 ]
531
532 def _get_upload_input_manager_cls(self, transfer_future):
533 """Retrieves a class for managing input for an upload based on file type
534
535 :type transfer_future: s3transfer.futures.TransferFuture
536 :param transfer_future: The transfer future for the request
537
538 :rtype: class of UploadInputManager
539 :returns: The appropriate class to use for managing a specific type of
540 input for uploads.
541 """
542 upload_manager_resolver_chain = [
543 UploadFilenameInputManager,
544 UploadSeekableInputManager,
545 UploadNonSeekableInputManager,
546 ]
547
548 fileobj = transfer_future.meta.call_args.fileobj
549 for upload_manager_cls in upload_manager_resolver_chain:
550 if upload_manager_cls.is_compatible(fileobj):
551 return upload_manager_cls
552 raise RuntimeError(
553 'Input {} of type: {} is not supported.'.format(
554 fileobj, type(fileobj)
555 )
556 )
557
558 def _submit(
559 self,
560 client,
561 config,
562 osutil,
563 request_executor,
564 transfer_future,
565 bandwidth_limiter=None,
566 ):
567 """
568 :param client: The client associated with the transfer manager
569
570 :type config: s3transfer.manager.TransferConfig
571 :param config: The transfer config associated with the transfer
572 manager
573
574 :type osutil: s3transfer.utils.OSUtil
575 :param osutil: The os utility associated to the transfer manager
576
577 :type request_executor: s3transfer.futures.BoundedExecutor
578 :param request_executor: The request executor associated with the
579 transfer manager
580
581 :type transfer_future: s3transfer.futures.TransferFuture
582 :param transfer_future: The transfer future associated with the
583 transfer request that tasks are being submitted for
584 """
585 upload_input_manager = self._get_upload_input_manager_cls(
586 transfer_future
587 )(osutil, self._transfer_coordinator, bandwidth_limiter)
588
589 # Determine the size if it was not provided
590 if transfer_future.meta.size is None:
591 upload_input_manager.provide_transfer_size(transfer_future)
592
593 # Do a multipart upload if needed, otherwise do a regular put object.
594 if not upload_input_manager.requires_multipart_upload(
595 transfer_future, config
596 ):
597 self._submit_upload_request(
598 client,
599 config,
600 osutil,
601 request_executor,
602 transfer_future,
603 upload_input_manager,
604 )
605 else:
606 self._submit_multipart_request(
607 client,
608 config,
609 osutil,
610 request_executor,
611 transfer_future,
612 upload_input_manager,
613 )
614
615 def _submit_upload_request(
616 self,
617 client,
618 config,
619 osutil,
620 request_executor,
621 transfer_future,
622 upload_input_manager,
623 ):
624 call_args = transfer_future.meta.call_args
625
626 # Get any tags that need to be associated to the put object task
627 put_object_tag = self._get_upload_task_tag(
628 upload_input_manager, 'put_object'
629 )
630
631 # Submit the request of a single upload.
632 self._transfer_coordinator.submit(
633 request_executor,
634 PutObjectTask(
635 transfer_coordinator=self._transfer_coordinator,
636 main_kwargs={
637 'client': client,
638 'fileobj': upload_input_manager.get_put_object_body(
639 transfer_future
640 ),
641 'bucket': call_args.bucket,
642 'key': call_args.key,
643 'extra_args': call_args.extra_args,
644 },
645 is_final=True,
646 ),
647 tag=put_object_tag,
648 )
649
650 def _submit_multipart_request(
651 self,
652 client,
653 config,
654 osutil,
655 request_executor,
656 transfer_future,
657 upload_input_manager,
658 ):
659 call_args = transfer_future.meta.call_args
660
661 # Submit the request to create a multipart upload.
662 create_multipart_future = self._transfer_coordinator.submit(
663 request_executor,
664 CreateMultipartUploadTask(
665 transfer_coordinator=self._transfer_coordinator,
666 main_kwargs={
667 'client': client,
668 'bucket': call_args.bucket,
669 'key': call_args.key,
670 'extra_args': call_args.extra_args,
671 },
672 ),
673 )
674
675 # Submit requests to upload the parts of the file.
676 part_futures = []
677 extra_part_args = self._extra_upload_part_args(call_args.extra_args)
678
679 # Get any tags that need to be associated to the submitted task
680 # for upload the data
681 upload_part_tag = self._get_upload_task_tag(
682 upload_input_manager, 'upload_part'
683 )
684
685 size = transfer_future.meta.size
686 adjuster = ChunksizeAdjuster()
687 chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size)
688 part_iterator = upload_input_manager.yield_upload_part_bodies(
689 transfer_future, chunksize
690 )
691
692 for part_number, fileobj in part_iterator:
693 part_futures.append(
694 self._transfer_coordinator.submit(
695 request_executor,
696 UploadPartTask(
697 transfer_coordinator=self._transfer_coordinator,
698 main_kwargs={
699 'client': client,
700 'fileobj': fileobj,
701 'bucket': call_args.bucket,
702 'key': call_args.key,
703 'part_number': part_number,
704 'extra_args': extra_part_args,
705 },
706 pending_main_kwargs={
707 'upload_id': create_multipart_future
708 },
709 ),
710 tag=upload_part_tag,
711 )
712 )
713
714 complete_multipart_extra_args = self._extra_complete_multipart_args(
715 call_args.extra_args
716 )
717 # Submit the request to complete the multipart upload.
718 self._transfer_coordinator.submit(
719 request_executor,
720 CompleteMultipartUploadTask(
721 transfer_coordinator=self._transfer_coordinator,
722 main_kwargs={
723 'client': client,
724 'bucket': call_args.bucket,
725 'key': call_args.key,
726 'extra_args': complete_multipart_extra_args,
727 },
728 pending_main_kwargs={
729 'upload_id': create_multipart_future,
730 'parts': part_futures,
731 },
732 is_final=True,
733 ),
734 )
735
736 def _extra_upload_part_args(self, extra_args):
737 # Only the args in UPLOAD_PART_ARGS actually need to be passed
738 # onto the upload_part calls.
739 return get_filtered_dict(extra_args, self.UPLOAD_PART_ARGS)
740
741 def _extra_complete_multipart_args(self, extra_args):
742 return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS)
743
744 def _get_upload_task_tag(self, upload_input_manager, operation_name):
745 tag = None
746 if upload_input_manager.stores_body_in_memory(operation_name):
747 tag = IN_MEMORY_UPLOAD_TAG
748 return tag
749
750
751class PutObjectTask(Task):
752 """Task to do a nonmultipart upload"""
753
754 def _main(self, client, fileobj, bucket, key, extra_args):
755 """
756 :param client: The client to use when calling PutObject
757 :param fileobj: The file to upload.
758 :param bucket: The name of the bucket to upload to
759 :param key: The name of the key to upload to
760 :param extra_args: A dictionary of any extra arguments that may be
761 used in the upload.
762 """
763 with fileobj as body:
764 client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
765
766
767class UploadPartTask(Task):
768 """Task to upload a part in a multipart upload"""
769
770 def _main(
771 self, client, fileobj, bucket, key, upload_id, part_number, extra_args
772 ):
773 """
774 :param client: The client to use when calling PutObject
775 :param fileobj: The file to upload.
776 :param bucket: The name of the bucket to upload to
777 :param key: The name of the key to upload to
778 :param upload_id: The id of the upload
779 :param part_number: The number representing the part of the multipart
780 upload
781 :param extra_args: A dictionary of any extra arguments that may be
782 used in the upload.
783
784 :rtype: dict
785 :returns: A dictionary representing a part::
786
787 {'Etag': etag_value, 'PartNumber': part_number}
788
789 This value can be appended to a list to be used to complete
790 the multipart upload.
791 """
792 with fileobj as body:
793 response = client.upload_part(
794 Bucket=bucket,
795 Key=key,
796 UploadId=upload_id,
797 PartNumber=part_number,
798 Body=body,
799 **extra_args,
800 )
801 etag = response['ETag']
802 part_metadata = {'ETag': etag, 'PartNumber': part_number}
803 if 'ChecksumAlgorithm' in extra_args:
804 algorithm_name = extra_args['ChecksumAlgorithm'].upper()
805 checksum_member = f'Checksum{algorithm_name}'
806 if checksum_member in response:
807 part_metadata[checksum_member] = response[checksum_member]
808 return part_metadata