1# engine/_py_processors.py
2# Copyright (C) 2010-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4# Copyright (C) 2010 Gaetan de Menten gdementen@gmail.com
5#
6# This module is part of SQLAlchemy and is released under
7# the MIT License: https://www.opensource.org/licenses/mit-license.php
8
9"""defines generic type conversion functions, as used in bind and result
10processors.
11
12They all share one common characteristic: None is passed through unchanged.
13
14"""
15
16from __future__ import annotations
17
18import datetime
19from datetime import date as date_cls
20from datetime import datetime as datetime_cls
21from datetime import time as time_cls
22from decimal import Decimal
23import typing
24from typing import Any
25from typing import Callable
26from typing import Optional
27from typing import Type
28from typing import TypeVar
29from typing import Union
30
31_DT = TypeVar(
32 "_DT", bound=Union[datetime.datetime, datetime.time, datetime.date]
33)
34
35
36def str_to_datetime_processor_factory(
37 regexp: typing.Pattern[str], type_: Callable[..., _DT]
38) -> Callable[[Optional[str]], Optional[_DT]]:
39 rmatch = regexp.match
40 # Even on python2.6 datetime.strptime is both slower than this code
41 # and it does not support microseconds.
42 has_named_groups = bool(regexp.groupindex)
43
44 def process(value: Optional[str]) -> Optional[_DT]:
45 if value is None:
46 return None
47 else:
48 try:
49 m = rmatch(value)
50 except TypeError as err:
51 raise ValueError(
52 "Couldn't parse %s string '%r' "
53 "- value is not a string." % (type_.__name__, value)
54 ) from err
55
56 if m is None:
57 raise ValueError(
58 "Couldn't parse %s string: "
59 "'%s'" % (type_.__name__, value)
60 )
61 if has_named_groups:
62 groups = m.groupdict(0)
63 return type_(
64 **dict(
65 list(
66 zip(
67 iter(groups.keys()),
68 list(map(int, iter(groups.values()))),
69 )
70 )
71 )
72 )
73 else:
74 return type_(*list(map(int, m.groups(0))))
75
76 return process
77
78
79def to_decimal_processor_factory(
80 target_class: Type[Decimal], scale: int
81) -> Callable[[Optional[float]], Optional[Decimal]]:
82 fstring = "%%.%df" % scale
83
84 def process(value: Optional[float]) -> Optional[Decimal]:
85 if value is None:
86 return None
87 else:
88 return target_class(fstring % value)
89
90 return process
91
92
93def to_float(value: Optional[Union[int, float]]) -> Optional[float]:
94 if value is None:
95 return None
96 else:
97 return float(value)
98
99
100def to_str(value: Optional[Any]) -> Optional[str]:
101 if value is None:
102 return None
103 else:
104 return str(value)
105
106
107def int_to_boolean(value: Optional[int]) -> Optional[bool]:
108 if value is None:
109 return None
110 else:
111 return bool(value)
112
113
114def str_to_datetime(value: Optional[str]) -> Optional[datetime.datetime]:
115 if value is not None:
116 dt_value = datetime_cls.fromisoformat(value)
117 else:
118 dt_value = None
119 return dt_value
120
121
122def str_to_time(value: Optional[str]) -> Optional[datetime.time]:
123 if value is not None:
124 dt_value = time_cls.fromisoformat(value)
125 else:
126 dt_value = None
127 return dt_value
128
129
130def str_to_date(value: Optional[str]) -> Optional[datetime.date]:
131 if value is not None:
132 dt_value = date_cls.fromisoformat(value)
133 else:
134 dt_value = None
135 return dt_value