Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_auth.py: 62%
63 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:20 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:20 +0000
1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12#!/usr/bin/python3
13# Copyright 2022 Google LLC
14#
15# Licensed under the Apache License, Version 2.0 (the "License");
16# you may not use this file except in compliance with the License.
17# You may obtain a copy of the License at
18#
19# http://www.apache.org/licenses/LICENSE-2.0
20#
21# Unless required by applicable law or agreed to in writing, software
22# distributed under the License is distributed on an "AS IS" BASIS,
23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24# See the License for the specific language governing permissions and
25# limitations under the License.
26import atheris
27import sys
28import requests
29with atheris.instrument_imports():
30 from msal import PublicClientApplication
31 from msal.application import extract_certs
32 from msal.authority import AuthorityBuilder
35# FuzzHttpClient inspired by MinimalHttpClient from msal unit tests
36class FuzzHttpClient:
37 """HTTP client returning data seeded by the fuzzer and no real connections"""
38 def __init__(self, fdp, verify=True, proxies=None, timeout=None):
39 # We keep these variables from the unit test implementation
40 # in case some of the MSAL code uses it.
41 self.session = requests.Session()
42 self.session.verify = verify
43 self.session.proxies = proxies
44 self.timeout = timeout
45 self.fdp = fdp
47 def post(self, url, params=None, data=None, headers=None, **kwargs):
48 return FuzzResponse(fdp = self.fdp)
50 def get(self, url, params=None, headers=None, **kwargs):
51 return FuzzResponse(fdp = self.fdp)
53 def close(self):
54 self.session.close()
57class FuzzResponse(object):
58 def __init__(self, fdp, requests_resp=None, status_code=None, text=None):
59 # Over-approximate responses by creating a random Response object
60 self._raw_resp = requests.Response()
61 self.fdp = fdp
62 self._raw_resp.status_code = self.fdp.ConsumeIntInRange(100, 599)
63 self.text = self.fdp.ConsumeString(500)
64 self.status_code = self._raw_resp.status_code
66 def raise_for_status(self):
67 if self._raw_resp is not None:
68 self._raw_resp.raise_for_status()
70def is_expected(error_list,error_msg):
71 for error in error_list:
72 if error in error_msg:
73 return True
74 return False
76def TestInput(input_bytes):
77 if len(input_bytes)<32:
78 return
79 fdp = atheris.FuzzedDataProvider(input_bytes)
80 authority = AuthorityBuilder(fdp.ConsumeString(50),fdp.ConsumeString(50))
81 try:
82 app = PublicClientApplication(
83 client_id=fdp.ConsumeString(32),
84 authority=authority,
85 http_client=FuzzHttpClient(fdp) # Use fake Fuzz HTTP client
86 )
87 app.get_accounts()
88 except (ValueError,KeyError) as e:
89 error_list = [
90 "tenant_discovery_endpoint",
91 "Invalid IPv6 URL",
92 "should consist of an https url with a minimum of one segment in a path",
93 "netloc"
94 ]
95 if not is_expected(error_list,str(e)):
96 raise e
98 cert = "-----BEGIN CERTIFICATE-----%s-----END CERTIFICATE-----"%fdp.ConsumeString(200)
99 extract_certs(cert)
101def main():
102 atheris.Setup(sys.argv, TestInput, enable_python_coverage=True)
103 atheris.instrument_all()
104 atheris.Fuzz()
106if __name__ == "__main__":
107 main()