Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/typing.py: 86%

37 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:16 +0000

1# from __future__ import annotations 

2 

3from datetime import datetime, timedelta 

4from typing import TYPE_CHECKING, Any, Awaitable, Iterable, TypeVar, Union 

5 

6from redis.compat import Protocol 

7 

8if TYPE_CHECKING: 

9 from redis.asyncio.connection import ConnectionPool as AsyncConnectionPool 

10 from redis.asyncio.connection import Encoder as AsyncEncoder 

11 from redis.connection import ConnectionPool, Encoder 

12 

13 

14Number = Union[int, float] 

15EncodedT = Union[bytes, memoryview] 

16DecodedT = Union[str, int, float] 

17EncodableT = Union[EncodedT, DecodedT] 

18AbsExpiryT = Union[int, datetime] 

19ExpiryT = Union[int, timedelta] 

20ZScoreBoundT = Union[float, str] # str allows for the [ or ( prefix 

21BitfieldOffsetT = Union[int, str] # str allows for #x syntax 

22_StringLikeT = Union[bytes, str, memoryview] 

23KeyT = _StringLikeT # Main redis key space 

24PatternT = _StringLikeT # Patterns matched against keys, fields etc 

25FieldT = EncodableT # Fields within hash tables, streams and geo commands 

26KeysT = Union[KeyT, Iterable[KeyT]] 

27ChannelT = _StringLikeT 

28GroupT = _StringLikeT # Consumer group 

29ConsumerT = _StringLikeT # Consumer name 

30StreamIdT = Union[int, _StringLikeT] 

31ScriptTextT = _StringLikeT 

32TimeoutSecT = Union[int, float, _StringLikeT] 

33# Mapping is not covariant in the key type, which prevents 

34# Mapping[_StringLikeT, X] from accepting arguments of type Dict[str, X]. Using 

35# a TypeVar instead of a Union allows mappings with any of the permitted types 

36# to be passed. Care is needed if there is more than one such mapping in a 

37# type signature because they will all be required to be the same key type. 

38AnyKeyT = TypeVar("AnyKeyT", bytes, str, memoryview) 

39AnyFieldT = TypeVar("AnyFieldT", bytes, str, memoryview) 

40AnyChannelT = TypeVar("AnyChannelT", bytes, str, memoryview) 

41 

42 

43class CommandsProtocol(Protocol): 

44 connection_pool: Union["AsyncConnectionPool", "ConnectionPool"] 

45 

46 def execute_command(self, *args, **options): 

47 ... 

48 

49 

50class ClusterCommandsProtocol(CommandsProtocol, Protocol): 

51 encoder: Union["AsyncEncoder", "Encoder"] 

52 

53 def execute_command(self, *args, **options) -> Union[Any, Awaitable]: 

54 ...