Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/code.py: 64%
225 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from botocore.exceptions import ClientError
5from c7n.actions import BaseAction
6from c7n.filters.vpc import SubnetFilter, SecurityGroupFilter, VpcFilter
7from c7n.manager import resources
8from c7n.query import (
9 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, ChildResourceManager)
10from c7n.tags import universal_augment
11from c7n.utils import local_session, type_schema, jmespath_search
12from c7n import query
14from .securityhub import OtherResourcePostFinding
17class DescribeRepo(DescribeSource):
19 def augment(self, resources):
20 return universal_augment(
21 self.manager,
22 super().augment(resources)
23 )
26@resources.register('codecommit')
27class CodeRepository(QueryResourceManager):
29 class resource_type(TypeInfo):
30 service = 'codecommit'
31 enum_spec = ('list_repositories', 'repositories', None)
32 batch_detail_spec = (
33 'batch_get_repositories', 'repositoryNames', 'repositoryName',
34 'repositories', None)
35 name = id = 'repositoryName'
36 arn = "Arn"
37 date = 'creationDate'
38 cfn_type = 'AWS::CodeCommit::Repository'
39 universal_taggable = object()
41 source_mapping = {
42 'describe': DescribeRepo,
43 'config': ConfigSource
44 }
46 def get_resources(self, ids, cache=True, augment=True):
47 return universal_augment(self, self.augment([{'repositoryName': i} for i in ids]))
50@CodeRepository.action_registry.register('delete')
51class DeleteRepository(BaseAction):
52 """Action to delete code commit
54 It is recommended to use a filter to avoid unwanted deletion of repos
56 :example:
58 .. code-block:: yaml
60 policies:
61 - name: codecommit-delete
62 resource: codecommit
63 actions:
64 - delete
65 """
67 schema = type_schema('delete')
68 permissions = ("codecommit:DeleteRepository",)
70 def process(self, repositories):
71 client = local_session(
72 self.manager.session_factory).client('codecommit')
73 for r in repositories:
74 self.process_repository(client, r)
76 def process_repository(self, client, repository):
77 try:
78 client.delete_repository(repositoryName=repository['repositoryName'])
79 except ClientError as e:
80 self.log.exception(
81 "Exception deleting repo:\n %s" % e)
84class DescribeBuild(DescribeSource):
86 def augment(self, resources):
87 return universal_augment(
88 self.manager,
89 super(DescribeBuild, self).augment(resources))
92class ConfigBuild(ConfigSource):
94 def load_resource(self, item):
95 item_config = item['configuration']
96 item_config['Tags'] = [
97 {'Key': t['key'], 'Value': t['value']} for t in item_config.get('tags')]
99 # AWS Config garbage mangle undo.
101 if 'queuedtimeoutInMinutes' in item_config:
102 item_config['queuedTimeoutInMinutes'] = int(item_config.pop('queuedtimeoutInMinutes'))
104 artifacts = item_config.pop('artifacts')
105 item_config['artifacts'] = artifacts.pop(0)
106 if artifacts:
107 item_config['secondaryArtifacts'] = artifacts
108 sources = item_config['source']
109 item_config['source'] = sources.pop(0)
110 if sources:
111 item_config['secondarySources'] = sources
113 if 'vpcConfig' in item_config and 'subnets' in item_config['vpcConfig']:
114 item_config['vpcConfig']['subnets'] = [
115 s['subnet'] for s in item_config['vpcConfig']['subnets']]
117 item_config['arn'] = 'arn:aws:codebuild:{}:{}:project/{}'.format(
118 self.manager.config.region, self.manager.config.account_id, item_config['name'])
119 return item_config
122@resources.register('codebuild')
123class CodeBuildProject(QueryResourceManager):
125 class resource_type(TypeInfo):
126 service = 'codebuild'
127 enum_spec = ('list_projects', 'projects', None)
128 batch_detail_spec = (
129 'batch_get_projects', 'names', None, 'projects', None)
130 name = id = 'name'
131 arn = 'arn'
132 date = 'created'
133 dimension = 'ProjectName'
134 cfn_type = config_type = "AWS::CodeBuild::Project"
135 arn_type = 'project'
136 universal_taggable = object()
138 source_mapping = {
139 'describe': DescribeBuild,
140 'config': ConfigBuild
141 }
144@CodeBuildProject.filter_registry.register('subnet')
145class BuildSubnetFilter(SubnetFilter):
147 RelatedIdsExpression = "vpcConfig.subnets[]"
150@CodeBuildProject.filter_registry.register('security-group')
151class BuildSecurityGroupFilter(SecurityGroupFilter):
153 RelatedIdsExpression = "vpcConfig.securityGroupIds[]"
156@CodeBuildProject.filter_registry.register('vpc')
157class BuildVpcFilter(VpcFilter):
159 RelatedIdsExpression = "vpcConfig.vpcId"
162@CodeBuildProject.action_registry.register('post-finding')
163class BuildPostFinding(OtherResourcePostFinding):
165 resource_type = 'AwsCodeBuildProject'
167 def format_resource(self, r):
168 envelope, payload = self.format_envelope(r)
169 payload.update(self.filter_empty({
170 'Name': r['name'],
171 'EncryptionKey': r['encryptionKey'],
172 'Environment': self.filter_empty({
173 'Type': r['environment']['type'],
174 'Certificate': r['environment'].get('certificate'),
175 'RegistryCredential': self.filter_empty({
176 'Credential': jmespath_search(
177 'environment.registryCredential.credential', r),
178 'CredentialProvider': jmespath_search(
179 'environment.registryCredential.credentialProvider', r)
180 }),
181 'ImagePullCredentialsType': r['environment'].get(
182 'imagePullCredentialsType')
183 }),
184 'ServiceRole': r['serviceRole'],
185 'VpcConfig': self.filter_empty({
186 'VpcId': jmespath_search('vpcConfig.vpcId', r),
187 'Subnets': jmespath_search('vpcConfig.subnets', r),
188 'SecurityGroupIds': jmespath_search('vpcConfig.securityGroupIds', r)
189 }),
190 'Source': self.filter_empty({
191 'Type': jmespath_search('source.type', r),
192 'Location': jmespath_search('source.location', r),
193 'GitCloneDepth': jmespath_search('source.gitCloneDepth', r)
194 }),
195 }))
196 return envelope
199@CodeBuildProject.action_registry.register('delete')
200class DeleteProject(BaseAction):
201 """Action to delete code build
203 It is recommended to use a filter to avoid unwanted deletion of builds
205 :example:
207 .. code-block:: yaml
209 policies:
210 - name: codebuild-delete
211 resource: codebuild
212 actions:
213 - delete
214 """
216 schema = type_schema('delete')
217 permissions = ("codebuild:DeleteProject",)
219 def process(self, projects):
220 client = local_session(self.manager.session_factory).client('codebuild')
221 for p in projects:
222 self.process_project(client, p)
224 def process_project(self, client, project):
226 try:
227 client.delete_project(name=project['name'])
228 except ClientError as e:
229 self.log.exception(
230 "Exception deleting project:\n %s" % e)
233class ConfigPipeline(ConfigSource):
235 def load_resource(self, item):
236 item_config = self._load_item_config(item)
237 resource = item_config.pop('pipeline')
238 resource.update(item_config['metadata'])
239 self._load_resource_tags(resource, item)
240 return resource
243class DescribePipeline(DescribeSource):
245 def augment(self, resources):
246 resources = super().augment(resources)
247 return universal_augment(self.manager, resources)
250@resources.register('codepipeline')
251class CodeDeployPipeline(QueryResourceManager):
253 class resource_type(TypeInfo):
254 service = 'codepipeline'
255 enum_spec = ('list_pipelines', 'pipelines', None)
256 detail_spec = ('get_pipeline', 'name', 'name', 'pipeline')
257 name = id = 'name'
258 date = 'created'
259 # Note this is purposeful, codepipeline don't have a separate type specifier.
260 arn_type = ""
261 cfn_type = config_type = "AWS::CodePipeline::Pipeline"
262 universal_taggable = object()
264 source_mapping = {
265 'describe': DescribePipeline,
266 'config': ConfigPipeline
267 }
270@CodeDeployPipeline.action_registry.register('delete')
271class DeletePipeline(BaseAction):
273 schema = type_schema('delete')
274 permissions = ('codepipeline:DeletePipeline',)
276 def process(self, resources):
277 client = local_session(self.manager.session_factory).client('codepipeline')
278 for r in resources:
279 try:
280 self.manager.retry(client.delete_pipeline, name=r['name'])
281 except client.exceptions.PipelineNotFoundException:
282 continue
285class DescribeApplication(DescribeSource):
287 def augment(self, resources):
288 resources = super().augment(resources)
289 client = local_session(self.manager.session_factory).client('codedeploy')
290 for r, arn in zip(resources, self.manager.get_arns(resources)):
291 r['Tags'] = client.list_tags_for_resource(
292 ResourceArn=arn).get('Tags', [])
293 return resources
296@resources.register('codedeploy-app')
297class CodeDeployApplication(QueryResourceManager):
299 class resource_type(TypeInfo):
300 service = 'codedeploy'
301 enum_spec = ('list_applications', 'applications', None)
302 batch_detail_spec = (
303 'batch_get_applications', 'applicationNames',
304 None, 'applicationsInfo', None)
305 id = name = 'applicationName'
306 date = 'createTime'
307 arn_type = "application"
308 arn_separator = ":"
309 config_type = cfn_type = "AWS::CodeDeploy::Application"
310 universal_taggable = True
312 source_mapping = {
313 'describe': DescribeApplication,
314 'config': ConfigSource
315 }
317 def get_arns(self, resources):
318 return [self.generate_arn(r['applicationName']) for r in resources]
321@CodeDeployApplication.action_registry.register('delete')
322class DeleteApplication(BaseAction):
324 schema = type_schema('delete')
325 permissions = ('codedeploy:DeleteApplication',)
327 def process(self, resources):
328 client = local_session(self.manager.session_factory).client('codedeploy')
329 for r in resources:
330 try:
331 self.manager.retry(client.delete_application, applicationName=r['applicationName'])
332 except (client.exceptions.InvalidApplicationNameException,
333 client.exceptions.ApplicationDoesNotExistException):
334 continue
337@resources.register('codedeploy-deployment')
338class CodeDeployDeployment(QueryResourceManager):
340 class resource_type(TypeInfo):
341 service = 'codedeploy'
342 enum_spec = ('list_deployments', 'deployments', {'includeOnlyStatuses': [
343 'Created', 'Queued', 'InProgress', 'Baking', 'Ready']})
344 batch_detail_spec = (
345 'batch_get_deployments', 'deploymentIds',
346 None, 'deploymentsInfo', None)
347 name = id = 'deploymentId'
348 # couldn't find a real cloudformation type
349 cfn_type = None
350 arn_type = "deploymentgroup"
351 date = 'createTime'
354class DescribeDeploymentGroup(query.ChildDescribeSource):
356 def get_query(self):
357 query = super().get_query()
358 query.capture_parent_id = True
359 return query
361 def augment(self, resources):
362 client = local_session(self.manager.session_factory).client('codedeploy')
363 results = []
364 for parent_id, group_name in resources:
365 dg = self.manager.retry(
366 client.get_deployment_group, applicationName=parent_id,
367 deploymentGroupName=group_name).get('deploymentGroupInfo')
368 results.append(dg)
369 for r in results:
370 rarn = self.manager.generate_arn(r['applicationName'] + '/' + r['deploymentGroupName'])
371 r['Tags'] = self.manager.retry(
372 client.list_tags_for_resource, ResourceArn=rarn).get('Tags')
373 return results
376@resources.register('codedeploy-group')
377class CodeDeployDeploymentGroup(ChildResourceManager):
379 class resource_type(TypeInfo):
380 service = 'codedeploy'
381 parent_spec = ('codedeploy-app', 'applicationName', None)
382 enum_spec = ('list_deployment_groups', 'deploymentGroups', None)
383 id = 'deploymentGroupId'
384 name = 'deploymentGroupName'
385 arn_type = "deploymentgroup"
386 config_type = cfn_type = 'AWS::CodeDeploy::DeploymentGroup'
387 arn_separator = ':'
388 permission_prefix = 'codedeploy'
389 universal_taggable = True
391 source_mapping = {
392 'describe-child': DescribeDeploymentGroup
393 }
395 def get_arns(self, resources):
396 arns = []
397 for r in resources:
398 arns.append(self.generate_arn(r['applicationName'] + '/' + r['deploymentGroupName']))
399 return arns
402@CodeDeployDeploymentGroup.action_registry.register('delete')
403class DeleteDeploymentGroup(BaseAction):
404 """Delete a deployment group tied to an application.
405 """
407 schema = type_schema('delete')
408 permissions = ('codedeploy:DeleteDeploymentGroup',)
410 def process(self, resources):
411 client = local_session(self.manager.session_factory).client('codedeploy')
412 for r in resources:
413 try:
414 self.manager.retry(client.delete_deployment_group,
415 applicationName=r['applicationName'],
416 deploymentGroupName=r['deploymentGroupName'])
417 except client.exceptions.InvalidDeploymentGroupNameException:
418 continue