Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_filters_process.py: 74%

124 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12#!/usr/bin/python3 

13# Copyright 2023 Google LLC 

14# 

15# Licensed under the Apache License, Version 2.0 (the "License"); 

16# you may not use this file except in compliance with the License. 

17# You may obtain a copy of the License at 

18# 

19# http://www.apache.org/licenses/LICENSE-2.0 

20# 

21# Unless required by applicable law or agreed to in writing, software 

22# distributed under the License is distributed on an "AS IS" BASIS, 

23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

24# See the License for the specific language governing permissions and 

25# limitations under the License. 

26 

27import os 

28import sys 

29import atheris 

30from botocore.exceptions import ProfileNotFound 

31 

32with atheris.instrument_imports(): 

33 from c7n import policy as c7n_policy 

34 from c7n import exceptions, manager, data 

35 

36 from c7n.filters import FilterRegistry 

37 from c7n.actions import ActionRegistry 

38 

39 from c7n.filters import multiattr, missing, metrics 

40 from c7n.filters import offhours, vpc, revisions 

41 from c7n.filters import backup, health, iamaccess 

42 from c7n.filters import policystatement, iamanalyzer 

43 

44 from c7n.resources import aws, rdsparamgroup, elasticache, ec2 

45 from c7n.resources import emr, account, apigw, elb, s3, glue 

46 from c7n.resources import appelb 

47 

48def TestOneInput(data): 

49 """Fuzz encode and decode""" 

50 registry_type = [ 

51 'c7n.data', 'rds-param-group', 'elasticache', 'ec2', 'emr', 

52 'aws.account', 'rest-account', 'elb', 's3', 'iac', 'rds', 

53 'glue-catalog', 'app-elb-target-group' 

54 ] 

55 provider = 'aws' 

56 object = None 

57 

58 fdp = atheris.FuzzedDataProvider(data) 

59 choice = fdp.ConsumeIntInRange(1, 12) 

60 

61 option = FuzzOption(fdp) 

62 data = _generate_random_dict(fdp) 

63 event = _generate_random_dict(fdp) 

64 manager_data = _generate_random_dict(fdp) 

65 

66 type = fdp.PickValueInList(registry_type) 

67 action_registry = ActionRegistry("%s.actions" % type) 

68 filter_registry = FilterRegistry("%s.filters" % type) 

69 

70 resource_manager = manager.ResourceManager(FuzzContext(provider, option), manager_data) 

71 resource_manager.action_registry = action_registry 

72 resource_manager.filter_registry = filter_registry 

73 resource_manager.type = type 

74 resources = manager.resources 

75 

76 try: 

77 if choice == 1: 

78 object = multiattr.MultiAttrFilter(data, resource_manager) 

79 object.validate() 

80 elif choice == 2: 

81 data['policy'] = resource_manager.ctx.policy 

82 object = missing.Missing(data, resource_manager) 

83 object.validate() 

84 elif choice == 3: 

85 object = offhours.OnHour(data, resource_manager) 

86 object.validate() 

87 elif choice == 4: 

88 object = offhours.OffHour(data, resource_manager) 

89 object.validate() 

90 elif choice == 5: 

91 object = vpc.SubnetFilter(data, resource_manager) 

92 object.validate() 

93 elif choice == 6: 

94 object = vpc.NetworkLocation(data, resource_manager) 

95 object.validate() 

96 elif choice == 7: 

97 object = revisions.Diff(data, resource_manager) 

98 object.validate() 

99 elif choice == 8: 

100 object = backup.ConsecutiveAwsBackupsFilter(data, resource_manager) 

101 elif choice == 9: 

102 object = health.HealthEventFilter(data, resource_manager) 

103 elif choice == 10: 

104 object = policystatement.HasStatementFilter(data, resource_manager) 

105 elif choice == 11: 

106 resource_manager.config = FuzzConfig(fdp) 

107 object = iamaccess.CrossAccountAccessFilter(data, resource_manager) 

108 elif choice == 12: 

109 object = iamanalyzer.AccessAnalyzer(data, resource_manager) 

110 

111 if object: 

112 object.process(resources, event) 

113 except (exceptions.PolicyValidationError, ProfileNotFound): 

114 pass 

115 except ValueError as e: 

116 if "Filter requires resource expression" not in str(e): 

117 raise e 

118 except (KeyError, TypeError): 

119 pass 

120 

121def _generate_random_dict(fdp): 

122 map = dict() 

123 

124 for count in range(fdp.ConsumeIntInRange(1, 100)): 

125 map[fdp.ConsumeUnicodeNoSurrogates(1024)] = fdp.ConsumeUnicodeNoSurrogates(1024) 

126 

127 map["name"] = fdp.ConsumeUnicodeNoSurrogates(1024) 

128 

129 return map 

130 

131 

132def initializeProviders(): 

133 aws.AWS() 

134 

135 

136def main(): 

137 initializeProviders() 

138 

139 atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True) 

140 atheris.Fuzz() 

141 

142 

143class FuzzContext: 

144 def __init__(self, name, option): 

145 self.options = None 

146 self.session_factory = c7n_policy.get_session_factory(name, option) 

147 self.policy = FuzzPolicy(name) 

148 self.tracer = FuzzTracer() 

149 self.execution_id = "id" 

150 self.start_time = "1234567890" 

151 

152 

153class FuzzPolicy: 

154 def __init__(self, provider_name): 

155 self.provider_name = provider_name 

156 self.name = "FuzzName" 

157 

158 

159class FuzzTracer: 

160 def subsegment(type): 

161 return True 

162 

163 

164class FuzzOption: 

165 def __init__(self, fdp): 

166 self.region = fdp.ConsumeUnicodeNoSurrogates(1024) 

167 self.profile = fdp.ConsumeUnicodeNoSurrogates(1024) 

168 self.assume_role = fdp.ConsumeUnicodeNoSurrogates(1024) 

169 self.external_id = fdp.ConsumeUnicodeNoSurrogates(1024) 

170 

171 

172class FuzzConfig: 

173 def __init__(self, fdp): 

174 self.account_id = fdp.ConsumeUnicodeNoSurrogates(1024) 

175 self.region = fdp.ConsumeUnicodeNoSurrogates(1024) 

176 

177 

178if __name__ == "__main__": 

179 main()