Architecture Fundamentals
Flink's runtime consists of a JobManager and one or more TaskManagers. The JobManager coordinates distributed execution while TaskManagers execute the actual operator code in slots.
disableChaining() only when profiling confirms bottleneck.taskmanager.network.memory.fraction between 0.15–0.25.State Management
Choose your state backend based on state size and latency requirements. RocksDB is recommended for production workloads with large state.
// Configure RocksDB state backend StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // Configure incremental checkpoints (ALWAYS enable) RocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend(); rocksDB.setNumberOfTransferThreads(4); // Set checkpoint storage to distributed FS env.getCheckpointConfig() .setCheckpointStorage("s3://flink-checkpoints/my-job");
// HashMapStateBackend — in-memory, fast, small state only env.setStateBackend(new HashMapStateBackend()); // Increase heap accordingly in flink-conf.yaml // taskmanager.memory.managed.fraction: 0.0 // taskmanager.memory.task.heap.size: 4gb ValueStateDescriptor<Long> desc = new ValueStateDescriptor<>("counter", Long.class); ValueState<Long> count = getRuntimeContext().getState(desc);
- Use TTL on all ValueState/MapState to prevent unbounded growth
- Prefer
ListStateoverValueState<List>for append-heavy workloads - Enable compression on RocksDB SST files (
ZSTDrecommended) - Avoid large objects in state; prefer IDs with external lookups
- Never store non-serializable objects — use Kryo registrations
- Do not use broadcast state for frequently mutating reference data
Checkpointing
Checkpoints are Flink's mechanism for exactly-once fault tolerance. Configure them defensively — a failed checkpoint is a potential data loss event.
# Checkpoint interval — balance recovery time vs overhead execution.checkpointing.interval: 60s # Minimum pause between checkpoints execution.checkpointing.min-pause: 30s # Timeout — increase for large RocksDB state execution.checkpointing.timeout: 10min # Enable unaligned checkpoints for backpressured pipelines execution.checkpointing.unaligned: true # Keep 3 retained checkpoints state.checkpoints.num-retained: 3 # Externalize on cancel execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
numberOfBufferedBytes in in-flight data metrics.Performance Tuning
Performance tuning in Flink involves JVM heap, managed memory, network buffers, and operator-level optimizations. Profile before tuning.
| Parameter | Default | Recommended | Impact |
|---|---|---|---|
taskmanager.numberOfTaskSlots | 1 | 4–8 | High |
pipeline.object-reuse | false | true | High |
env.java.opts.taskmanager | — | -XX:+UseG1GC | Medium |
io.tmp.dirs | /tmp | NVMe path | Medium |
akka.ask.timeout | 10s | 30s | Stability |
pipeline.object-reuse for stateless map/filter chains. It eliminates per-record allocation but requires operators to not hold references across calls.Connectors
Use the Flink Connector ecosystem for certified, maintained integrations. Avoid custom source/sink implementations unless absolutely required.
| Connector | Delivery | Scalability | Notes |
|---|---|---|---|
| Kafka | Exactly-once | High | Prefer KafkaSource API over legacy FlinkKafkaConsumer |
| Kinesis | Exactly-once | Medium | Shard-limited; use enhanced fan-out for low latency |
| JDBC | At-least-once | Low | Batch writes; avoid as hot-path sink |
| Iceberg | Exactly-once | High | Recommended for lake ingestion with ACID semantics |
| Elasticsearch | At-least-once | Medium | Use bulkFlushMaxActions ≥ 500 for throughput |
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("broker:9092") .setTopics("orders") .setGroupId("flink-consumer") .setStartingOffsets(OffsetsInitializer.committedOffsets( OffsetResetStrategy.EARLIEST)) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy .<String>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((e, t) -> parseTs(e)), "Kafka Orders Source");
Fault Tolerance
Design for failure. Configure restart strategies that match your SLA and understand the trade-offs between recovery speed and checkpoint frequency.
ExponentialDelayRestartStrategyConfiguration in production. Initial delay 1s, max delay 5min, jitter factor 0.1.NonRecoverableException when they indicate data corruption. Don't retry infinitely on schema errors.TwoPhaseCommitSinkFunction.Monitoring & Observability
Export Flink metrics to Prometheus. Alert on the following critical signals:
- lastCheckpointDuration — Alert if >50% of checkpoint interval. Indicates state growth or I/O bottleneck.
- numberOfFailedCheckpoints — Any non-zero value in production is a critical event.
- currentInputWatermark — Monitor lag vs. wall-clock to detect watermark stalls from idle partitions.
- backPressuredTimeMsPerSecond — Alert at >500ms/s. Indicates downstream saturation; scale or tune the slow operator.
- numRecordsOutPerSecond — Establish baseline and alert on >40% drop within a 5-minute window.
- JVM GC pause time — P99 GC pause >200ms causes checkpoint timeouts. Switch to ZGC on JDK 17+.
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory metrics.reporter.prom.port: 9249 metrics.reporter.prom.interval: 15 SECONDS # Include job-level labels for Grafana dashboards metrics.scope.jm.job: <host>.jobmanager.<job_name> metrics.scope.tm.task: <host>.taskmanager.<tm_id>.<job_name>.<task_name>
Security
Flink clusters process sensitive data and must be hardened in production environments. At minimum, enforce the following controls.
security.ssl.internal.enabled: true for all internal cluster communication. Use mutual TLS between JobManager and TaskManagers.ConfigurationDecryptor SPI or inject secrets via Kubernetes Secrets mounted as env vars.securityContext.runAsNonRoot: true and securityContext.readOnlyRootFilesystem: true in pod specs. Mount tmp dirs as emptyDir volumes.Deployment Checklist
Before promoting any Flink job to production, verify all items:
- State backend configured to RocksDB with incremental checkpointing
- Checkpoint storage points to durable object store (S3/GCS/ADLS)
- Restart strategy set (not default fixed-delay)
- Prometheus metrics reporter configured and scraped
- Watermark strategy set with appropriate out-of-orderness bound
- All state descriptors use registered serializers (no Kryo fallback)
- State TTL configured on all keyed state
- SSL enabled for internal cluster communication
- Web UI / REST API not exposed to public internet
- Load tested at 2× expected peak throughput in staging