Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pymysql/cursors.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

272 statements  

1import re 

2from . import err 

3 

4 

5#: Regular expression for :meth:`Cursor.executemany`. 

6#: executemany only supports simple bulk insert. 

7#: You can use it to load large dataset. 

8RE_INSERT_VALUES = re.compile( 

9 r"\s*((?:INSERT|REPLACE)\b.+\bVALUES?\s*)" 

10 + r"(\(\s*(?:%s|%\([^)]+\)s)\s*(?:,\s*(?:%s|%\([^)]+\)s)\s*)*\))" 

11 + r'(\s*(?:AS\s+(?:`[^`]+`|"[^"]+"|[0-9A-Za-z_$]+)\s*' 

12 + r'(?:\(\s*(?:`[^`]+`|"[^"]+"|[0-9A-Za-z_$]+)\s*' 

13 + r'(?:,\s*(?:`[^`]+`|"[^"]+"|[0-9A-Za-z_$]+)\s*)*\))?\s*)?' 

14 + r"(?:ON DUPLICATE.*)?);?\s*\Z", 

15 re.IGNORECASE | re.DOTALL, 

16) 

17 

18 

19def _backquote_escape(s): 

20 return s.replace("`", "``") 

21 

22 

23class Cursor: 

24 """ 

25 This is the object used to interact with the database. 

26 

27 Do not create an instance of a Cursor yourself. Call 

28 connections.Connection.cursor(). 

29 

30 See `Cursor <https://www.python.org/dev/peps/pep-0249/#cursor-objects>`_ in 

31 the specification. 

32 """ 

33 

34 #: Max statement size which :meth:`executemany` generates. 

35 #: 

36 #: Max size of allowed statement is max_allowed_packet - packet_header_size. 

37 #: Default value of max_allowed_packet is 1048576. 

38 max_stmt_length = 1024000 

39 

40 def __init__(self, connection): 

41 self.connection = connection 

42 self.warning_count = 0 

43 self.description = None 

44 self.rownumber = 0 

45 self.rowcount = -1 

46 self.arraysize = 1 

47 self._executed = None 

48 self._result = None 

49 self._rows = None 

50 

51 def close(self): 

52 """ 

53 Closing a cursor just exhausts all remaining data. 

54 """ 

55 conn = self.connection 

56 if conn is None: 

57 return 

58 try: 

59 while self.nextset(): 

60 pass 

61 finally: 

62 self.connection = None 

63 

64 def __enter__(self): 

65 return self 

66 

67 def __exit__(self, *exc_info): 

68 del exc_info 

69 self.close() 

70 

71 def _get_db(self): 

72 if not self.connection: 

73 raise err.ProgrammingError("Cursor closed") 

74 return self.connection 

75 

76 def _check_executed(self): 

77 if not self._executed: 

78 raise err.ProgrammingError("execute() first") 

79 

80 def _conv_row(self, row): 

81 return row 

82 

83 def setinputsizes(self, *args): 

84 """Does nothing, required by DB API.""" 

85 

86 def setoutputsizes(self, *args): 

87 """Does nothing, required by DB API.""" 

88 

89 def _nextset(self, unbuffered=False): 

90 """Get the next query set.""" 

91 conn = self._get_db() 

92 current_result = self._result 

93 if current_result is None or current_result is not conn._result: 

94 return None 

95 if not current_result.has_next: 

96 return None 

97 self._result = None 

98 self._clear_result() 

99 conn.next_result(unbuffered=unbuffered) 

100 self._do_get_result() 

101 return True 

102 

103 def nextset(self): 

104 return self._nextset(False) 

105 

106 def _escape_args(self, args, conn): 

107 if isinstance(args, (tuple, list)): 

108 return tuple(conn.literal(arg) for arg in args) 

109 elif isinstance(args, dict): 

110 return {key: conn.literal(val) for (key, val) in args.items()} 

111 else: 

112 # If it's not a dictionary let's try escaping it anyways. 

113 # Worst case it will throw a Value error 

114 return conn.escape(args) 

115 

116 def mogrify(self, query, args=None): 

117 """ 

118 Returns the exact string that would be sent to the database by calling the 

119 execute() method. 

120 

121 :param query: Query to mogrify. 

122 :type query: str 

123 

124 :param args: Parameters used with query. (optional) 

125 :type args: tuple, list or dict 

126 

127 :return: The query with argument binding applied. 

128 :rtype: str 

129 

130 This method follows the extension to the DB API 2.0 followed by Psycopg. 

131 """ 

132 conn = self._get_db() 

133 

134 if args is not None: 

135 query = query % self._escape_args(args, conn) 

136 

137 return query 

138 

139 def execute(self, query, args=None): 

140 """Execute a query. 

141 

142 :param query: Query to execute. 

143 :type query: str 

144 

145 :param args: Parameters used with query. (optional) 

146 :type args: tuple, list or dict 

147 

148 :return: Number of affected rows. 

149 :rtype: int 

150 

151 If args is a list or tuple, %s can be used as a placeholder in the query. 

152 If args is a dict, %(name)s can be used as a placeholder in the query. 

153 """ 

154 while self.nextset(): 

155 pass 

156 

157 query = self.mogrify(query, args) 

158 

159 result = self._query(query) 

160 self._executed = query 

161 return result 

162 

163 def executemany(self, query, args): 

164 """Run several data against one query. 

165 

166 :param query: Query to execute. 

167 :type query: str 

168 

169 :param args: Sequence of sequences or mappings. It is used as parameter. 

170 :type args: tuple or list 

171 

172 :return: Number of rows affected, if any. 

173 :rtype: int or None 

174 

175 This method improves performance on multiple-row INSERT and 

176 REPLACE. Otherwise it is equivalent to looping over args with 

177 execute(). 

178 """ 

179 if not args: 

180 return 

181 

182 m = RE_INSERT_VALUES.match(query) 

183 if m: 

184 q_prefix = m.group(1) % () 

185 q_values = m.group(2).rstrip() 

186 q_postfix = m.group(3) or "" 

187 assert q_values[0] == "(" and q_values[-1] == ")" 

188 return self._do_execute_many( 

189 q_prefix, 

190 q_values, 

191 q_postfix, 

192 args, 

193 self.max_stmt_length, 

194 self._get_db().encoding, 

195 ) 

196 

197 self.rowcount = sum(self.execute(query, arg) for arg in args) 

198 return self.rowcount 

199 

200 def _do_execute_many( 

201 self, prefix, values, postfix, args, max_stmt_length, encoding 

202 ): 

203 conn = self._get_db() 

204 escape = self._escape_args 

205 if isinstance(prefix, str): 

206 prefix = prefix.encode(encoding) 

207 if isinstance(postfix, str): 

208 postfix = postfix.encode(encoding) 

209 sql = bytearray(prefix) 

210 args = iter(args) 

211 v = values % escape(next(args), conn) 

212 if isinstance(v, str): 

213 v = v.encode(encoding, "surrogateescape") 

214 sql += v 

215 rows = 0 

216 for arg in args: 

217 v = values % escape(arg, conn) 

218 if isinstance(v, str): 

219 v = v.encode(encoding, "surrogateescape") 

220 if len(sql) + len(v) + len(postfix) + 1 > max_stmt_length: 

221 rows += self.execute(sql + postfix) 

222 sql = bytearray(prefix) 

223 else: 

224 sql += b"," 

225 sql += v 

226 rows += self.execute(sql + postfix) 

227 self.rowcount = rows 

228 return rows 

229 

230 def callproc(self, procname, args=()): 

231 """Execute stored procedure procname with args. 

232 

233 :param procname: Name of procedure to execute on server. 

234 :type procname: str 

235 

236 :param args: Sequence of parameters to use with procedure. 

237 :type args: tuple or list 

238 

239 Returns the original args. 

240 

241 Compatibility warning: PEP-249 specifies that any modified 

242 parameters must be returned. This is currently impossible 

243 as they are only available by storing them in a server 

244 variable and then retrieved by a query. Since stored 

245 procedures return zero or more result sets, there is no 

246 reliable way to get at OUT or INOUT parameters via callproc. 

247 The server variables are named @_procname_n, where procname 

248 is the parameter above and n is the position of the parameter 

249 (from zero). Once all result sets generated by the procedure 

250 have been fetched, you can issue a SELECT @_procname_0, ... 

251 query using .execute() to get any OUT or INOUT values. 

252 

253 Compatibility warning: The act of calling a stored procedure 

254 itself creates an empty result set. This appears after any 

255 result sets generated by the procedure. This is non-standard 

256 behavior with respect to the DB-API. Be sure to use nextset() 

257 to advance through all result sets; otherwise you may get 

258 disconnected. 

259 """ 

260 procname_escaped = _backquote_escape(procname) 

261 conn = self._get_db() 

262 

263 if args: 

264 fmt = f"@`_{procname_escaped}_%d`=%s" 

265 self._query( 

266 "SET %s" 

267 % ",".join( 

268 fmt % (index, conn.escape(arg)) for index, arg in enumerate(args) 

269 ) 

270 ) 

271 self.nextset() 

272 

273 q = "CALL `{}`({})".format( 

274 procname_escaped, 

275 ",".join([f"@`_{procname_escaped}_{i}`" for i in range(len(args))]), 

276 ) 

277 self._query(q) 

278 self._executed = q 

279 return args 

280 

281 def fetchone(self): 

282 """Fetch the next row.""" 

283 self._check_executed() 

284 if self._rows is None or self.rownumber >= len(self._rows): 

285 return None 

286 result = self._rows[self.rownumber] 

287 self.rownumber += 1 

288 return result 

289 

290 def fetchmany(self, size=None): 

291 """Fetch several rows.""" 

292 self._check_executed() 

293 if self._rows is None: 

294 # Django expects () for EOF. 

295 # https://github.com/django/django/blob/0c1518ee429b01c145cf5b34eab01b0b92f8c246/django/db/backends/mysql/features.py#L8 

296 return () 

297 end = self.rownumber + (size or self.arraysize) 

298 result = self._rows[self.rownumber : end] 

299 self.rownumber = min(end, len(self._rows)) 

300 return result 

301 

302 def fetchall(self): 

303 """Fetch all the rows.""" 

304 self._check_executed() 

305 if self._rows is None: 

306 return [] 

307 if self.rownumber: 

308 result = self._rows[self.rownumber :] 

309 else: 

310 result = self._rows 

311 self.rownumber = len(self._rows) 

312 return result 

313 

314 def scroll(self, value, mode="relative"): 

315 self._check_executed() 

316 if mode == "relative": 

317 r = self.rownumber + value 

318 elif mode == "absolute": 

319 r = value 

320 else: 

321 raise err.ProgrammingError("unknown scroll mode %s" % mode) 

322 

323 if not (0 <= r < len(self._rows)): 

324 raise IndexError("out of range") 

325 self.rownumber = r 

326 

327 def _query(self, q): 

328 conn = self._get_db() 

329 self._clear_result() 

330 conn.query(q) 

331 self._do_get_result() 

332 return self.rowcount 

333 

334 def _clear_result(self): 

335 self.rownumber = 0 

336 self._result = None 

337 

338 self.rowcount = 0 

339 self.warning_count = 0 

340 self.description = None 

341 self.lastrowid = None 

342 self._rows = None 

343 

344 def _do_get_result(self): 

345 conn = self._get_db() 

346 

347 self._result = result = conn._result 

348 

349 self.rowcount = result.affected_rows 

350 self.warning_count = result.warning_count 

351 self.description = result.description 

352 self.lastrowid = result.insert_id 

353 self._rows = result.rows 

354 

355 def __iter__(self): 

356 return self 

357 

358 def __next__(self): 

359 row = self.fetchone() 

360 if row is None: 

361 raise StopIteration 

362 return row 

363 

364 

365class DictCursorMixin: 

366 # You can override this to use OrderedDict or other dict-like types. 

367 dict_type = dict 

368 

369 def _do_get_result(self): 

370 super()._do_get_result() 

371 fields = [] 

372 if self.description: 

373 for f in self._result.fields: 

374 name = f.name 

375 if name in fields: 

376 name = f.table_name + "." + name 

377 fields.append(name) 

378 self._fields = fields 

379 

380 if fields and self._rows: 

381 self._rows = [self._conv_row(r) for r in self._rows] 

382 

383 def _conv_row(self, row): 

384 if row is None: 

385 return None 

386 return self.dict_type(zip(self._fields, row)) 

387 

388 

389class DictCursor(DictCursorMixin, Cursor): 

390 """A cursor which returns results as a dictionary""" 

391 

392 

393class SSCursor(Cursor): 

394 """ 

395 Unbuffered Cursor, mainly useful for queries that return a lot of data, 

396 or for connections to remote servers over a slow network. 

397 

398 Instead of copying every row of data into a buffer, this will fetch 

399 rows as needed. The upside of this is the client uses much less memory, 

400 and rows are returned much faster when traveling over a slow network 

401 or if the result set is very big. 

402 

403 There are limitations, though. The MySQL protocol doesn't support 

404 returning the total number of rows, so the only way to tell how many rows 

405 there are is to iterate over every row returned. Also, it currently isn't 

406 possible to scroll backwards, as only the current row is held in memory. 

407 """ 

408 

409 def _conv_row(self, row): 

410 return row 

411 

412 def close(self): 

413 conn = self.connection 

414 if conn is None: 

415 return 

416 

417 if self._result is not None and self._result is conn._result: 

418 self._result._finish_unbuffered_query() 

419 

420 try: 

421 while self.nextset(): 

422 pass 

423 finally: 

424 self.connection = None 

425 

426 __del__ = close 

427 

428 def _query(self, q): 

429 conn = self._get_db() 

430 self._clear_result() 

431 conn.query(q, unbuffered=True) 

432 self._do_get_result() 

433 return self.rowcount 

434 

435 def nextset(self): 

436 return self._nextset(unbuffered=True) 

437 

438 def read_next(self): 

439 """Read next row.""" 

440 return self._conv_row(self._result._read_rowdata_packet_unbuffered()) 

441 

442 def fetchone(self): 

443 """Fetch next row.""" 

444 self._check_executed() 

445 row = self.read_next() 

446 if row is None: 

447 self.warning_count = self._result.warning_count 

448 return None 

449 self.rownumber += 1 

450 return row 

451 

452 def fetchall(self): 

453 """ 

454 Fetch all, as per MySQLdb. Pretty useless for large queries, as 

455 it is buffered. See fetchall_unbuffered(), if you want an unbuffered 

456 generator version of this method. 

457 """ 

458 return list(self.fetchall_unbuffered()) 

459 

460 def fetchall_unbuffered(self): 

461 """ 

462 Fetch all, implemented as a generator, which isn't to standard, 

463 however, it doesn't make sense to return everything in a list, as that 

464 would use ridiculous memory for large result sets. 

465 """ 

466 return iter(self.fetchone, None) 

467 

468 def fetchmany(self, size=None): 

469 """Fetch many.""" 

470 self._check_executed() 

471 if size is None: 

472 size = self.arraysize 

473 

474 rows = [] 

475 for i in range(size): 

476 row = self.read_next() 

477 if row is None: 

478 self.warning_count = self._result.warning_count 

479 break 

480 rows.append(row) 

481 self.rownumber += 1 

482 if not rows: 

483 # Django expects () for EOF. 

484 # https://github.com/django/django/blob/0c1518ee429b01c145cf5b34eab01b0b92f8c246/django/db/backends/mysql/features.py#L8 

485 return () 

486 return rows 

487 

488 def scroll(self, value, mode="relative"): 

489 self._check_executed() 

490 

491 if mode == "relative": 

492 if value < 0: 

493 raise err.NotSupportedError( 

494 "Backwards scrolling not supported by this cursor" 

495 ) 

496 

497 for _ in range(value): 

498 self.read_next() 

499 self.rownumber += value 

500 elif mode == "absolute": 

501 if value < self.rownumber: 

502 raise err.NotSupportedError( 

503 "Backwards scrolling not supported by this cursor" 

504 ) 

505 

506 end = value - self.rownumber 

507 for _ in range(end): 

508 self.read_next() 

509 self.rownumber = value 

510 else: 

511 raise err.ProgrammingError("unknown scroll mode %s" % mode) 

512 

513 

514class SSDictCursor(DictCursorMixin, SSCursor): 

515 """An unbuffered cursor, which returns results as a dictionary"""