1# engine/url.py 
    2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: http://www.opensource.org/licenses/mit-license.php 
    7 
    8"""Provides the :class:`~sqlalchemy.engine.url.URL` class which encapsulates 
    9information about a database connection specification. 
    10 
    11The URL object is created automatically when 
    12:func:`~sqlalchemy.engine.create_engine` is called with a string 
    13argument; alternatively, the URL is a public-facing construct which can 
    14be used directly and is also accepted directly by ``create_engine()``. 
    15""" 
    16 
    17import re 
    18 
    19from .interfaces import Dialect 
    20from .. import exc 
    21from .. import util 
    22from ..dialects import plugins 
    23from ..dialects import registry 
    24 
    25 
    26class URL(object): 
    27    """ 
    28    Represent the components of a URL used to connect to a database. 
    29 
    30    This object is suitable to be passed directly to a 
    31    :func:`~sqlalchemy.create_engine` call.  The fields of the URL are parsed 
    32    from a string by the :func:`.make_url` function.  The string 
    33    format of the URL is an RFC-1738-style string. 
    34 
    35    All initialization parameters are available as public attributes. 
    36 
    37    :param drivername: the name of the database backend. 
    38      This name will correspond to a module in sqlalchemy/databases 
    39      or a third party plug-in. 
    40 
    41    :param username: The user name. 
    42 
    43    :param password: database password. 
    44 
    45    :param host: The name of the host. 
    46 
    47    :param port: The port number. 
    48 
    49    :param database: The database name. 
    50 
    51    :param query: A dictionary of options to be passed to the 
    52      dialect and/or the DBAPI upon connect. 
    53 
    54    """ 
    55 
    56    def __init__( 
    57        self, 
    58        drivername, 
    59        username=None, 
    60        password=None, 
    61        host=None, 
    62        port=None, 
    63        database=None, 
    64        query=None, 
    65    ): 
    66        self.drivername = drivername 
    67        self.username = username 
    68        self.password_original = password 
    69        self.host = host 
    70        if port is not None: 
    71            self.port = int(port) 
    72        else: 
    73            self.port = None 
    74        self.database = database 
    75        self.query = query or {} 
    76 
    77    def __to_string__(self, hide_password=True): 
    78        s = self.drivername + "://" 
    79        if self.username is not None: 
    80            s += _rfc_1738_quote(self.username) 
    81            if self.password is not None: 
    82                s += ":" + ( 
    83                    "***" if hide_password else _rfc_1738_quote(self.password) 
    84                ) 
    85            s += "@" 
    86        if self.host is not None: 
    87            if ":" in self.host: 
    88                s += "[%s]" % self.host 
    89            else: 
    90                s += self.host 
    91        if self.port is not None: 
    92            s += ":" + str(self.port) 
    93        if self.database is not None: 
    94            s += "/" + self.database 
    95        if self.query: 
    96            keys = list(self.query) 
    97            keys.sort() 
    98            s += "?" + "&".join( 
    99                "%s=%s" % (util.quote_plus(k), util.quote_plus(element)) 
    100                for k in keys 
    101                for element in util.to_list(self.query[k]) 
    102            ) 
    103        return s 
    104 
    105    def __str__(self): 
    106        return self.__to_string__(hide_password=False) 
    107 
    108    def __repr__(self): 
    109        return self.__to_string__() 
    110 
    111    def __hash__(self): 
    112        return hash(str(self)) 
    113 
    114    def __eq__(self, other): 
    115        return ( 
    116            isinstance(other, URL) 
    117            and self.drivername == other.drivername 
    118            and self.username == other.username 
    119            and self.password == other.password 
    120            and self.host == other.host 
    121            and self.database == other.database 
    122            and self.query == other.query 
    123            and self.port == other.port 
    124        ) 
    125 
    126    def __ne__(self, other): 
    127        return not self == other 
    128 
    129    @property 
    130    def password(self): 
    131        if self.password_original is None: 
    132            return None 
    133        else: 
    134            return util.text_type(self.password_original) 
    135 
    136    @password.setter 
    137    def password(self, password): 
    138        self.password_original = password 
    139 
    140    def get_backend_name(self): 
    141        if "+" not in self.drivername: 
    142            return self.drivername 
    143        else: 
    144            return self.drivername.split("+")[0] 
    145 
    146    def get_driver_name(self): 
    147        if "+" not in self.drivername: 
    148            return self.get_dialect().driver 
    149        else: 
    150            return self.drivername.split("+")[1] 
    151 
    152    def _instantiate_plugins(self, kwargs): 
    153        plugin_names = util.to_list(self.query.get("plugin", ())) 
    154        plugin_names += kwargs.get("plugins", []) 
    155 
    156        return [ 
    157            plugins.load(plugin_name)(self, kwargs) 
    158            for plugin_name in plugin_names 
    159        ] 
    160 
    161    def _get_entrypoint(self): 
    162        """Return the "entry point" dialect class. 
    163 
    164        This is normally the dialect itself except in the case when the 
    165        returned class implements the get_dialect_cls() method. 
    166 
    167        """ 
    168        if "+" not in self.drivername: 
    169            name = self.drivername 
    170        else: 
    171            name = self.drivername.replace("+", ".") 
    172        cls = registry.load(name) 
    173        # check for legacy dialects that 
    174        # would return a module with 'dialect' as the 
    175        # actual class 
    176        if ( 
    177            hasattr(cls, "dialect") 
    178            and isinstance(cls.dialect, type) 
    179            and issubclass(cls.dialect, Dialect) 
    180        ): 
    181            return cls.dialect 
    182        else: 
    183            return cls 
    184 
    185    def get_dialect(self): 
    186        """Return the SQLAlchemy database dialect class corresponding 
    187        to this URL's driver name. 
    188        """ 
    189        entrypoint = self._get_entrypoint() 
    190        dialect_cls = entrypoint.get_dialect_cls(self) 
    191        return dialect_cls 
    192 
    193    def translate_connect_args(self, names=[], **kw): 
    194        r"""Translate url attributes into a dictionary of connection arguments. 
    195 
    196        Returns attributes of this url (`host`, `database`, `username`, 
    197        `password`, `port`) as a plain dictionary.  The attribute names are 
    198        used as the keys by default.  Unset or false attributes are omitted 
    199        from the final dictionary. 
    200 
    201        :param \**kw: Optional, alternate key names for url attributes. 
    202 
    203        :param names: Deprecated.  Same purpose as the keyword-based alternate 
    204            names, but correlates the name to the original positionally. 
    205        """ 
    206 
    207        translated = {} 
    208        attribute_names = ["host", "database", "username", "password", "port"] 
    209        for sname in attribute_names: 
    210            if names: 
    211                name = names.pop(0) 
    212            elif sname in kw: 
    213                name = kw[sname] 
    214            else: 
    215                name = sname 
    216            if name is not None and getattr(self, sname, False): 
    217                translated[name] = getattr(self, sname) 
    218        return translated 
    219 
    220 
    221def make_url(name_or_url): 
    222    """Given a string or unicode instance, produce a new URL instance. 
    223 
    224    The given string is parsed according to the RFC 1738 spec.  If an 
    225    existing URL object is passed, just returns the object. 
    226    """ 
    227 
    228    if isinstance(name_or_url, util.string_types): 
    229        return _parse_rfc1738_args(name_or_url) 
    230    else: 
    231        return name_or_url 
    232 
    233 
    234def _parse_rfc1738_args(name): 
    235    pattern = re.compile( 
    236        r""" 
    237            (?P<name>[\w\+]+):// 
    238            (?: 
    239                (?P<username>[^:/]*) 
    240                (?::(?P<password>.*))? 
    241            @)? 
    242            (?: 
    243                (?: 
    244                    \[(?P<ipv6host>[^/]+)\] | 
    245                    (?P<ipv4host>[^/:]+) 
    246                )? 
    247                (?::(?P<port>[^/]*))? 
    248            )? 
    249            (?:/(?P<database>.*))? 
    250            """, 
    251        re.X, 
    252    ) 
    253 
    254    m = pattern.match(name) 
    255    if m is not None: 
    256        components = m.groupdict() 
    257        if components["database"] is not None: 
    258            tokens = components["database"].split("?", 2) 
    259            components["database"] = tokens[0] 
    260 
    261            if len(tokens) > 1: 
    262                query = {} 
    263 
    264                for key, value in util.parse_qsl(tokens[1]): 
    265                    if util.py2k: 
    266                        key = key.encode("ascii") 
    267                    if key in query: 
    268                        query[key] = util.to_list(query[key]) 
    269                        query[key].append(value) 
    270                    else: 
    271                        query[key] = value 
    272            else: 
    273                query = None 
    274        else: 
    275            query = None 
    276        components["query"] = query 
    277 
    278        if components["username"] is not None: 
    279            components["username"] = _rfc_1738_unquote(components["username"]) 
    280 
    281        if components["password"] is not None: 
    282            components["password"] = _rfc_1738_unquote(components["password"]) 
    283 
    284        ipv4host = components.pop("ipv4host") 
    285        ipv6host = components.pop("ipv6host") 
    286        components["host"] = ipv4host or ipv6host 
    287        name = components.pop("name") 
    288        return URL(name, **components) 
    289    else: 
    290        raise exc.ArgumentError( 
    291            "Could not parse rfc1738 URL from string '%s'" % name 
    292        ) 
    293 
    294 
    295def _rfc_1738_quote(text): 
    296    return re.sub(r"[:@/]", lambda m: "%%%X" % ord(m.group(0)), text) 
    297 
    298 
    299def _rfc_1738_unquote(text): 
    300    return util.unquote(text) 
    301 
    302 
    303def _parse_keyvalue_args(name): 
    304    m = re.match(r"(\w+)://(.*)", name) 
    305    if m is not None: 
    306        (name, args) = m.group(1, 2) 
    307        opts = dict(util.parse_qsl(args)) 
    308        return URL(name, *opts) 
    309    else: 
    310        return None