1""" 
    2oauthlib.oauth2.rfc8628 
    3~~~~~~~~~~~~~~~~~~~~~~~ 
    4 
    5This module is an implementation of various logic needed 
    6for consuming and providing OAuth 2.0 RFC8628. 
    7""" 
    8 
    9import logging 
    10from typing import Callable 
    11 
    12from oauthlib.common import Request, generate_token 
    13from oauthlib.oauth2.rfc6749 import errors 
    14from oauthlib.oauth2.rfc6749.endpoints.base import ( 
    15    BaseEndpoint, 
    16    catch_errors_and_unavailability, 
    17) 
    18 
    19log = logging.getLogger(__name__) 
    20 
    21 
    22class DeviceAuthorizationEndpoint(BaseEndpoint): 
    23    """DeviceAuthorization endpoint - used by the client to initiate 
    24    the authorization flow by requesting a set of verification codes 
    25    from the authorization server by making an HTTP "POST" request to 
    26    the device authorization endpoint. 
    27 
    28    The client authentication requirements of Section 3.2.1 of [RFC6749] 
    29    apply to requests on this endpoint, which means that confidential 
    30    clients (those that have established client credentials) authenticate 
    31    in the same manner as when making requests to the token endpoint, and 
    32    public clients provide the "client_id" parameter to identify 
    33    themselves. 
    34    """ 
    35 
    36    def __init__( 
    37        self, 
    38        request_validator, 
    39        verification_uri, 
    40        expires_in=1800, 
    41        interval=None, 
    42        verification_uri_complete=None, 
    43        user_code_generator: Callable[[None], str] = None, 
    44    ): 
    45        """ 
    46        :param request_validator: An instance of RequestValidator. 
    47        :type request_validator: oauthlib.oauth2.rfc6749.RequestValidator. 
    48        :param verification_uri: a string containing the URL that can be polled by the client application 
    49        :param expires_in: a number that represents the lifetime of the `user_code` and `device_code` 
    50        :param interval: an option number that represents the number of seconds between each poll requests 
    51        :param verification_uri_complete: a string of a function that can be called with `user_data` as parameter 
    52        :param user_code_generator: a callable that returns a configurable user code 
    53        """ 
    54        self.request_validator = request_validator 
    55        self._expires_in = expires_in 
    56        self._interval = interval 
    57        self._verification_uri = verification_uri 
    58        self._verification_uri_complete = verification_uri_complete 
    59        self.user_code_generator = user_code_generator 
    60 
    61        BaseEndpoint.__init__(self) 
    62 
    63    @property 
    64    def interval(self): 
    65        """The minimum amount of time in seconds that the client 
    66        SHOULD wait between polling requests to the token endpoint.  If no 
    67        value is provided, clients MUST use 5 as the default. 
    68        """ 
    69        return self._interval 
    70 
    71    @property 
    72    def expires_in(self): 
    73        """The lifetime in seconds of the "device_code" and "user_code".""" 
    74        return self._expires_in 
    75 
    76    @property 
    77    def verification_uri(self): 
    78        """The end-user verification URI on the authorization 
    79        server.  The URI should be short and easy to remember as end users 
    80        will be asked to manually type it into their user agent. 
    81        """ 
    82        return self._verification_uri 
    83 
    84    def verification_uri_complete(self, user_code): 
    85        if not self._verification_uri_complete: 
    86            return None 
    87        if isinstance(self._verification_uri_complete, str): 
    88            return self._verification_uri_complete.format(user_code=user_code) 
    89        if callable(self._verification_uri_complete): 
    90            return self._verification_uri_complete(user_code) 
    91        return None 
    92 
    93    @catch_errors_and_unavailability 
    94    def validate_device_authorization_request(self, request): 
    95        """Validate the device authorization request. 
    96 
    97        The client_id is required if the client is not authenticating with the 
    98        authorization server as described in `Section 3.2.1. of [RFC6749]`_. 
    99        The client identifier as described in `Section 2.2 of [RFC6749]`_. 
    100 
    101        .. _`Section 3.2.1. of [RFC6749]`: https://www.rfc-editor.org/rfc/rfc6749#section-3.2.1 
    102        .. _`Section 2.2 of [RFC6749]`: https://www.rfc-editor.org/rfc/rfc6749#section-2.2 
    103        """ 
    104 
    105        # First check duplicate parameters 
    106        for param in ("client_id", "scope"): 
    107            try: 
    108                duplicate_params = request.duplicate_params 
    109            except ValueError: 
    110                raise errors.InvalidRequestFatalError( 
    111                    description="Unable to parse query string", request=request 
    112                ) 
    113            if param in duplicate_params: 
    114                raise errors.InvalidRequestFatalError( 
    115                    description="Duplicate %s parameter." % param, request=request 
    116                ) 
    117 
    118        # the "application/x-www-form-urlencoded" format, per Appendix B of [RFC6749] 
    119        # https://www.rfc-editor.org/rfc/rfc6749#appendix-B 
    120        if request.headers["Content-Type"] != "application/x-www-form-urlencoded": 
    121            raise errors.InvalidRequestError( 
    122                "Content-Type must be application/x-www-form-urlencoded", 
    123                request=request, 
    124            ) 
    125 
    126        # REQUIRED. The client identifier as described in Section 2.2. 
    127        # https://tools.ietf.org/html/rfc6749#section-2.2 
    128        # TODO: extract client_id an helper validation function. 
    129        if not request.client_id: 
    130            raise errors.MissingClientIdError(request=request) 
    131 
    132        if not self.request_validator.validate_client_id(request.client_id, request): 
    133            raise errors.InvalidClientIdError(request=request) 
    134 
    135        # The client authentication requirements of Section 3.2.1 of [RFC6749] 
    136        # apply to requests on this endpoint, which means that confidential 
    137        # clients (those that have established client credentials) authenticate 
    138        # in the same manner as when making requests to the token endpoint, and 
    139        # public clients provide the "client_id" parameter to identify 
    140        # themselves. 
    141        self._raise_on_invalid_client(request) 
    142 
    143    @catch_errors_and_unavailability 
    144    def create_device_authorization_response( 
    145        self, uri, http_method="POST", body=None, headers=None 
    146    ): 
    147        """ 
    148           Generate a unique device verification code and an end-user code that are valid for a limited time. 
    149           Include them in the HTTP response body using the "application/json" format [RFC8259] with a 
    150           200 (OK) status code, as described in `Section-3.2`_. 
    151 
    152           :param uri: The full URI of the token request. 
    153           :type uri: str 
    154           :param request: OAuthlib request. 
    155           :type request: oauthlib.common.Request 
    156           :param user_code_generator: 
    157               A callable that returns a string for the user code. 
    158               This allows the caller to decide how the `user_code` should be formatted. 
    159           :type user_code_generator: Callable[[], str] 
    160           :return: A tuple of three elements: 
    161                    1. A dict of headers to set on the response. 
    162                    2. The response body as a string. 
    163                    3. The response status code as an integer. 
    164           :rtype: tuple 
    165 
    166           The response contains the following parameters: 
    167 
    168           device_code 
    169              **REQUIRED.** The device verification code. 
    170 
    171           user_code 
    172              **REQUIRED.** The end-user verification code. 
    173 
    174           verification_uri 
    175              **REQUIRED.** The end-user verification URI on the authorization server. 
    176              The URI should be short and easy to remember as end users will be asked 
    177              to manually type it into their user agent. 
    178 
    179           verification_uri_complete 
    180              **OPTIONAL.** A verification URI that includes the `user_code` (or 
    181              other information with the same function as the `user_code`), which is 
    182              designed for non-textual transmission. 
    183 
    184           expires_in 
    185              **REQUIRED.** The lifetime in seconds of the `device_code` and `user_code`. 
    186 
    187           interval 
    188              **OPTIONAL.** The minimum amount of time in seconds that the client 
    189              SHOULD wait between polling requests to the token endpoint. If no 
    190              value is provided, clients MUST use 5 as the default. 
    191 
    192           **For example:** 
    193 
    194              .. code-block:: http 
    195 
    196                 HTTP/1.1 200 OK 
    197                 Content-Type: application/json 
    198                 Cache-Control: no-store 
    199 
    200                 { 
    201                   "device_code": "GmRhmhcxhwAzkoEqiMEg_DnyEysNkuNhszIySk9eS", 
    202                   "user_code": "WDJB-MJHT", 
    203                   "verification_uri": "https://example.com/device", 
    204                   "verification_uri_complete": 
    205                       "https://example.com/device?user_code=WDJB-MJHT", 
    206                   "expires_in": 1800, 
    207                   "interval": 5 
    208                 } 
    209 
    210           .. _`Section-3.2`: https://www.rfc-editor.org/rfc/rfc8628#section-3.2 
    211           """ 
    212        request = Request(uri, http_method, body, headers) 
    213        self.validate_device_authorization_request(request) 
    214        log.debug("Pre resource owner authorization validation ok for %r.", request) 
    215 
    216        headers = {} 
    217        user_code = self.user_code_generator() if self.user_code_generator else generate_token() 
    218        data = { 
    219            "verification_uri": self.verification_uri, 
    220            "expires_in": self.expires_in, 
    221            "user_code": user_code, 
    222            "device_code": generate_token(), 
    223        } 
    224        if self.interval is not None: 
    225            data["interval"] = self.interval 
    226 
    227 
    228        verification_uri_complete = self.verification_uri_complete(user_code) 
    229        if verification_uri_complete: 
    230            data["verification_uri_complete"] = verification_uri_complete 
    231 
    232        return headers, data, 200