Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pymysql/converters.py: 31%

150 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:28 +0000

1import datetime 

2from decimal import Decimal 

3import re 

4import time 

5 

6from .err import ProgrammingError 

7from .constants import FIELD_TYPE 

8 

9 

10def escape_item(val, charset, mapping=None): 

11 if mapping is None: 

12 mapping = encoders 

13 encoder = mapping.get(type(val)) 

14 

15 # Fallback to default when no encoder found 

16 if not encoder: 

17 try: 

18 encoder = mapping[str] 

19 except KeyError: 

20 raise TypeError("no default type converter defined") 

21 

22 if encoder in (escape_dict, escape_sequence): 

23 val = encoder(val, charset, mapping) 

24 else: 

25 val = encoder(val, mapping) 

26 return val 

27 

28 

29def escape_dict(val, charset, mapping=None): 

30 n = {} 

31 for k, v in val.items(): 

32 quoted = escape_item(v, charset, mapping) 

33 n[k] = quoted 

34 return n 

35 

36 

37def escape_sequence(val, charset, mapping=None): 

38 n = [] 

39 for item in val: 

40 quoted = escape_item(item, charset, mapping) 

41 n.append(quoted) 

42 return "(" + ",".join(n) + ")" 

43 

44 

45def escape_set(val, charset, mapping=None): 

46 return ",".join([escape_item(x, charset, mapping) for x in val]) 

47 

48 

49def escape_bool(value, mapping=None): 

50 return str(int(value)) 

51 

52 

53def escape_int(value, mapping=None): 

54 return str(value) 

55 

56 

57def escape_float(value, mapping=None): 

58 s = repr(value) 

59 if s in ("inf", "-inf", "nan"): 

60 raise ProgrammingError("%s can not be used with MySQL" % s) 

61 if "e" not in s: 

62 s += "e0" 

63 return s 

64 

65 

66_escape_table = [chr(x) for x in range(128)] 

67_escape_table[0] = "\\0" 

68_escape_table[ord("\\")] = "\\\\" 

69_escape_table[ord("\n")] = "\\n" 

70_escape_table[ord("\r")] = "\\r" 

71_escape_table[ord("\032")] = "\\Z" 

72_escape_table[ord('"')] = '\\"' 

73_escape_table[ord("'")] = "\\'" 

74 

75 

76def escape_string(value, mapping=None): 

77 """escapes *value* without adding quote. 

78 

79 Value should be unicode 

80 """ 

81 return value.translate(_escape_table) 

82 

83 

84def escape_bytes_prefixed(value, mapping=None): 

85 return "_binary'%s'" % value.decode("ascii", "surrogateescape").translate( 

86 _escape_table 

87 ) 

88 

89 

90def escape_bytes(value, mapping=None): 

91 return "'%s'" % value.decode("ascii", "surrogateescape").translate(_escape_table) 

92 

93 

94def escape_str(value, mapping=None): 

95 return "'%s'" % escape_string(str(value), mapping) 

96 

97 

98def escape_None(value, mapping=None): 

99 return "NULL" 

100 

101 

102def escape_timedelta(obj, mapping=None): 

103 seconds = int(obj.seconds) % 60 

104 minutes = int(obj.seconds // 60) % 60 

105 hours = int(obj.seconds // 3600) % 24 + int(obj.days) * 24 

106 if obj.microseconds: 

107 fmt = "'{0:02d}:{1:02d}:{2:02d}.{3:06d}'" 

108 else: 

109 fmt = "'{0:02d}:{1:02d}:{2:02d}'" 

110 return fmt.format(hours, minutes, seconds, obj.microseconds) 

111 

112 

113def escape_time(obj, mapping=None): 

114 if obj.microsecond: 

115 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'" 

116 else: 

117 fmt = "'{0.hour:02}:{0.minute:02}:{0.second:02}'" 

118 return fmt.format(obj) 

119 

120 

121def escape_datetime(obj, mapping=None): 

122 if obj.microsecond: 

123 fmt = ( 

124 "'{0.year:04}-{0.month:02}-{0.day:02}" 

125 + " {0.hour:02}:{0.minute:02}:{0.second:02}.{0.microsecond:06}'" 

126 ) 

127 else: 

128 fmt = "'{0.year:04}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}'" 

129 return fmt.format(obj) 

130 

131 

132def escape_date(obj, mapping=None): 

133 fmt = "'{0.year:04}-{0.month:02}-{0.day:02}'" 

134 return fmt.format(obj) 

135 

136 

137def escape_struct_time(obj, mapping=None): 

138 return escape_datetime(datetime.datetime(*obj[:6])) 

139 

140 

141def Decimal2Literal(o, d): 

142 return format(o, "f") 

143 

144 

145def _convert_second_fraction(s): 

146 if not s: 

147 return 0 

148 # Pad zeros to ensure the fraction length in microseconds 

149 s = s.ljust(6, "0") 

150 return int(s[:6]) 

151 

152 

153DATETIME_RE = re.compile( 

154 r"(\d{1,4})-(\d{1,2})-(\d{1,2})[T ](\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?" 

155) 

156 

157 

158def convert_datetime(obj): 

159 """Returns a DATETIME or TIMESTAMP column value as a datetime object: 

160 

161 >>> convert_datetime('2007-02-25 23:06:20') 

162 datetime.datetime(2007, 2, 25, 23, 6, 20) 

163 >>> convert_datetime('2007-02-25T23:06:20') 

164 datetime.datetime(2007, 2, 25, 23, 6, 20) 

165 

166 Illegal values are returned as str: 

167 

168 >>> convert_datetime('2007-02-31T23:06:20') 

169 '2007-02-31T23:06:20' 

170 >>> convert_datetime('0000-00-00 00:00:00') 

171 '0000-00-00 00:00:00' 

172 """ 

173 if isinstance(obj, (bytes, bytearray)): 

174 obj = obj.decode("ascii") 

175 

176 m = DATETIME_RE.match(obj) 

177 if not m: 

178 return convert_date(obj) 

179 

180 try: 

181 groups = list(m.groups()) 

182 groups[-1] = _convert_second_fraction(groups[-1]) 

183 return datetime.datetime(*[int(x) for x in groups]) 

184 except ValueError: 

185 return convert_date(obj) 

186 

187 

188TIMEDELTA_RE = re.compile(r"(-)?(\d{1,3}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?") 

189 

190 

191def convert_timedelta(obj): 

192 """Returns a TIME column as a timedelta object: 

193 

194 >>> convert_timedelta('25:06:17') 

195 datetime.timedelta(days=1, seconds=3977) 

196 >>> convert_timedelta('-25:06:17') 

197 datetime.timedelta(days=-2, seconds=82423) 

198 

199 Illegal values are returned as string: 

200 

201 >>> convert_timedelta('random crap') 

202 'random crap' 

203 

204 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but 

205 can accept values as (+|-)DD HH:MM:SS. The latter format will not 

206 be parsed correctly by this function. 

207 """ 

208 if isinstance(obj, (bytes, bytearray)): 

209 obj = obj.decode("ascii") 

210 

211 m = TIMEDELTA_RE.match(obj) 

212 if not m: 

213 return obj 

214 

215 try: 

216 groups = list(m.groups()) 

217 groups[-1] = _convert_second_fraction(groups[-1]) 

218 negate = -1 if groups[0] else 1 

219 hours, minutes, seconds, microseconds = groups[1:] 

220 

221 tdelta = ( 

222 datetime.timedelta( 

223 hours=int(hours), 

224 minutes=int(minutes), 

225 seconds=int(seconds), 

226 microseconds=int(microseconds), 

227 ) 

228 * negate 

229 ) 

230 return tdelta 

231 except ValueError: 

232 return obj 

233 

234 

235TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})(?:.(\d{1,6}))?") 

236 

237 

238def convert_time(obj): 

239 """Returns a TIME column as a time object: 

240 

241 >>> convert_time('15:06:17') 

242 datetime.time(15, 6, 17) 

243 

244 Illegal values are returned as str: 

245 

246 >>> convert_time('-25:06:17') 

247 '-25:06:17' 

248 >>> convert_time('random crap') 

249 'random crap' 

250 

251 Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but 

252 can accept values as (+|-)DD HH:MM:SS. The latter format will not 

253 be parsed correctly by this function. 

254 

255 Also note that MySQL's TIME column corresponds more closely to 

256 Python's timedelta and not time. However if you want TIME columns 

257 to be treated as time-of-day and not a time offset, then you can 

258 use set this function as the converter for FIELD_TYPE.TIME. 

259 """ 

260 if isinstance(obj, (bytes, bytearray)): 

261 obj = obj.decode("ascii") 

262 

263 m = TIME_RE.match(obj) 

264 if not m: 

265 return obj 

266 

267 try: 

268 groups = list(m.groups()) 

269 groups[-1] = _convert_second_fraction(groups[-1]) 

270 hours, minutes, seconds, microseconds = groups 

271 return datetime.time( 

272 hour=int(hours), 

273 minute=int(minutes), 

274 second=int(seconds), 

275 microsecond=int(microseconds), 

276 ) 

277 except ValueError: 

278 return obj 

279 

280 

281def convert_date(obj): 

282 """Returns a DATE column as a date object: 

283 

284 >>> convert_date('2007-02-26') 

285 datetime.date(2007, 2, 26) 

286 

287 Illegal values are returned as str: 

288 

289 >>> convert_date('2007-02-31') 

290 '2007-02-31' 

291 >>> convert_date('0000-00-00') 

292 '0000-00-00' 

293 """ 

294 if isinstance(obj, (bytes, bytearray)): 

295 obj = obj.decode("ascii") 

296 try: 

297 return datetime.date(*[int(x) for x in obj.split("-", 2)]) 

298 except ValueError: 

299 return obj 

300 

301 

302def through(x): 

303 return x 

304 

305 

306# def convert_bit(b): 

307# b = "\x00" * (8 - len(b)) + b # pad w/ zeroes 

308# return struct.unpack(">Q", b)[0] 

309# 

310# the snippet above is right, but MySQLdb doesn't process bits, 

311# so we shouldn't either 

312convert_bit = through 

313 

314 

315encoders = { 

316 bool: escape_bool, 

317 int: escape_int, 

318 float: escape_float, 

319 str: escape_str, 

320 bytes: escape_bytes, 

321 tuple: escape_sequence, 

322 list: escape_sequence, 

323 set: escape_sequence, 

324 frozenset: escape_sequence, 

325 dict: escape_dict, 

326 type(None): escape_None, 

327 datetime.date: escape_date, 

328 datetime.datetime: escape_datetime, 

329 datetime.timedelta: escape_timedelta, 

330 datetime.time: escape_time, 

331 time.struct_time: escape_struct_time, 

332 Decimal: Decimal2Literal, 

333} 

334 

335 

336decoders = { 

337 FIELD_TYPE.BIT: convert_bit, 

338 FIELD_TYPE.TINY: int, 

339 FIELD_TYPE.SHORT: int, 

340 FIELD_TYPE.LONG: int, 

341 FIELD_TYPE.FLOAT: float, 

342 FIELD_TYPE.DOUBLE: float, 

343 FIELD_TYPE.LONGLONG: int, 

344 FIELD_TYPE.INT24: int, 

345 FIELD_TYPE.YEAR: int, 

346 FIELD_TYPE.TIMESTAMP: convert_datetime, 

347 FIELD_TYPE.DATETIME: convert_datetime, 

348 FIELD_TYPE.TIME: convert_timedelta, 

349 FIELD_TYPE.DATE: convert_date, 

350 FIELD_TYPE.BLOB: through, 

351 FIELD_TYPE.TINY_BLOB: through, 

352 FIELD_TYPE.MEDIUM_BLOB: through, 

353 FIELD_TYPE.LONG_BLOB: through, 

354 FIELD_TYPE.STRING: through, 

355 FIELD_TYPE.VAR_STRING: through, 

356 FIELD_TYPE.VARCHAR: through, 

357 FIELD_TYPE.DECIMAL: Decimal, 

358 FIELD_TYPE.NEWDECIMAL: Decimal, 

359} 

360 

361 

362# for MySQLdb compatibility 

363conversions = encoders.copy() 

364conversions.update(decoders) 

365Thing2Literal = escape_str 

366 

367# Run doctests with `pytest --doctest-modules pymysql/converters.py`