Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/code.py: 64%

225 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3from botocore.exceptions import ClientError 

4 

5from c7n.actions import BaseAction 

6from c7n.filters.vpc import SubnetFilter, SecurityGroupFilter, VpcFilter 

7from c7n.manager import resources 

8from c7n.query import ( 

9 QueryResourceManager, DescribeSource, ConfigSource, TypeInfo, ChildResourceManager) 

10from c7n.tags import universal_augment 

11from c7n.utils import local_session, type_schema, jmespath_search 

12from c7n import query 

13 

14from .securityhub import OtherResourcePostFinding 

15 

16 

17class DescribeRepo(DescribeSource): 

18 

19 def augment(self, resources): 

20 return universal_augment( 

21 self.manager, 

22 super().augment(resources) 

23 ) 

24 

25 

26@resources.register('codecommit') 

27class CodeRepository(QueryResourceManager): 

28 

29 class resource_type(TypeInfo): 

30 service = 'codecommit' 

31 enum_spec = ('list_repositories', 'repositories', None) 

32 batch_detail_spec = ( 

33 'batch_get_repositories', 'repositoryNames', 'repositoryName', 

34 'repositories', None) 

35 name = id = 'repositoryName' 

36 arn = "Arn" 

37 date = 'creationDate' 

38 cfn_type = 'AWS::CodeCommit::Repository' 

39 universal_taggable = object() 

40 

41 source_mapping = { 

42 'describe': DescribeRepo, 

43 'config': ConfigSource 

44 } 

45 

46 def get_resources(self, ids, cache=True, augment=True): 

47 return universal_augment(self, self.augment([{'repositoryName': i} for i in ids])) 

48 

49 

50@CodeRepository.action_registry.register('delete') 

51class DeleteRepository(BaseAction): 

52 """Action to delete code commit 

53 

54 It is recommended to use a filter to avoid unwanted deletion of repos 

55 

56 :example: 

57 

58 .. code-block:: yaml 

59 

60 policies: 

61 - name: codecommit-delete 

62 resource: codecommit 

63 actions: 

64 - delete 

65 """ 

66 

67 schema = type_schema('delete') 

68 permissions = ("codecommit:DeleteRepository",) 

69 

70 def process(self, repositories): 

71 client = local_session( 

72 self.manager.session_factory).client('codecommit') 

73 for r in repositories: 

74 self.process_repository(client, r) 

75 

76 def process_repository(self, client, repository): 

77 try: 

78 client.delete_repository(repositoryName=repository['repositoryName']) 

79 except ClientError as e: 

80 self.log.exception( 

81 "Exception deleting repo:\n %s" % e) 

82 

83 

84class DescribeBuild(DescribeSource): 

85 

86 def augment(self, resources): 

87 return universal_augment( 

88 self.manager, 

89 super(DescribeBuild, self).augment(resources)) 

90 

91 

92class ConfigBuild(ConfigSource): 

93 

94 def load_resource(self, item): 

95 item_config = item['configuration'] 

96 item_config['Tags'] = [ 

97 {'Key': t['key'], 'Value': t['value']} for t in item_config.get('tags')] 

98 

99 # AWS Config garbage mangle undo. 

100 

101 if 'queuedtimeoutInMinutes' in item_config: 

102 item_config['queuedTimeoutInMinutes'] = int(item_config.pop('queuedtimeoutInMinutes')) 

103 

104 artifacts = item_config.pop('artifacts') 

105 item_config['artifacts'] = artifacts.pop(0) 

106 if artifacts: 

107 item_config['secondaryArtifacts'] = artifacts 

108 sources = item_config['source'] 

109 item_config['source'] = sources.pop(0) 

110 if sources: 

111 item_config['secondarySources'] = sources 

112 

113 if 'vpcConfig' in item_config and 'subnets' in item_config['vpcConfig']: 

114 item_config['vpcConfig']['subnets'] = [ 

115 s['subnet'] for s in item_config['vpcConfig']['subnets']] 

116 

117 item_config['arn'] = 'arn:aws:codebuild:{}:{}:project/{}'.format( 

118 self.manager.config.region, self.manager.config.account_id, item_config['name']) 

119 return item_config 

120 

121 

122@resources.register('codebuild') 

123class CodeBuildProject(QueryResourceManager): 

124 

125 class resource_type(TypeInfo): 

126 service = 'codebuild' 

127 enum_spec = ('list_projects', 'projects', None) 

128 batch_detail_spec = ( 

129 'batch_get_projects', 'names', None, 'projects', None) 

130 name = id = 'name' 

131 arn = 'arn' 

132 date = 'created' 

133 dimension = 'ProjectName' 

134 cfn_type = config_type = "AWS::CodeBuild::Project" 

135 arn_type = 'project' 

136 universal_taggable = object() 

137 

138 source_mapping = { 

139 'describe': DescribeBuild, 

140 'config': ConfigBuild 

141 } 

142 

143 

144@CodeBuildProject.filter_registry.register('subnet') 

145class BuildSubnetFilter(SubnetFilter): 

146 

147 RelatedIdsExpression = "vpcConfig.subnets[]" 

148 

149 

150@CodeBuildProject.filter_registry.register('security-group') 

151class BuildSecurityGroupFilter(SecurityGroupFilter): 

152 

153 RelatedIdsExpression = "vpcConfig.securityGroupIds[]" 

154 

155 

156@CodeBuildProject.filter_registry.register('vpc') 

157class BuildVpcFilter(VpcFilter): 

158 

159 RelatedIdsExpression = "vpcConfig.vpcId" 

160 

161 

162@CodeBuildProject.action_registry.register('post-finding') 

163class BuildPostFinding(OtherResourcePostFinding): 

164 

165 resource_type = 'AwsCodeBuildProject' 

166 

167 def format_resource(self, r): 

168 envelope, payload = self.format_envelope(r) 

169 payload.update(self.filter_empty({ 

170 'Name': r['name'], 

171 'EncryptionKey': r['encryptionKey'], 

172 'Environment': self.filter_empty({ 

173 'Type': r['environment']['type'], 

174 'Certificate': r['environment'].get('certificate'), 

175 'RegistryCredential': self.filter_empty({ 

176 'Credential': jmespath_search( 

177 'environment.registryCredential.credential', r), 

178 'CredentialProvider': jmespath_search( 

179 'environment.registryCredential.credentialProvider', r) 

180 }), 

181 'ImagePullCredentialsType': r['environment'].get( 

182 'imagePullCredentialsType') 

183 }), 

184 'ServiceRole': r['serviceRole'], 

185 'VpcConfig': self.filter_empty({ 

186 'VpcId': jmespath_search('vpcConfig.vpcId', r), 

187 'Subnets': jmespath_search('vpcConfig.subnets', r), 

188 'SecurityGroupIds': jmespath_search('vpcConfig.securityGroupIds', r) 

189 }), 

190 'Source': self.filter_empty({ 

191 'Type': jmespath_search('source.type', r), 

192 'Location': jmespath_search('source.location', r), 

193 'GitCloneDepth': jmespath_search('source.gitCloneDepth', r) 

194 }), 

195 })) 

196 return envelope 

197 

198 

199@CodeBuildProject.action_registry.register('delete') 

200class DeleteProject(BaseAction): 

201 """Action to delete code build 

202 

203 It is recommended to use a filter to avoid unwanted deletion of builds 

204 

205 :example: 

206 

207 .. code-block:: yaml 

208 

209 policies: 

210 - name: codebuild-delete 

211 resource: codebuild 

212 actions: 

213 - delete 

214 """ 

215 

216 schema = type_schema('delete') 

217 permissions = ("codebuild:DeleteProject",) 

218 

219 def process(self, projects): 

220 client = local_session(self.manager.session_factory).client('codebuild') 

221 for p in projects: 

222 self.process_project(client, p) 

223 

224 def process_project(self, client, project): 

225 

226 try: 

227 client.delete_project(name=project['name']) 

228 except ClientError as e: 

229 self.log.exception( 

230 "Exception deleting project:\n %s" % e) 

231 

232 

233class ConfigPipeline(ConfigSource): 

234 

235 def load_resource(self, item): 

236 item_config = self._load_item_config(item) 

237 resource = item_config.pop('pipeline') 

238 resource.update(item_config['metadata']) 

239 self._load_resource_tags(resource, item) 

240 return resource 

241 

242 

243class DescribePipeline(DescribeSource): 

244 

245 def augment(self, resources): 

246 resources = super().augment(resources) 

247 return universal_augment(self.manager, resources) 

248 

249 

250@resources.register('codepipeline') 

251class CodeDeployPipeline(QueryResourceManager): 

252 

253 class resource_type(TypeInfo): 

254 service = 'codepipeline' 

255 enum_spec = ('list_pipelines', 'pipelines', None) 

256 detail_spec = ('get_pipeline', 'name', 'name', 'pipeline') 

257 name = id = 'name' 

258 date = 'created' 

259 # Note this is purposeful, codepipeline don't have a separate type specifier. 

260 arn_type = "" 

261 cfn_type = config_type = "AWS::CodePipeline::Pipeline" 

262 universal_taggable = object() 

263 

264 source_mapping = { 

265 'describe': DescribePipeline, 

266 'config': ConfigPipeline 

267 } 

268 

269 

270@CodeDeployPipeline.action_registry.register('delete') 

271class DeletePipeline(BaseAction): 

272 

273 schema = type_schema('delete') 

274 permissions = ('codepipeline:DeletePipeline',) 

275 

276 def process(self, resources): 

277 client = local_session(self.manager.session_factory).client('codepipeline') 

278 for r in resources: 

279 try: 

280 self.manager.retry(client.delete_pipeline, name=r['name']) 

281 except client.exceptions.PipelineNotFoundException: 

282 continue 

283 

284 

285class DescribeApplication(DescribeSource): 

286 

287 def augment(self, resources): 

288 resources = super().augment(resources) 

289 client = local_session(self.manager.session_factory).client('codedeploy') 

290 for r, arn in zip(resources, self.manager.get_arns(resources)): 

291 r['Tags'] = client.list_tags_for_resource( 

292 ResourceArn=arn).get('Tags', []) 

293 return resources 

294 

295 

296@resources.register('codedeploy-app') 

297class CodeDeployApplication(QueryResourceManager): 

298 

299 class resource_type(TypeInfo): 

300 service = 'codedeploy' 

301 enum_spec = ('list_applications', 'applications', None) 

302 batch_detail_spec = ( 

303 'batch_get_applications', 'applicationNames', 

304 None, 'applicationsInfo', None) 

305 id = name = 'applicationName' 

306 date = 'createTime' 

307 arn_type = "application" 

308 arn_separator = ":" 

309 config_type = cfn_type = "AWS::CodeDeploy::Application" 

310 universal_taggable = True 

311 

312 source_mapping = { 

313 'describe': DescribeApplication, 

314 'config': ConfigSource 

315 } 

316 

317 def get_arns(self, resources): 

318 return [self.generate_arn(r['applicationName']) for r in resources] 

319 

320 

321@CodeDeployApplication.action_registry.register('delete') 

322class DeleteApplication(BaseAction): 

323 

324 schema = type_schema('delete') 

325 permissions = ('codedeploy:DeleteApplication',) 

326 

327 def process(self, resources): 

328 client = local_session(self.manager.session_factory).client('codedeploy') 

329 for r in resources: 

330 try: 

331 self.manager.retry(client.delete_application, applicationName=r['applicationName']) 

332 except (client.exceptions.InvalidApplicationNameException, 

333 client.exceptions.ApplicationDoesNotExistException): 

334 continue 

335 

336 

337@resources.register('codedeploy-deployment') 

338class CodeDeployDeployment(QueryResourceManager): 

339 

340 class resource_type(TypeInfo): 

341 service = 'codedeploy' 

342 enum_spec = ('list_deployments', 'deployments', {'includeOnlyStatuses': [ 

343 'Created', 'Queued', 'InProgress', 'Baking', 'Ready']}) 

344 batch_detail_spec = ( 

345 'batch_get_deployments', 'deploymentIds', 

346 None, 'deploymentsInfo', None) 

347 name = id = 'deploymentId' 

348 # couldn't find a real cloudformation type 

349 cfn_type = None 

350 arn_type = "deploymentgroup" 

351 date = 'createTime' 

352 

353 

354class DescribeDeploymentGroup(query.ChildDescribeSource): 

355 

356 def get_query(self): 

357 query = super().get_query() 

358 query.capture_parent_id = True 

359 return query 

360 

361 def augment(self, resources): 

362 client = local_session(self.manager.session_factory).client('codedeploy') 

363 results = [] 

364 for parent_id, group_name in resources: 

365 dg = self.manager.retry( 

366 client.get_deployment_group, applicationName=parent_id, 

367 deploymentGroupName=group_name).get('deploymentGroupInfo') 

368 results.append(dg) 

369 for r in results: 

370 rarn = self.manager.generate_arn(r['applicationName'] + '/' + r['deploymentGroupName']) 

371 r['Tags'] = self.manager.retry( 

372 client.list_tags_for_resource, ResourceArn=rarn).get('Tags') 

373 return results 

374 

375 

376@resources.register('codedeploy-group') 

377class CodeDeployDeploymentGroup(ChildResourceManager): 

378 

379 class resource_type(TypeInfo): 

380 service = 'codedeploy' 

381 parent_spec = ('codedeploy-app', 'applicationName', None) 

382 enum_spec = ('list_deployment_groups', 'deploymentGroups', None) 

383 id = 'deploymentGroupId' 

384 name = 'deploymentGroupName' 

385 arn_type = "deploymentgroup" 

386 config_type = cfn_type = 'AWS::CodeDeploy::DeploymentGroup' 

387 arn_separator = ':' 

388 permission_prefix = 'codedeploy' 

389 universal_taggable = True 

390 

391 source_mapping = { 

392 'describe-child': DescribeDeploymentGroup 

393 } 

394 

395 def get_arns(self, resources): 

396 arns = [] 

397 for r in resources: 

398 arns.append(self.generate_arn(r['applicationName'] + '/' + r['deploymentGroupName'])) 

399 return arns 

400 

401 

402@CodeDeployDeploymentGroup.action_registry.register('delete') 

403class DeleteDeploymentGroup(BaseAction): 

404 """Delete a deployment group tied to an application. 

405 """ 

406 

407 schema = type_schema('delete') 

408 permissions = ('codedeploy:DeleteDeploymentGroup',) 

409 

410 def process(self, resources): 

411 client = local_session(self.manager.session_factory).client('codedeploy') 

412 for r in resources: 

413 try: 

414 self.manager.retry(client.delete_deployment_group, 

415 applicationName=r['applicationName'], 

416 deploymentGroupName=r['deploymentGroupName']) 

417 except client.exceptions.InvalidDeploymentGroupNameException: 

418 continue