Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/typing.py: 92%
36 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
1# from __future__ import annotations
3from datetime import datetime, timedelta
4from typing import (
5 TYPE_CHECKING,
6 Any,
7 Awaitable,
8 Iterable,
9 Mapping,
10 Protocol,
11 Type,
12 TypeVar,
13 Union,
14)
16if TYPE_CHECKING:
17 from redis._parsers import Encoder
18 from redis.asyncio.connection import ConnectionPool as AsyncConnectionPool
19 from redis.connection import ConnectionPool
22Number = Union[int, float]
23EncodedT = Union[bytes, memoryview]
24DecodedT = Union[str, int, float]
25EncodableT = Union[EncodedT, DecodedT]
26AbsExpiryT = Union[int, datetime]
27ExpiryT = Union[int, timedelta]
28ZScoreBoundT = Union[float, str] # str allows for the [ or ( prefix
29BitfieldOffsetT = Union[int, str] # str allows for #x syntax
30_StringLikeT = Union[bytes, str, memoryview]
31KeyT = _StringLikeT # Main redis key space
32PatternT = _StringLikeT # Patterns matched against keys, fields etc
33FieldT = EncodableT # Fields within hash tables, streams and geo commands
34KeysT = Union[KeyT, Iterable[KeyT]]
35ResponseT = Union[Awaitable, Any]
36ChannelT = _StringLikeT
37GroupT = _StringLikeT # Consumer group
38ConsumerT = _StringLikeT # Consumer name
39StreamIdT = Union[int, _StringLikeT]
40ScriptTextT = _StringLikeT
41TimeoutSecT = Union[int, float, _StringLikeT]
42# Mapping is not covariant in the key type, which prevents
43# Mapping[_StringLikeT, X] from accepting arguments of type Dict[str, X]. Using
44# a TypeVar instead of a Union allows mappings with any of the permitted types
45# to be passed. Care is needed if there is more than one such mapping in a
46# type signature because they will all be required to be the same key type.
47AnyKeyT = TypeVar("AnyKeyT", bytes, str, memoryview)
48AnyFieldT = TypeVar("AnyFieldT", bytes, str, memoryview)
49AnyChannelT = TypeVar("AnyChannelT", bytes, str, memoryview)
51ExceptionMappingT = Mapping[str, Union[Type[Exception], Mapping[str, Type[Exception]]]]
54class CommandsProtocol(Protocol):
55 connection_pool: Union["AsyncConnectionPool", "ConnectionPool"]
57 def execute_command(self, *args, **options): ...
60class ClusterCommandsProtocol(CommandsProtocol, Protocol):
61 encoder: "Encoder"
63 def execute_command(self, *args, **options) -> Union[Any, Awaitable]: ...