1# engine/url.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8"""Provides the :class:`~sqlalchemy.engine.url.URL` class which encapsulates
9information about a database connection specification.
10
11The URL object is created automatically when
12:func:`~sqlalchemy.engine.create_engine` is called with a string
13argument; alternatively, the URL is a public-facing construct which can
14be used directly and is also accepted directly by ``create_engine()``.
15"""
16
17import re
18
19from .interfaces import Dialect
20from .. import exc
21from .. import util
22from ..dialects import plugins
23from ..dialects import registry
24
25
26class URL(object):
27 """
28 Represent the components of a URL used to connect to a database.
29
30 This object is suitable to be passed directly to a
31 :func:`~sqlalchemy.create_engine` call. The fields of the URL are parsed
32 from a string by the :func:`.make_url` function. The string
33 format of the URL is an RFC-1738-style string.
34
35 All initialization parameters are available as public attributes.
36
37 :param drivername: the name of the database backend.
38 This name will correspond to a module in sqlalchemy/databases
39 or a third party plug-in.
40
41 :param username: The user name.
42
43 :param password: database password.
44
45 :param host: The name of the host.
46
47 :param port: The port number.
48
49 :param database: The database name.
50
51 :param query: A dictionary of options to be passed to the
52 dialect and/or the DBAPI upon connect.
53
54 """
55
56 def __init__(
57 self,
58 drivername,
59 username=None,
60 password=None,
61 host=None,
62 port=None,
63 database=None,
64 query=None,
65 ):
66 self.drivername = drivername
67 self.username = username
68 self.password_original = password
69 self.host = host
70 if port is not None:
71 self.port = int(port)
72 else:
73 self.port = None
74 self.database = database
75 self.query = query or {}
76
77 def __to_string__(self, hide_password=True):
78 s = self.drivername + "://"
79 if self.username is not None:
80 s += _rfc_1738_quote(self.username)
81 if self.password is not None:
82 s += ":" + (
83 "***" if hide_password else _rfc_1738_quote(self.password)
84 )
85 s += "@"
86 if self.host is not None:
87 if ":" in self.host:
88 s += "[%s]" % self.host
89 else:
90 s += self.host
91 if self.port is not None:
92 s += ":" + str(self.port)
93 if self.database is not None:
94 s += "/" + self.database
95 if self.query:
96 keys = list(self.query)
97 keys.sort()
98 s += "?" + "&".join(
99 "%s=%s" % (util.quote_plus(k), util.quote_plus(element))
100 for k in keys
101 for element in util.to_list(self.query[k])
102 )
103 return s
104
105 def __str__(self):
106 return self.__to_string__(hide_password=False)
107
108 def __repr__(self):
109 return self.__to_string__()
110
111 def __hash__(self):
112 return hash(str(self))
113
114 def __eq__(self, other):
115 return (
116 isinstance(other, URL)
117 and self.drivername == other.drivername
118 and self.username == other.username
119 and self.password == other.password
120 and self.host == other.host
121 and self.database == other.database
122 and self.query == other.query
123 and self.port == other.port
124 )
125
126 def __ne__(self, other):
127 return not self == other
128
129 @property
130 def password(self):
131 if self.password_original is None:
132 return None
133 else:
134 return util.text_type(self.password_original)
135
136 @password.setter
137 def password(self, password):
138 self.password_original = password
139
140 def get_backend_name(self):
141 if "+" not in self.drivername:
142 return self.drivername
143 else:
144 return self.drivername.split("+")[0]
145
146 def get_driver_name(self):
147 if "+" not in self.drivername:
148 return self.get_dialect().driver
149 else:
150 return self.drivername.split("+")[1]
151
152 def _instantiate_plugins(self, kwargs):
153 plugin_names = util.to_list(self.query.get("plugin", ()))
154 plugin_names += kwargs.get("plugins", [])
155
156 return [
157 plugins.load(plugin_name)(self, kwargs)
158 for plugin_name in plugin_names
159 ]
160
161 def _get_entrypoint(self):
162 """Return the "entry point" dialect class.
163
164 This is normally the dialect itself except in the case when the
165 returned class implements the get_dialect_cls() method.
166
167 """
168 if "+" not in self.drivername:
169 name = self.drivername
170 else:
171 name = self.drivername.replace("+", ".")
172 cls = registry.load(name)
173 # check for legacy dialects that
174 # would return a module with 'dialect' as the
175 # actual class
176 if (
177 hasattr(cls, "dialect")
178 and isinstance(cls.dialect, type)
179 and issubclass(cls.dialect, Dialect)
180 ):
181 return cls.dialect
182 else:
183 return cls
184
185 def get_dialect(self):
186 """Return the SQLAlchemy database dialect class corresponding
187 to this URL's driver name.
188 """
189 entrypoint = self._get_entrypoint()
190 dialect_cls = entrypoint.get_dialect_cls(self)
191 return dialect_cls
192
193 def translate_connect_args(self, names=[], **kw):
194 r"""Translate url attributes into a dictionary of connection arguments.
195
196 Returns attributes of this url (`host`, `database`, `username`,
197 `password`, `port`) as a plain dictionary. The attribute names are
198 used as the keys by default. Unset or false attributes are omitted
199 from the final dictionary.
200
201 :param \**kw: Optional, alternate key names for url attributes.
202
203 :param names: Deprecated. Same purpose as the keyword-based alternate
204 names, but correlates the name to the original positionally.
205 """
206
207 translated = {}
208 attribute_names = ["host", "database", "username", "password", "port"]
209 for sname in attribute_names:
210 if names:
211 name = names.pop(0)
212 elif sname in kw:
213 name = kw[sname]
214 else:
215 name = sname
216 if name is not None and getattr(self, sname, False):
217 translated[name] = getattr(self, sname)
218 return translated
219
220
221def make_url(name_or_url):
222 """Given a string or unicode instance, produce a new URL instance.
223
224 The given string is parsed according to the RFC 1738 spec. If an
225 existing URL object is passed, just returns the object.
226 """
227
228 if isinstance(name_or_url, util.string_types):
229 return _parse_rfc1738_args(name_or_url)
230 else:
231 return name_or_url
232
233
234def _parse_rfc1738_args(name):
235 pattern = re.compile(
236 r"""
237 (?P<name>[\w\+]+)://
238 (?:
239 (?P<username>[^:/]*)
240 (?::(?P<password>.*))?
241 @)?
242 (?:
243 (?:
244 \[(?P<ipv6host>[^/]+)\] |
245 (?P<ipv4host>[^/:]+)
246 )?
247 (?::(?P<port>[^/]*))?
248 )?
249 (?:/(?P<database>.*))?
250 """,
251 re.X,
252 )
253
254 m = pattern.match(name)
255 if m is not None:
256 components = m.groupdict()
257 if components["database"] is not None:
258 tokens = components["database"].split("?", 2)
259 components["database"] = tokens[0]
260
261 if len(tokens) > 1:
262 query = {}
263
264 for key, value in util.parse_qsl(tokens[1]):
265 if util.py2k:
266 key = key.encode("ascii")
267 if key in query:
268 query[key] = util.to_list(query[key])
269 query[key].append(value)
270 else:
271 query[key] = value
272 else:
273 query = None
274 else:
275 query = None
276 components["query"] = query
277
278 if components["username"] is not None:
279 components["username"] = _rfc_1738_unquote(components["username"])
280
281 if components["password"] is not None:
282 components["password"] = _rfc_1738_unquote(components["password"])
283
284 ipv4host = components.pop("ipv4host")
285 ipv6host = components.pop("ipv6host")
286 components["host"] = ipv4host or ipv6host
287 name = components.pop("name")
288 return URL(name, **components)
289 else:
290 raise exc.ArgumentError(
291 "Could not parse rfc1738 URL from string '%s'" % name
292 )
293
294
295def _rfc_1738_quote(text):
296 return re.sub(r"[:@/]", lambda m: "%%%X" % ord(m.group(0)), text)
297
298
299def _rfc_1738_unquote(text):
300 return util.unquote(text)
301
302
303def _parse_keyvalue_args(name):
304 m = re.match(r"(\w+)://(.*)", name)
305 if m is not None:
306 (name, args) = m.group(1, 2)
307 opts = dict(util.parse_qsl(args))
308 return URL(name, *opts)
309 else:
310 return None