1import datetime
2from decimal import Decimal
3import re
4import time
5
6from .err import ProgrammingError
7from .constants import FIELD_TYPE
8
9
10def escape_item(val, charset, mapping=None):
11 if mapping is None:
12 mapping = encoders
13 encoder = mapping.get(type(val))
14
15 # Fallback to default when no encoder found
16 if not encoder:
17 try:
18 encoder = mapping[str]
19 except KeyError:
20 raise TypeError("no default type converter defined")
21
22 if encoder in (escape_dict, escape_sequence):
23 val = encoder(val, charset, mapping)
24 else:
25 val = encoder(val, mapping)
26 return val
27
28
29def escape_dict(val, charset, mapping=None):
30 raise TypeError("dict can not be used as parameter")
31
32
33def escape_sequence(val, charset, mapping=None):
34 n = []
35 for item in val:
36 quoted = escape_item(item, charset, mapping)
37 n.append(quoted)
38 return "(" + ",".join(n) + ")"
39
40
41def escape_set(val, charset, mapping=None):
42 return ",".join([escape_item(x, charset, mapping) for x in val])
43
44
45def escape_bool(value, mapping=None):
46 return str(int(value))
47
48
49def escape_int(value, mapping=None):
50 return str(value)
51
52
53def escape_float(value, mapping=None):
54 s = repr(value)
55 if s in ("inf", "-inf", "nan"):
56 raise ProgrammingError("%s can not be used with MySQL" % s)
57 if "e" not in s:
58 s += "e0"
59 return s
60
61
62_escape_table = [chr(x) for x in range(128)]
63_escape_table[0] = "\\0"
64_escape_table[ord("\\")] = "\\\\"
65_escape_table[ord("\n")] = "\\n"
66_escape_table[ord("\r")] = "\\r"
67_escape_table[ord("\032")] = "\\Z"
68_escape_table[ord('"')] = '\\"'
69_escape_table[ord("'")] = "\\'"
70
71
72def escape_string(value, mapping=None):
73 """escapes *value* without adding quote.
74
75 Value should be unicode
76 """
77 return value.translate(_escape_table)
78
79
80def escape_bytes_prefixed(value, mapping=None):
81 return "_binary'%s'" % value.decode("ascii", "surrogateescape").translate(
82 _escape_table
83 )
84
85
86def escape_bytes(value, mapping=None):
87 return "'%s'" % value.decode("ascii", "surrogateescape").translate(_escape_table)
88
89
90def escape_str(value, mapping=None):
91 return "'%s'" % escape_string(str(value), mapping)
92
93
94def escape_None(value, mapping=None):
95 return "NULL"
96
97
98def escape_timedelta(obj, mapping=None):
99 seconds = int(obj.seconds) % 60
100 minutes = int(obj.seconds // 60) % 60
101 hours = int(obj.seconds // 3600) % 24 + int(obj.days) * 24
102 if obj.microseconds:
103 fmt = "'{0:02d}:{1:02d}:{2:02d}.{3:06d}'"
104 else:
105 fmt = "'{0:02d}:{1:02d}:{2:02d}'"
106 return fmt.format(hours, minutes, seconds, obj.microseconds)
107
108
109def escape_time(obj, mapping=None):
110 if obj.microsecond:
111 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"
112 else:
113 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}'"
114 return fmt.format(obj)
115
116
117def escape_datetime(obj, mapping=None):
118 if obj.microsecond:
119 fmt = (
120 "'{0.year:04}-{0.month:02}-{0.day:02}"
121 + " {0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'"
122 )
123 else:
124 fmt = "'{0.year:04}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}'"
125 return fmt.format(obj)
126
127
128def escape_date(obj, mapping=None):
129 fmt = "'{0.year:04}-{0.month:02}-{0.day:02}'"
130 return fmt.format(obj)
131
132
133def escape_struct_time(obj, mapping=None):
134 return escape_datetime(datetime.datetime(*obj[:6]))
135
136
137def Decimal2Literal(o, d):
138 return format(o, "f")
139
140
141def _convert_second_fraction(s):
142 if not s:
143 return 0
144 # Pad zeros to ensure the fraction length in microseconds
145 s = s.ljust(6, "0")
146 return int(s[:6])
147
148
149DATETIME_RE = re.compile(
150 r"(\d{1,4})-(\d{1,2})-(\d{1,2})[T ](\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?"
151)
152
153
154def convert_datetime(obj):
155 """Returns a DATETIME or TIMESTAMP column value as a datetime object:
156
157 >>> convert_datetime('2007-02-25 23:06:20')
158 datetime.datetime(2007, 2, 25, 23, 6, 20)
159 >>> convert_datetime('2007-02-25T23:06:20')
160 datetime.datetime(2007, 2, 25, 23, 6, 20)
161
162 Illegal values are returned as str:
163
164 >>> convert_datetime('2007-02-31T23:06:20')
165 '2007-02-31T23:06:20'
166 >>> convert_datetime('0000-00-00 00:00:00')
167 '0000-00-00 00:00:00'
168 """
169 if isinstance(obj, (bytes, bytearray)):
170 obj = obj.decode("ascii")
171
172 m = DATETIME_RE.match(obj)
173 if not m:
174 return convert_date(obj)
175
176 try:
177 groups = list(m.groups())
178 groups[-1] = _convert_second_fraction(groups[-1])
179 return datetime.datetime(*[int(x) for x in groups])
180 except ValueError:
181 return convert_date(obj)
182
183
184TIMEDELTA_RE = re.compile(r"(-)?(\d{1,3}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?")
185
186
187def convert_timedelta(obj):
188 """Returns a TIME column as a timedelta object:
189
190 >>> convert_timedelta('25:06:17')
191 datetime.timedelta(days=1, seconds=3977)
192 >>> convert_timedelta('-25:06:17')
193 datetime.timedelta(days=-2, seconds=82423)
194
195 Illegal values are returned as string:
196
197 >>> convert_timedelta('random crap')
198 'random crap'
199
200 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but
201 can accept values as (+|-)DD HH:MM:SS. The latter format will not
202 be parsed correctly by this function.
203 """
204 if isinstance(obj, (bytes, bytearray)):
205 obj = obj.decode("ascii")
206
207 m = TIMEDELTA_RE.match(obj)
208 if not m:
209 return obj
210
211 try:
212 groups = list(m.groups())
213 groups[-1] = _convert_second_fraction(groups[-1])
214 negate = -1 if groups[0] else 1
215 hours, minutes, seconds, microseconds = groups[1:]
216
217 tdelta = (
218 datetime.timedelta(
219 hours=int(hours),
220 minutes=int(minutes),
221 seconds=int(seconds),
222 microseconds=int(microseconds),
223 )
224 * negate
225 )
226 return tdelta
227 except ValueError:
228 return obj
229
230
231TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?")
232
233
234def convert_time(obj):
235 """Returns a TIME column as a time object:
236
237 >>> convert_time('15:06:17')
238 datetime.time(15, 6, 17)
239
240 Illegal values are returned as str:
241
242 >>> convert_time('-25:06:17')
243 '-25:06:17'
244 >>> convert_time('random crap')
245 'random crap'
246
247 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but
248 can accept values as (+|-)DD HH:MM:SS. The latter format will not
249 be parsed correctly by this function.
250
251 Also note that MySQL's TIME column corresponds more closely to
252 Python's timedelta and not time. However if you want TIME columns
253 to be treated as time-of-day and not a time offset, then you can
254 use set this function as the converter for FIELD_TYPE.TIME.
255 """
256 if isinstance(obj, (bytes, bytearray)):
257 obj = obj.decode("ascii")
258
259 m = TIME_RE.match(obj)
260 if not m:
261 return obj
262
263 try:
264 groups = list(m.groups())
265 groups[-1] = _convert_second_fraction(groups[-1])
266 hours, minutes, seconds, microseconds = groups
267 return datetime.time(
268 hour=int(hours),
269 minute=int(minutes),
270 second=int(seconds),
271 microsecond=int(microseconds),
272 )
273 except ValueError:
274 return obj
275
276
277def convert_date(obj):
278 """Returns a DATE column as a date object:
279
280 >>> convert_date('2007-02-26')
281 datetime.date(2007, 2, 26)
282
283 Illegal values are returned as str:
284
285 >>> convert_date('2007-02-31')
286 '2007-02-31'
287 >>> convert_date('0000-00-00')
288 '0000-00-00'
289 """
290 if isinstance(obj, (bytes, bytearray)):
291 obj = obj.decode("ascii")
292 try:
293 return datetime.date(*[int(x) for x in obj.split("-", 2)])
294 except ValueError:
295 return obj
296
297
298def through(x):
299 return x
300
301
302# def convert_bit(b):
303# b = "\x00" * (8 - len(b)) + b # pad w/ zeroes
304# return struct.unpack(">Q", b)[0]
305#
306# the snippet above is right, but MySQLdb doesn't process bits,
307# so we shouldn't either
308convert_bit = through
309
310
311encoders = {
312 bool: escape_bool,
313 int: escape_int,
314 float: escape_float,
315 str: escape_str,
316 bytes: escape_bytes,
317 tuple: escape_sequence,
318 list: escape_sequence,
319 set: escape_sequence,
320 frozenset: escape_sequence,
321 dict: escape_dict,
322 type(None): escape_None,
323 datetime.date: escape_date,
324 datetime.datetime: escape_datetime,
325 datetime.timedelta: escape_timedelta,
326 datetime.time: escape_time,
327 time.struct_time: escape_struct_time,
328 Decimal: Decimal2Literal,
329}
330
331
332decoders = {
333 FIELD_TYPE.BIT: convert_bit,
334 FIELD_TYPE.TINY: int,
335 FIELD_TYPE.SHORT: int,
336 FIELD_TYPE.LONG: int,
337 FIELD_TYPE.FLOAT: float,
338 FIELD_TYPE.DOUBLE: float,
339 FIELD_TYPE.LONGLONG: int,
340 FIELD_TYPE.INT24: int,
341 FIELD_TYPE.YEAR: int,
342 FIELD_TYPE.TIMESTAMP: convert_datetime,
343 FIELD_TYPE.DATETIME: convert_datetime,
344 FIELD_TYPE.TIME: convert_timedelta,
345 FIELD_TYPE.DATE: convert_date,
346 FIELD_TYPE.BLOB: through,
347 FIELD_TYPE.TINY_BLOB: through,
348 FIELD_TYPE.MEDIUM_BLOB: through,
349 FIELD_TYPE.LONG_BLOB: through,
350 FIELD_TYPE.STRING: through,
351 FIELD_TYPE.VAR_STRING: through,
352 FIELD_TYPE.VARCHAR: through,
353 FIELD_TYPE.DECIMAL: Decimal,
354 FIELD_TYPE.NEWDECIMAL: Decimal,
355}
356
357
358# for MySQLdb compatibility
359conversions = encoders.copy()
360conversions.update(decoders)
361Thing2Literal = escape_str
362
363# Run doctests with `pytest --doctest-modules pymysql/converters.py`