Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/SQLAlchemy-1.3.25.dev0-py3.11-linux-x86_64.egg/sqlalchemy/pool/impl.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

204 statements  

1# sqlalchemy/pool.py 

2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: http://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Pool implementation classes. 

10 

11""" 

12 

13import traceback 

14import weakref 

15 

16from .base import _ConnectionFairy 

17from .base import _ConnectionRecord 

18from .base import Pool 

19from .. import exc 

20from .. import util 

21from ..util import chop_traceback 

22from ..util import queue as sqla_queue 

23from ..util import threading 

24 

25 

26class QueuePool(Pool): 

27 

28 """A :class:`_pool.Pool` 

29 that imposes a limit on the number of open connections. 

30 

31 :class:`.QueuePool` is the default pooling implementation used for 

32 all :class:`_engine.Engine` objects, unless the SQLite dialect is in use. 

33 

34 """ 

35 

36 def __init__( 

37 self, 

38 creator, 

39 pool_size=5, 

40 max_overflow=10, 

41 timeout=30, 

42 use_lifo=False, 

43 **kw 

44 ): 

45 r""" 

46 Construct a QueuePool. 

47 

48 :param creator: a callable function that returns a DB-API 

49 connection object, same as that of :paramref:`_pool.Pool.creator`. 

50 

51 :param pool_size: The size of the pool to be maintained, 

52 defaults to 5. This is the largest number of connections that 

53 will be kept persistently in the pool. Note that the pool 

54 begins with no connections; once this number of connections 

55 is requested, that number of connections will remain. 

56 ``pool_size`` can be set to 0 to indicate no size limit; to 

57 disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 

58 instead. 

59 

60 :param max_overflow: The maximum overflow size of the 

61 pool. When the number of checked-out connections reaches the 

62 size set in pool_size, additional connections will be 

63 returned up to this limit. When those additional connections 

64 are returned to the pool, they are disconnected and 

65 discarded. It follows then that the total number of 

66 simultaneous connections the pool will allow is pool_size + 

67 `max_overflow`, and the total number of "sleeping" 

68 connections the pool will allow is pool_size. `max_overflow` 

69 can be set to -1 to indicate no overflow limit; no limit 

70 will be placed on the total number of concurrent 

71 connections. Defaults to 10. 

72 

73 :param timeout: The number of seconds to wait before giving up 

74 on returning a connection. Defaults to 30. 

75 

76 :param use_lifo: use LIFO (last-in-first-out) when retrieving 

77 connections instead of FIFO (first-in-first-out). Using LIFO, a 

78 server-side timeout scheme can reduce the number of connections used 

79 during non-peak periods of use. When planning for server-side 

80 timeouts, ensure that a recycle or pre-ping strategy is in use to 

81 gracefully handle stale connections. 

82 

83 .. versionadded:: 1.3 

84 

85 .. seealso:: 

86 

87 :ref:`pool_use_lifo` 

88 

89 :ref:`pool_disconnects` 

90 

91 :param \**kw: Other keyword arguments including 

92 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 

93 :paramref:`_pool.Pool.reset_on_return` and others are passed to the 

94 :class:`_pool.Pool` constructor. 

95 

96 """ 

97 Pool.__init__(self, creator, **kw) 

98 self._pool = sqla_queue.Queue(pool_size, use_lifo=use_lifo) 

99 self._overflow = 0 - pool_size 

100 self._max_overflow = max_overflow 

101 self._timeout = timeout 

102 self._overflow_lock = threading.Lock() 

103 

104 def _do_return_conn(self, conn): 

105 try: 

106 self._pool.put(conn, False) 

107 except sqla_queue.Full: 

108 try: 

109 conn.close() 

110 finally: 

111 self._dec_overflow() 

112 

113 def _do_get(self): 

114 use_overflow = self._max_overflow > -1 

115 

116 try: 

117 wait = use_overflow and self._overflow >= self._max_overflow 

118 return self._pool.get(wait, self._timeout) 

119 except sqla_queue.Empty: 

120 # don't do things inside of "except Empty", because when we say 

121 # we timed out or can't connect and raise, Python 3 tells 

122 # people the real error is queue.Empty which it isn't. 

123 pass 

124 if use_overflow and self._overflow >= self._max_overflow: 

125 if not wait: 

126 return self._do_get() 

127 else: 

128 raise exc.TimeoutError( 

129 "QueuePool limit of size %d overflow %d reached, " 

130 "connection timed out, timeout %d" 

131 % (self.size(), self.overflow(), self._timeout), 

132 code="3o7r", 

133 ) 

134 

135 if self._inc_overflow(): 

136 try: 

137 return self._create_connection() 

138 except: 

139 with util.safe_reraise(): 

140 self._dec_overflow() 

141 else: 

142 return self._do_get() 

143 

144 def _inc_overflow(self): 

145 if self._max_overflow == -1: 

146 self._overflow += 1 

147 return True 

148 with self._overflow_lock: 

149 if self._overflow < self._max_overflow: 

150 self._overflow += 1 

151 return True 

152 else: 

153 return False 

154 

155 def _dec_overflow(self): 

156 if self._max_overflow == -1: 

157 self._overflow -= 1 

158 return True 

159 with self._overflow_lock: 

160 self._overflow -= 1 

161 return True 

162 

163 def recreate(self): 

164 self.logger.info("Pool recreating") 

165 return self.__class__( 

166 self._creator, 

167 pool_size=self._pool.maxsize, 

168 max_overflow=self._max_overflow, 

169 pre_ping=self._pre_ping, 

170 use_lifo=self._pool.use_lifo, 

171 timeout=self._timeout, 

172 recycle=self._recycle, 

173 echo=self.echo, 

174 logging_name=self._orig_logging_name, 

175 use_threadlocal=self._use_threadlocal, 

176 reset_on_return=self._reset_on_return, 

177 _dispatch=self.dispatch, 

178 dialect=self._dialect, 

179 ) 

180 

181 def dispose(self): 

182 while True: 

183 try: 

184 conn = self._pool.get(False) 

185 conn.close() 

186 except sqla_queue.Empty: 

187 break 

188 

189 self._overflow = 0 - self.size() 

190 self.logger.info("Pool disposed. %s", self.status()) 

191 

192 def status(self): 

193 return ( 

194 "Pool size: %d Connections in pool: %d " 

195 "Current Overflow: %d Current Checked out " 

196 "connections: %d" 

197 % ( 

198 self.size(), 

199 self.checkedin(), 

200 self.overflow(), 

201 self.checkedout(), 

202 ) 

203 ) 

204 

205 def size(self): 

206 return self._pool.maxsize 

207 

208 def timeout(self): 

209 return self._timeout 

210 

211 def checkedin(self): 

212 return self._pool.qsize() 

213 

214 def overflow(self): 

215 return self._overflow 

216 

217 def checkedout(self): 

218 return self._pool.maxsize - self._pool.qsize() + self._overflow 

219 

220 

221class NullPool(Pool): 

222 

223 """A Pool which does not pool connections. 

224 

225 Instead it literally opens and closes the underlying DB-API connection 

226 per each connection open/close. 

227 

228 Reconnect-related functions such as ``recycle`` and connection 

229 invalidation are not supported by this Pool implementation, since 

230 no connections are held persistently. 

231 

232 """ 

233 

234 def status(self): 

235 return "NullPool" 

236 

237 def _do_return_conn(self, conn): 

238 conn.close() 

239 

240 def _do_get(self): 

241 return self._create_connection() 

242 

243 def recreate(self): 

244 self.logger.info("Pool recreating") 

245 

246 return self.__class__( 

247 self._creator, 

248 recycle=self._recycle, 

249 echo=self.echo, 

250 logging_name=self._orig_logging_name, 

251 use_threadlocal=self._use_threadlocal, 

252 reset_on_return=self._reset_on_return, 

253 pre_ping=self._pre_ping, 

254 _dispatch=self.dispatch, 

255 dialect=self._dialect, 

256 ) 

257 

258 def dispose(self): 

259 pass 

260 

261 

262class SingletonThreadPool(Pool): 

263 

264 """A Pool that maintains one connection per thread. 

265 

266 Maintains one connection per each thread, never moving a connection to a 

267 thread other than the one which it was created in. 

268 

269 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` 

270 on arbitrary connections that exist beyond the size setting of 

271 ``pool_size``, e.g. if more unique **thread identities** 

272 than what ``pool_size`` states are used. This cleanup is 

273 non-deterministic and not sensitive to whether or not the connections 

274 linked to those thread identities are currently in use. 

275 

276 :class:`.SingletonThreadPool` may be improved in a future release, 

277 however in its current status it is generally used only for test 

278 scenarios using a SQLite ``:memory:`` database and is not recommended 

279 for production use. 

280 

281 

282 Options are the same as those of :class:`_pool.Pool`, as well as: 

283 

284 :param pool_size: The number of threads in which to maintain connections 

285 at once. Defaults to five. 

286 

287 :class:`.SingletonThreadPool` is used by the SQLite dialect 

288 automatically when a memory-based database is used. 

289 See :ref:`sqlite_toplevel`. 

290 

291 """ 

292 

293 def __init__(self, creator, pool_size=5, **kw): 

294 Pool.__init__(self, creator, **kw) 

295 self._conn = threading.local() 

296 self._fairy = threading.local() 

297 self._all_conns = set() 

298 self.size = pool_size 

299 

300 def recreate(self): 

301 self.logger.info("Pool recreating") 

302 return self.__class__( 

303 self._creator, 

304 pool_size=self.size, 

305 recycle=self._recycle, 

306 echo=self.echo, 

307 pre_ping=self._pre_ping, 

308 logging_name=self._orig_logging_name, 

309 use_threadlocal=self._use_threadlocal, 

310 reset_on_return=self._reset_on_return, 

311 _dispatch=self.dispatch, 

312 dialect=self._dialect, 

313 ) 

314 

315 def dispose(self): 

316 """Dispose of this pool.""" 

317 

318 for conn in self._all_conns: 

319 try: 

320 conn.close() 

321 except Exception: 

322 # pysqlite won't even let you close a conn from a thread 

323 # that didn't create it 

324 pass 

325 

326 self._all_conns.clear() 

327 

328 def _cleanup(self): 

329 while len(self._all_conns) >= self.size: 

330 c = self._all_conns.pop() 

331 c.close() 

332 

333 def status(self): 

334 return "SingletonThreadPool id:%d size: %d" % ( 

335 id(self), 

336 len(self._all_conns), 

337 ) 

338 

339 def _do_return_conn(self, conn): 

340 pass 

341 

342 def _do_get(self): 

343 try: 

344 c = self._conn.current() 

345 if c: 

346 return c 

347 except AttributeError: 

348 pass 

349 c = self._create_connection() 

350 self._conn.current = weakref.ref(c) 

351 if len(self._all_conns) >= self.size: 

352 self._cleanup() 

353 self._all_conns.add(c) 

354 return c 

355 

356 def connect(self): 

357 # vendored from Pool to include use_threadlocal behavior 

358 try: 

359 rec = self._fairy.current() 

360 except AttributeError: 

361 pass 

362 else: 

363 if rec is not None: 

364 return rec._checkout_existing() 

365 

366 return _ConnectionFairy._checkout(self, self._fairy) 

367 

368 def _return_conn(self, record): 

369 try: 

370 del self._fairy.current 

371 except AttributeError: 

372 pass 

373 self._do_return_conn(record) 

374 

375 

376class StaticPool(Pool): 

377 

378 """A Pool of exactly one connection, used for all requests. 

379 

380 Reconnect-related functions such as ``recycle`` and connection 

381 invalidation (which is also used to support auto-reconnect) are not 

382 currently supported by this Pool implementation but may be implemented 

383 in a future release. 

384 

385 """ 

386 

387 @util.memoized_property 

388 def _conn(self): 

389 return self._creator() 

390 

391 @util.memoized_property 

392 def connection(self): 

393 return _ConnectionRecord(self) 

394 

395 def status(self): 

396 return "StaticPool" 

397 

398 def dispose(self): 

399 if "_conn" in self.__dict__: 

400 self._conn.close() 

401 self._conn = None 

402 

403 def recreate(self): 

404 self.logger.info("Pool recreating") 

405 return self.__class__( 

406 creator=self._creator, 

407 recycle=self._recycle, 

408 use_threadlocal=self._use_threadlocal, 

409 reset_on_return=self._reset_on_return, 

410 pre_ping=self._pre_ping, 

411 echo=self.echo, 

412 logging_name=self._orig_logging_name, 

413 _dispatch=self.dispatch, 

414 dialect=self._dialect, 

415 ) 

416 

417 def _create_connection(self): 

418 return self._conn 

419 

420 def _do_return_conn(self, conn): 

421 pass 

422 

423 def _do_get(self): 

424 return self.connection 

425 

426 

427class AssertionPool(Pool): 

428 

429 """A :class:`_pool.Pool` that allows at most one checked out connection at 

430 any given time. 

431 

432 This will raise an exception if more than one connection is checked out 

433 at a time. Useful for debugging code that is using more connections 

434 than desired. 

435 

436 """ 

437 

438 def __init__(self, *args, **kw): 

439 self._conn = None 

440 self._checked_out = False 

441 self._store_traceback = kw.pop("store_traceback", True) 

442 self._checkout_traceback = None 

443 Pool.__init__(self, *args, **kw) 

444 

445 def status(self): 

446 return "AssertionPool" 

447 

448 def _do_return_conn(self, conn): 

449 if not self._checked_out: 

450 raise AssertionError("connection is not checked out") 

451 self._checked_out = False 

452 assert conn is self._conn 

453 

454 def dispose(self): 

455 self._checked_out = False 

456 if self._conn: 

457 self._conn.close() 

458 

459 def recreate(self): 

460 self.logger.info("Pool recreating") 

461 return self.__class__( 

462 self._creator, 

463 echo=self.echo, 

464 pre_ping=self._pre_ping, 

465 recycle=self._recycle, 

466 reset_on_return=self._reset_on_return, 

467 logging_name=self._orig_logging_name, 

468 _dispatch=self.dispatch, 

469 dialect=self._dialect, 

470 ) 

471 

472 def _do_get(self): 

473 if self._checked_out: 

474 if self._checkout_traceback: 

475 suffix = " at:\n%s" % "".join( 

476 chop_traceback(self._checkout_traceback) 

477 ) 

478 else: 

479 suffix = "" 

480 raise AssertionError("connection is already checked out" + suffix) 

481 

482 if not self._conn: 

483 self._conn = self._create_connection() 

484 

485 self._checked_out = True 

486 if self._store_traceback: 

487 self._checkout_traceback = traceback.format_stack() 

488 return self._conn