Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_tokencache.py: 51%
47 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:13 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:13 +0000
1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12#!/usr/bin/python3
13# Copyright 2022 Google LLC
14#
15# Licensed under the Apache License, Version 2.0 (the "License");
16# you may not use this file except in compliance with the License.
17# You may obtain a copy of the License at
18#
19# http://www.apache.org/licenses/LICENSE-2.0
20#
21# Unless required by applicable law or agreed to in writing, software
22# distributed under the License is distributed on an "AS IS" BASIS,
23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24# See the License for the specific language governing permissions and
25# limitations under the License.
27import base64
28import atheris
29import sys
30with atheris.instrument_imports():
31 from msal.token_cache import *
33#Create dummy token
34def build_token(issuer="issuer",subject="subject",id="id",**claims):
35 return "header.%s.signature" % base64.b64encode(json.dumps(dict({
36 "iss": issuer, "sub": subject, "aud": id,
37 "exp": (time.time() + 100), "iat": time.time()
38 }, **claims)).encode()).decode('utf-8')
40#Create dummy response
41def build_response(uid,utid,access_token,expires_in,token_type,**kwargs):
42 response = {}
43 if uid and utid:
44 response["client_info"] = base64.b64encode(json.dumps({
45 "uid": uid, "utid": utid,
46 }).encode()).decode('utf-8')
47 if access_token:
48 response.update({
49 "access_token": access_token,
50 "expires_in": expires_in,
51 "token_type": token_type,
52 })
53 response.update(kwargs) # Pass-through key-value pairs as top-level fields
54 return response
56def is_expected(error_list,error_msg):
57 for error in error_list:
58 if error in error_msg:
59 return True
60 return False
62def TestInput(input_bytes):
63 if len(input_bytes)<32:
64 return
66 fdp = atheris.FuzzedDataProvider(input_bytes)
67 cache = TokenCache()
68 client_id = fdp.ConsumeString(32)
69 try:
70 token = build_token(
71 oid=fdp.ConsumeString(10),
72 preferred_username=fdp.ConsumeString(10),
73 id=client_id
74 )
75 cache.add({
76 "client_id": client_id,
77 "scope": ["s2", "s1", "s3"],
78 "token_endpoint": "https://%s"%fdp.ConsumeString(20),
79 "response": build_response(
80 token_type=fdp.ConsumeString(5),
81 uid=fdp.ConsumeString(5),
82 utid=fdp.ConsumeString(5),
83 expires_in=3600,
84 access_token=fdp.ConsumeString(10),
85 id_token=token,
86 refresh_token=fdp.ConsumeString(10)
87 ),
88 }, now=1000)
89 except ValueError as e:
90 error_list = [
91 "netloc",
92 "Invalid IPv6 URL",
93 "should consist of an https url with a minimum of one segment in a path"
94 ]
95 if not is_expected(error_list,str(e)):
96 raise e
98def main():
99 atheris.Setup(sys.argv, TestInput, enable_python_coverage=True)
100 atheris.instrument_all()
101 atheris.Fuzz()
103if __name__ == "__main__":
104 main()