Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_gcp_actions_validate_process.py: 61%
106 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1###### Coverage stub
2import atexit
3import coverage
4cov = coverage.coverage(data_file='.coverage', cover_pylib=True)
5cov.start()
6# Register an exist handler that will print coverage
7def exit_handler():
8 cov.stop()
9 cov.save()
10atexit.register(exit_handler)
11####### End of coverage stub
12#!/usr/bin/python3
13# Copyright 2023 Google LLC
14#
15# Licensed under the Apache License, Version 2.0 (the "License");
16# you may not use this file except in compliance with the License.
17# You may obtain a copy of the License at
18#
19# http://www.apache.org/licenses/LICENSE-2.0
20#
21# Unless required by applicable law or agreed to in writing, software
22# distributed under the License is distributed on an "AS IS" BASIS,
23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24# See the License for the specific language governing permissions and
25# limitations under the License.
27import os
28import sys
29import atheris
30from google.auth.exceptions import DefaultCredentialsError
32with atheris.instrument_imports():
33 from c7n import policy as c7n_policy
34 from c7n import exceptions, manager, data
36 from c7n.filters import FilterRegistry, FilterValidationError
37 from c7n.actions import ActionRegistry
39 from c7n_gcp import provider
40 from c7n_gcp.actions import labels, cscc, notify
42 from c7n_gcp.resources import iam, dns, armor, build, secret
43 from c7n_gcp.resources import pubsub, logging, network, mlengine
44 from c7n_gcp.resources import bigquery, cloudrun, appengine
45 from c7n_gcp.resources import loadbalancer, sql, service, gke
46 from c7n_gcp.resources import bigtable, memstore, kms, datafusion
47 from c7n_gcp.resources import cloudbilling, function, spanner
48 from c7n_gcp.resources import resourcemanager, source, osconfig
49 from c7n_gcp.resources import dataproc, notebook, artifactregistry
50 from c7n_gcp.resources import compute, storage, deploymentmanager
51 from c7n_gcp.resources import dataflow
54def TestOneInput(data):
55 """Fuzz tools/c7n_gcp"""
56 object = None
58 fdp = atheris.FuzzedDataProvider(data)
59 choice = fdp.ConsumeIntInRange(1, 4)
61 option = FuzzOption(fdp)
62 data = _generate_random_dict(fdp)
63 manager_data = _generate_random_dict(fdp)
65 type = fdp.ConsumeUnicodeNoSurrogates(1024)
66 action_registry = ActionRegistry("%s.actions" % type)
67 filter_registry = FilterRegistry("%s.filters" % type)
69 resource_manager = manager.ResourceManager(FuzzContext("gcp", option), manager_data)
70 resource_manager.action_registry = action_registry
71 resource_manager.filter_registry = filter_registry
72 resource_manager.type = type
73 resources = manager.resources
75 try:
76 if choice == 1:
77 object = labels.SetLabelsAction(data, resource_manager)
78 object.validate()
79 elif choice == 2:
80 object = labels.LabelDelayedAction(data, resource_manager)
81 object.validate()
82 elif choice == 3:
83 event = _generate_random_dict(fdp)
84 object = cscc.PostFinding(data, resource_manager)
85 object.validate()
86 object.process(resources, event)
87 elif choice == 4:
88 object = notify.Notify(data, resource_manager)
89 object.process(resources)
91 except (
92 exceptions.PolicyValidationError,
93 DefaultCredentialsError,
94 FilterValidationError):
95 pass
96 except ValueError as e:
97 if "Filter requires resource expression" not in str(e) and "No GCP Project ID" not in str(e):
98 raise e
99 except (KeyError, TypeError):
100 pass
102def _generate_random_dict(fdp):
103 map = dict()
105 for count in range(fdp.ConsumeIntInRange(1, 100)):
106 map[fdp.ConsumeUnicodeNoSurrogates(1024)] = fdp.ConsumeUnicodeNoSurrogates(1024)
108 map["name"] = fdp.ConsumeUnicodeNoSurrogates(1024)
110 return map
113def initializeProviders():
114 provider.GoogleCloud()
117def main():
118 initializeProviders()
119 atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True)
120 atheris.Fuzz()
123class FuzzContext:
124 def __init__(self, name, option):
125 self.options = None
126 self.session_factory = c7n_policy.get_session_factory(name, option)
127 self.policy = FuzzPolicy(name)
128 self.tracer = FuzzTracer()
129 self.execution_id = "id"
130 self.start_time = "1234567890"
133class FuzzPolicy:
134 def __init__(self, provider_name):
135 self.provider_name = provider_name
136 self.name = "FuzzName"
139class FuzzTracer:
140 def subsegment(type):
141 return True
144class FuzzOption:
145 def __init__(self, fdp):
146 self.region = fdp.ConsumeUnicodeNoSurrogates(1024)
147 self.profile = fdp.ConsumeUnicodeNoSurrogates(1024)
148 self.assume_role = fdp.ConsumeUnicodeNoSurrogates(1024)
149 self.external_id = fdp.ConsumeUnicodeNoSurrogates(1024)
150 self.account_id = fdp.ConsumeUnicodeNoSurrogates(1024)
153class FuzzConfig:
154 def __init__(self, fdp):
155 self.account_id = fdp.ConsumeUnicodeNoSurrogates(1024)
156 self.region = fdp.ConsumeUnicodeNoSurrogates(1024)
159if __name__ == "__main__":
160 main()