Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_filters_process.py: 74%
124 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12#!/usr/bin/python3
13# Copyright 2023 Google LLC
14#
15# Licensed under the Apache License, Version 2.0 (the "License");
16# you may not use this file except in compliance with the License.
17# You may obtain a copy of the License at
18#
19# http://www.apache.org/licenses/LICENSE-2.0
20#
21# Unless required by applicable law or agreed to in writing, software
22# distributed under the License is distributed on an "AS IS" BASIS,
23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24# See the License for the specific language governing permissions and
25# limitations under the License.
27import os
28import sys
29import atheris
30from botocore.exceptions import ProfileNotFound
32with atheris.instrument_imports():
33 from c7n import policy as c7n_policy
34 from c7n import exceptions, manager, data
36 from c7n.filters import FilterRegistry
37 from c7n.actions import ActionRegistry
39 from c7n.filters import multiattr, missing, metrics
40 from c7n.filters import offhours, vpc, revisions
41 from c7n.filters import backup, health, iamaccess
42 from c7n.filters import policystatement, iamanalyzer
44 from c7n.resources import aws, rdsparamgroup, elasticache, ec2
45 from c7n.resources import emr, account, apigw, elb, s3, glue
46 from c7n.resources import appelb
48def TestOneInput(data):
49 """Fuzz encode and decode"""
50 registry_type = [
51 'c7n.data', 'rds-param-group', 'elasticache', 'ec2', 'emr',
52 'aws.account', 'rest-account', 'elb', 's3', 'iac', 'rds',
53 'glue-catalog', 'app-elb-target-group'
54 ]
55 provider = 'aws'
56 object = None
58 fdp = atheris.FuzzedDataProvider(data)
59 choice = fdp.ConsumeIntInRange(1, 12)
61 option = FuzzOption(fdp)
62 data = _generate_random_dict(fdp)
63 event = _generate_random_dict(fdp)
64 manager_data = _generate_random_dict(fdp)
66 type = fdp.PickValueInList(registry_type)
67 action_registry = ActionRegistry("%s.actions" % type)
68 filter_registry = FilterRegistry("%s.filters" % type)
70 resource_manager = manager.ResourceManager(FuzzContext(provider, option), manager_data)
71 resource_manager.action_registry = action_registry
72 resource_manager.filter_registry = filter_registry
73 resource_manager.type = type
74 resources = manager.resources
76 try:
77 if choice == 1:
78 object = multiattr.MultiAttrFilter(data, resource_manager)
79 object.validate()
80 elif choice == 2:
81 data['policy'] = resource_manager.ctx.policy
82 object = missing.Missing(data, resource_manager)
83 object.validate()
84 elif choice == 3:
85 object = offhours.OnHour(data, resource_manager)
86 object.validate()
87 elif choice == 4:
88 object = offhours.OffHour(data, resource_manager)
89 object.validate()
90 elif choice == 5:
91 object = vpc.SubnetFilter(data, resource_manager)
92 object.validate()
93 elif choice == 6:
94 object = vpc.NetworkLocation(data, resource_manager)
95 object.validate()
96 elif choice == 7:
97 object = revisions.Diff(data, resource_manager)
98 object.validate()
99 elif choice == 8:
100 object = backup.ConsecutiveAwsBackupsFilter(data, resource_manager)
101 elif choice == 9:
102 object = health.HealthEventFilter(data, resource_manager)
103 elif choice == 10:
104 object = policystatement.HasStatementFilter(data, resource_manager)
105 elif choice == 11:
106 resource_manager.config = FuzzConfig(fdp)
107 object = iamaccess.CrossAccountAccessFilter(data, resource_manager)
108 elif choice == 12:
109 object = iamanalyzer.AccessAnalyzer(data, resource_manager)
111 if object:
112 object.process(resources, event)
113 except (exceptions.PolicyValidationError, ProfileNotFound):
114 pass
115 except ValueError as e:
116 if "Filter requires resource expression" not in str(e):
117 raise e
118 except (KeyError, TypeError):
119 pass
121def _generate_random_dict(fdp):
122 map = dict()
124 for count in range(fdp.ConsumeIntInRange(1, 100)):
125 map[fdp.ConsumeUnicodeNoSurrogates(1024)] = fdp.ConsumeUnicodeNoSurrogates(1024)
127 map["name"] = fdp.ConsumeUnicodeNoSurrogates(1024)
129 return map
132def initializeProviders():
133 aws.AWS()
136def main():
137 initializeProviders()
139 atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True)
140 atheris.Fuzz()
143class FuzzContext:
144 def __init__(self, name, option):
145 self.options = None
146 self.session_factory = c7n_policy.get_session_factory(name, option)
147 self.policy = FuzzPolicy(name)
148 self.tracer = FuzzTracer()
149 self.execution_id = "id"
150 self.start_time = "1234567890"
153class FuzzPolicy:
154 def __init__(self, provider_name):
155 self.provider_name = provider_name
156 self.name = "FuzzName"
159class FuzzTracer:
160 def subsegment(type):
161 return True
164class FuzzOption:
165 def __init__(self, fdp):
166 self.region = fdp.ConsumeUnicodeNoSurrogates(1024)
167 self.profile = fdp.ConsumeUnicodeNoSurrogates(1024)
168 self.assume_role = fdp.ConsumeUnicodeNoSurrogates(1024)
169 self.external_id = fdp.ConsumeUnicodeNoSurrogates(1024)
172class FuzzConfig:
173 def __init__(self, fdp):
174 self.account_id = fdp.ConsumeUnicodeNoSurrogates(1024)
175 self.region = fdp.ConsumeUnicodeNoSurrogates(1024)
178if __name__ == "__main__":
179 main()