Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_jwt.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12#!/usr/bin/python3 

13# Copyright 2023 Google LLC 

14# 

15# Licensed under the Apache License, Version 2.0 (the "License"); 

16# you may not use this file except in compliance with the License. 

17# You may obtain a copy of the License at 

18# 

19# http://www.apache.org/licenses/LICENSE-2.0 

20# 

21# Unless required by applicable law or agreed to in writing, software 

22# distributed under the License is distributed on an "AS IS" BASIS, 

23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

24# See the License for the specific language governing permissions and 

25# limitations under the License. 

26import sys 

27import jwt 

28import atheris 

29 

30from flask import Flask 

31from flask import jsonify 

32 

33from flask_jwt_extended import create_access_token 

34from flask_jwt_extended import decode_token 

35from flask_jwt_extended import jwt_required 

36from flask_jwt_extended import JWTManager 

37from flask_jwt_extended import verify_jwt_in_request 

38 

39 

40def get_app(key): 

41 """Helper method to get a flask app.""" 

42 app = Flask(__name__) 

43 app.config["JWT_SECRET_KEY"] = key if key != "" else "randomfuzzkey" 

44 app.config["JWT_TOKEN_LOCATION"] = ["query_string"] 

45 JWTManager(app) 

46 

47 @app.route("/protected", methods=["GET"]) 

48 @jwt_required() 

49 def access_protected(): 

50 return jsonify(foo="bar") 

51 

52 return app 

53 

54 

55def test_encodings(data): 

56 fdp = atheris.FuzzedDataProvider(data) 

57 app = get_app(fdp.ConsumeUnicodeNoSurrogates(64)) 

58 with app.test_request_context(): 

59 token = create_access_token(fdp.ConsumeUnicodeNoSurrogates(sys.maxsize)) 

60 decoded_token = decode_token(token) 

61 

62 

63def test_get(data): 

64 fdp = atheris.FuzzedDataProvider(data) 

65 app = get_app(fdp.ConsumeUnicodeNoSurrogates(64)) 

66 

67 @app.route("/custom", methods=["GET"]) 

68 def custom(): 

69 jwt_header, jwt_data = verify_jwt_in_request(optional=fdp.ConsumeBool(), 

70 fresh=fdp.ConsumeBool(), 

71 refresh=fdp.ConsumeBool()) 

72 if fdp.ConsumeBool(): 

73 return jsonify(foo=fdp.ConsumeUnicodeNoSurrogates(256)) 

74 else: 

75 return { 

76 fdp.ConsumeUnicodeNoSurrogates(124): 

77 fdp.ConsumeUnicodeNoSurrogates(124) 

78 } 

79 

80 url = "/custom" 

81 test_client = app.test_client() 

82 with app.test_request_context(): 

83 try: 

84 token = create_access_token(fdp.ConsumeUnicodeNoSurrogates(256)) 

85 except jwt.exceptions.InvalidKeyError: 

86 return 

87 

88 headers = {"Authorization": "Bearer {}".format(token)} 

89 response = test_client.get(url, headers=headers) 

90 

91 # Get the json return from /custom 

92 response.get_json() 

93 

94 

95def TestOneInput(data): 

96 fdp = atheris.FuzzedDataProvider(data) 

97 if fdp.ConsumeBool(): 

98 test_get(data) 

99 else: 

100 test_encodings(data) 

101 

102 

103def main(): 

104 atheris.instrument_all() 

105 atheris.Setup(sys.argv, TestOneInput) 

106 atheris.Fuzz() 

107 

108 

109if __name__ == "__main__": 

110 main()