1from __future__ import annotations 
    2import json 
    3 
    4from typing import Callable 
    5 
    6from oauthlib import common # noqa: TC001 
    7 
    8from oauthlib.oauth2.rfc6749 import errors as rfc6749_errors 
    9from oauthlib.oauth2.rfc6749.grant_types.base import GrantTypeBase 
    10 
    11 
    12class DeviceCodeGrant(GrantTypeBase): 
    13    def create_authorization_response( 
    14        self, request: common.Request, token_handler: Callable 
    15    ) -> tuple[dict, str, int]: 
    16        """ 
    17        Validate the device flow request -> create the access token 
    18        -> persist the token -> return the token. 
    19        """ 
    20        headers = self._get_default_headers() 
    21        try: 
    22            self.validate_token_request(request) 
    23        except rfc6749_errors.OAuth2Error as e: 
    24            headers.update(e.headers) 
    25            return headers, e.json, e.status_code 
    26 
    27        token = token_handler.create_token(request, refresh_token=False) 
    28 
    29        for modifier in self._token_modifiers: 
    30            token = modifier(token) 
    31 
    32        self.request_validator.save_token(token, request) 
    33 
    34        return self.create_token_response(request, token_handler) 
    35 
    36    def validate_token_request(self, request: common.Request) -> None: 
    37        """ 
    38        Performs the necessary check against the request to ensure 
    39        it's allowed to retrieve a token. 
    40        """ 
    41        for validator in self.custom_validators.pre_token: 
    42            validator(request) 
    43 
    44        if not getattr(request, "grant_type", None): 
    45            raise rfc6749_errors.InvalidRequestError( 
    46                "Request is missing grant type.", request=request 
    47            ) 
    48 
    49        if request.grant_type != "urn:ietf:params:oauth:grant-type:device_code": 
    50            raise rfc6749_errors.UnsupportedGrantTypeError(request=request) 
    51 
    52        for param in ("grant_type", "scope"): 
    53            if param in request.duplicate_params: 
    54                raise rfc6749_errors.InvalidRequestError( 
    55                    description=f"Duplicate {param} parameter.", request=request 
    56                ) 
    57 
    58        if not self.request_validator.authenticate_client(request): 
    59            raise rfc6749_errors.InvalidClientError(request=request) 
    60        elif not hasattr(request.client, "client_id"): 
    61            raise NotImplementedError( 
    62                "Authenticate client must set the " 
    63                "request.client.client_id attribute " 
    64                "in authenticate_client." 
    65            ) 
    66 
    67        # Ensure client is authorized use of this grant type 
    68        self.validate_grant_type(request) 
    69 
    70        request.client_id = request.client_id or request.client.client_id 
    71        self.validate_scopes(request) 
    72 
    73        for validator in self.custom_validators.post_token: 
    74            validator(request) 
    75 
    76    def create_token_response( 
    77        self, request: common.Request, token_handler: Callable 
    78    ) -> tuple[dict, str, int]: 
    79        """Return token or error in json format. 
    80 
    81        :param request: OAuthlib request. 
    82        :type request: oauthlib.common.Request 
    83        :param token_handler: A token handler instance, for example of type 
    84                              oauthlib.oauth2.BearerToken. 
    85 
    86        If the access token request is valid and authorized, the 
    87        authorization server issues an access token and optional refresh 
    88        token as described in `Section 5.1`_.  If the request failed client 
    89        authentication or is invalid, the authorization server returns an 
    90        error response as described in `Section 5.2`_. 
    91        .. _`Section 5.1`: https://tools.ietf.org/html/rfc6749#section-5.1 
    92        .. _`Section 5.2`: https://tools.ietf.org/html/rfc6749#section-5.2 
    93        """ 
    94        headers = self._get_default_headers() 
    95        try: 
    96            if self.request_validator.client_authentication_required( 
    97                request 
    98            ) and not self.request_validator.authenticate_client(request): 
    99                raise rfc6749_errors.InvalidClientError(request=request) 
    100 
    101            self.validate_token_request(request) 
    102 
    103        except rfc6749_errors.OAuth2Error as e: 
    104            headers.update(e.headers) 
    105            return headers, e.json, e.status_code 
    106 
    107        token = token_handler.create_token(request, self.refresh_token) 
    108 
    109        self.request_validator.save_token(token, request) 
    110 
    111        return headers, json.dumps(token), 200