Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/payload_streamer.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

33 statements  

1""" 

2Payload implementation for coroutines as data provider. 

3 

4As a simple case, you can upload data from file:: 

5 

6 @aiohttp.streamer 

7 async def file_sender(writer, file_name=None): 

8 with open(file_name, 'rb') as f: 

9 chunk = f.read(2**16) 

10 while chunk: 

11 await writer.write(chunk) 

12 

13 chunk = f.read(2**16) 

14 

15Then you can use `file_sender` like this: 

16 

17 async with session.post('http://httpbin.org/post', 

18 data=file_sender(file_name='huge_file')) as resp: 

19 print(await resp.text()) 

20 

21..note:: Coroutine must accept `writer` as first argument 

22 

23""" 

24 

25import types 

26import warnings 

27from collections.abc import Awaitable, Callable 

28from typing import Any 

29 

30from .abc import AbstractStreamWriter 

31from .payload import Payload, payload_type 

32 

33__all__ = ("streamer",) 

34 

35 

36class _stream_wrapper: 

37 def __init__( 

38 self, 

39 coro: Callable[..., Awaitable[None]], 

40 args: tuple[Any, ...], 

41 kwargs: dict[str, Any], 

42 ) -> None: 

43 self.coro = types.coroutine(coro) 

44 self.args = args 

45 self.kwargs = kwargs 

46 

47 async def __call__(self, writer: AbstractStreamWriter) -> None: 

48 await self.coro(writer, *self.args, **self.kwargs) 

49 

50 

51class streamer: 

52 def __init__(self, coro: Callable[..., Awaitable[None]]) -> None: 

53 warnings.warn( 

54 "@streamer is deprecated, use async generators instead", 

55 DeprecationWarning, 

56 stacklevel=2, 

57 ) 

58 self.coro = coro 

59 

60 def __call__(self, *args: Any, **kwargs: Any) -> _stream_wrapper: 

61 return _stream_wrapper(self.coro, args, kwargs) 

62 

63 

64@payload_type(_stream_wrapper) 

65class StreamWrapperPayload(Payload): 

66 async def write(self, writer: AbstractStreamWriter) -> None: 

67 await self._value(writer) 

68 

69 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

70 raise TypeError("Unable to decode.") 

71 

72 

73@payload_type(streamer) 

74class StreamPayload(StreamWrapperPayload): 

75 def __init__(self, value: Any, *args: Any, **kwargs: Any) -> None: 

76 super().__init__(value(), *args, **kwargs) 

77 

78 async def write(self, writer: AbstractStreamWriter) -> None: 

79 await self._value(writer)