1# engine/processors.py
2# Copyright (C) 2010-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4# Copyright (C) 2010 Gaetan de Menten gdementen@gmail.com
5#
6# This module is part of SQLAlchemy and is released under
7# the MIT License: https://www.opensource.org/licenses/mit-license.php
8
9"""defines generic type conversion functions, as used in bind and result
10processors.
11
12They all share one common characteristic: None is passed through unchanged.
13
14"""
15from __future__ import annotations
16
17import datetime
18from typing import Callable
19from typing import Optional
20from typing import Pattern
21from typing import TypeVar
22from typing import Union
23
24from ._processors_cy import int_to_boolean as int_to_boolean # noqa: F401
25from ._processors_cy import str_to_date as str_to_date # noqa: F401
26from ._processors_cy import str_to_datetime as str_to_datetime # noqa: F401
27from ._processors_cy import str_to_time as str_to_time # noqa: F401
28from ._processors_cy import to_float as to_float # noqa: F401
29from ._processors_cy import to_str as to_str # noqa: F401
30
31if True:
32 from ._processors_cy import ( # noqa: F401
33 to_decimal_processor_factory as to_decimal_processor_factory,
34 )
35
36
37_DT = TypeVar(
38 "_DT", bound=Union[datetime.datetime, datetime.time, datetime.date]
39)
40
41
42def str_to_datetime_processor_factory(
43 regexp: Pattern[str], type_: Callable[..., _DT]
44) -> Callable[[Optional[str]], Optional[_DT]]:
45 rmatch = regexp.match
46 # Even on python2.6 datetime.strptime is both slower than this code
47 # and it does not support microseconds.
48 has_named_groups = bool(regexp.groupindex)
49
50 def process(value: Optional[str]) -> Optional[_DT]:
51 if value is None:
52 return None
53 else:
54 try:
55 m = rmatch(value)
56 except TypeError as err:
57 raise ValueError(
58 "Couldn't parse %s string '%r' "
59 "- value is not a string." % (type_.__name__, value)
60 ) from err
61
62 if m is None:
63 raise ValueError(
64 "Couldn't parse %s string: "
65 "'%s'" % (type_.__name__, value)
66 )
67 if has_named_groups:
68 groups = m.groupdict(0)
69 return type_(
70 **dict(
71 list(
72 zip(
73 iter(groups.keys()),
74 list(map(int, iter(groups.values()))),
75 )
76 )
77 )
78 )
79 else:
80 return type_(*list(map(int, m.groups(0))))
81
82 return process