Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/actions/webhook.py: 45%

66 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4try: 

5 import certifi 

6except ImportError: 

7 certifi = None 

8 

9import urllib3 

10from urllib import parse 

11 

12from c7n import utils 

13from .core import EventAction 

14 

15 

16class Webhook(EventAction): 

17 """Calls a webhook with optional parameters and body 

18 populated from JMESPath queries. 

19 

20 .. code-block:: yaml 

21 

22 policies: 

23 - name: call-webhook 

24 resource: ec2 

25 description: | 

26 Call webhook with list of resource groups 

27 actions: 

28 - type: webhook 

29 url: http://foo.com 

30 query-params: 

31 resource_name: resource.name 

32 policy_name: policy.name 

33 """ 

34 

35 schema_alias = True 

36 schema = utils.type_schema( 

37 'webhook', 

38 required=['url'], 

39 **{ 

40 'url': {'type': 'string'}, 

41 'body': {'type': 'string'}, 

42 'batch': {'type': 'boolean'}, 

43 'batch-size': {'type': 'number'}, 

44 'method': {'type': 'string', 'enum': ['PUT', 'POST', 'GET', 'PATCH', 'DELETE']}, 

45 'query-params': { 

46 "type": "object", 

47 "additionalProperties": { 

48 "type": "string", 

49 "description": "query string values" 

50 } 

51 }, 

52 'headers': { 

53 "type": "object", 

54 "additionalProperties": { 

55 "type": "string", 

56 "description": "header values" 

57 } 

58 } 

59 } 

60 ) 

61 

62 def __init__(self, data=None, manager=None, log_dir=None): 

63 super(Webhook, self).__init__(data, manager, log_dir) 

64 self.http = None 

65 self.url = self.data.get('url') 

66 self.body = self.data.get('body') 

67 self.batch = self.data.get('batch', False) 

68 self.batch_size = self.data.get('batch-size', 500) 

69 self.query_params = self.data.get('query-params', {}) 

70 self.headers = self.data.get('headers', {}) 

71 self.method = self.data.get('method', 'POST') 

72 self.lookup_data = None 

73 

74 def process(self, resources, event=None): 

75 self.lookup_data = { 

76 'account_id': self.manager.config.account_id, 

77 'region': self.manager.config.region, 

78 'execution_id': self.manager.ctx.execution_id, 

79 'execution_start': self.manager.ctx.start_time, 

80 'policy': self.manager.data, 

81 'event': event 

82 } 

83 

84 self.http = self._build_http_manager() 

85 

86 if self.batch: 

87 for chunk in utils.chunks(resources, self.batch_size): 

88 resource_data = self.lookup_data 

89 resource_data['resources'] = chunk 

90 self._process_call(resource_data) 

91 else: 

92 for r in resources: 

93 resource_data = self.lookup_data 

94 resource_data['resource'] = r 

95 self._process_call(resource_data) 

96 

97 def _process_call(self, resource): 

98 prepared_url = self._build_url(resource) 

99 prepared_body = self._build_body(resource) 

100 prepared_headers = self._build_headers(resource) 

101 

102 if prepared_body: 

103 prepared_headers['Content-Type'] = 'application/json' 

104 

105 try: 

106 res = self.http.request( 

107 method=self.method, 

108 url=prepared_url, 

109 body=prepared_body, 

110 headers=prepared_headers) 

111 

112 self.log.info("%s got response %s with URL %s" % 

113 (self.method, res.status, prepared_url)) 

114 except urllib3.exceptions.HTTPError as e: 

115 self.log.error("Error calling %s. Code: %s" % (prepared_url, e.reason)) 

116 

117 def _build_http_manager(self): 

118 pool_kwargs = { 

119 'cert_reqs': 'CERT_REQUIRED', 

120 'ca_certs': certifi and certifi.where() or None 

121 } 

122 

123 proxy_url = utils.get_proxy_url(self.url) 

124 if proxy_url: 

125 return urllib3.ProxyManager(proxy_url, **pool_kwargs) 

126 else: 

127 return urllib3.PoolManager(**pool_kwargs) 

128 

129 def _build_headers(self, resource): 

130 return {k: utils.jmespath_search(v, resource) for k, v in self.headers.items()} 

131 

132 def _build_url(self, resource): 

133 """ 

134 Compose URL with query string parameters. 

135 

136 Will not lose existing static parameters in the URL string 

137 but does not support 'duplicate' parameter entries 

138 """ 

139 

140 if not self.query_params: 

141 return self.url 

142 

143 evaluated_params = { 

144 k: utils.jmespath_search(v, resource) for k, v in self.query_params.items() 

145 } 

146 

147 url_parts = list(parse.urlparse(self.url)) 

148 query = dict(parse.parse_qsl(url_parts[4])) 

149 query.update(evaluated_params) 

150 url_parts[4] = parse.urlencode(query) 

151 

152 return parse.urlunparse(url_parts) 

153 

154 def _build_body(self, resource): 

155 """Create a JSON body and dump it to encoded bytes.""" 

156 

157 if not self.body: 

158 return None 

159 

160 return utils.dumps(utils.jmespath_search(self.body, resource)).encode('utf-8')