Coverage for /pythoncovmergedfiles/medio/medio/src/fuzz_actions_process.py: 68%

102 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12#!/usr/bin/python3 

13# Copyright 2023 Google LLC 

14# 

15# Licensed under the Apache License, Version 2.0 (the "License"); 

16# you may not use this file except in compliance with the License. 

17# You may obtain a copy of the License at 

18# 

19# http://www.apache.org/licenses/LICENSE-2.0 

20# 

21# Unless required by applicable law or agreed to in writing, software 

22# distributed under the License is distributed on an "AS IS" BASIS, 

23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

24# See the License for the specific language governing permissions and 

25# limitations under the License. 

26 

27import os 

28import sys 

29import atheris 

30from botocore.exceptions import ProfileNotFound 

31 

32with atheris.instrument_imports(): 

33 from c7n import policy as c7n_policy 

34 from c7n import exceptions, manager, data 

35 

36 from c7n.filters import FilterRegistry 

37 from c7n.actions import ActionRegistry 

38 

39 from c7n.actions import autotag, network, notify 

40 from c7n.actions import invoke, webhook, autoscaling 

41 from c7n.actions import metric, policy 

42 

43 from c7n.resources import aws, rdsparamgroup, elasticache, ec2 

44 from c7n.resources import emr, account, apigw, elb, s3, glue 

45 from c7n.resources import appelb 

46 

47def TestOneInput(data): 

48 """Fuzz encode and decode""" 

49 registry_type = [ 

50 'c7n.data', 'rds-param-group', 'elasticache', 'ec2', 'emr', 

51 'aws.account', 'rest-account', 'elb', 's3', 'iac', 'rds', 

52 'glue-catalog', 'app-elb-target-group' 

53 ] 

54 provider = 'aws' 

55 

56 fdp = atheris.FuzzedDataProvider(data) 

57 choice = fdp.ConsumeIntInRange(1, 6) 

58 object = None 

59 

60 option = FuzzOption(fdp) 

61 data = _generate_random_dict(fdp) 

62 event = _generate_random_dict(fdp) 

63 manager_data = _generate_random_dict(fdp) 

64 type = fdp.PickValueInList(registry_type) 

65 action_registry = ActionRegistry("%s.actions" % type) 

66 filter_registry = FilterRegistry("%s.filters" % type) 

67 

68 resource_manager = manager.ResourceManager(FuzzContext(provider, option), manager_data) 

69 resource_manager.action_registry = action_registry 

70 resource_manager.filter_registry = filter_registry 

71 resource_manager.type = type 

72 resources = manager.resources 

73 

74 try: 

75 if choice == 1: 

76 object = autotag.AutoTagUser(data, resource_manager) 

77 object.validate() 

78 elif choice == 2: 

79 object = notify.Notify(data, resource_manager) 

80 object.validate() 

81 elif choice == 3: 

82 object = invoke.LambdaInvoke(data, resource_manager) 

83 elif choice == 4: 

84 resource_manager.config = FuzzConfig(fdp) 

85 object = webhook.Webhook(data, resource_manager) 

86 elif choice == 5: 

87 object = autoscaling.AutoscalingBase(data, resource_manager) 

88 elif choice == 6: 

89 object = metric.PutMetric(data, resource_manager) 

90 

91 if object: 

92 object.process(resources, event) 

93 except (exceptions.PolicyValidationError, ProfileNotFound): 

94 pass 

95 except (KeyError, TypeError): 

96 pass 

97 

98 

99def _generate_random_dict(fdp): 

100 map = dict() 

101 

102 for count in range(fdp.ConsumeIntInRange(1, 100)): 

103 map[fdp.ConsumeUnicodeNoSurrogates(1024)] = fdp.ConsumeUnicodeNoSurrogates(1024) 

104 

105 map["name"] = fdp.ConsumeUnicodeNoSurrogates(1024) 

106 

107 return map 

108 

109 

110def initializeProviders(): 

111 aws.AWS() 

112 

113 

114def main(): 

115 initializeProviders() 

116 

117 atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True) 

118 atheris.Fuzz() 

119 

120 

121class FuzzContext: 

122 def __init__(self, name, option): 

123 self.options = None 

124 self.session_factory = c7n_policy.get_session_factory(name, option) 

125 self.policy = FuzzPolicy(name) 

126 self.tracer = FuzzTracer() 

127 self.execution_id = "id" 

128 self.start_time = "1234567890" 

129 

130 

131class FuzzPolicy: 

132 def __init__(self, provider_name): 

133 self.provider_name = provider_name 

134 self.name = "FuzzName" 

135 

136 

137class FuzzTracer: 

138 def subsegment(type): 

139 return True 

140 

141 

142class FuzzOption: 

143 def __init__(self, fdp): 

144 self.region = fdp.ConsumeUnicodeNoSurrogates(1024) 

145 self.profile = fdp.ConsumeUnicodeNoSurrogates(1024) 

146 self.assume_role = fdp.ConsumeUnicodeNoSurrogates(1024) 

147 self.external_id = fdp.ConsumeUnicodeNoSurrogates(1024) 

148 

149 

150class FuzzConfig: 

151 def __init__(self, fdp): 

152 self.account_id = fdp.ConsumeUnicodeNoSurrogates(1024) 

153 self.region = fdp.ConsumeUnicodeNoSurrogates(1024) 

154 

155 

156if __name__ == "__main__": 

157 main()