Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/actions/webhook.py: 45%
66 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4try:
5 import certifi
6except ImportError:
7 certifi = None
9import urllib3
10from urllib import parse
12from c7n import utils
13from .core import EventAction
16class Webhook(EventAction):
17 """Calls a webhook with optional parameters and body
18 populated from JMESPath queries.
20 .. code-block:: yaml
22 policies:
23 - name: call-webhook
24 resource: ec2
25 description: |
26 Call webhook with list of resource groups
27 actions:
28 - type: webhook
29 url: http://foo.com
30 query-params:
31 resource_name: resource.name
32 policy_name: policy.name
33 """
35 schema_alias = True
36 schema = utils.type_schema(
37 'webhook',
38 required=['url'],
39 **{
40 'url': {'type': 'string'},
41 'body': {'type': 'string'},
42 'batch': {'type': 'boolean'},
43 'batch-size': {'type': 'number'},
44 'method': {'type': 'string', 'enum': ['PUT', 'POST', 'GET', 'PATCH', 'DELETE']},
45 'query-params': {
46 "type": "object",
47 "additionalProperties": {
48 "type": "string",
49 "description": "query string values"
50 }
51 },
52 'headers': {
53 "type": "object",
54 "additionalProperties": {
55 "type": "string",
56 "description": "header values"
57 }
58 }
59 }
60 )
62 def __init__(self, data=None, manager=None, log_dir=None):
63 super(Webhook, self).__init__(data, manager, log_dir)
64 self.http = None
65 self.url = self.data.get('url')
66 self.body = self.data.get('body')
67 self.batch = self.data.get('batch', False)
68 self.batch_size = self.data.get('batch-size', 500)
69 self.query_params = self.data.get('query-params', {})
70 self.headers = self.data.get('headers', {})
71 self.method = self.data.get('method', 'POST')
72 self.lookup_data = None
74 def process(self, resources, event=None):
75 self.lookup_data = {
76 'account_id': self.manager.config.account_id,
77 'region': self.manager.config.region,
78 'execution_id': self.manager.ctx.execution_id,
79 'execution_start': self.manager.ctx.start_time,
80 'policy': self.manager.data,
81 'event': event
82 }
84 self.http = self._build_http_manager()
86 if self.batch:
87 for chunk in utils.chunks(resources, self.batch_size):
88 resource_data = self.lookup_data
89 resource_data['resources'] = chunk
90 self._process_call(resource_data)
91 else:
92 for r in resources:
93 resource_data = self.lookup_data
94 resource_data['resource'] = r
95 self._process_call(resource_data)
97 def _process_call(self, resource):
98 prepared_url = self._build_url(resource)
99 prepared_body = self._build_body(resource)
100 prepared_headers = self._build_headers(resource)
102 if prepared_body:
103 prepared_headers['Content-Type'] = 'application/json'
105 try:
106 res = self.http.request(
107 method=self.method,
108 url=prepared_url,
109 body=prepared_body,
110 headers=prepared_headers)
112 self.log.info("%s got response %s with URL %s" %
113 (self.method, res.status, prepared_url))
114 except urllib3.exceptions.HTTPError as e:
115 self.log.error("Error calling %s. Code: %s" % (prepared_url, e.reason))
117 def _build_http_manager(self):
118 pool_kwargs = {
119 'cert_reqs': 'CERT_REQUIRED',
120 'ca_certs': certifi and certifi.where() or None
121 }
123 proxy_url = utils.get_proxy_url(self.url)
124 if proxy_url:
125 return urllib3.ProxyManager(proxy_url, **pool_kwargs)
126 else:
127 return urllib3.PoolManager(**pool_kwargs)
129 def _build_headers(self, resource):
130 return {k: utils.jmespath_search(v, resource) for k, v in self.headers.items()}
132 def _build_url(self, resource):
133 """
134 Compose URL with query string parameters.
136 Will not lose existing static parameters in the URL string
137 but does not support 'duplicate' parameter entries
138 """
140 if not self.query_params:
141 return self.url
143 evaluated_params = {
144 k: utils.jmespath_search(v, resource) for k, v in self.query_params.items()
145 }
147 url_parts = list(parse.urlparse(self.url))
148 query = dict(parse.parse_qsl(url_parts[4]))
149 query.update(evaluated_params)
150 url_parts[4] = parse.urlencode(query)
152 return parse.urlunparse(url_parts)
154 def _build_body(self, resource):
155 """Create a JSON body and dump it to encoded bytes."""
157 if not self.body:
158 return None
160 return utils.dumps(utils.jmespath_search(self.body, resource)).encode('utf-8')