Firehose Delivery Stream Args
Provides a Kinesis Firehose Delivery Stream resource. Amazon Kinesis Firehose is a fully managed, elastic service to easily deliver real-time data streams to destinations such as Amazon S3 and Amazon Redshift. For more details, see the Amazon Kinesis Firehose Documentation.
Example Usage
Extended S3 Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.BucketV2;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.lambda.Function;
import com.pulumi.aws.lambda.FunctionArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs;
import com.pulumi.aws.s3.BucketAclV2;
import com.pulumi.aws.s3.BucketAclV2Args;
import com.pulumi.asset.FileArchive;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var bucket = new BucketV2("bucket");
final var firehoseAssumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("firehose.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var firehoseRole = new Role("firehoseRole", RoleArgs.builder()
.assumeRolePolicy(firehoseAssumeRole.applyValue(getPolicyDocumentResult -> getPolicyDocumentResult.json()))
.build());
final var lambdaAssumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("lambda.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var lambdaIam = new Role("lambdaIam", RoleArgs.builder()
.assumeRolePolicy(lambdaAssumeRole.applyValue(getPolicyDocumentResult -> getPolicyDocumentResult.json()))
.build());
var lambdaProcessor = new Function("lambdaProcessor", FunctionArgs.builder()
.code(new FileArchive("lambda.zip"))
.role(lambdaIam.arn())
.handler("exports.handler")
.runtime("nodejs16.x")
.build());
var extendedS3Stream = new FirehoseDeliveryStream("extendedS3Stream", FirehoseDeliveryStreamArgs.builder()
.destination("extended_s3")
.extendedS3Configuration(FirehoseDeliveryStreamExtendedS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.processingConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs.builder()
.enabled("true")
.processors(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("Lambda")
.parameters(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("LambdaArn")
.parameterValue(lambdaProcessor.arn().applyValue(arn -> String.format("%s:$LATEST", arn)))
.build())
.build())
.build())
.build())
.build());
var bucketAcl = new BucketAclV2("bucketAcl", BucketAclV2Args.builder()
.bucket(bucket.id())
.acl("private")
.build());
}
}
Extended S3 Destination with dynamic partitioning
These examples use built-in Firehose functionality, rather than requiring a lambda.
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var extendedS3Stream = new FirehoseDeliveryStream("extendedS3Stream", FirehoseDeliveryStreamArgs.builder()
.destination("extended_s3")
.extendedS3Configuration(FirehoseDeliveryStreamExtendedS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(64)
.dynamicPartitioningConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs.builder()
.enabled("true")
.build())
.prefix("data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/")
.errorOutputPrefix("errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/")
.processingConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs.builder()
.enabled("true")
.processors(
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("RecordDeAggregation")
.parameters(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("SubRecordType")
.parameterValue("JSON")
.build())
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("AppendDelimiterToRecord")
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("MetadataExtraction")
.parameters(
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("JsonParsingEngine")
.parameterValue("JQ-1.6")
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("MetadataExtractionQuery")
.parameterValue("{customer_id:.customer_id}")
.build())
.build())
.build())
.build())
.build());
}
}
S3 Destination (deprecated)
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.BucketV2;
import com.pulumi.aws.s3.BucketAclV2;
import com.pulumi.aws.s3.BucketAclV2Args;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var bucket = new BucketV2("bucket");
var bucketAcl = new BucketAclV2("bucketAcl", BucketAclV2Args.builder()
.bucket(bucket.id())
.acl("private")
.build());
final var assumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("firehose.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var firehoseRole = new Role("firehoseRole", RoleArgs.builder()
.assumeRolePolicy(assumeRole.applyValue(getPolicyDocumentResult -> getPolicyDocumentResult.json()))
.build());
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("s3")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.build())
.build());
}
}
Redshift Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.redshift.Cluster;
import com.pulumi.aws.redshift.ClusterArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Cluster("testCluster", ClusterArgs.builder()
.clusterIdentifier("tf-redshift-cluster")
.databaseName("test")
.masterUsername("testuser")
.masterPassword("T3stPass")
.nodeType("dc1.large")
.clusterType("single-node")
.build());
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("redshift")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(10)
.bufferInterval(400)
.compressionFormat("GZIP")
.build())
.redshiftConfiguration(FirehoseDeliveryStreamRedshiftConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.clusterJdbcurl(Output.tuple(testCluster.endpoint(), testCluster.databaseName()).applyValue(values -> {
var endpoint = values.t1;
var databaseName = values.t2;
return String.format("jdbc:redshift://%s/%s", endpoint,databaseName);
}))
.username("testuser")
.password("T3stPass")
.dataTableName("test-table")
.copyOptions("delimiter '|'")
.dataTableColumns("test-col")
.s3BackupMode("Enabled")
.s3BackupConfiguration(FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(15)
.bufferInterval(300)
.compressionFormat("GZIP")
.build())
.build())
.build());
}
}
Elasticsearch Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.elasticsearch.Domain;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamElasticsearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamElasticsearchConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Domain("testCluster");
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("elasticsearch")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(10)
.bufferInterval(400)
.compressionFormat("GZIP")
.build())
.elasticsearchConfiguration(FirehoseDeliveryStreamElasticsearchConfigurationArgs.builder()
.domainArn(testCluster.arn())
.roleArn(aws_iam_role.firehose_role().arn())
.indexName("test")
.typeName("test")
.processingConfiguration(FirehoseDeliveryStreamElasticsearchConfigurationProcessingConfigurationArgs.builder()
.enabled("true")
.processors(FirehoseDeliveryStreamElasticsearchConfigurationProcessingConfigurationProcessorArgs.builder()
.type("Lambda")
.parameters(FirehoseDeliveryStreamElasticsearchConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("LambdaArn")
.parameterValue(String.format("%s:$LATEST", aws_lambda_function.lambda_processor().arn()))
.build())
.build())
.build())
.build())
.build());
}
}
Elasticsearch Destination With VPC
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.elasticsearch.Domain;
import com.pulumi.aws.elasticsearch.DomainArgs;
import com.pulumi.aws.elasticsearch.inputs.DomainClusterConfigArgs;
import com.pulumi.aws.elasticsearch.inputs.DomainEbsOptionsArgs;
import com.pulumi.aws.elasticsearch.inputs.DomainVpcOptionsArgs;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamElasticsearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamElasticsearchConfigurationVpcConfigArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Domain("testCluster", DomainArgs.builder()
.clusterConfig(DomainClusterConfigArgs.builder()
.instanceCount(2)
.zoneAwarenessEnabled(true)
.instanceType("t2.small.elasticsearch")
.build())
.ebsOptions(DomainEbsOptionsArgs.builder()
.ebsEnabled(true)
.volumeSize(10)
.build())
.vpcOptions(DomainVpcOptionsArgs.builder()
.securityGroupIds(aws_security_group.first().id())
.subnetIds(
aws_subnet.first().id(),
aws_subnet.second().id())
.build())
.build());
final var firehose-elasticsearchPolicyDocument = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(
GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions("es:*")
.resources(
testCluster.arn(),
testCluster.arn().applyValue(arn -> String.format("%s/*", arn)))
.build(),
GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions(
"ec2:DescribeVpcs",
"ec2:DescribeVpcAttribute",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
"ec2:DescribeNetworkInterfaces",
"ec2:CreateNetworkInterface",
"ec2:CreateNetworkInterfacePermission",
"ec2:DeleteNetworkInterface")
.resources("*")
.build())
.build());
var firehose_elasticsearchRolePolicy = new RolePolicy("firehose-elasticsearchRolePolicy", RolePolicyArgs.builder()
.role(aws_iam_role.firehose().id())
.policy(firehose_elasticsearchPolicyDocument.applyValue(firehose_elasticsearchPolicyDocument -> firehose_elasticsearchPolicyDocument.json()))
.build());
var test = new FirehoseDeliveryStream("test", FirehoseDeliveryStreamArgs.builder()
.destination("elasticsearch")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.build())
.elasticsearchConfiguration(FirehoseDeliveryStreamElasticsearchConfigurationArgs.builder()
.domainArn(testCluster.arn())
.roleArn(aws_iam_role.firehose().arn())
.indexName("test")
.typeName("test")
.vpcConfig(FirehoseDeliveryStreamElasticsearchConfigurationVpcConfigArgs.builder()
.subnetIds(
aws_subnet.first().id(),
aws_subnet.second().id())
.securityGroupIds(aws_security_group.first().id())
.roleArn(aws_iam_role.firehose().arn())
.build())
.build())
.build(), CustomResourceOptions.builder()
.dependsOn(firehose_elasticsearchRolePolicy)
.build());
}
}
Opensearch Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.opensearch.Domain;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Domain("testCluster");
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("opensearch")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose_role().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(10)
.bufferInterval(400)
.compressionFormat("GZIP")
.build())
.opensearchConfiguration(FirehoseDeliveryStreamOpensearchConfigurationArgs.builder()
.domainArn(testCluster.arn())
.roleArn(aws_iam_role.firehose_role().arn())
.indexName("test")
.processingConfiguration(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs.builder()
.enabled("true")
.processors(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs.builder()
.type("Lambda")
.parameters(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("LambdaArn")
.parameterValue(String.format("%s:$LATEST", aws_lambda_function.lambda_processor().arn()))
.build())
.build())
.build())
.build())
.build());
}
}
Opensearch Destination With VPC
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.opensearch.Domain;
import com.pulumi.aws.opensearch.DomainArgs;
import com.pulumi.aws.opensearch.inputs.DomainClusterConfigArgs;
import com.pulumi.aws.opensearch.inputs.DomainEbsOptionsArgs;
import com.pulumi.aws.opensearch.inputs.DomainVpcOptionsArgs;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationVpcConfigArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Domain("testCluster", DomainArgs.builder()
.clusterConfig(DomainClusterConfigArgs.builder()
.instanceCount(2)
.zoneAwarenessEnabled(true)
.instanceType("m4.large.search")
.build())
.ebsOptions(DomainEbsOptionsArgs.builder()
.ebsEnabled(true)
.volumeSize(10)
.build())
.vpcOptions(DomainVpcOptionsArgs.builder()
.securityGroupIds(aws_security_group.first().id())
.subnetIds(
aws_subnet.first().id(),
aws_subnet.second().id())
.build())
.build());
var firehose_opensearch = new RolePolicy("firehose-opensearch", RolePolicyArgs.builder()
.role(aws_iam_role.firehose().id())
.policy(Output.tuple(testCluster.arn(), testCluster.arn()).applyValue(values -> {
var testClusterArn = values.t1;
var testClusterArn1 = values.t2;
return """
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"es:*"
],
"Resource": [
"%s",
"%s/*"
]
},
{
"Effect": "Allow",
"Action": [
"ec2:DescribeVpcs",
"ec2:DescribeVpcAttribute",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
"ec2:DescribeNetworkInterfaces",
"ec2:CreateNetworkInterface",
"ec2:CreateNetworkInterfacePermission",
"ec2:DeleteNetworkInterface"
],
"Resource": [
"*"
]
}
]
}
", testClusterArn,testClusterArn1);
}))
.build());
var test = new FirehoseDeliveryStream("test", FirehoseDeliveryStreamArgs.builder()
.destination("opensearch")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.build())
.opensearchConfiguration(FirehoseDeliveryStreamOpensearchConfigurationArgs.builder()
.domainArn(testCluster.arn())
.roleArn(aws_iam_role.firehose().arn())
.indexName("test")
.vpcConfig(FirehoseDeliveryStreamOpensearchConfigurationVpcConfigArgs.builder()
.subnetIds(
aws_subnet.first().id(),
aws_subnet.second().id())
.securityGroupIds(aws_security_group.first().id())
.roleArn(aws_iam_role.firehose().arn())
.build())
.build())
.build(), CustomResourceOptions.builder()
.dependsOn(firehose_opensearch)
.build());
}
}
Splunk Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamSplunkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("splunk")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(10)
.bufferInterval(400)
.compressionFormat("GZIP")
.build())
.splunkConfiguration(FirehoseDeliveryStreamSplunkConfigurationArgs.builder()
.hecEndpoint("https://http-inputs-mydomain.splunkcloud.com:443")
.hecToken("51D4DA16-C61B-4F5F-8EC7-ED4301342A4A")
.hecAcknowledgmentTimeout(600)
.hecEndpointType("Event")
.s3BackupMode("FailedEventsOnly")
.build())
.build());
}
}
HTTP Endpoint (e.g., New Relic) Destination
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("http_endpoint")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(aws_iam_role.firehose().arn())
.bucketArn(aws_s3_bucket.bucket().arn())
.bufferSize(10)
.bufferInterval(400)
.compressionFormat("GZIP")
.build())
.httpEndpointConfiguration(FirehoseDeliveryStreamHttpEndpointConfigurationArgs.builder()
.url("https://aws-api.newrelic.com/firehose/v1")
.name("New Relic")
.accessKey("my-key")
.bufferingSize(15)
.bufferingInterval(600)
.roleArn(aws_iam_role.firehose().arn())
.s3BackupMode("FailedDataOnly")
.requestConfiguration(FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs.builder()
.contentEncoding("GZIP")
.commonAttributes(
FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs.builder()
.name("testname")
.value("testvalue")
.build(),
FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs.builder()
.name("testname2")
.value("testvalue2")
.build())
.build())
.build())
.build());
}
}
Import
Kinesis Firehose Delivery streams can be imported using the stream ARN, e.g.,
$ pulumi import aws:kinesis/firehoseDeliveryStream:FirehoseDeliveryStream foo arn:aws:firehose:us-east-1:XXX:deliverystream/example
NoteImport does not work for stream destination s3
. Consider using extended_s3
since s3
destination is deprecated.
Constructors
Properties
This is the destination to where the data is delivered. The only options are s3
(Deprecated, use extended_s3
instead), extended_s3
, redshift
, elasticsearch
, splunk
, http_endpoint
and opensearch
.
Configuration options if elasticsearch is the destination. More details are given below.
Enhanced configuration options for the s3 destination. More details are given below.
Configuration options if http_endpoint is the destination. requires the user to also specify a s3_configuration
block. More details are given below.
Allows the ability to specify the kinesis stream that is used as the source of the firehose delivery stream.
A name to identify the stream. This is unique to the AWS account and region the Stream is created in. When using for WAF logging, name must be prefixed with aws-waf-logs-
. See AWS Documentation for more details.
Configuration options if opensearch is the destination. More details are given below.
Configuration options if redshift is the destination. Using redshift_configuration
requires the user to also specify a s3_configuration
block. More details are given below.
Required for non-S3 destinations. For S3 destination, use extended_s3_configuration
instead. Configuration options for the s3 destination (or the intermediate bucket if the destination is redshift). More details are given below.
Encrypt at rest options. Server-side encryption should not be enabled when a kinesis stream is configured as the source of the firehose delivery stream.
Configuration options if splunk is the destination. More details are given below.