Cluster
Manages an Amazon MSK cluster.
Note: This resource manages provisioned clusters. To manage a serverless Amazon MSK cluster, use the
aws.msk.ServerlessCluster
resource.
Example Usage
Basic
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.ec2.Vpc;
import com.pulumi.aws.ec2.VpcArgs;
import com.pulumi.aws.AwsFunctions;
import com.pulumi.aws.inputs.GetAvailabilityZonesArgs;
import com.pulumi.aws.ec2.Subnet;
import com.pulumi.aws.ec2.SubnetArgs;
import com.pulumi.aws.ec2.SecurityGroup;
import com.pulumi.aws.ec2.SecurityGroupArgs;
import com.pulumi.aws.kms.Key;
import com.pulumi.aws.kms.KeyArgs;
import com.pulumi.aws.cloudwatch.LogGroup;
import com.pulumi.aws.s3.BucketV2;
import com.pulumi.aws.s3.BucketAclV2;
import com.pulumi.aws.s3.BucketAclV2Args;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamS3ConfigurationArgs;
import com.pulumi.aws.msk.Cluster;
import com.pulumi.aws.msk.ClusterArgs;
import com.pulumi.aws.msk.inputs.ClusterBrokerNodeGroupInfoArgs;
import com.pulumi.aws.msk.inputs.ClusterBrokerNodeGroupInfoStorageInfoArgs;
import com.pulumi.aws.msk.inputs.ClusterBrokerNodeGroupInfoStorageInfoEbsStorageInfoArgs;
import com.pulumi.aws.msk.inputs.ClusterEncryptionInfoArgs;
import com.pulumi.aws.msk.inputs.ClusterOpenMonitoringArgs;
import com.pulumi.aws.msk.inputs.ClusterOpenMonitoringPrometheusArgs;
import com.pulumi.aws.msk.inputs.ClusterOpenMonitoringPrometheusJmxExporterArgs;
import com.pulumi.aws.msk.inputs.ClusterOpenMonitoringPrometheusNodeExporterArgs;
import com.pulumi.aws.msk.inputs.ClusterLoggingInfoArgs;
import com.pulumi.aws.msk.inputs.ClusterLoggingInfoBrokerLogsArgs;
import com.pulumi.aws.msk.inputs.ClusterLoggingInfoBrokerLogsCloudwatchLogsArgs;
import com.pulumi.aws.msk.inputs.ClusterLoggingInfoBrokerLogsFirehoseArgs;
import com.pulumi.aws.msk.inputs.ClusterLoggingInfoBrokerLogsS3Args;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var vpc = new Vpc("vpc", VpcArgs.builder()
.cidrBlock("192.168.0.0/22")
.build());
final var azs = AwsFunctions.getAvailabilityZones(GetAvailabilityZonesArgs.builder()
.state("available")
.build());
var subnetAz1 = new Subnet("subnetAz1", SubnetArgs.builder()
.availabilityZone(azs.applyValue(getAvailabilityZonesResult -> getAvailabilityZonesResult.names()[0]))
.cidrBlock("192.168.0.0/24")
.vpcId(vpc.id())
.build());
var subnetAz2 = new Subnet("subnetAz2", SubnetArgs.builder()
.availabilityZone(azs.applyValue(getAvailabilityZonesResult -> getAvailabilityZonesResult.names()[1]))
.cidrBlock("192.168.1.0/24")
.vpcId(vpc.id())
.build());
var subnetAz3 = new Subnet("subnetAz3", SubnetArgs.builder()
.availabilityZone(azs.applyValue(getAvailabilityZonesResult -> getAvailabilityZonesResult.names()[2]))
.cidrBlock("192.168.2.0/24")
.vpcId(vpc.id())
.build());
var sg = new SecurityGroup("sg", SecurityGroupArgs.builder()
.vpcId(vpc.id())
.build());
var kms = new Key("kms", KeyArgs.builder()
.description("example")
.build());
var test = new LogGroup("test");
var bucket = new BucketV2("bucket");
var bucketAcl = new BucketAclV2("bucketAcl", BucketAclV2Args.builder()
.bucket(bucket.id())
.acl("private")
.build());
final var assumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("firehose.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var firehoseRole = new Role("firehoseRole", RoleArgs.builder()
.assumeRolePolicy(assumeRole.applyValue(getPolicyDocumentResult -> getPolicyDocumentResult.json()))
.build());
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.destination("s3")
.s3Configuration(FirehoseDeliveryStreamS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.build())
.tags(Map.of("LogDeliveryEnabled", "placeholder"))
.build());
var example = new Cluster("example", ClusterArgs.builder()
.kafkaVersion("3.2.0")
.numberOfBrokerNodes(3)
.brokerNodeGroupInfo(ClusterBrokerNodeGroupInfoArgs.builder()
.instanceType("kafka.m5.large")
.clientSubnets(
subnetAz1.id(),
subnetAz2.id(),
subnetAz3.id())
.storageInfo(ClusterBrokerNodeGroupInfoStorageInfoArgs.builder()
.ebsStorageInfo(ClusterBrokerNodeGroupInfoStorageInfoEbsStorageInfoArgs.builder()
.volumeSize(1000)
.build())
.build())
.securityGroups(sg.id())
.build())
.encryptionInfo(ClusterEncryptionInfoArgs.builder()
.encryptionAtRestKmsKeyArn(kms.arn())
.build())
.openMonitoring(ClusterOpenMonitoringArgs.builder()
.prometheus(ClusterOpenMonitoringPrometheusArgs.builder()
.jmxExporter(ClusterOpenMonitoringPrometheusJmxExporterArgs.builder()
.enabledInBroker(true)
.build())
.nodeExporter(ClusterOpenMonitoringPrometheusNodeExporterArgs.builder()
.enabledInBroker(true)
.build())
.build())
.build())
.loggingInfo(ClusterLoggingInfoArgs.builder()
.brokerLogs(ClusterLoggingInfoBrokerLogsArgs.builder()
.cloudwatchLogs(ClusterLoggingInfoBrokerLogsCloudwatchLogsArgs.builder()
.enabled(true)
.logGroup(test.name())
.build())
.firehose(ClusterLoggingInfoBrokerLogsFirehoseArgs.builder()
.enabled(true)
.deliveryStream(testStream.name())
.build())
.s3(ClusterLoggingInfoBrokerLogsS3Args.builder()
.enabled(true)
.bucket(bucket.id())
.prefix("logs/msk-")
.build())
.build())
.build())
.tags(Map.of("foo", "bar"))
.build());
ctx.export("zookeeperConnectString", example.zookeeperConnectString());
ctx.export("bootstrapBrokersTls", example.bootstrapBrokersTls());
}
}
With volume_throughput argument
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.msk.Cluster;
import com.pulumi.aws.msk.ClusterArgs;
import com.pulumi.aws.msk.inputs.ClusterBrokerNodeGroupInfoArgs;
import com.pulumi.aws.msk.inputs.ClusterBrokerNodeGroupInfoStorageInfoArgs;
import com.pulumi.aws.msk.inputs.ClusterBrokerNodeGroupInfoStorageInfoEbsStorageInfoArgs;
import com.pulumi.aws.msk.inputs.ClusterBrokerNodeGroupInfoStorageInfoEbsStorageInfoProvisionedThroughputArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Cluster("example", ClusterArgs.builder()
.kafkaVersion("2.7.1")
.numberOfBrokerNodes(3)
.brokerNodeGroupInfo(ClusterBrokerNodeGroupInfoArgs.builder()
.instanceType("kafka.m5.4xlarge")
.clientSubnets(
aws_subnet.subnet_az1().id(),
aws_subnet.subnet_az2().id(),
aws_subnet.subnet_az3().id())
.storageInfo(ClusterBrokerNodeGroupInfoStorageInfoArgs.builder()
.ebsStorageInfo(ClusterBrokerNodeGroupInfoStorageInfoEbsStorageInfoArgs.builder()
.provisionedThroughput(ClusterBrokerNodeGroupInfoStorageInfoEbsStorageInfoProvisionedThroughputArgs.builder()
.enabled(true)
.volumeThroughput(250)
.build())
.volumeSize(1000)
.build())
.build())
.securityGroups(aws_security_group.sg().id())
.build())
.build());
}
}
Import
MSK clusters can be imported using the cluster arn
, e.g.,
$ pulumi import aws:msk/cluster:Cluster example arn:aws:kafka:us-west-2:123456789012:cluster/example/279c0212-d057-4dba-9aa9-1c4e5a25bfc7-3
Properties
Comma separated list of one or more hostname:port pairs of kafka brokers suitable to bootstrap connectivity to the kafka cluster. Contains a value if encryption_info.0.encryption_in_transit.0.client_broker
is set to PLAINTEXT
or TLS_PLAINTEXT
. The resource sorts values alphabetically. AWS may not always return all endpoints so this value is not guaranteed to be stable across applies.
One or more DNS names (or IP addresses) and SASL IAM port pairs. For example, b-1-public.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9198,b-2-public.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9198,b-3-public.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9198
. This attribute will have a value if encryption_info.0.encryption_in_transit.0.client_broker
is set to TLS_PLAINTEXT
or TLS
and client_authentication.0.sasl.0.iam
is set to true
and broker_node_group_info.0.connectivity_info.0.public_access.0.type
is set to SERVICE_PROVIDED_EIPS
and the cluster fulfill all other requirements for public access. The resource sorts the list alphabetically. AWS may not always return all endpoints so the values may not be stable across applies.
One or more DNS names (or IP addresses) and SASL SCRAM port pairs. For example, b-1-public.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9196,b-2-public.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9196,b-3-public.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9196
. This attribute will have a value if encryption_info.0.encryption_in_transit.0.client_broker
is set to TLS_PLAINTEXT
or TLS
and client_authentication.0.sasl.0.scram
is set to true
and broker_node_group_info.0.connectivity_info.0.public_access.0.type
is set to SERVICE_PROVIDED_EIPS
and the cluster fulfill all other requirements for public access. The resource sorts the list alphabetically. AWS may not always return all endpoints so the values may not be stable across applies.
One or more DNS names (or IP addresses) and TLS port pairs. For example, b-1-public.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9194,b-2-public.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9194,b-3-public.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9194
. This attribute will have a value if encryption_info.0.encryption_in_transit.0.client_broker
is set to TLS_PLAINTEXT
or TLS
and broker_node_group_info.0.connectivity_info.0.public_access.0.type
is set to SERVICE_PROVIDED_EIPS
and the cluster fulfill all other requirements for public access. The resource sorts the list alphabetically. AWS may not always return all endpoints so the values may not be stable across applies.
One or more DNS names (or IP addresses) and SASL IAM port pairs. For example, b-1.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9098,b-2.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9098,b-3.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9098
. This attribute will have a value if encryption_info.0.encryption_in_transit.0.client_broker
is set to TLS_PLAINTEXT
or TLS
and client_authentication.0.sasl.0.iam
is set to true
. The resource sorts the list alphabetically. AWS may not always return all endpoints so the values may not be stable across applies.
One or more DNS names (or IP addresses) and SASL SCRAM port pairs. For example, b-1.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9096,b-2.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9096,b-3.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9096
. This attribute will have a value if encryption_info.0.encryption_in_transit.0.client_broker
is set to TLS_PLAINTEXT
or TLS
and client_authentication.0.sasl.0.scram
is set to true
. The resource sorts the list alphabetically. AWS may not always return all endpoints so the values may not be stable across applies.
One or more DNS names (or IP addresses) and TLS port pairs. For example, b-1.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9094,b-2.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9094,b-3.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9094
. This attribute will have a value if encryption_info.0.encryption_in_transit.0.client_broker
is set to TLS_PLAINTEXT
or TLS
. The resource sorts the list alphabetically. AWS may not always return all endpoints so the values may not be stable across applies.
Configuration block for the broker nodes of the Kafka cluster.
Configuration block for specifying a client authentication. See below.
Name of the MSK cluster.
Configuration block for specifying a MSK Configuration to attach to Kafka brokers. See below.
Current version of the MSK Cluster used for updates, e.g., K13V1IB3VIYZZH
Configuration block for specifying encryption. See below.
Specify the desired enhanced MSK CloudWatch monitoring level. See Monitoring Amazon MSK with Amazon CloudWatch
Specify the desired Kafka software version.
Configuration block for streaming broker logs to Cloudwatch/S3/Kinesis Firehose. See below.
The desired total number of broker nodes in the kafka cluster. It must be a multiple of the number of specified client subnets.
Configuration block for JMX and Node monitoring for the MSK cluster. See below.
Controls storage mode for supported storage tiers. Valid values are: LOCAL
or TIERED
.
A comma separated list of one or more hostname:port pairs to use to connect to the Apache Zookeeper cluster. The returned values are sorted alphabetically. The AWS API may not return all endpoints, so this value is not guaranteed to be stable across applies.
A comma separated list of one or more hostname:port pairs to use to connect to the Apache Zookeeper cluster via TLS. The returned values are sorted alphabetically. The AWS API may not return all endpoints, so this value is not guaranteed to be stable across applies.