Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy_utils/types/password.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

106 statements  

1import weakref 

2 

3from sqlalchemy import types 

4from sqlalchemy.dialects import oracle, postgresql, sqlite 

5from sqlalchemy.ext.mutable import Mutable 

6 

7from ..exceptions import ImproperlyConfigured 

8from .scalar_coercible import ScalarCoercible 

9 

10passlib = None 

11try: 

12 import passlib 

13 from passlib.context import LazyCryptContext 

14except ImportError: 

15 pass 

16 

17 

18class Password(Mutable): 

19 @classmethod 

20 def coerce(cls, key, value): 

21 if isinstance(value, Password): 

22 return value 

23 

24 if isinstance(value, (str, bytes)): 

25 return cls(value, secret=True) 

26 

27 super().coerce(key, value) 

28 

29 def __init__(self, value, context=None, secret=False): 

30 # Store the hash (if it is one). 

31 self.hash = value if not secret else None 

32 

33 # Store the secret if we have one. 

34 self.secret = value if secret else None 

35 

36 # The hash should be bytes. 

37 if isinstance(self.hash, str): 

38 self.hash = self.hash.encode('utf8') 

39 

40 # Save weakref of the password context (if we have one) 

41 self.context = weakref.proxy(context) if context is not None else None 

42 

43 def __eq__(self, value): 

44 if self.hash is None or value is None: 

45 # Ensure that we don't continue comparison if one of us is None. 

46 return self.hash is value 

47 

48 if isinstance(value, Password): 

49 # Comparing 2 hashes isn't very useful; but this equality 

50 # method breaks otherwise. 

51 return value.hash == self.hash 

52 

53 if self.context is None: 

54 # Compare 2 hashes again as we don't know how to validate. 

55 return value == self 

56 

57 if isinstance(value, (str, bytes)): 

58 valid, new = self.context.verify_and_update(value, self.hash) 

59 if valid and new: 

60 # New hash was calculated due to various reasons; stored one 

61 # wasn't optimal, etc. 

62 self.hash = new 

63 

64 # The hash should be bytes. 

65 if isinstance(self.hash, str): 

66 self.hash = self.hash.encode('utf8') 

67 self.changed() 

68 

69 return valid 

70 

71 return False 

72 

73 def __ne__(self, value): 

74 return not (self == value) 

75 

76 

77class PasswordType(ScalarCoercible, types.TypeDecorator): 

78 """ 

79 PasswordType hashes passwords as they come into the database and allows 

80 verifying them using a Pythonic interface. This Pythonic interface 

81 relies on setting up automatic data type coercion using the 

82 :func:`~sqlalchemy_utils.listeners.force_auto_coercion` function. 

83 

84 All keyword arguments (aside from max_length) are forwarded to the 

85 construction of a `passlib.context.LazyCryptContext` object, which 

86 also supports deferred configuration via the `onload` callback. 

87 

88 The following usage will create a password column that will 

89 automatically hash new passwords as `pbkdf2_sha512` but still compare 

90 passwords against pre-existing `md5_crypt` hashes. As passwords are 

91 compared; the password hash in the database will be updated to 

92 be `pbkdf2_sha512`. 

93 

94 :: 

95 

96 

97 class Model(Base): 

98 password = sa.Column(PasswordType( 

99 schemes=[ 

100 'pbkdf2_sha512', 

101 'md5_crypt' 

102 ], 

103 

104 deprecated=['md5_crypt'] 

105 )) 

106 

107 

108 Verifying password is as easy as: 

109 

110 :: 

111 

112 target = Model() 

113 target.password = 'b' 

114 # '$5$rounds=80000$H.............' 

115 

116 target.password == 'b' 

117 # True 

118 

119 

120 Lazy configuration of the type with Flask config: 

121 

122 :: 

123 

124 

125 import flask 

126 from sqlalchemy_utils import PasswordType, force_auto_coercion 

127 

128 force_auto_coercion() 

129 

130 class User(db.Model): 

131 __tablename__ = 'user' 

132 

133 password = db.Column( 

134 PasswordType( 

135 # The returned dictionary is forwarded to the CryptContext 

136 onload=lambda **kwargs: dict( 

137 schemes=flask.current_app.config['PASSWORD_SCHEMES'], 

138 **kwargs 

139 ), 

140 ), 

141 unique=False, 

142 nullable=False, 

143 ) 

144 

145 """ 

146 

147 impl = types.VARBINARY(1024) 

148 cache_ok = True 

149 

150 def __init__(self, max_length=None, **kwargs): 

151 # Fail if passlib is not found. 

152 if passlib is None: 

153 raise ImproperlyConfigured("'passlib' is required to use 'PasswordType'") 

154 

155 # Construct the passlib crypt context. 

156 self.context = LazyCryptContext(**kwargs) 

157 self._max_length = max_length 

158 

159 @property 

160 def hashing_method(self): 

161 return 'hash' if hasattr(self.context, 'hash') else 'encrypt' 

162 

163 @property 

164 def length(self): 

165 """Get column length.""" 

166 if self._max_length is None: 

167 self._max_length = self.calculate_max_length() 

168 

169 return self._max_length 

170 

171 def calculate_max_length(self): 

172 # Calculate the largest possible encoded password. 

173 # name + rounds + salt + hash + ($ * 4) of largest hash 

174 max_lengths = [1024] 

175 for name in self.context.schemes(): 

176 scheme = getattr(__import__('passlib.hash').hash, name) 

177 length = 4 + len(scheme.name) 

178 length += len(str(getattr(scheme, 'max_rounds', ''))) 

179 length += getattr(scheme, 'max_salt_size', 0) or 0 

180 length += getattr(scheme, 'encoded_checksum_size', scheme.checksum_size) 

181 max_lengths.append(length) 

182 

183 # Return the maximum calculated max length. 

184 return max(max_lengths) 

185 

186 def load_dialect_impl(self, dialect): 

187 if dialect.name == 'postgresql': 

188 # Use a BYTEA type for postgresql. 

189 impl = postgresql.BYTEA(self.length) 

190 elif dialect.name == 'oracle': 

191 # Use a RAW type for oracle. 

192 impl = oracle.RAW(self.length) 

193 elif dialect.name == 'sqlite': 

194 # Use a BLOB type for sqlite 

195 impl = sqlite.BLOB(self.length) 

196 else: 

197 # Use a VARBINARY for all other dialects. 

198 impl = types.VARBINARY(self.length) 

199 return dialect.type_descriptor(impl) 

200 

201 def process_bind_param(self, value, dialect): 

202 if isinstance(value, Password): 

203 # If were given a password secret; hash it. 

204 if value.secret is not None: 

205 return self._hash(value.secret).encode('utf8') 

206 

207 # Value has already been hashed. 

208 return value.hash 

209 

210 if isinstance(value, str): 

211 # Assume value has not been hashed. 

212 return self._hash(value).encode('utf8') 

213 

214 def process_result_value(self, value, dialect): 

215 if value is not None: 

216 return Password(value, self.context) 

217 

218 def _hash(self, value): 

219 return getattr(self.context, self.hashing_method)(value) 

220 

221 def _coerce(self, value): 

222 if value is None: 

223 return 

224 

225 if not isinstance(value, Password): 

226 # Hash the password using the default scheme. 

227 value = self._hash(value).encode('utf8') 

228 return Password(value, context=self.context) 

229 

230 else: 

231 # If were given a password object; ensure the context is right. 

232 value.context = weakref.proxy(self.context) 

233 

234 # If were given a password secret; hash it. 

235 if value.secret is not None: 

236 value.hash = self._hash(value.secret).encode('utf8') 

237 value.secret = None 

238 

239 return value 

240 

241 @property 

242 def python_type(self): 

243 return self.impl.type.python_type 

244 

245 

246Password.associate_with(PasswordType)