Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pymysql/converters.py: 31%
150 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:28 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:28 +0000
1import datetime
2from decimal import Decimal
3import re
4import time
6from .err import ProgrammingError
7from .constants import FIELD_TYPE
10def escape_item(val, charset, mapping=None):
11 if mapping is None:
12 mapping = encoders
13 encoder = mapping.get(type(val))
15 # Fallback to default when no encoder found
16 if not encoder:
17 try:
18 encoder = mapping[str]
19 except KeyError:
20 raise TypeError("no default type converter defined")
22 if encoder in (escape_dict, escape_sequence):
23 val = encoder(val, charset, mapping)
24 else:
25 val = encoder(val, mapping)
26 return val
29def escape_dict(val, charset, mapping=None):
30 n = {}
31 for k, v in val.items():
32 quoted = escape_item(v, charset, mapping)
33 n[k] = quoted
34 return n
37def escape_sequence(val, charset, mapping=None):
38 n = []
39 for item in val:
40 quoted = escape_item(item, charset, mapping)
41 n.append(quoted)
42 return "(" + ",".join(n) + ")"
45def escape_set(val, charset, mapping=None):
46 return ",".join([escape_item(x, charset, mapping) for x in val])
49def escape_bool(value, mapping=None):
50 return str(int(value))
53def escape_int(value, mapping=None):
54 return str(value)
57def escape_float(value, mapping=None):
58 s = repr(value)
59 if s in ("inf", "-inf", "nan"):
60 raise ProgrammingError("%s can not be used with MySQL" % s)
61 if "e" not in s:
62 s += "e0"
63 return s
66_escape_table = [chr(x) for x in range(128)]
67_escape_table[0] = "\\0"
68_escape_table[ord("\\")] = "\\\\"
69_escape_table[ord("\n")] = "\\n"
70_escape_table[ord("\r")] = "\\r"
71_escape_table[ord("\032")] = "\\Z"
72_escape_table[ord('"')] = '\\"'
73_escape_table[ord("'")] = "\\'"
76def escape_string(value, mapping=None):
77 """escapes *value* without adding quote.
79 Value should be unicode
80 """
81 return value.translate(_escape_table)
84def escape_bytes_prefixed(value, mapping=None):
85 return "_binary'%s'" % value.decode("ascii", "surrogateescape").translate(
86 _escape_table
87 )
90def escape_bytes(value, mapping=None):
91 return "'%s'" % value.decode("ascii", "surrogateescape").translate(_escape_table)
94def escape_str(value, mapping=None):
95 return "'%s'" % escape_string(str(value), mapping)
98def escape_None(value, mapping=None):
99 return "NULL"
102def escape_timedelta(obj, mapping=None):
103 seconds = int(obj.seconds) % 60
104 minutes = int(obj.seconds // 60) % 60
105 hours = int(obj.seconds // 3600) % 24 + int(obj.days) * 24
106 if obj.microseconds:
107 fmt = "'{0:02d}:{1:02d}:{2:02d}.{3:06d}'"
108 else:
109 fmt = "'{0:02d}:{1:02d}:{2:02d}'"
110 return fmt.format(hours, minutes, seconds, obj.microseconds)
113def escape_time(obj, mapping=None):
114 if obj.microsecond:
115 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"
116 else:
117 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}'"
118 return fmt.format(obj)
121def escape_datetime(obj, mapping=None):
122 if obj.microsecond:
123 fmt = (
124 "'{0.year:04}-{0.month:02}-{0.day:02}"
125 + " {0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"
126 )
127 else:
128 fmt = "'{0.year:04}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}'"
129 return fmt.format(obj)
132def escape_date(obj, mapping=None):
133 fmt = "'{0.year:04}-{0.month:02}-{0.day:02}'"
134 return fmt.format(obj)
137def escape_struct_time(obj, mapping=None):
138 return escape_datetime(datetime.datetime(*obj[:6]))
141def Decimal2Literal(o, d):
142 return format(o, "f")
145def _convert_second_fraction(s):
146 if not s:
147 return 0
148 # Pad zeros to ensure the fraction length in microseconds
149 s = s.ljust(6, "0")
150 return int(s[:6])
153DATETIME_RE = re.compile(
154 r"(\d{1,4})-(\d{1,2})-(\d{1,2})[T ](\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?"
155)
158def convert_datetime(obj):
159 """Returns a DATETIME or TIMESTAMP column value as a datetime object:
161 >>> convert_datetime('2007-02-25 23:06:20')
162 datetime.datetime(2007, 2, 25, 23, 6, 20)
163 >>> convert_datetime('2007-02-25T23:06:20')
164 datetime.datetime(2007, 2, 25, 23, 6, 20)
166 Illegal values are returned as str:
168 >>> convert_datetime('2007-02-31T23:06:20')
169 '2007-02-31T23:06:20'
170 >>> convert_datetime('0000-00-00 00:00:00')
171 '0000-00-00 00:00:00'
172 """
173 if isinstance(obj, (bytes, bytearray)):
174 obj = obj.decode("ascii")
176 m = DATETIME_RE.match(obj)
177 if not m:
178 return convert_date(obj)
180 try:
181 groups = list(m.groups())
182 groups[-1] = _convert_second_fraction(groups[-1])
183 return datetime.datetime(*[int(x) for x in groups])
184 except ValueError:
185 return convert_date(obj)
188TIMEDELTA_RE = re.compile(r"(-)?(\d{1,3}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?")
191def convert_timedelta(obj):
192 """Returns a TIME column as a timedelta object:
194 >>> convert_timedelta('25:06:17')
195 datetime.timedelta(days=1, seconds=3977)
196 >>> convert_timedelta('-25:06:17')
197 datetime.timedelta(days=-2, seconds=82423)
199 Illegal values are returned as string:
201 >>> convert_timedelta('random crap')
202 'random crap'
204 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but
205 can accept values as (+|-)DD HH:MM:SS. The latter format will not
206 be parsed correctly by this function.
207 """
208 if isinstance(obj, (bytes, bytearray)):
209 obj = obj.decode("ascii")
211 m = TIMEDELTA_RE.match(obj)
212 if not m:
213 return obj
215 try:
216 groups = list(m.groups())
217 groups[-1] = _convert_second_fraction(groups[-1])
218 negate = -1 if groups[0] else 1
219 hours, minutes, seconds, microseconds = groups[1:]
221 tdelta = (
222 datetime.timedelta(
223 hours=int(hours),
224 minutes=int(minutes),
225 seconds=int(seconds),
226 microseconds=int(microseconds),
227 )
228 * negate
229 )
230 return tdelta
231 except ValueError:
232 return obj
235TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?")
238def convert_time(obj):
239 """Returns a TIME column as a time object:
241 >>> convert_time('15:06:17')
242 datetime.time(15, 6, 17)
244 Illegal values are returned as str:
246 >>> convert_time('-25:06:17')
247 '-25:06:17'
248 >>> convert_time('random crap')
249 'random crap'
251 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but
252 can accept values as (+|-)DD HH:MM:SS. The latter format will not
253 be parsed correctly by this function.
255 Also note that MySQL's TIME column corresponds more closely to
256 Python's timedelta and not time. However if you want TIME columns
257 to be treated as time-of-day and not a time offset, then you can
258 use set this function as the converter for FIELD_TYPE.TIME.
259 """
260 if isinstance(obj, (bytes, bytearray)):
261 obj = obj.decode("ascii")
263 m = TIME_RE.match(obj)
264 if not m:
265 return obj
267 try:
268 groups = list(m.groups())
269 groups[-1] = _convert_second_fraction(groups[-1])
270 hours, minutes, seconds, microseconds = groups
271 return datetime.time(
272 hour=int(hours),
273 minute=int(minutes),
274 second=int(seconds),
275 microsecond=int(microseconds),
276 )
277 except ValueError:
278 return obj
281def convert_date(obj):
282 """Returns a DATE column as a date object:
284 >>> convert_date('2007-02-26')
285 datetime.date(2007, 2, 26)
287 Illegal values are returned as str:
289 >>> convert_date('2007-02-31')
290 '2007-02-31'
291 >>> convert_date('0000-00-00')
292 '0000-00-00'
293 """
294 if isinstance(obj, (bytes, bytearray)):
295 obj = obj.decode("ascii")
296 try:
297 return datetime.date(*[int(x) for x in obj.split("-", 2)])
298 except ValueError:
299 return obj
302def through(x):
303 return x
306# def convert_bit(b):
307# b = "\x00" * (8 - len(b)) + b # pad w/ zeroes
308# return struct.unpack(">Q", b)[0]
309#
310# the snippet above is right, but MySQLdb doesn't process bits,
311# so we shouldn't either
312convert_bit = through
315encoders = {
316 bool: escape_bool,
317 int: escape_int,
318 float: escape_float,
319 str: escape_str,
320 bytes: escape_bytes,
321 tuple: escape_sequence,
322 list: escape_sequence,
323 set: escape_sequence,
324 frozenset: escape_sequence,
325 dict: escape_dict,
326 type(None): escape_None,
327 datetime.date: escape_date,
328 datetime.datetime: escape_datetime,
329 datetime.timedelta: escape_timedelta,
330 datetime.time: escape_time,
331 time.struct_time: escape_struct_time,
332 Decimal: Decimal2Literal,
333}
336decoders = {
337 FIELD_TYPE.BIT: convert_bit,
338 FIELD_TYPE.TINY: int,
339 FIELD_TYPE.SHORT: int,
340 FIELD_TYPE.LONG: int,
341 FIELD_TYPE.FLOAT: float,
342 FIELD_TYPE.DOUBLE: float,
343 FIELD_TYPE.LONGLONG: int,
344 FIELD_TYPE.INT24: int,
345 FIELD_TYPE.YEAR: int,
346 FIELD_TYPE.TIMESTAMP: convert_datetime,
347 FIELD_TYPE.DATETIME: convert_datetime,
348 FIELD_TYPE.TIME: convert_timedelta,
349 FIELD_TYPE.DATE: convert_date,
350 FIELD_TYPE.BLOB: through,
351 FIELD_TYPE.TINY_BLOB: through,
352 FIELD_TYPE.MEDIUM_BLOB: through,
353 FIELD_TYPE.LONG_BLOB: through,
354 FIELD_TYPE.STRING: through,
355 FIELD_TYPE.VAR_STRING: through,
356 FIELD_TYPE.VARCHAR: through,
357 FIELD_TYPE.DECIMAL: Decimal,
358 FIELD_TYPE.NEWDECIMAL: Decimal,
359}
362# for MySQLdb compatibility
363conversions = encoders.copy()
364conversions.update(decoders)
365Thing2Literal = escape_str
367# Run doctests with `pytest --doctest-modules pymysql/converters.py`