1# ------------------------------------------------------------------------- 
    2# Copyright (c) Microsoft Corporation. All rights reserved. 
    3# Licensed under the MIT License. See LICENSE.txt in the project root for 
    4# license information. 
    5# ------------------------------------------------------------------------- 
    6from typing import Any, NamedTuple, Optional, TypedDict, Union, ContextManager 
    7from typing_extensions import Protocol, runtime_checkable 
    8 
    9 
    10class AccessToken(NamedTuple): 
    11    """Represents an OAuth access token.""" 
    12 
    13    token: str 
    14    """The token string.""" 
    15    expires_on: int 
    16    """The token's expiration time in Unix time.""" 
    17 
    18 
    19class AccessTokenInfo: 
    20    """Information about an OAuth access token. 
    21 
    22    This class is an alternative to `AccessToken` which provides additional information about the token. 
    23 
    24    :param str token: The token string. 
    25    :param int expires_on: The token's expiration time in Unix time. 
    26    :keyword str token_type: The type of access token. Defaults to 'Bearer'. 
    27    :keyword int refresh_on: Specifies the time, in Unix time, when the cached token should be proactively 
    28        refreshed. Optional. 
    29    """ 
    30 
    31    token: str 
    32    """The token string.""" 
    33    expires_on: int 
    34    """The token's expiration time in Unix time.""" 
    35    token_type: str 
    36    """The type of access token.""" 
    37    refresh_on: Optional[int] 
    38    """Specifies the time, in Unix time, when the cached token should be proactively refreshed. Optional.""" 
    39 
    40    def __init__( 
    41        self, 
    42        token: str, 
    43        expires_on: int, 
    44        *, 
    45        token_type: str = "Bearer", 
    46        refresh_on: Optional[int] = None, 
    47    ) -> None: 
    48        self.token = token 
    49        self.expires_on = expires_on 
    50        self.token_type = token_type 
    51        self.refresh_on = refresh_on 
    52 
    53    def __repr__(self) -> str: 
    54        return "AccessTokenInfo(token='{}', expires_on={}, token_type='{}', refresh_on={})".format( 
    55            self.token, self.expires_on, self.token_type, self.refresh_on 
    56        ) 
    57 
    58 
    59class TokenRequestOptions(TypedDict, total=False): 
    60    """Options to use for access token requests. All parameters are optional.""" 
    61 
    62    claims: str 
    63    """Additional claims required in the token, such as those returned in a resource provider's claims 
    64    challenge following an authorization failure.""" 
    65    tenant_id: str 
    66    """The tenant ID to include in the token request.""" 
    67    enable_cae: bool 
    68    """Indicates whether to enable Continuous Access Evaluation (CAE) for the requested token.""" 
    69 
    70 
    71@runtime_checkable 
    72class TokenCredential(Protocol): 
    73    """Protocol for classes able to provide OAuth tokens.""" 
    74 
    75    def get_token( 
    76        self, 
    77        *scopes: str, 
    78        claims: Optional[str] = None, 
    79        tenant_id: Optional[str] = None, 
    80        enable_cae: bool = False, 
    81        **kwargs: Any, 
    82    ) -> AccessToken: 
    83        """Request an access token for `scopes`. 
    84 
    85        :param str scopes: The type of access needed. 
    86 
    87        :keyword str claims: Additional claims required in the token, such as those returned in a resource 
    88            provider's claims challenge following an authorization failure. 
    89        :keyword str tenant_id: Optional tenant to include in the token request. 
    90        :keyword bool enable_cae: Indicates whether to enable Continuous Access Evaluation (CAE) for the requested 
    91            token. Defaults to False. 
    92 
    93        :rtype: AccessToken 
    94        :return: An AccessToken instance containing the token string and its expiration time in Unix time. 
    95        """ 
    96        ... 
    97 
    98 
    99@runtime_checkable 
    100class SupportsTokenInfo(Protocol, ContextManager["SupportsTokenInfo"]): 
    101    """Protocol for classes able to provide OAuth access tokens with additional properties.""" 
    102 
    103    def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] = None) -> AccessTokenInfo: 
    104        """Request an access token for `scopes`. 
    105 
    106        This is an alternative to `get_token` to enable certain scenarios that require additional properties 
    107        on the token. 
    108 
    109        :param str scopes: The type of access needed. 
    110        :keyword options: A dictionary of options for the token request. Unknown options will be ignored. Optional. 
    111        :paramtype options: TokenRequestOptions 
    112 
    113        :rtype: AccessTokenInfo 
    114        :return: An AccessTokenInfo instance containing information about the token. 
    115        """ 
    116        ... 
    117 
    118    def close(self) -> None: 
    119        """Close the credential, releasing any resources it holds. 
    120 
    121        :return: None 
    122        :rtype: None 
    123        """ 
    124 
    125 
    126TokenProvider = Union[TokenCredential, SupportsTokenInfo] 
    127 
    128 
    129class AzureNamedKey(NamedTuple): 
    130    """Represents a name and key pair.""" 
    131 
    132    name: str 
    133    key: str 
    134 
    135 
    136__all__ = [ 
    137    "AzureKeyCredential", 
    138    "AzureSasCredential", 
    139    "AccessToken", 
    140    "AccessTokenInfo", 
    141    "SupportsTokenInfo", 
    142    "AzureNamedKeyCredential", 
    143    "TokenCredential", 
    144    "TokenRequestOptions", 
    145    "TokenProvider", 
    146] 
    147 
    148 
    149class AzureKeyCredential: 
    150    """Credential type used for authenticating to an Azure service. 
    151    It provides the ability to update the key without creating a new client. 
    152 
    153    :param str key: The key used to authenticate to an Azure service 
    154    :raises TypeError: If the key is not a string. 
    155    """ 
    156 
    157    def __init__(self, key: str) -> None: 
    158        if not isinstance(key, str): 
    159            raise TypeError("key must be a string.") 
    160        self._key = key 
    161 
    162    @property 
    163    def key(self) -> str: 
    164        """The value of the configured key. 
    165 
    166        :rtype: str 
    167        :return: The value of the configured key. 
    168        """ 
    169        return self._key 
    170 
    171    def update(self, key: str) -> None: 
    172        """Update the key. 
    173 
    174        This can be used when you've regenerated your service key and want 
    175        to update long-lived clients. 
    176 
    177        :param str key: The key used to authenticate to an Azure service 
    178        :raises ValueError or TypeError: If the key is None, empty, or not a string. 
    179        """ 
    180        if not key: 
    181            raise ValueError("The key used for updating can not be None or empty") 
    182        if not isinstance(key, str): 
    183            raise TypeError("The key used for updating must be a string.") 
    184        self._key = key 
    185 
    186 
    187class AzureSasCredential: 
    188    """Credential type used for authenticating to an Azure service. 
    189    It provides the ability to update the shared access signature without creating a new client. 
    190 
    191    :param str signature: The shared access signature used to authenticate to an Azure service 
    192    :raises TypeError: If the signature is not a string. 
    193    """ 
    194 
    195    def __init__(self, signature: str) -> None: 
    196        if not isinstance(signature, str): 
    197            raise TypeError("signature must be a string.") 
    198        self._signature = signature 
    199 
    200    @property 
    201    def signature(self) -> str: 
    202        """The value of the configured shared access signature. 
    203 
    204        :rtype: str 
    205        :return: The value of the configured shared access signature. 
    206        """ 
    207        return self._signature 
    208 
    209    def update(self, signature: str) -> None: 
    210        """Update the shared access signature. 
    211 
    212        This can be used when you've regenerated your shared access signature and want 
    213        to update long-lived clients. 
    214 
    215        :param str signature: The shared access signature used to authenticate to an Azure service 
    216        :raises ValueError: If the signature is None or empty. 
    217        :raises TypeError: If the signature is not a string. 
    218        """ 
    219        if not signature: 
    220            raise ValueError("The signature used for updating can not be None or empty") 
    221        if not isinstance(signature, str): 
    222            raise TypeError("The signature used for updating must be a string.") 
    223        self._signature = signature 
    224 
    225 
    226class AzureNamedKeyCredential: 
    227    """Credential type used for working with any service needing a named key that follows patterns 
    228    established by the other credential types. 
    229 
    230    :param str name: The name of the credential used to authenticate to an Azure service. 
    231    :param str key: The key used to authenticate to an Azure service. 
    232    :raises TypeError: If the name or key is not a string. 
    233    """ 
    234 
    235    def __init__(self, name: str, key: str) -> None: 
    236        if not isinstance(name, str) or not isinstance(key, str): 
    237            raise TypeError("Both name and key must be strings.") 
    238        self._credential = AzureNamedKey(name, key) 
    239 
    240    @property 
    241    def named_key(self) -> AzureNamedKey: 
    242        """The value of the configured name. 
    243 
    244        :rtype: AzureNamedKey 
    245        :return: The value of the configured name. 
    246        """ 
    247        return self._credential 
    248 
    249    def update(self, name: str, key: str) -> None: 
    250        """Update the named key credential. 
    251 
    252        Both name and key must be provided in order to update the named key credential. 
    253        Individual attributes cannot be updated. 
    254 
    255        :param str name: The name of the credential used to authenticate to an Azure service. 
    256        :param str key: The key used to authenticate to an Azure service. 
    257        """ 
    258        if not isinstance(name, str) or not isinstance(key, str): 
    259            raise TypeError("Both name and key must be strings.") 
    260        self._credential = AzureNamedKey(name, key)