Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pymysql/converters.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

146 statements  

1import datetime 

2from decimal import Decimal 

3import re 

4import time 

5 

6from .err import ProgrammingError 

7from .constants import FIELD_TYPE 

8 

9 

10def escape_item(val, charset, mapping=None): 

11 if mapping is None: 

12 mapping = encoders 

13 encoder = mapping.get(type(val)) 

14 

15 # Fallback to default when no encoder found 

16 if not encoder: 

17 try: 

18 encoder = mapping[str] 

19 except KeyError: 

20 raise TypeError("no default type converter defined") 

21 

22 if encoder in (escape_dict, escape_sequence): 

23 val = encoder(val, charset, mapping) 

24 else: 

25 val = encoder(val, mapping) 

26 return val 

27 

28 

29def escape_dict(val, charset, mapping=None): 

30 raise TypeError("dict can not be used as parameter") 

31 

32 

33def escape_sequence(val, charset, mapping=None): 

34 n = [] 

35 for item in val: 

36 quoted = escape_item(item, charset, mapping) 

37 n.append(quoted) 

38 return "(" + ",".join(n) + ")" 

39 

40 

41def escape_set(val, charset, mapping=None): 

42 return ",".join([escape_item(x, charset, mapping) for x in val]) 

43 

44 

45def escape_bool(value, mapping=None): 

46 return str(int(value)) 

47 

48 

49def escape_int(value, mapping=None): 

50 return str(value) 

51 

52 

53def escape_float(value, mapping=None): 

54 s = repr(value) 

55 if s in ("inf", "-inf", "nan"): 

56 raise ProgrammingError("%s can not be used with MySQL" % s) 

57 if "e" not in s: 

58 s += "e0" 

59 return s 

60 

61 

62_escape_table = [chr(x) for x in range(128)] 

63_escape_table[0] = "\\0" 

64_escape_table[ord("\\")] = "\\\\" 

65_escape_table[ord("\n")] = "\\n" 

66_escape_table[ord("\r")] = "\\r" 

67_escape_table[ord("\032")] = "\\Z" 

68_escape_table[ord('"')] = '\\"' 

69_escape_table[ord("'")] = "\\'" 

70 

71 

72def escape_string(value, mapping=None): 

73 """escapes *value* without adding quote. 

74 

75 Value should be unicode 

76 """ 

77 return value.translate(_escape_table) 

78 

79 

80def escape_bytes_prefixed(value, mapping=None): 

81 return "_binary'%s'" % value.decode("ascii", "surrogateescape").translate( 

82 _escape_table 

83 ) 

84 

85 

86def escape_bytes(value, mapping=None): 

87 return "'%s'" % value.decode("ascii", "surrogateescape").translate(_escape_table) 

88 

89 

90def escape_str(value, mapping=None): 

91 return "'%s'" % escape_string(str(value), mapping) 

92 

93 

94def escape_None(value, mapping=None): 

95 return "NULL" 

96 

97 

98def escape_timedelta(obj, mapping=None): 

99 seconds = int(obj.seconds) % 60 

100 minutes = int(obj.seconds // 60) % 60 

101 hours = int(obj.seconds // 3600) % 24 + int(obj.days) * 24 

102 if obj.microseconds: 

103 fmt = "'{0:02d}:{1:02d}:{2:02d}.{3:06d}'" 

104 else: 

105 fmt = "'{0:02d}:{1:02d}:{2:02d}'" 

106 return fmt.format(hours, minutes, seconds, obj.microseconds) 

107 

108 

109def escape_time(obj, mapping=None): 

110 if obj.microsecond: 

111 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'" 

112 else: 

113 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}'" 

114 return fmt.format(obj) 

115 

116 

117def escape_datetime(obj, mapping=None): 

118 if obj.microsecond: 

119 fmt = ( 

120 "'{0.year:04}-{0.month:02}-{0.day:02}" 

121 + " {0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'" 

122 ) 

123 else: 

124 fmt = "'{0.year:04}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}'" 

125 return fmt.format(obj) 

126 

127 

128def escape_date(obj, mapping=None): 

129 fmt = "'{0.year:04}-{0.month:02}-{0.day:02}'" 

130 return fmt.format(obj) 

131 

132 

133def escape_struct_time(obj, mapping=None): 

134 return escape_datetime(datetime.datetime(*obj[:6])) 

135 

136 

137def Decimal2Literal(o, d): 

138 return format(o, "f") 

139 

140 

141def _convert_second_fraction(s): 

142 if not s: 

143 return 0 

144 # Pad zeros to ensure the fraction length in microseconds 

145 s = s.ljust(6, "0") 

146 return int(s[:6]) 

147 

148 

149DATETIME_RE = re.compile( 

150 r"(\d{1,4})-(\d{1,2})-(\d{1,2})[T ](\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?" 

151) 

152 

153 

154def convert_datetime(obj): 

155 """Returns a DATETIME or TIMESTAMP column value as a datetime object: 

156 

157 >>> convert_datetime('2007-02-25 23:06:20') 

158 datetime.datetime(2007, 2, 25, 23, 6, 20) 

159 >>> convert_datetime('2007-02-25T23:06:20') 

160 datetime.datetime(2007, 2, 25, 23, 6, 20) 

161 

162 Illegal values are returned as str: 

163 

164 >>> convert_datetime('2007-02-31T23:06:20') 

165 '2007-02-31T23:06:20' 

166 >>> convert_datetime('0000-00-00 00:00:00') 

167 '0000-00-00 00:00:00' 

168 """ 

169 if isinstance(obj, (bytes, bytearray)): 

170 obj = obj.decode("ascii") 

171 

172 m = DATETIME_RE.match(obj) 

173 if not m: 

174 return convert_date(obj) 

175 

176 try: 

177 groups = list(m.groups()) 

178 groups[-1] = _convert_second_fraction(groups[-1]) 

179 return datetime.datetime(*[int(x) for x in groups]) 

180 except ValueError: 

181 return convert_date(obj) 

182 

183 

184TIMEDELTA_RE = re.compile(r"(-)?(\d{1,3}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?") 

185 

186 

187def convert_timedelta(obj): 

188 """Returns a TIME column as a timedelta object: 

189 

190 >>> convert_timedelta('25:06:17') 

191 datetime.timedelta(days=1, seconds=3977) 

192 >>> convert_timedelta('-25:06:17') 

193 datetime.timedelta(days=-2, seconds=82423) 

194 

195 Illegal values are returned as string: 

196 

197 >>> convert_timedelta('random crap') 

198 'random crap' 

199 

200 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but 

201 can accept values as (+|-)DD HH:MM:SS. The latter format will not 

202 be parsed correctly by this function. 

203 """ 

204 if isinstance(obj, (bytes, bytearray)): 

205 obj = obj.decode("ascii") 

206 

207 m = TIMEDELTA_RE.match(obj) 

208 if not m: 

209 return obj 

210 

211 try: 

212 groups = list(m.groups()) 

213 groups[-1] = _convert_second_fraction(groups[-1]) 

214 negate = -1 if groups[0] else 1 

215 hours, minutes, seconds, microseconds = groups[1:] 

216 

217 tdelta = ( 

218 datetime.timedelta( 

219 hours=int(hours), 

220 minutes=int(minutes), 

221 seconds=int(seconds), 

222 microseconds=int(microseconds), 

223 ) 

224 * negate 

225 ) 

226 return tdelta 

227 except ValueError: 

228 return obj 

229 

230 

231TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?") 

232 

233 

234def convert_time(obj): 

235 """Returns a TIME column as a time object: 

236 

237 >>> convert_time('15:06:17') 

238 datetime.time(15, 6, 17) 

239 

240 Illegal values are returned as str: 

241 

242 >>> convert_time('-25:06:17') 

243 '-25:06:17' 

244 >>> convert_time('random crap') 

245 'random crap' 

246 

247 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but 

248 can accept values as (+|-)DD HH:MM:SS. The latter format will not 

249 be parsed correctly by this function. 

250 

251 Also note that MySQL's TIME column corresponds more closely to 

252 Python's timedelta and not time. However if you want TIME columns 

253 to be treated as time-of-day and not a time offset, then you can 

254 use set this function as the converter for FIELD_TYPE.TIME. 

255 """ 

256 if isinstance(obj, (bytes, bytearray)): 

257 obj = obj.decode("ascii") 

258 

259 m = TIME_RE.match(obj) 

260 if not m: 

261 return obj 

262 

263 try: 

264 groups = list(m.groups()) 

265 groups[-1] = _convert_second_fraction(groups[-1]) 

266 hours, minutes, seconds, microseconds = groups 

267 return datetime.time( 

268 hour=int(hours), 

269 minute=int(minutes), 

270 second=int(seconds), 

271 microsecond=int(microseconds), 

272 ) 

273 except ValueError: 

274 return obj 

275 

276 

277def convert_date(obj): 

278 """Returns a DATE column as a date object: 

279 

280 >>> convert_date('2007-02-26') 

281 datetime.date(2007, 2, 26) 

282 

283 Illegal values are returned as str: 

284 

285 >>> convert_date('2007-02-31') 

286 '2007-02-31' 

287 >>> convert_date('0000-00-00') 

288 '0000-00-00' 

289 """ 

290 if isinstance(obj, (bytes, bytearray)): 

291 obj = obj.decode("ascii") 

292 try: 

293 return datetime.date(*[int(x) for x in obj.split("-", 2)]) 

294 except ValueError: 

295 return obj 

296 

297 

298def through(x): 

299 return x 

300 

301 

302# def convert_bit(b): 

303# b = "\x00" * (8 - len(b)) + b # pad w/ zeroes 

304# return struct.unpack(">Q", b)[0] 

305# 

306# the snippet above is right, but MySQLdb doesn't process bits, 

307# so we shouldn't either 

308convert_bit = through 

309 

310 

311encoders = { 

312 bool: escape_bool, 

313 int: escape_int, 

314 float: escape_float, 

315 str: escape_str, 

316 bytes: escape_bytes, 

317 tuple: escape_sequence, 

318 list: escape_sequence, 

319 set: escape_sequence, 

320 frozenset: escape_sequence, 

321 dict: escape_dict, 

322 type(None): escape_None, 

323 datetime.date: escape_date, 

324 datetime.datetime: escape_datetime, 

325 datetime.timedelta: escape_timedelta, 

326 datetime.time: escape_time, 

327 time.struct_time: escape_struct_time, 

328 Decimal: Decimal2Literal, 

329} 

330 

331 

332decoders = { 

333 FIELD_TYPE.BIT: convert_bit, 

334 FIELD_TYPE.TINY: int, 

335 FIELD_TYPE.SHORT: int, 

336 FIELD_TYPE.LONG: int, 

337 FIELD_TYPE.FLOAT: float, 

338 FIELD_TYPE.DOUBLE: float, 

339 FIELD_TYPE.LONGLONG: int, 

340 FIELD_TYPE.INT24: int, 

341 FIELD_TYPE.YEAR: int, 

342 FIELD_TYPE.TIMESTAMP: convert_datetime, 

343 FIELD_TYPE.DATETIME: convert_datetime, 

344 FIELD_TYPE.TIME: convert_timedelta, 

345 FIELD_TYPE.DATE: convert_date, 

346 FIELD_TYPE.BLOB: through, 

347 FIELD_TYPE.TINY_BLOB: through, 

348 FIELD_TYPE.MEDIUM_BLOB: through, 

349 FIELD_TYPE.LONG_BLOB: through, 

350 FIELD_TYPE.STRING: through, 

351 FIELD_TYPE.VAR_STRING: through, 

352 FIELD_TYPE.VARCHAR: through, 

353 FIELD_TYPE.DECIMAL: Decimal, 

354 FIELD_TYPE.NEWDECIMAL: Decimal, 

355} 

356 

357 

358# for MySQLdb compatibility 

359conversions = encoders.copy() 

360conversions.update(decoders) 

361Thing2Literal = escape_str 

362 

363# Run doctests with `pytest --doctest-modules pymysql/converters.py`