Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pymysql/converters.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import datetime
2from decimal import Decimal
3import re
4import time
6from .err import ProgrammingError
7from .constants import FIELD_TYPE
10def escape_item(val, charset, mapping=None):
11 if mapping is None:
12 mapping = encoders
13 encoder = mapping.get(type(val))
15 # Fallback to default when no encoder found
16 if not encoder:
17 try:
18 encoder = mapping[str]
19 except KeyError:
20 raise TypeError("no default type converter defined")
22 if encoder in (escape_dict, escape_sequence):
23 val = encoder(val, charset, mapping)
24 else:
25 val = encoder(val, mapping)
26 return val
29def escape_dict(val, charset, mapping=None):
30 raise TypeError("dict can not be used as parameter")
33def escape_sequence(val, charset, mapping=None):
34 n = []
35 for item in val:
36 quoted = escape_item(item, charset, mapping)
37 n.append(quoted)
38 return "(" + ",".join(n) + ")"
41def escape_set(val, charset, mapping=None):
42 return ",".join([escape_item(x, charset, mapping) for x in val])
45def escape_bool(value, mapping=None):
46 return str(int(value))
49def escape_int(value, mapping=None):
50 return str(value)
53def escape_float(value, mapping=None):
54 s = repr(value)
55 if s in ("inf", "-inf", "nan"):
56 raise ProgrammingError("%s can not be used with MySQL" % s)
57 if "e" not in s:
58 s += "e0"
59 return s
62_escape_table = [chr(x) for x in range(128)]
63_escape_table[0] = "\\0"
64_escape_table[ord("\\")] = "\\\\"
65_escape_table[ord("\n")] = "\\n"
66_escape_table[ord("\r")] = "\\r"
67_escape_table[ord("\032")] = "\\Z"
68_escape_table[ord('"')] = '\\"'
69_escape_table[ord("'")] = "\\'"
72def escape_string(value, mapping=None):
73 """escapes *value* without adding quote.
75 Value should be unicode
76 """
77 return value.translate(_escape_table)
80def escape_bytes_prefixed(value, mapping=None):
81 return "_binary'%s'" % value.decode("ascii", "surrogateescape").translate(
82 _escape_table
83 )
86def escape_bytes(value, mapping=None):
87 return "'%s'" % value.decode("ascii", "surrogateescape").translate(_escape_table)
90def escape_str(value, mapping=None):
91 return "'%s'" % escape_string(str(value), mapping)
94def escape_None(value, mapping=None):
95 return "NULL"
98def escape_timedelta(obj, mapping=None):
99 seconds = int(obj.seconds) % 60
100 minutes = int(obj.seconds // 60) % 60
101 hours = int(obj.seconds // 3600) % 24 + int(obj.days) * 24
102 if obj.microseconds:
103 fmt = "'{0:02d}:{1:02d}:{2:02d}.{3:06d}'"
104 else:
105 fmt = "'{0:02d}:{1:02d}:{2:02d}'"
106 return fmt.format(hours, minutes, seconds, obj.microseconds)
109def escape_time(obj, mapping=None):
110 if obj.microsecond:
111 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"
112 else:
113 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}'"
114 return fmt.format(obj)
117def escape_datetime(obj, mapping=None):
118 if obj.microsecond:
119 fmt = (
120 "'{0.year:04}-{0.month:02}-{0.day:02}"
121 + " {0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"
122 )
123 else:
124 fmt = "'{0.year:04}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}'"
125 return fmt.format(obj)
128def escape_date(obj, mapping=None):
129 fmt = "'{0.year:04}-{0.month:02}-{0.day:02}'"
130 return fmt.format(obj)
133def escape_struct_time(obj, mapping=None):
134 return escape_datetime(datetime.datetime(*obj[:6]))
137def Decimal2Literal(o, d):
138 if not o.is_finite():
139 raise ProgrammingError("%s can not be used with MySQL" % str(o).lower())
140 return format(o, "f")
143def _convert_second_fraction(s):
144 if not s:
145 return 0
146 # Pad zeros to ensure the fraction length in microseconds
147 s = s.ljust(6, "0")
148 return int(s[:6])
151DATETIME_RE = re.compile(
152 r"(\d{1,4})-(\d{1,2})-(\d{1,2})[T ](\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?"
153)
156def convert_datetime(obj):
157 """Returns a DATETIME or TIMESTAMP column value as a datetime object:
159 >>> convert_datetime('2007-02-25 23:06:20')
160 datetime.datetime(2007, 2, 25, 23, 6, 20)
161 >>> convert_datetime('2007-02-25T23:06:20')
162 datetime.datetime(2007, 2, 25, 23, 6, 20)
164 Illegal values are returned as str:
166 >>> convert_datetime('2007-02-31T23:06:20')
167 '2007-02-31T23:06:20'
168 >>> convert_datetime('0000-00-00 00:00:00')
169 '0000-00-00 00:00:00'
170 """
171 if isinstance(obj, (bytes, bytearray)):
172 obj = obj.decode("ascii")
174 m = DATETIME_RE.match(obj)
175 if not m:
176 return convert_date(obj)
178 try:
179 groups = list(m.groups())
180 groups[-1] = _convert_second_fraction(groups[-1])
181 return datetime.datetime(*[int(x) for x in groups])
182 except ValueError:
183 return convert_date(obj)
186TIMEDELTA_RE = re.compile(r"(-)?(\d{1,3}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?")
189def convert_timedelta(obj):
190 """Returns a TIME column as a timedelta object:
192 >>> convert_timedelta('25:06:17')
193 datetime.timedelta(days=1, seconds=3977)
194 >>> convert_timedelta('-25:06:17')
195 datetime.timedelta(days=-2, seconds=82423)
197 Illegal values are returned as string:
199 >>> convert_timedelta('random crap')
200 'random crap'
202 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but
203 can accept values as (+|-)DD HH:MM:SS. The latter format will not
204 be parsed correctly by this function.
205 """
206 if isinstance(obj, (bytes, bytearray)):
207 obj = obj.decode("ascii")
209 m = TIMEDELTA_RE.match(obj)
210 if not m:
211 return obj
213 try:
214 groups = list(m.groups())
215 groups[-1] = _convert_second_fraction(groups[-1])
216 negate = -1 if groups[0] else 1
217 hours, minutes, seconds, microseconds = groups[1:]
219 tdelta = (
220 datetime.timedelta(
221 hours=int(hours),
222 minutes=int(minutes),
223 seconds=int(seconds),
224 microseconds=int(microseconds),
225 )
226 * negate
227 )
228 return tdelta
229 except ValueError:
230 return obj
233TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?")
236def convert_time(obj):
237 """Returns a TIME column as a time object:
239 >>> convert_time('15:06:17')
240 datetime.time(15, 6, 17)
242 Illegal values are returned as str:
244 >>> convert_time('-25:06:17')
245 '-25:06:17'
246 >>> convert_time('random crap')
247 'random crap'
249 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but
250 can accept values as (+|-)DD HH:MM:SS. The latter format will not
251 be parsed correctly by this function.
253 Also note that MySQL's TIME column corresponds more closely to
254 Python's timedelta and not time. However if you want TIME columns
255 to be treated as time-of-day and not a time offset, then you can
256 use set this function as the converter for FIELD_TYPE.TIME.
257 """
258 if isinstance(obj, (bytes, bytearray)):
259 obj = obj.decode("ascii")
261 m = TIME_RE.match(obj)
262 if not m:
263 return obj
265 try:
266 groups = list(m.groups())
267 groups[-1] = _convert_second_fraction(groups[-1])
268 hours, minutes, seconds, microseconds = groups
269 return datetime.time(
270 hour=int(hours),
271 minute=int(minutes),
272 second=int(seconds),
273 microsecond=int(microseconds),
274 )
275 except ValueError:
276 return obj
279def convert_date(obj):
280 """Returns a DATE column as a date object:
282 >>> convert_date('2007-02-26')
283 datetime.date(2007, 2, 26)
285 Illegal values are returned as str:
287 >>> convert_date('2007-02-31')
288 '2007-02-31'
289 >>> convert_date('0000-00-00')
290 '0000-00-00'
291 """
292 if isinstance(obj, (bytes, bytearray)):
293 obj = obj.decode("ascii")
294 try:
295 return datetime.date(*[int(x) for x in obj.split("-", 2)])
296 except ValueError:
297 return obj
300def through(x):
301 return x
304# def convert_bit(b):
305# b = "\x00" * (8 - len(b)) + b # pad w/ zeroes
306# return struct.unpack(">Q", b)[0]
307#
308# the snippet above is right, but MySQLdb doesn't process bits,
309# so we shouldn't either
310convert_bit = through
313encoders = {
314 bool: escape_bool,
315 int: escape_int,
316 float: escape_float,
317 str: escape_str,
318 bytes: escape_bytes,
319 tuple: escape_sequence,
320 list: escape_sequence,
321 set: escape_sequence,
322 frozenset: escape_sequence,
323 dict: escape_dict,
324 type(None): escape_None,
325 datetime.date: escape_date,
326 datetime.datetime: escape_datetime,
327 datetime.timedelta: escape_timedelta,
328 datetime.time: escape_time,
329 time.struct_time: escape_struct_time,
330 Decimal: Decimal2Literal,
331}
334decoders = {
335 FIELD_TYPE.BIT: convert_bit,
336 FIELD_TYPE.TINY: int,
337 FIELD_TYPE.SHORT: int,
338 FIELD_TYPE.LONG: int,
339 FIELD_TYPE.FLOAT: float,
340 FIELD_TYPE.DOUBLE: float,
341 FIELD_TYPE.LONGLONG: int,
342 FIELD_TYPE.INT24: int,
343 FIELD_TYPE.YEAR: int,
344 FIELD_TYPE.TIMESTAMP: convert_datetime,
345 FIELD_TYPE.DATETIME: convert_datetime,
346 FIELD_TYPE.TIME: convert_timedelta,
347 FIELD_TYPE.DATE: convert_date,
348 FIELD_TYPE.BLOB: through,
349 FIELD_TYPE.TINY_BLOB: through,
350 FIELD_TYPE.MEDIUM_BLOB: through,
351 FIELD_TYPE.LONG_BLOB: through,
352 FIELD_TYPE.STRING: through,
353 FIELD_TYPE.VAR_STRING: through,
354 FIELD_TYPE.VARCHAR: through,
355 FIELD_TYPE.DECIMAL: Decimal,
356 FIELD_TYPE.NEWDECIMAL: Decimal,
357}
360# for MySQLdb compatibility
361conversions = encoders.copy()
362conversions.update(decoders)
363Thing2Literal = escape_str
365# Run doctests with `pytest --doctest-modules pymysql/converters.py`