Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_server/auth/decorator.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

52 statements  

1"""Decorator for layering authorization into JupyterHandlers.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5import asyncio 

6from functools import wraps 

7from typing import Any, Callable, Optional, TypeVar, Union, cast 

8 

9from jupyter_core.utils import ensure_async 

10from tornado.log import app_log 

11from tornado.web import HTTPError 

12 

13from .utils import HTTP_METHOD_TO_AUTH_ACTION 

14 

15FuncT = TypeVar("FuncT", bound=Callable[..., Any]) 

16 

17 

18def authorized( 

19 action: Optional[Union[str, FuncT]] = None, 

20 resource: Optional[str] = None, 

21 message: Optional[str] = None, 

22) -> FuncT: 

23 """A decorator for tornado.web.RequestHandler methods 

24 that verifies whether the current user is authorized 

25 to make the following request. 

26 

27 Helpful for adding an 'authorization' layer to 

28 a REST API. 

29 

30 .. versionadded:: 2.0 

31 

32 Parameters 

33 ---------- 

34 action : str 

35 the type of permission or action to check. 

36 

37 resource: str or None 

38 the name of the resource the action is being authorized 

39 to access. 

40 

41 message : str or none 

42 a message for the unauthorized action. 

43 """ 

44 

45 def wrapper(method): 

46 @wraps(method) 

47 async def inner(self, *args, **kwargs): 

48 # default values for action, resource 

49 nonlocal action 

50 nonlocal resource 

51 nonlocal message 

52 if action is None: 

53 http_method = self.request.method.upper() 

54 action = HTTP_METHOD_TO_AUTH_ACTION[http_method] 

55 if resource is None: 

56 resource = self.auth_resource 

57 if message is None: 

58 message = f"User is not authorized to {action} on resource: {resource}." 

59 

60 user = self.current_user 

61 if not user: 

62 app_log.warning("Attempting to authorize request without authentication!") 

63 raise HTTPError(status_code=403, log_message=message) 

64 # If the user is allowed to do this action, 

65 # call the method. 

66 authorized = await ensure_async( 

67 self.authorizer.is_authorized(self, user, action, resource) 

68 ) 

69 if authorized: 

70 out = method(self, *args, **kwargs) 

71 # If the method is a coroutine, await it 

72 if asyncio.iscoroutine(out): 

73 return await out 

74 return out 

75 # else raise an exception. 

76 else: 

77 raise HTTPError(status_code=403, log_message=message) 

78 

79 return inner 

80 

81 if callable(action): 

82 method = action 

83 action = None 

84 # no-arguments `@authorized` decorator called 

85 return cast(FuncT, wrapper(method)) 

86 

87 return cast(FuncT, wrapper) 

88 

89 

90def allow_unauthenticated(method: FuncT) -> FuncT: 

91 """A decorator for tornado.web.RequestHandler methods 

92 that allows any user to make the following request. 

93 

94 Selectively disables the 'authentication' layer of REST API which 

95 is active when `ServerApp.allow_unauthenticated_access = False`. 

96 

97 To be used exclusively on endpoints which may be considered public, 

98 for example the login page handler. 

99 

100 .. versionadded:: 2.13 

101 

102 Parameters 

103 ---------- 

104 method : bound callable 

105 the endpoint method to remove authentication from. 

106 """ 

107 

108 @wraps(method) 

109 def wrapper(self, *args, **kwargs): 

110 return method(self, *args, **kwargs) 

111 

112 setattr(wrapper, "__allow_unauthenticated", True) 

113 

114 return cast(FuncT, wrapper) 

115 

116 

117def ws_authenticated(method: FuncT) -> FuncT: 

118 """A decorator for websockets derived from `WebSocketHandler` 

119 that authenticates user before allowing to proceed. 

120 

121 Differently from tornado.web.authenticated, does not redirect 

122 to the login page, which would be meaningless for websockets. 

123 

124 .. versionadded:: 2.13 

125 

126 Parameters 

127 ---------- 

128 method : bound callable 

129 the endpoint method to add authentication for. 

130 """ 

131 

132 @wraps(method) 

133 def wrapper(self, *args, **kwargs): 

134 user = self.current_user 

135 if user is None: 

136 self.log.warning("Couldn't authenticate WebSocket connection") 

137 raise HTTPError(403) 

138 return method(self, *args, **kwargs) 

139 

140 setattr(wrapper, "__allow_unauthenticated", False) 

141 

142 return cast(FuncT, wrapper)