Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_actions_validate.py: 66%
91 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12#!/usr/bin/python3
13# Copyright 2023 Google LLC
14#
15# Licensed under the Apache License, Version 2.0 (the "License");
16# you may not use this file except in compliance with the License.
17# You may obtain a copy of the License at
18#
19# http://www.apache.org/licenses/LICENSE-2.0
20#
21# Unless required by applicable law or agreed to in writing, software
22# distributed under the License is distributed on an "AS IS" BASIS,
23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24# See the License for the specific language governing permissions and
25# limitations under the License.
27import os
28import sys
29import atheris
30from botocore.exceptions import ProfileNotFound
32with atheris.instrument_imports():
33 from c7n import policy as c7n_policy
34 from c7n import exceptions, manager, data
36 from c7n.filters import FilterRegistry
37 from c7n.actions import ActionRegistry
39 from c7n.actions import autotag, network, notify
41 from c7n.resources import aws, rdsparamgroup, elasticache, ec2
42 from c7n.resources import emr, account, apigw, elb, s3, glue
43 from c7n.resources import appelb
45def TestOneInput(data):
46 """Fuzz encode and decode"""
47 registry_type = [
48 'c7n.data', 'rds-param-group', 'elasticache', 'ec2', 'emr',
49 'aws.account', 'rest-account', 'elb', 's3', 'iac', 'rds',
50 'glue-catalog', 'app-elb-target-group'
51 ]
52 provider = 'aws'
53 object = None
55 fdp = atheris.FuzzedDataProvider(data)
56 choice = fdp.ConsumeIntInRange(1, 3)
58 option = FuzzOption(fdp)
59 data = _generate_random_dict(fdp)
60 manager_data = _generate_random_dict(fdp)
61 type = fdp.PickValueInList(registry_type)
62 action_registry = ActionRegistry("%s.actions" % type)
63 filter_registry = FilterRegistry("%s.filters" % type)
64 resource_manager = manager.ResourceManager(FuzzContext(provider, option), manager_data)
65 resource_manager.action_registry = action_registry
66 resource_manager.filter_registry = filter_registry
67 resource_manager.type = type
68 resources = manager.resources
70 try:
71 if choice == 1:
72 object = autotag.AutoTagUser(data, resource_manager)
73 elif choice == 2:
74 object = network.ModifyVpcSecurityGroupsAction(data, resource_manager)
75 object.type = fdp.ConsumeUnicodeNoSurrogates(1024)
76 elif choice == 3:
77 object = notify.Notify(data, resource_manager)
79 if object:
80 object.validate()
81 except (exceptions.PolicyValidationError, ProfileNotFound):
82 pass
83 except (KeyError, TypeError):
84 pass
87def _generate_random_dict(fdp):
88 map = dict()
90 for count in range(fdp.ConsumeIntInRange(1, 100)):
91 map[fdp.ConsumeUnicodeNoSurrogates(1024)] = fdp.ConsumeUnicodeNoSurrogates(1024)
93 map["name"] = fdp.ConsumeUnicodeNoSurrogates(1024)
95 return map
98def initializeProviders():
99 aws.AWS()
102def main():
103 initializeProviders()
105 atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True)
106 atheris.Fuzz()
109class FuzzContext:
110 def __init__(self, name, option):
111 self.options = None
112 self.session_factory = c7n_policy.get_session_factory(name, option)
113 self.policy = FuzzPolicy(name)
114 self.tracer = FuzzTracer()
115 self.execution_id = "id"
116 self.start_time = "1234567890"
119class FuzzPolicy:
120 def __init__(self, provider_name):
121 self.provider_name = provider_name
122 self.name = "FuzzName"
125class FuzzTracer:
126 def subsegment(type):
127 return True
130class FuzzOption:
131 def __init__(self, fdp):
132 self.region = fdp.ConsumeUnicodeNoSurrogates(1024)
133 self.profile = fdp.ConsumeUnicodeNoSurrogates(1024)
134 self.assume_role = fdp.ConsumeUnicodeNoSurrogates(1024)
135 self.external_id = fdp.ConsumeUnicodeNoSurrogates(1024)
138class FuzzConfig:
139 def __init__(self, fdp):
140 self.account_id = fdp.ConsumeUnicodeNoSurrogates(1024)
141 self.region = fdp.ConsumeUnicodeNoSurrogates(1024)
144if __name__ == "__main__":
145 main()