Firehose Delivery Stream
Provides a Kinesis Firehose Delivery Stream resource. Amazon Kinesis Firehose is a fully managed, elastic service to easily deliver real-time data streams to destinations such as Amazon S3 and Amazon Redshift. For more details, see the Amazon Kinesis Firehose Documentation.
Example Usage
Extended S3 Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.BucketV2;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.lambda.Function;
import com.pulumi.aws.lambda.FunctionArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs;
import com.pulumi.aws.s3.BucketAclV2;
import com.pulumi.aws.s3.BucketAclV2Args;
import com.pulumi.asset.FileArchive;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var bucket = new BucketV2("bucket");
final var firehoseAssumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("firehose.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var firehoseRole = new Role("firehoseRole", RoleArgs.builder()
.assumeRolePolicy(firehoseAssumeRole.applyValue(getPolicyDocumentResult -> getPolicyDocumentResult.json()))
.build());
final var lambdaAssumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("lambda.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var lambdaIam = new Role("lambdaIam", RoleArgs.builder()
.assumeRolePolicy(lambdaAssumeRole.applyValue(getPolicyDocumentResult -> getPolicyDocumentResult.json()))
.build());
var lambdaProcessor = new Function("lambdaProcessor", FunctionArgs.builder()
.code(new FileArchive("lambda.zip"))
.role(lambdaIam.arn())
.handler("exports.handler")
.runtime("nodejs16.x")
.build());
var extendedS3Stream = new FirehoseDeliveryStream("extendedS3Stream", FirehoseDeliveryStreamArgs.builder()
.destination("extended_s3")
.extendedS3Configuration(FirehoseDeliveryStreamExtendedS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.processingConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs.builder()
.enabled("true")
.processors(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("Lambda")
.parameters(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("LambdaArn")
.parameterValue(lambdaProcessor.arn().applyValue(arn -> String.format("%s:$LATEST", arn)))
.build())
.build())
.build())
.build())
.build());
var bucketAcl = new BucketAclV2("bucketAcl", BucketAclV2Args.builder()
.bucket(bucket.id())
.acl("private")
.build());
}
}Extended S3 Destination with dynamic partitioning
These examples use built-in Firehose functionality, rather than requiring a lambda.
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var extendedS3Stream = new FirehoseDeliveryStream("extendedS3Stream", FirehoseDeliveryStreamArgs.builder()
.destination("extended_s3")
.extendedS3Configuration(FirehoseDeliveryStreamExtendedS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(64)
.dynamicPartitioningConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs.builder()
.enabled("true")
.build())
.prefix("data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/")
.errorOutputPrefix("errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/")
.processingConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs.builder()
.enabled("true")
.processors(
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("RecordDeAggregation")
.parameters(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("SubRecordType")
.parameterValue("JSON")
.build())
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("AppendDelimiterToRecord")
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("MetadataExtraction")
.parameters(
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("JsonParsingEngine")
.parameterValue("JQ-1.6")
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("MetadataExtractionQuery")
.parameterValue("{customer_id:.customer_id}")
.build())
.build())
.build())
.build())
.build());
}
}S3 Destination (deprecated)
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.BucketV2;
import com.pulumi.aws.s3.BucketAclV2;
import com.pulumi.aws.s3.BucketAclV2Args;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var bucket = new BucketV2("bucket");
var bucketAcl = new BucketAclV2("bucketAcl", BucketAclV2Args.builder()
.bucket(bucket.id())
.acl("private")
.build());
final var assumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("firehose.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var firehoseRole = new Role("firehoseRole", RoleArgs.builder()
.assumeRolePolicy(assumeRole.applyValue(getPolicyDocumentResult -> getPolicyDocumentResult.json()))
.build());
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("s3")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.build())
.build());
}
}Redshift Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.redshift.Cluster;
import com.pulumi.aws.redshift.ClusterArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Cluster("testCluster", ClusterArgs.builder()
.clusterIdentifier("tf-redshift-cluster")
.databaseName("test")
.masterUsername("testuser")
.masterPassword("T3stPass")
.nodeType("dc1.large")
.clusterType("single-node")
.build());
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("redshift")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(10)
.bufferInterval(400)
.compressionFormat("GZIP")
.build())
.redshiftConfiguration(FirehoseDeliveryStreamRedshiftConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.clusterJdbcurl(Output.tuple(testCluster.endpoint(), testCluster.databaseName()).applyValue(values -> {
var endpoint = values.t1;
var databaseName = values.t2;
return String.format("jdbc:redshift://%s/%s", endpoint,databaseName);
}))
.username("testuser")
.password("T3stPass")
.dataTableName("test-table")
.copyOptions("delimiter '|'")
.dataTableColumns("test-col")
.s3BackupMode("Enabled")
.s3BackupConfiguration(FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(15)
.bufferInterval(300)
.compressionFormat("GZIP")
.build())
.build())
.build());
}
}Elasticsearch Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.elasticsearch.Domain;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamElasticsearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamElasticsearchConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Domain("testCluster");
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("elasticsearch")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(10)
.bufferInterval(400)
.compressionFormat("GZIP")
.build())
.elasticsearchConfiguration(FirehoseDeliveryStreamElasticsearchConfigurationArgs.builder()
.domainArn(testCluster.arn())
.roleArn(aws_iam_role.firehose_role().arn())
.indexName("test")
.typeName("test")
.processingConfiguration(FirehoseDeliveryStreamElasticsearchConfigurationProcessingConfigurationArgs.builder()
.enabled("true")
.processors(FirehoseDeliveryStreamElasticsearchConfigurationProcessingConfigurationProcessorArgs.builder()
.type("Lambda")
.parameters(FirehoseDeliveryStreamElasticsearchConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("LambdaArn")
.parameterValue(String.format("%s:$LATEST", aws_lambda_function.lambda_processor().arn()))
.build())
.build())
.build())
.build())
.build());
}
}Elasticsearch Destination With VPC
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.elasticsearch.Domain;
import com.pulumi.aws.elasticsearch.DomainArgs;
import com.pulumi.aws.elasticsearch.inputs.DomainClusterConfigArgs;
import com.pulumi.aws.elasticsearch.inputs.DomainEbsOptionsArgs;
import com.pulumi.aws.elasticsearch.inputs.DomainVpcOptionsArgs;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamElasticsearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamElasticsearchConfigurationVpcConfigArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Domain("testCluster", DomainArgs.builder()
.clusterConfig(DomainClusterConfigArgs.builder()
.instanceCount(2)
.zoneAwarenessEnabled(true)
.instanceType("t2.small.elasticsearch")
.build())
.ebsOptions(DomainEbsOptionsArgs.builder()
.ebsEnabled(true)
.volumeSize(10)
.build())
.vpcOptions(DomainVpcOptionsArgs.builder()
.securityGroupIds(aws_security_group.first().id())
.subnetIds(
aws_subnet.first().id(),
aws_subnet.second().id())
.build())
.build());
final var firehose-elasticsearchPolicyDocument = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(
GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions("es:*")
.resources(
testCluster.arn(),
testCluster.arn().applyValue(arn -> String.format("%s/*", arn)))
.build(),
GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions(
"ec2:DescribeVpcs",
"ec2:DescribeVpcAttribute",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
"ec2:DescribeNetworkInterfaces",
"ec2:CreateNetworkInterface",
"ec2:CreateNetworkInterfacePermission",
"ec2:DeleteNetworkInterface")
.resources("*")
.build())
.build());
var firehose_elasticsearchRolePolicy = new RolePolicy("firehose-elasticsearchRolePolicy", RolePolicyArgs.builder()
.role(aws_iam_role.firehose().id())
.policy(firehose_elasticsearchPolicyDocument.applyValue(firehose_elasticsearchPolicyDocument -> firehose_elasticsearchPolicyDocument.json()))
.build());
var test = new FirehoseDeliveryStream("test", FirehoseDeliveryStreamArgs.builder()
.destination("elasticsearch")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.build())
.elasticsearchConfiguration(FirehoseDeliveryStreamElasticsearchConfigurationArgs.builder()
.domainArn(testCluster.arn())
.roleArn(aws_iam_role.firehose().arn())
.indexName("test")
.typeName("test")
.vpcConfig(FirehoseDeliveryStreamElasticsearchConfigurationVpcConfigArgs.builder()
.subnetIds(
aws_subnet.first().id(),
aws_subnet.second().id())
.securityGroupIds(aws_security_group.first().id())
.roleArn(aws_iam_role.firehose().arn())
.build())
.build())
.build(), CustomResourceOptions.builder()
.dependsOn(firehose_elasticsearchRolePolicy)
.build());
}
}Opensearch Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.opensearch.Domain;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Domain("testCluster");
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("opensearch")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(10)
.bufferInterval(400)
.compressionFormat("GZIP")
.build())
.opensearchConfiguration(FirehoseDeliveryStreamOpensearchConfigurationArgs.builder()
.domainArn(testCluster.arn())
.roleArn(aws_iam_role.firehose_role().arn())
.indexName("test")
.processingConfiguration(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs.builder()
.enabled("true")
.processors(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs.builder()
.type("Lambda")
.parameters(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("LambdaArn")
.parameterValue(String.format("%s:$LATEST", aws_lambda_function.lambda_processor().arn()))
.build())
.build())
.build())
.build())
.build());
}
}Opensearch Destination With VPC
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.opensearch.Domain;
import com.pulumi.aws.opensearch.DomainArgs;
import com.pulumi.aws.opensearch.inputs.DomainClusterConfigArgs;
import com.pulumi.aws.opensearch.inputs.DomainEbsOptionsArgs;
import com.pulumi.aws.opensearch.inputs.DomainVpcOptionsArgs;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationVpcConfigArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Domain("testCluster", DomainArgs.builder()
.clusterConfig(DomainClusterConfigArgs.builder()
.instanceCount(2)
.zoneAwarenessEnabled(true)
.instanceType("m4.large.search")
.build())
.ebsOptions(DomainEbsOptionsArgs.builder()
.ebsEnabled(true)
.volumeSize(10)
.build())
.vpcOptions(DomainVpcOptionsArgs.builder()
.securityGroupIds(aws_security_group.first().id())
.subnetIds(
aws_subnet.first().id(),
aws_subnet.second().id())
.build())
.build());
var firehose_opensearch = new RolePolicy("firehose-opensearch", RolePolicyArgs.builder()
.role(aws_iam_role.firehose().id())
.policy(Output.tuple(testCluster.arn(), testCluster.arn()).applyValue(values -> {
var testClusterArn = values.t1;
var testClusterArn1 = values.t2;
return """
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"es:*"
],
"Resource": [
"%s",
"%s/*"
]
},
{
"Effect": "Allow",
"Action": [
"ec2:DescribeVpcs",
"ec2:DescribeVpcAttribute",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
"ec2:DescribeNetworkInterfaces",
"ec2:CreateNetworkInterface",
"ec2:CreateNetworkInterfacePermission",
"ec2:DeleteNetworkInterface"
],
"Resource": [
"*"
]
}
]
}
", testClusterArn,testClusterArn1);
}))
.build());
var test = new FirehoseDeliveryStream("test", FirehoseDeliveryStreamArgs.builder()
.destination("opensearch")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.build())
.opensearchConfiguration(FirehoseDeliveryStreamOpensearchConfigurationArgs.builder()
.domainArn(testCluster.arn())
.roleArn(aws_iam_role.firehose().arn())
.indexName("test")
.vpcConfig(FirehoseDeliveryStreamOpensearchConfigurationVpcConfigArgs.builder()
.subnetIds(
aws_subnet.first().id(),
aws_subnet.second().id())
.securityGroupIds(aws_security_group.first().id())
.roleArn(aws_iam_role.firehose().arn())
.build())
.build())
.build(), CustomResourceOptions.builder()
.dependsOn(firehose_opensearch)
.build());
}
}Splunk Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamSplunkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("splunk")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(10)
.bufferInterval(400)
.compressionFormat("GZIP")
.build())
.splunkConfiguration(FirehoseDeliveryStreamSplunkConfigurationArgs.builder()
.hecEndpoint("https://http-inputs-mydomain.splunkcloud.com:443")
.hecToken("51D4DA16-C61B-4F5F-8EC7-ED4301342A4A")
.hecAcknowledgmentTimeout(600)
.hecEndpointType("Event")
.s3BackupMode("FailedEventsOnly")
.build())
.build());
}
}HTTP Endpoint (e.g., New Relic) Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("http_endpoint")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(10)
.bufferInterval(400)
.compressionFormat("GZIP")
.build())
.httpEndpointConfiguration(FirehoseDeliveryStreamHttpEndpointConfigurationArgs.builder()
.url("https://aws-api.newrelic.com/firehose/v1")
.name("New Relic")
.accessKey("my-key")
.bufferingSize(15)
.bufferingInterval(600)
.roleArn(aws_iam_role.firehose().arn())
.s3BackupMode("FailedDataOnly")
.requestConfiguration(FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs.builder()
.contentEncoding("GZIP")
.commonAttributes(
FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs.builder()
.name("testname")
.value("testvalue")
.build(),
FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs.builder()
.name("testname2")
.value("testvalue2")
.build())
.build())
.build())
.build());
}
}Import
Kinesis Firehose Delivery streams can be imported using the stream ARN, e.g.,
$ pulumi import aws:kinesis/firehoseDeliveryStream:FirehoseDeliveryStream foo arn:aws:firehose:us-east-1:XXX:deliverystream/exampleNoteImport does not work for stream destination s3. Consider using extended_s3 since s3 destination is deprecated. //
Properties
This is the destination to where the data is delivered. The only options are s3 (Deprecated, use extended_s3 instead), extended_s3, redshift, elasticsearch, splunk, http_endpoint and opensearch.
Configuration options if elasticsearch is the destination. More details are given below.
Enhanced configuration options for the s3 destination. More details are given below.
Configuration options if http_endpoint is the destination. requires the user to also specify a s3_configuration block. More details are given below.
Allows the ability to specify the kinesis stream that is used as the source of the firehose delivery stream.
A name to identify the stream. This is unique to the AWS account and region the Stream is created in. When using for WAF logging, name must be prefixed with aws-waf-logs-. See AWS Documentation for more details.
Configuration options if opensearch is the destination. More details are given below.
Configuration options if redshift is the destination. Using redshift_configuration requires the user to also specify a s3_configuration block. More details are given below.
Required for non-S3 destinations. For S3 destination, use extended_s3_configuration instead. Configuration options for the s3 destination (or the intermediate bucket if the destination is redshift). More details are given below.
Encrypt at rest options. Server-side encryption should not be enabled when a kinesis stream is configured as the source of the firehose delivery stream.
Configuration options if splunk is the destination. More details are given below.