1from __future__ import annotations
2import json
3
4from typing import Callable
5
6from oauthlib import common # noqa: TC001
7
8from oauthlib.oauth2.rfc6749 import errors as rfc6749_errors
9from oauthlib.oauth2.rfc6749.grant_types.base import GrantTypeBase
10
11
12class DeviceCodeGrant(GrantTypeBase):
13 def create_authorization_response(
14 self, request: common.Request, token_handler: Callable
15 ) -> tuple[dict, str, int]:
16 """
17 Validate the device flow request -> create the access token
18 -> persist the token -> return the token.
19 """
20 headers = self._get_default_headers()
21 try:
22 self.validate_token_request(request)
23 except rfc6749_errors.OAuth2Error as e:
24 headers.update(e.headers)
25 return headers, e.json, e.status_code
26
27 token = token_handler.create_token(request, refresh_token=False)
28
29 for modifier in self._token_modifiers:
30 token = modifier(token)
31
32 self.request_validator.save_token(token, request)
33
34 return self.create_token_response(request, token_handler)
35
36 def validate_token_request(self, request: common.Request) -> None:
37 """
38 Performs the necessary check against the request to ensure
39 it's allowed to retrieve a token.
40 """
41 for validator in self.custom_validators.pre_token:
42 validator(request)
43
44 if not getattr(request, "grant_type", None):
45 raise rfc6749_errors.InvalidRequestError(
46 "Request is missing grant type.", request=request
47 )
48
49 if request.grant_type != "urn:ietf:params:oauth:grant-type:device_code":
50 raise rfc6749_errors.UnsupportedGrantTypeError(request=request)
51
52 for param in ("grant_type", "scope"):
53 if param in request.duplicate_params:
54 raise rfc6749_errors.InvalidRequestError(
55 description=f"Duplicate {param} parameter.", request=request
56 )
57
58 if not self.request_validator.authenticate_client(request):
59 raise rfc6749_errors.InvalidClientError(request=request)
60 elif not hasattr(request.client, "client_id"):
61 raise NotImplementedError(
62 "Authenticate client must set the "
63 "request.client.client_id attribute "
64 "in authenticate_client."
65 )
66
67 # Ensure client is authorized use of this grant type
68 self.validate_grant_type(request)
69
70 request.client_id = request.client_id or request.client.client_id
71 self.validate_scopes(request)
72
73 for validator in self.custom_validators.post_token:
74 validator(request)
75
76 def create_token_response(
77 self, request: common.Request, token_handler: Callable
78 ) -> tuple[dict, str, int]:
79 """Return token or error in json format.
80
81 :param request: OAuthlib request.
82 :type request: oauthlib.common.Request
83 :param token_handler: A token handler instance, for example of type
84 oauthlib.oauth2.BearerToken.
85
86 If the access token request is valid and authorized, the
87 authorization server issues an access token and optional refresh
88 token as described in `Section 5.1`_. If the request failed client
89 authentication or is invalid, the authorization server returns an
90 error response as described in `Section 5.2`_.
91 .. _`Section 5.1`: https://tools.ietf.org/html/rfc6749#section-5.1
92 .. _`Section 5.2`: https://tools.ietf.org/html/rfc6749#section-5.2
93 """
94 headers = self._get_default_headers()
95 try:
96 if self.request_validator.client_authentication_required(
97 request
98 ) and not self.request_validator.authenticate_client(request):
99 raise rfc6749_errors.InvalidClientError(request=request)
100
101 self.validate_token_request(request)
102
103 except rfc6749_errors.OAuth2Error as e:
104 headers.update(e.headers)
105 return headers, e.json, e.status_code
106
107 token = token_handler.create_token(request, self.refresh_token)
108
109 self.request_validator.save_token(token, request)
110
111 return headers, json.dumps(token), 200