Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_auth.py: 62%

63 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:20 +0000

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12#!/usr/bin/python3 

13# Copyright 2022 Google LLC 

14# 

15# Licensed under the Apache License, Version 2.0 (the "License"); 

16# you may not use this file except in compliance with the License. 

17# You may obtain a copy of the License at 

18# 

19# http://www.apache.org/licenses/LICENSE-2.0 

20# 

21# Unless required by applicable law or agreed to in writing, software 

22# distributed under the License is distributed on an "AS IS" BASIS, 

23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

24# See the License for the specific language governing permissions and 

25# limitations under the License. 

26import atheris 

27import sys 

28import requests 

29with atheris.instrument_imports(): 

30 from msal import PublicClientApplication 

31 from msal.application import extract_certs 

32 from msal.authority import AuthorityBuilder 

33 

34 

35# FuzzHttpClient inspired by MinimalHttpClient from msal unit tests 

36class FuzzHttpClient: 

37 """HTTP client returning data seeded by the fuzzer and no real connections""" 

38 def __init__(self, fdp, verify=True, proxies=None, timeout=None): 

39 # We keep these variables from the unit test implementation 

40 # in case some of the MSAL code uses it. 

41 self.session = requests.Session() 

42 self.session.verify = verify 

43 self.session.proxies = proxies 

44 self.timeout = timeout 

45 self.fdp = fdp 

46 

47 def post(self, url, params=None, data=None, headers=None, **kwargs): 

48 return FuzzResponse(fdp = self.fdp) 

49 

50 def get(self, url, params=None, headers=None, **kwargs): 

51 return FuzzResponse(fdp = self.fdp) 

52 

53 def close(self): 

54 self.session.close() 

55 

56 

57class FuzzResponse(object): 

58 def __init__(self, fdp, requests_resp=None, status_code=None, text=None): 

59 # Over-approximate responses by creating a random Response object 

60 self._raw_resp = requests.Response() 

61 self.fdp = fdp 

62 self._raw_resp.status_code = self.fdp.ConsumeIntInRange(100, 599) 

63 self.text = self.fdp.ConsumeString(500) 

64 self.status_code = self._raw_resp.status_code 

65 

66 def raise_for_status(self): 

67 if self._raw_resp is not None: 

68 self._raw_resp.raise_for_status() 

69 

70def is_expected(error_list,error_msg): 

71 for error in error_list: 

72 if error in error_msg: 

73 return True 

74 return False 

75 

76def TestInput(input_bytes): 

77 if len(input_bytes)<32: 

78 return 

79 fdp = atheris.FuzzedDataProvider(input_bytes) 

80 authority = AuthorityBuilder(fdp.ConsumeString(50),fdp.ConsumeString(50)) 

81 try: 

82 app = PublicClientApplication( 

83 client_id=fdp.ConsumeString(32), 

84 authority=authority, 

85 http_client=FuzzHttpClient(fdp) # Use fake Fuzz HTTP client 

86 ) 

87 app.get_accounts() 

88 except (ValueError,KeyError) as e: 

89 error_list = [ 

90 "tenant_discovery_endpoint", 

91 "Invalid IPv6 URL", 

92 "should consist of an https url with a minimum of one segment in a path", 

93 "netloc" 

94 ] 

95 if not is_expected(error_list,str(e)): 

96 raise e 

97 

98 cert = "-----BEGIN CERTIFICATE-----%s-----END CERTIFICATE-----"%fdp.ConsumeString(200) 

99 extract_certs(cert) 

100 

101def main(): 

102 atheris.Setup(sys.argv, TestInput, enable_python_coverage=True) 

103 atheris.instrument_all() 

104 atheris.Fuzz() 

105 

106if __name__ == "__main__": 

107 main()