1import weakref
2
3from sqlalchemy import types
4from sqlalchemy.dialects import oracle, postgresql, sqlite
5from sqlalchemy.ext.mutable import Mutable
6
7from ..exceptions import ImproperlyConfigured
8from .scalar_coercible import ScalarCoercible
9
10passlib = None
11try:
12 import passlib
13 from passlib.context import LazyCryptContext
14except ImportError:
15 pass
16
17
18class Password(Mutable):
19 @classmethod
20 def coerce(cls, key, value):
21 if isinstance(value, Password):
22 return value
23
24 if isinstance(value, (str, bytes)):
25 return cls(value, secret=True)
26
27 super().coerce(key, value)
28
29 def __init__(self, value, context=None, secret=False):
30 # Store the hash (if it is one).
31 self.hash = value if not secret else None
32
33 # Store the secret if we have one.
34 self.secret = value if secret else None
35
36 # The hash should be bytes.
37 if isinstance(self.hash, str):
38 self.hash = self.hash.encode('utf8')
39
40 # Save weakref of the password context (if we have one)
41 self.context = weakref.proxy(context) if context is not None else None
42
43 def __eq__(self, value):
44 if self.hash is None or value is None:
45 # Ensure that we don't continue comparison if one of us is None.
46 return self.hash is value
47
48 if isinstance(value, Password):
49 # Comparing 2 hashes isn't very useful; but this equality
50 # method breaks otherwise.
51 return value.hash == self.hash
52
53 if self.context is None:
54 # Compare 2 hashes again as we don't know how to validate.
55 return value == self
56
57 if isinstance(value, (str, bytes)):
58 valid, new = self.context.verify_and_update(value, self.hash)
59 if valid and new:
60 # New hash was calculated due to various reasons; stored one
61 # wasn't optimal, etc.
62 self.hash = new
63
64 # The hash should be bytes.
65 if isinstance(self.hash, str):
66 self.hash = self.hash.encode('utf8')
67 self.changed()
68
69 return valid
70
71 return False
72
73 def __ne__(self, value):
74 return not (self == value)
75
76
77class PasswordType(ScalarCoercible, types.TypeDecorator):
78 """
79 PasswordType hashes passwords as they come into the database and allows
80 verifying them using a Pythonic interface. This Pythonic interface
81 relies on setting up automatic data type coercion using the
82 :func:`~sqlalchemy_utils.listeners.force_auto_coercion` function.
83
84 All keyword arguments (aside from max_length) are forwarded to the
85 construction of a `passlib.context.LazyCryptContext` object, which
86 also supports deferred configuration via the `onload` callback.
87
88 The following usage will create a password column that will
89 automatically hash new passwords as `pbkdf2_sha512` but still compare
90 passwords against pre-existing `md5_crypt` hashes. As passwords are
91 compared; the password hash in the database will be updated to
92 be `pbkdf2_sha512`.
93
94 ::
95
96
97 class Model(Base):
98 password = sa.Column(PasswordType(
99 schemes=[
100 'pbkdf2_sha512',
101 'md5_crypt'
102 ],
103
104 deprecated=['md5_crypt']
105 ))
106
107
108 Verifying password is as easy as:
109
110 ::
111
112 target = Model()
113 target.password = 'b'
114 # '$5$rounds=80000$H.............'
115
116 target.password == 'b'
117 # True
118
119
120 Lazy configuration of the type with Flask config:
121
122 ::
123
124
125 import flask
126 from sqlalchemy_utils import PasswordType, force_auto_coercion
127
128 force_auto_coercion()
129
130 class User(db.Model):
131 __tablename__ = 'user'
132
133 password = db.Column(
134 PasswordType(
135 # The returned dictionary is forwarded to the CryptContext
136 onload=lambda **kwargs: dict(
137 schemes=flask.current_app.config['PASSWORD_SCHEMES'],
138 **kwargs
139 ),
140 ),
141 unique=False,
142 nullable=False,
143 )
144
145 """
146
147 impl = types.VARBINARY(1024)
148 cache_ok = True
149
150 def __init__(self, max_length=None, **kwargs):
151 # Fail if passlib is not found.
152 if passlib is None:
153 raise ImproperlyConfigured("'passlib' is required to use 'PasswordType'")
154
155 # Construct the passlib crypt context.
156 self.context = LazyCryptContext(**kwargs)
157 self._max_length = max_length
158
159 @property
160 def hashing_method(self):
161 return 'hash' if hasattr(self.context, 'hash') else 'encrypt'
162
163 @property
164 def length(self):
165 """Get column length."""
166 if self._max_length is None:
167 self._max_length = self.calculate_max_length()
168
169 return self._max_length
170
171 def calculate_max_length(self):
172 # Calculate the largest possible encoded password.
173 # name + rounds + salt + hash + ($ * 4) of largest hash
174 max_lengths = [1024]
175 for name in self.context.schemes():
176 scheme = getattr(__import__('passlib.hash').hash, name)
177 length = 4 + len(scheme.name)
178 length += len(str(getattr(scheme, 'max_rounds', '')))
179 length += getattr(scheme, 'max_salt_size', 0) or 0
180 length += getattr(scheme, 'encoded_checksum_size', scheme.checksum_size)
181 max_lengths.append(length)
182
183 # Return the maximum calculated max length.
184 return max(max_lengths)
185
186 def load_dialect_impl(self, dialect):
187 if dialect.name == 'postgresql':
188 # Use a BYTEA type for postgresql.
189 impl = postgresql.BYTEA(self.length)
190 elif dialect.name == 'oracle':
191 # Use a RAW type for oracle.
192 impl = oracle.RAW(self.length)
193 elif dialect.name == 'sqlite':
194 # Use a BLOB type for sqlite
195 impl = sqlite.BLOB(self.length)
196 else:
197 # Use a VARBINARY for all other dialects.
198 impl = types.VARBINARY(self.length)
199 return dialect.type_descriptor(impl)
200
201 def process_bind_param(self, value, dialect):
202 if isinstance(value, Password):
203 # If were given a password secret; hash it.
204 if value.secret is not None:
205 return self._hash(value.secret).encode('utf8')
206
207 # Value has already been hashed.
208 return value.hash
209
210 if isinstance(value, str):
211 # Assume value has not been hashed.
212 return self._hash(value).encode('utf8')
213
214 def process_result_value(self, value, dialect):
215 if value is not None:
216 return Password(value, self.context)
217
218 def _hash(self, value):
219 return getattr(self.context, self.hashing_method)(value)
220
221 def _coerce(self, value):
222 if value is None:
223 return
224
225 if not isinstance(value, Password):
226 # Hash the password using the default scheme.
227 value = self._hash(value).encode('utf8')
228 return Password(value, context=self.context)
229
230 else:
231 # If were given a password object; ensure the context is right.
232 value.context = weakref.proxy(self.context)
233
234 # If were given a password secret; hash it.
235 if value.secret is not None:
236 value.hash = self._hash(value.secret).encode('utf8')
237 value.secret = None
238
239 return value
240
241 @property
242 def python_type(self):
243 return self.impl.type.python_type
244
245
246Password.associate_with(PasswordType)