1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3
4try:
5 import certifi
6except ImportError:
7 certifi = None
8
9import urllib3
10from urllib import parse
11
12from c7n import utils
13from .core import EventAction
14
15
16class Webhook(EventAction):
17 """Calls a webhook with optional parameters and body
18 populated from JMESPath queries.
19
20 .. code-block:: yaml
21
22 policies:
23 - name: call-webhook
24 resource: ec2
25 description: |
26 Call webhook with list of resource groups
27 actions:
28 - type: webhook
29 url: http://foo.com
30 query-params:
31 resource_name: resource.name
32 policy_name: policy.name
33 """
34
35 schema_alias = True
36 schema = utils.type_schema(
37 'webhook',
38 required=['url'],
39 **{
40 'url': {'type': 'string'},
41 'body': {'type': 'string'},
42 'batch': {'type': 'boolean'},
43 'batch-size': {'type': 'number'},
44 'method': {'type': 'string', 'enum': ['PUT', 'POST', 'GET', 'PATCH', 'DELETE']},
45 'query-params': {
46 "type": "object",
47 "additionalProperties": {
48 "type": "string",
49 "description": "query string values"
50 }
51 },
52 'headers': {
53 "type": "object",
54 "additionalProperties": {
55 "type": "string",
56 "description": "header values"
57 }
58 }
59 }
60 )
61
62 def __init__(self, data=None, manager=None, log_dir=None):
63 super(Webhook, self).__init__(data, manager, log_dir)
64 self.http = None
65 self.url = self.data.get('url')
66 self.body = self.data.get('body')
67 self.batch = self.data.get('batch', False)
68 self.batch_size = self.data.get('batch-size', 500)
69 self.query_params = self.data.get('query-params', {})
70 self.headers = self.data.get('headers', {})
71 self.method = self.data.get('method', 'POST')
72 self.lookup_data = None
73
74 def process(self, resources, event=None):
75 self.lookup_data = {
76 'account_id': self.manager.config.account_id,
77 'region': self.manager.config.region,
78 'execution_id': self.manager.ctx.execution_id,
79 'execution_start': self.manager.ctx.start_time,
80 'policy': self.manager.data,
81 'event': event
82 }
83
84 self.http = self._build_http_manager()
85
86 if self.batch:
87 for chunk in utils.chunks(resources, self.batch_size):
88 resource_data = self.lookup_data
89 resource_data['resources'] = chunk
90 self._process_call(resource_data)
91 else:
92 for r in resources:
93 resource_data = self.lookup_data
94 resource_data['resource'] = r
95 self._process_call(resource_data)
96
97 def _process_call(self, resource):
98 prepared_url = self._build_url(resource)
99 prepared_body = self._build_body(resource)
100 prepared_headers = self._build_headers(resource)
101
102 if prepared_body:
103 prepared_headers['Content-Type'] = 'application/json'
104
105 try:
106 res = self.http.request(
107 method=self.method,
108 url=prepared_url,
109 body=prepared_body,
110 headers=prepared_headers)
111
112 self.log.info("%s got response %s with URL %s" %
113 (self.method, res.status, prepared_url))
114 except urllib3.exceptions.HTTPError as e:
115 self.log.error("Error calling %s. Code: %s" % (prepared_url, e.reason))
116
117 def _build_http_manager(self):
118 pool_kwargs = {
119 'cert_reqs': 'CERT_REQUIRED',
120 'ca_certs': certifi and certifi.where() or None
121 }
122
123 proxy_url = utils.get_proxy_url(self.url)
124 if proxy_url:
125 return urllib3.ProxyManager(proxy_url, **pool_kwargs)
126 else:
127 return urllib3.PoolManager(**pool_kwargs)
128
129 def _build_headers(self, resource):
130 return {k: utils.jmespath_search(v, resource) for k, v in self.headers.items()}
131
132 def _build_url(self, resource):
133 """
134 Compose URL with query string parameters.
135
136 Will not lose existing static parameters in the URL string
137 but does not support 'duplicate' parameter entries
138 """
139
140 if not self.query_params:
141 return self.url
142
143 evaluated_params = {
144 k: utils.jmespath_search(v, resource) for k, v in self.query_params.items()
145 }
146
147 url_parts = list(parse.urlparse(self.url))
148 query = dict(parse.parse_qsl(url_parts[4]))
149 query.update(evaluated_params)
150 url_parts[4] = parse.urlencode(query)
151
152 return parse.urlunparse(url_parts)
153
154 def _build_body(self, resource):
155 """Create a JSON body and dump it to encoded bytes."""
156
157 if not self.body:
158 return None
159
160 return utils.dumps(utils.jmespath_search(self.body, resource)).encode('utf-8')