Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/oauthlib/oauth2/rfc8628/grant_types/device_code.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

50 statements  

1from __future__ import annotations 

2import json 

3 

4from typing import Callable 

5 

6from oauthlib import common # noqa: TC001 

7 

8from oauthlib.oauth2.rfc6749 import errors as rfc6749_errors 

9from oauthlib.oauth2.rfc6749.grant_types.base import GrantTypeBase 

10 

11 

12class DeviceCodeGrant(GrantTypeBase): 

13 def create_authorization_response( 

14 self, request: common.Request, token_handler: Callable 

15 ) -> tuple[dict, str, int]: 

16 """ 

17 Validate the device flow request -> create the access token 

18 -> persist the token -> return the token. 

19 """ 

20 headers = self._get_default_headers() 

21 try: 

22 self.validate_token_request(request) 

23 except rfc6749_errors.OAuth2Error as e: 

24 headers.update(e.headers) 

25 return headers, e.json, e.status_code 

26 

27 token = token_handler.create_token(request, refresh_token=False) 

28 

29 for modifier in self._token_modifiers: 

30 token = modifier(token) 

31 

32 self.request_validator.save_token(token, request) 

33 

34 return self.create_token_response(request, token_handler) 

35 

36 def validate_token_request(self, request: common.Request) -> None: 

37 """ 

38 Performs the necessary check against the request to ensure 

39 it's allowed to retrieve a token. 

40 """ 

41 for validator in self.custom_validators.pre_token: 

42 validator(request) 

43 

44 if not getattr(request, "grant_type", None): 

45 raise rfc6749_errors.InvalidRequestError( 

46 "Request is missing grant type.", request=request 

47 ) 

48 

49 if request.grant_type != "urn:ietf:params:oauth:grant-type:device_code": 

50 raise rfc6749_errors.UnsupportedGrantTypeError(request=request) 

51 

52 for param in ("grant_type", "scope"): 

53 if param in request.duplicate_params: 

54 raise rfc6749_errors.InvalidRequestError( 

55 description=f"Duplicate {param} parameter.", request=request 

56 ) 

57 

58 if not self.request_validator.authenticate_client(request): 

59 raise rfc6749_errors.InvalidClientError(request=request) 

60 elif not hasattr(request.client, "client_id"): 

61 raise NotImplementedError( 

62 "Authenticate client must set the " 

63 "request.client.client_id attribute " 

64 "in authenticate_client." 

65 ) 

66 

67 # Ensure client is authorized use of this grant type 

68 self.validate_grant_type(request) 

69 

70 request.client_id = request.client_id or request.client.client_id 

71 self.validate_scopes(request) 

72 

73 for validator in self.custom_validators.post_token: 

74 validator(request) 

75 

76 def create_token_response( 

77 self, request: common.Request, token_handler: Callable 

78 ) -> tuple[dict, str, int]: 

79 """Return token or error in json format. 

80 

81 :param request: OAuthlib request. 

82 :type request: oauthlib.common.Request 

83 :param token_handler: A token handler instance, for example of type 

84 oauthlib.oauth2.BearerToken. 

85 

86 If the access token request is valid and authorized, the 

87 authorization server issues an access token and optional refresh 

88 token as described in `Section 5.1`_. If the request failed client 

89 authentication or is invalid, the authorization server returns an 

90 error response as described in `Section 5.2`_. 

91 .. _`Section 5.1`: https://tools.ietf.org/html/rfc6749#section-5.1 

92 .. _`Section 5.2`: https://tools.ietf.org/html/rfc6749#section-5.2 

93 """ 

94 headers = self._get_default_headers() 

95 try: 

96 if self.request_validator.client_authentication_required( 

97 request 

98 ) and not self.request_validator.authenticate_client(request): 

99 raise rfc6749_errors.InvalidClientError(request=request) 

100 

101 self.validate_token_request(request) 

102 

103 except rfc6749_errors.OAuth2Error as e: 

104 headers.update(e.headers) 

105 return headers, e.json, e.status_code 

106 

107 token = token_handler.create_token(request, self.refresh_token) 

108 

109 self.request_validator.save_token(token, request) 

110 

111 return headers, json.dumps(token), 200