1# engine/_py_processors.py
2# Copyright (C) 2010-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4# Copyright (C) 2010 Gaetan de Menten gdementen@gmail.com
5#
6# This module is part of SQLAlchemy and is released under
7# the MIT License: https://www.opensource.org/licenses/mit-license.php
8
9"""defines generic type conversion functions, as used in bind and result
10processors.
11
12They all share one common characteristic: None is passed through unchanged.
13
14"""
15
16from __future__ import annotations
17
18import datetime
19from datetime import date as date_cls
20from datetime import datetime as datetime_cls
21from datetime import time as time_cls
22from decimal import Decimal
23import typing
24from typing import Any
25from typing import Callable
26from typing import Optional
27from typing import Type
28from typing import TypeVar
29from typing import Union
30
31
32_DT = TypeVar(
33 "_DT", bound=Union[datetime.datetime, datetime.time, datetime.date]
34)
35
36
37def str_to_datetime_processor_factory(
38 regexp: typing.Pattern[str], type_: Callable[..., _DT]
39) -> Callable[[Optional[str]], Optional[_DT]]:
40 rmatch = regexp.match
41 # Even on python2.6 datetime.strptime is both slower than this code
42 # and it does not support microseconds.
43 has_named_groups = bool(regexp.groupindex)
44
45 def process(value: Optional[str]) -> Optional[_DT]:
46 if value is None:
47 return None
48 else:
49 try:
50 m = rmatch(value)
51 except TypeError as err:
52 raise ValueError(
53 "Couldn't parse %s string '%r' "
54 "- value is not a string." % (type_.__name__, value)
55 ) from err
56
57 if m is None:
58 raise ValueError(
59 "Couldn't parse %s string: "
60 "'%s'" % (type_.__name__, value)
61 )
62 if has_named_groups:
63 groups = m.groupdict(0)
64 return type_(
65 **dict(
66 list(
67 zip(
68 iter(groups.keys()),
69 list(map(int, iter(groups.values()))),
70 )
71 )
72 )
73 )
74 else:
75 return type_(*list(map(int, m.groups(0))))
76
77 return process
78
79
80def to_decimal_processor_factory(
81 target_class: Type[Decimal], scale: int
82) -> Callable[[Optional[float]], Optional[Decimal]]:
83 fstring = "%%.%df" % scale
84
85 def process(value: Optional[float]) -> Optional[Decimal]:
86 if value is None:
87 return None
88 else:
89 return target_class(fstring % value)
90
91 return process
92
93
94def to_float(value: Optional[Union[int, float]]) -> Optional[float]:
95 if value is None:
96 return None
97 else:
98 return float(value)
99
100
101def to_str(value: Optional[Any]) -> Optional[str]:
102 if value is None:
103 return None
104 else:
105 return str(value)
106
107
108def int_to_boolean(value: Optional[int]) -> Optional[bool]:
109 if value is None:
110 return None
111 else:
112 return bool(value)
113
114
115def str_to_datetime(value: Optional[str]) -> Optional[datetime.datetime]:
116 if value is not None:
117 dt_value = datetime_cls.fromisoformat(value)
118 else:
119 dt_value = None
120 return dt_value
121
122
123def str_to_time(value: Optional[str]) -> Optional[datetime.time]:
124 if value is not None:
125 dt_value = time_cls.fromisoformat(value)
126 else:
127 dt_value = None
128 return dt_value
129
130
131def str_to_date(value: Optional[str]) -> Optional[datetime.date]:
132 if value is not None:
133 dt_value = date_cls.fromisoformat(value)
134 else:
135 dt_value = None
136 return dt_value