Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/_py_processors.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

67 statements  

1# engine/_py_processors.py 

2# Copyright (C) 2010-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# Copyright (C) 2010 Gaetan de Menten gdementen@gmail.com 

5# 

6# This module is part of SQLAlchemy and is released under 

7# the MIT License: https://www.opensource.org/licenses/mit-license.php 

8 

9"""defines generic type conversion functions, as used in bind and result 

10processors. 

11 

12They all share one common characteristic: None is passed through unchanged. 

13 

14""" 

15 

16from __future__ import annotations 

17 

18import datetime 

19from datetime import date as date_cls 

20from datetime import datetime as datetime_cls 

21from datetime import time as time_cls 

22from decimal import Decimal 

23import typing 

24from typing import Any 

25from typing import Callable 

26from typing import Optional 

27from typing import Type 

28from typing import TypeVar 

29from typing import Union 

30 

31_DT = TypeVar( 

32 "_DT", bound=Union[datetime.datetime, datetime.time, datetime.date] 

33) 

34 

35 

36def str_to_datetime_processor_factory( 

37 regexp: typing.Pattern[str], type_: Callable[..., _DT] 

38) -> Callable[[Optional[str]], Optional[_DT]]: 

39 rmatch = regexp.match 

40 # Even on python2.6 datetime.strptime is both slower than this code 

41 # and it does not support microseconds. 

42 has_named_groups = bool(regexp.groupindex) 

43 

44 def process(value: Optional[str]) -> Optional[_DT]: 

45 if value is None: 

46 return None 

47 else: 

48 try: 

49 m = rmatch(value) 

50 except TypeError as err: 

51 raise ValueError( 

52 "Couldn't parse %s string '%r' " 

53 "- value is not a string." % (type_.__name__, value) 

54 ) from err 

55 

56 if m is None: 

57 raise ValueError( 

58 "Couldn't parse %s string: " 

59 "'%s'" % (type_.__name__, value) 

60 ) 

61 if has_named_groups: 

62 groups = m.groupdict(0) 

63 return type_( 

64 **dict( 

65 list( 

66 zip( 

67 iter(groups.keys()), 

68 list(map(int, iter(groups.values()))), 

69 ) 

70 ) 

71 ) 

72 ) 

73 else: 

74 return type_(*list(map(int, m.groups(0)))) 

75 

76 return process 

77 

78 

79def to_decimal_processor_factory( 

80 target_class: Type[Decimal], scale: int 

81) -> Callable[[Optional[float]], Optional[Decimal]]: 

82 fstring = "%%.%df" % scale 

83 

84 def process(value: Optional[float]) -> Optional[Decimal]: 

85 if value is None: 

86 return None 

87 else: 

88 return target_class(fstring % value) 

89 

90 return process 

91 

92 

93def to_float(value: Optional[Union[int, float]]) -> Optional[float]: 

94 if value is None: 

95 return None 

96 else: 

97 return float(value) 

98 

99 

100def to_str(value: Optional[Any]) -> Optional[str]: 

101 if value is None: 

102 return None 

103 else: 

104 return str(value) 

105 

106 

107def int_to_boolean(value: Optional[int]) -> Optional[bool]: 

108 if value is None: 

109 return None 

110 else: 

111 return bool(value) 

112 

113 

114def str_to_datetime(value: Optional[str]) -> Optional[datetime.datetime]: 

115 if value is not None: 

116 dt_value = datetime_cls.fromisoformat(value) 

117 else: 

118 dt_value = None 

119 return dt_value 

120 

121 

122def str_to_time(value: Optional[str]) -> Optional[datetime.time]: 

123 if value is not None: 

124 dt_value = time_cls.fromisoformat(value) 

125 else: 

126 dt_value = None 

127 return dt_value 

128 

129 

130def str_to_date(value: Optional[str]) -> Optional[datetime.date]: 

131 if value is not None: 

132 dt_value = date_cls.fromisoformat(value) 

133 else: 

134 dt_value = None 

135 return dt_value