Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_gcp_actions_validate_process.py: 61%

106 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12#!/usr/bin/python3 

13# Copyright 2023 Google LLC 

14# 

15# Licensed under the Apache License, Version 2.0 (the "License"); 

16# you may not use this file except in compliance with the License. 

17# You may obtain a copy of the License at 

18# 

19# http://www.apache.org/licenses/LICENSE-2.0 

20# 

21# Unless required by applicable law or agreed to in writing, software 

22# distributed under the License is distributed on an "AS IS" BASIS, 

23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

24# See the License for the specific language governing permissions and 

25# limitations under the License. 

26 

27import os 

28import sys 

29import atheris 

30from google.auth.exceptions import DefaultCredentialsError 

31 

32with atheris.instrument_imports(): 

33 from c7n import policy as c7n_policy 

34 from c7n import exceptions, manager, data 

35 

36 from c7n.filters import FilterRegistry, FilterValidationError 

37 from c7n.actions import ActionRegistry 

38 

39 from c7n_gcp import provider 

40 from c7n_gcp.actions import labels, cscc, notify 

41 

42 from c7n_gcp.resources import iam, dns, armor, build, secret 

43 from c7n_gcp.resources import pubsub, logging, network, mlengine 

44 from c7n_gcp.resources import bigquery, cloudrun, appengine 

45 from c7n_gcp.resources import loadbalancer, sql, service, gke 

46 from c7n_gcp.resources import bigtable, memstore, kms, datafusion 

47 from c7n_gcp.resources import cloudbilling, function, spanner 

48 from c7n_gcp.resources import resourcemanager, source, osconfig 

49 from c7n_gcp.resources import dataproc, notebook, artifactregistry 

50 from c7n_gcp.resources import compute, storage, deploymentmanager 

51 from c7n_gcp.resources import dataflow 

52 

53 

54def TestOneInput(data): 

55 """Fuzz tools/c7n_gcp""" 

56 object = None 

57 

58 fdp = atheris.FuzzedDataProvider(data) 

59 choice = fdp.ConsumeIntInRange(1, 4) 

60 

61 option = FuzzOption(fdp) 

62 data = _generate_random_dict(fdp) 

63 manager_data = _generate_random_dict(fdp) 

64 

65 type = fdp.ConsumeUnicodeNoSurrogates(1024) 

66 action_registry = ActionRegistry("%s.actions" % type) 

67 filter_registry = FilterRegistry("%s.filters" % type) 

68 

69 resource_manager = manager.ResourceManager(FuzzContext("gcp", option), manager_data) 

70 resource_manager.action_registry = action_registry 

71 resource_manager.filter_registry = filter_registry 

72 resource_manager.type = type 

73 resources = manager.resources 

74 

75 try: 

76 if choice == 1: 

77 object = labels.SetLabelsAction(data, resource_manager) 

78 object.validate() 

79 elif choice == 2: 

80 object = labels.LabelDelayedAction(data, resource_manager) 

81 object.validate() 

82 elif choice == 3: 

83 event = _generate_random_dict(fdp) 

84 object = cscc.PostFinding(data, resource_manager) 

85 object.validate() 

86 object.process(resources, event) 

87 elif choice == 4: 

88 object = notify.Notify(data, resource_manager) 

89 object.process(resources) 

90 

91 except ( 

92 exceptions.PolicyValidationError, 

93 DefaultCredentialsError, 

94 FilterValidationError): 

95 pass 

96 except ValueError as e: 

97 if "Filter requires resource expression" not in str(e) and "No GCP Project ID" not in str(e): 

98 raise e 

99 except (KeyError, TypeError): 

100 pass 

101 

102def _generate_random_dict(fdp): 

103 map = dict() 

104 

105 for count in range(fdp.ConsumeIntInRange(1, 100)): 

106 map[fdp.ConsumeUnicodeNoSurrogates(1024)] = fdp.ConsumeUnicodeNoSurrogates(1024) 

107 

108 map["name"] = fdp.ConsumeUnicodeNoSurrogates(1024) 

109 

110 return map 

111 

112 

113def initializeProviders(): 

114 provider.GoogleCloud() 

115 

116 

117def main(): 

118 initializeProviders() 

119 atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True) 

120 atheris.Fuzz() 

121 

122 

123class FuzzContext: 

124 def __init__(self, name, option): 

125 self.options = None 

126 self.session_factory = c7n_policy.get_session_factory(name, option) 

127 self.policy = FuzzPolicy(name) 

128 self.tracer = FuzzTracer() 

129 self.execution_id = "id" 

130 self.start_time = "1234567890" 

131 

132 

133class FuzzPolicy: 

134 def __init__(self, provider_name): 

135 self.provider_name = provider_name 

136 self.name = "FuzzName" 

137 

138 

139class FuzzTracer: 

140 def subsegment(type): 

141 return True 

142 

143 

144class FuzzOption: 

145 def __init__(self, fdp): 

146 self.region = fdp.ConsumeUnicodeNoSurrogates(1024) 

147 self.profile = fdp.ConsumeUnicodeNoSurrogates(1024) 

148 self.assume_role = fdp.ConsumeUnicodeNoSurrogates(1024) 

149 self.external_id = fdp.ConsumeUnicodeNoSurrogates(1024) 

150 self.account_id = fdp.ConsumeUnicodeNoSurrogates(1024) 

151 

152 

153class FuzzConfig: 

154 def __init__(self, fdp): 

155 self.account_id = fdp.ConsumeUnicodeNoSurrogates(1024) 

156 self.region = fdp.ConsumeUnicodeNoSurrogates(1024) 

157 

158 

159if __name__ == "__main__": 

160 main()