Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pymysql/converters.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import datetime
2from decimal import Decimal
3import re
4import time
6from .err import ProgrammingError
7from .constants import FIELD_TYPE
10def escape_item(val, charset, mapping=None):
11 if mapping is None:
12 mapping = encoders
13 encoder = mapping.get(type(val))
15 # Fallback to default when no encoder found
16 if not encoder:
17 try:
18 encoder = mapping[str]
19 except KeyError:
20 raise TypeError("no default type converter defined")
22 if encoder in (escape_dict, escape_sequence):
23 val = encoder(val, charset, mapping)
24 else:
25 val = encoder(val, mapping)
26 return val
29def escape_dict(val, charset, mapping=None):
30 raise TypeError("dict can not be used as parameter")
33def escape_sequence(val, charset, mapping=None):
34 n = []
35 for item in val:
36 quoted = escape_item(item, charset, mapping)
37 n.append(quoted)
38 return "(" + ",".join(n) + ")"
41def escape_set(val, charset, mapping=None):
42 return ",".join([escape_item(x, charset, mapping) for x in val])
45def escape_bool(value, mapping=None):
46 return str(int(value))
49def escape_int(value, mapping=None):
50 return str(value)
53def escape_float(value, mapping=None):
54 s = repr(value)
55 if s in ("inf", "-inf", "nan"):
56 raise ProgrammingError("%s can not be used with MySQL" % s)
57 if "e" not in s:
58 s += "e0"
59 return s
62_escape_table = [chr(x) for x in range(128)]
63_escape_table[0] = "\\0"
64_escape_table[ord("\\")] = "\\\\"
65_escape_table[ord("\n")] = "\\n"
66_escape_table[ord("\r")] = "\\r"
67_escape_table[ord("\032")] = "\\Z"
68_escape_table[ord('"')] = '\\"'
69_escape_table[ord("'")] = "\\'"
72def escape_string(value, mapping=None):
73 """escapes *value* without adding quote.
75 Value should be unicode
76 """
77 return value.translate(_escape_table)
80def escape_bytes_prefixed(value, mapping=None):
81 return "_binary'%s'" % value.decode("ascii", "surrogateescape").translate(
82 _escape_table
83 )
86def escape_bytes(value, mapping=None):
87 return "'%s'" % value.decode("ascii", "surrogateescape").translate(_escape_table)
90def escape_str(value, mapping=None):
91 return "'%s'" % escape_string(str(value), mapping)
94def escape_None(value, mapping=None):
95 return "NULL"
98def escape_timedelta(obj, mapping=None):
99 seconds = int(obj.seconds) % 60
100 minutes = int(obj.seconds // 60) % 60
101 hours = int(obj.seconds // 3600) % 24 + int(obj.days) * 24
102 if obj.microseconds:
103 fmt = "'{0:02d}:{1:02d}:{2:02d}.{3:06d}'"
104 else:
105 fmt = "'{0:02d}:{1:02d}:{2:02d}'"
106 return fmt.format(hours, minutes, seconds, obj.microseconds)
109def escape_time(obj, mapping=None):
110 if obj.microsecond:
111 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"
112 else:
113 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}'"
114 return fmt.format(obj)
117def escape_datetime(obj, mapping=None):
118 if obj.microsecond:
119 fmt = (
120 "'{0.year:04}-{0.month:02}-{0.day:02}"
121 + " {0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"
122 )
123 else:
124 fmt = "'{0.year:04}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}'"
125 return fmt.format(obj)
128def escape_date(obj, mapping=None):
129 fmt = "'{0.year:04}-{0.month:02}-{0.day:02}'"
130 return fmt.format(obj)
133def escape_struct_time(obj, mapping=None):
134 return escape_datetime(datetime.datetime(*obj[:6]))
137def Decimal2Literal(o, d):
138 return format(o, "f")
141def _convert_second_fraction(s):
142 if not s:
143 return 0
144 # Pad zeros to ensure the fraction length in microseconds
145 s = s.ljust(6, "0")
146 return int(s[:6])
149DATETIME_RE = re.compile(
150 r"(\d{1,4})-(\d{1,2})-(\d{1,2})[T ](\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?"
151)
154def convert_datetime(obj):
155 """Returns a DATETIME or TIMESTAMP column value as a datetime object:
157 >>> convert_datetime('2007-02-25 23:06:20')
158 datetime.datetime(2007, 2, 25, 23, 6, 20)
159 >>> convert_datetime('2007-02-25T23:06:20')
160 datetime.datetime(2007, 2, 25, 23, 6, 20)
162 Illegal values are returned as str:
164 >>> convert_datetime('2007-02-31T23:06:20')
165 '2007-02-31T23:06:20'
166 >>> convert_datetime('0000-00-00 00:00:00')
167 '0000-00-00 00:00:00'
168 """
169 if isinstance(obj, (bytes, bytearray)):
170 obj = obj.decode("ascii")
172 m = DATETIME_RE.match(obj)
173 if not m:
174 return convert_date(obj)
176 try:
177 groups = list(m.groups())
178 groups[-1] = _convert_second_fraction(groups[-1])
179 return datetime.datetime(*[int(x) for x in groups])
180 except ValueError:
181 return convert_date(obj)
184TIMEDELTA_RE = re.compile(r"(-)?(\d{1,3}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?")
187def convert_timedelta(obj):
188 """Returns a TIME column as a timedelta object:
190 >>> convert_timedelta('25:06:17')
191 datetime.timedelta(days=1, seconds=3977)
192 >>> convert_timedelta('-25:06:17')
193 datetime.timedelta(days=-2, seconds=82423)
195 Illegal values are returned as string:
197 >>> convert_timedelta('random crap')
198 'random crap'
200 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but
201 can accept values as (+|-)DD HH:MM:SS. The latter format will not
202 be parsed correctly by this function.
203 """
204 if isinstance(obj, (bytes, bytearray)):
205 obj = obj.decode("ascii")
207 m = TIMEDELTA_RE.match(obj)
208 if not m:
209 return obj
211 try:
212 groups = list(m.groups())
213 groups[-1] = _convert_second_fraction(groups[-1])
214 negate = -1 if groups[0] else 1
215 hours, minutes, seconds, microseconds = groups[1:]
217 tdelta = (
218 datetime.timedelta(
219 hours=int(hours),
220 minutes=int(minutes),
221 seconds=int(seconds),
222 microseconds=int(microseconds),
223 )
224 * negate
225 )
226 return tdelta
227 except ValueError:
228 return obj
231TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?")
234def convert_time(obj):
235 """Returns a TIME column as a time object:
237 >>> convert_time('15:06:17')
238 datetime.time(15, 6, 17)
240 Illegal values are returned as str:
242 >>> convert_time('-25:06:17')
243 '-25:06:17'
244 >>> convert_time('random crap')
245 'random crap'
247 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but
248 can accept values as (+|-)DD HH:MM:SS. The latter format will not
249 be parsed correctly by this function.
251 Also note that MySQL's TIME column corresponds more closely to
252 Python's timedelta and not time. However if you want TIME columns
253 to be treated as time-of-day and not a time offset, then you can
254 use set this function as the converter for FIELD_TYPE.TIME.
255 """
256 if isinstance(obj, (bytes, bytearray)):
257 obj = obj.decode("ascii")
259 m = TIME_RE.match(obj)
260 if not m:
261 return obj
263 try:
264 groups = list(m.groups())
265 groups[-1] = _convert_second_fraction(groups[-1])
266 hours, minutes, seconds, microseconds = groups
267 return datetime.time(
268 hour=int(hours),
269 minute=int(minutes),
270 second=int(seconds),
271 microsecond=int(microseconds),
272 )
273 except ValueError:
274 return obj
277def convert_date(obj):
278 """Returns a DATE column as a date object:
280 >>> convert_date('2007-02-26')
281 datetime.date(2007, 2, 26)
283 Illegal values are returned as str:
285 >>> convert_date('2007-02-31')
286 '2007-02-31'
287 >>> convert_date('0000-00-00')
288 '0000-00-00'
289 """
290 if isinstance(obj, (bytes, bytearray)):
291 obj = obj.decode("ascii")
292 try:
293 return datetime.date(*[int(x) for x in obj.split("-", 2)])
294 except ValueError:
295 return obj
298def through(x):
299 return x
302# def convert_bit(b):
303# b = "\x00" * (8 - len(b)) + b # pad w/ zeroes
304# return struct.unpack(">Q", b)[0]
305#
306# the snippet above is right, but MySQLdb doesn't process bits,
307# so we shouldn't either
308convert_bit = through
311encoders = {
312 bool: escape_bool,
313 int: escape_int,
314 float: escape_float,
315 str: escape_str,
316 bytes: escape_bytes,
317 tuple: escape_sequence,
318 list: escape_sequence,
319 set: escape_sequence,
320 frozenset: escape_sequence,
321 dict: escape_dict,
322 type(None): escape_None,
323 datetime.date: escape_date,
324 datetime.datetime: escape_datetime,
325 datetime.timedelta: escape_timedelta,
326 datetime.time: escape_time,
327 time.struct_time: escape_struct_time,
328 Decimal: Decimal2Literal,
329}
332decoders = {
333 FIELD_TYPE.BIT: convert_bit,
334 FIELD_TYPE.TINY: int,
335 FIELD_TYPE.SHORT: int,
336 FIELD_TYPE.LONG: int,
337 FIELD_TYPE.FLOAT: float,
338 FIELD_TYPE.DOUBLE: float,
339 FIELD_TYPE.LONGLONG: int,
340 FIELD_TYPE.INT24: int,
341 FIELD_TYPE.YEAR: int,
342 FIELD_TYPE.TIMESTAMP: convert_datetime,
343 FIELD_TYPE.DATETIME: convert_datetime,
344 FIELD_TYPE.TIME: convert_timedelta,
345 FIELD_TYPE.DATE: convert_date,
346 FIELD_TYPE.BLOB: through,
347 FIELD_TYPE.TINY_BLOB: through,
348 FIELD_TYPE.MEDIUM_BLOB: through,
349 FIELD_TYPE.LONG_BLOB: through,
350 FIELD_TYPE.STRING: through,
351 FIELD_TYPE.VAR_STRING: through,
352 FIELD_TYPE.VARCHAR: through,
353 FIELD_TYPE.DECIMAL: Decimal,
354 FIELD_TYPE.NEWDECIMAL: Decimal,
355}
358# for MySQLdb compatibility
359conversions = encoders.copy()
360conversions.update(decoders)
361Thing2Literal = escape_str
363# Run doctests with `pytest --doctest-modules pymysql/converters.py`