Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/dbapi_proxy.py: 32%

53 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# sqlalchemy/pool/dbapi_proxy.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""DBAPI proxy utility. 

10 

11Provides transparent connection pooling on top of a Python DBAPI. 

12 

13This is legacy SQLAlchemy functionality that is not typically used 

14today. 

15 

16""" 

17 

18from .impl import QueuePool 

19from .. import util 

20from ..util import threading 

21 

22proxies = {} 

23 

24 

25@util.deprecated( 

26 "1.3", 

27 "The :func:`.pool.manage` function is deprecated, and will be " 

28 "removed in a future release.", 

29) 

30def manage(module, **params): 

31 r"""Return a proxy for a DB-API module that automatically 

32 pools connections. 

33 

34 Given a DB-API 2.0 module and pool management parameters, returns 

35 a proxy for the module that will automatically pool connections, 

36 creating new connection pools for each distinct set of connection 

37 arguments sent to the decorated module's connect() function. 

38 

39 :param module: a DB-API 2.0 database module 

40 

41 :param poolclass: the class used by the pool module to provide 

42 pooling. Defaults to :class:`.QueuePool`. 

43 

44 :param \**params: will be passed through to *poolclass* 

45 

46 """ 

47 try: 

48 return proxies[module] 

49 except KeyError: 

50 return proxies.setdefault(module, _DBProxy(module, **params)) 

51 

52 

53def clear_managers(): 

54 """Remove all current DB-API 2.0 managers. 

55 

56 All pools and connections are disposed. 

57 """ 

58 

59 for manager in proxies.values(): 

60 manager.close() 

61 proxies.clear() 

62 

63 

64class _DBProxy(object): 

65 

66 """Layers connection pooling behavior on top of a standard DB-API module. 

67 

68 Proxies a DB-API 2.0 connect() call to a connection pool keyed to the 

69 specific connect parameters. Other functions and attributes are delegated 

70 to the underlying DB-API module. 

71 """ 

72 

73 def __init__(self, module, poolclass=QueuePool, **kw): 

74 """Initializes a new proxy. 

75 

76 module 

77 a DB-API 2.0 module 

78 

79 poolclass 

80 a Pool class, defaulting to QueuePool 

81 

82 Other parameters are sent to the Pool object's constructor. 

83 

84 """ 

85 

86 self.module = module 

87 self.kw = kw 

88 self.poolclass = poolclass 

89 self.pools = {} 

90 self._create_pool_mutex = threading.Lock() 

91 

92 def close(self): 

93 for key in list(self.pools): 

94 del self.pools[key] 

95 

96 def __del__(self): 

97 self.close() 

98 

99 def __getattr__(self, key): 

100 return getattr(self.module, key) 

101 

102 def get_pool(self, *args, **kw): 

103 key = self._serialize(*args, **kw) 

104 try: 

105 return self.pools[key] 

106 except KeyError: 

107 with self._create_pool_mutex: 

108 if key not in self.pools: 

109 kw.pop("sa_pool_key", None) 

110 pool = self.poolclass( 

111 lambda: self.module.connect(*args, **kw), **self.kw 

112 ) 

113 self.pools[key] = pool 

114 return pool 

115 else: 

116 return self.pools[key] 

117 

118 def connect(self, *args, **kw): 

119 """Activate a connection to the database. 

120 

121 Connect to the database using this DBProxy's module and the given 

122 connect arguments. If the arguments match an existing pool, the 

123 connection will be returned from the pool's current thread-local 

124 connection instance, or if there is no thread-local connection 

125 instance it will be checked out from the set of pooled connections. 

126 

127 If the pool has no available connections and allows new connections 

128 to be created, a new database connection will be made. 

129 

130 """ 

131 

132 return self.get_pool(*args, **kw).connect() 

133 

134 def dispose(self, *args, **kw): 

135 """Dispose the pool referenced by the given connect arguments.""" 

136 

137 key = self._serialize(*args, **kw) 

138 try: 

139 del self.pools[key] 

140 except KeyError: 

141 pass 

142 

143 def _serialize(self, *args, **kw): 

144 if "sa_pool_key" in kw: 

145 return kw["sa_pool_key"] 

146 

147 return tuple(list(args) + [(k, kw[k]) for k in sorted(kw)])