Press / to focus
Production Guidelines · v1.20

Apache
Flink

Distributed stream and batch processing — engineering standards for building reliable, high-throughput data pipelines at scale.

9Sections
1M+Events/sec
<10msLatency target
Replication
01

Architecture Fundamentals

Flink's runtime consists of a JobManager and one or more TaskManagers. The JobManager coordinates distributed execution while TaskManagers execute the actual operator code in slots.

sourceKafka
transformMap/Filter
windowAggregate
joinStream Join
sinkIceberg
🧩
Operator Chaining
Chain co-located operators to reduce serialization. Disable selectively via disableChaining() only when profiling confirms bottleneck.
⚖️
Parallelism Strategy
Set job-level parallelism equal to TaskManager slots. Override per-operator only for sources/sinks with partitioned I/O.
🔀
Network Buffers
Allocate ≥2 buffers per channel. For high-throughput jobs, tune taskmanager.network.memory.fraction between 0.15–0.25.
📦
Slot Sharing
Keep default slot sharing enabled unless operators have wildly different resource profiles. Isolate heavy aggregators into dedicated slot groups.
02

State Management

Choose your state backend based on state size and latency requirements. RocksDB is recommended for production workloads with large state.

Recommended for production. RocksDB stores state on disk, enabling jobs with terabytes of state that would not fit in JVM heap.
Java
// Configure RocksDB state backend
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

env.setStateBackend(new EmbeddedRocksDBStateBackend(true));

// Configure incremental checkpoints (ALWAYS enable)
RocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend();
rocksDB.setNumberOfTransferThreads(4);

// Set checkpoint storage to distributed FS
env.getCheckpointConfig()
   .setCheckpointStorage("s3://flink-checkpoints/my-job");
Use only for small state. HashMapStateBackend keeps all state in JVM heap. Suitable for <1GB total state with strict low-latency requirements.
Java
// HashMapStateBackend — in-memory, fast, small state only
env.setStateBackend(new HashMapStateBackend());

// Increase heap accordingly in flink-conf.yaml
// taskmanager.memory.managed.fraction: 0.0
// taskmanager.memory.task.heap.size: 4gb

ValueStateDescriptor<Long> desc =
    new ValueStateDescriptor<>("counter", Long.class);
ValueState<Long> count =
    getRuntimeContext().getState(desc);
  • Use TTL on all ValueState/MapState to prevent unbounded growth
  • Prefer ListState over ValueState<List> for append-heavy workloads
  • Enable compression on RocksDB SST files (ZSTD recommended)
  • ! Avoid large objects in state; prefer IDs with external lookups
  • Never store non-serializable objects — use Kryo registrations
  • Do not use broadcast state for frequently mutating reference data
03

Checkpointing

Checkpoints are Flink's mechanism for exactly-once fault tolerance. Configure them defensively — a failed checkpoint is a potential data loss event.

flink-conf.yaml
# Checkpoint interval — balance recovery time vs overhead
execution.checkpointing.interval: 60s

# Minimum pause between checkpoints
execution.checkpointing.min-pause: 30s

# Timeout — increase for large RocksDB state
execution.checkpointing.timeout: 10min

# Enable unaligned checkpoints for backpressured pipelines
execution.checkpointing.unaligned: true

# Keep 3 retained checkpoints
state.checkpoints.num-retained: 3

# Externalize on cancel
execution.checkpointing.externalized-checkpoint-retention:
  RETAIN_ON_CANCELLATION
Unaligned checkpoints caveat: While they prevent checkpoint timeouts under backpressure, they increase checkpoint size. Monitor numberOfBufferedBytes in in-flight data metrics.
💾
Savepoints vs Checkpoints
Use savepoints for planned upgrades and schema migrations. Never rely on savepoints for automatic failure recovery — that is what checkpoints are for.
📍
Changelog State Backend
Enable the Changelog state backend in Flink 1.20+ for near-continuous checkpointing with minimal impact on job latency.
04

Performance Tuning

Performance tuning in Flink involves JVM heap, managed memory, network buffers, and operator-level optimizations. Profile before tuning.

Task Heap Memory4–8 GB
Managed Memory (RocksDB)40%
Network Memory Fraction15–25%
JVM Metaspace256–512 MB
ParameterDefaultRecommendedImpact
taskmanager.numberOfTaskSlots14–8High
pipeline.object-reusefalsetrueHigh
env.java.opts.taskmanager-XX:+UseG1GCMedium
io.tmp.dirs/tmpNVMe pathMedium
akka.ask.timeout10s30sStability
Object reuse tip: Enable pipeline.object-reuse for stateless map/filter chains. It eliminates per-record allocation but requires operators to not hold references across calls.
05

Connectors

Use the Flink Connector ecosystem for certified, maintained integrations. Avoid custom source/sink implementations unless absolutely required.

ConnectorDeliveryScalabilityNotes
KafkaExactly-onceHighPrefer KafkaSource API over legacy FlinkKafkaConsumer
KinesisExactly-onceMediumShard-limited; use enhanced fan-out for low latency
JDBCAt-least-onceLowBatch writes; avoid as hot-path sink
IcebergExactly-onceHighRecommended for lake ingestion with ACID semantics
ElasticsearchAt-least-onceMediumUse bulkFlushMaxActions ≥ 500 for throughput
Java — Kafka Source
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("broker:9092")
    .setTopics("orders")
    .setGroupId("flink-consumer")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source,
    WatermarkStrategy
        .<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((e, t) -> parseTs(e)),
    "Kafka Orders Source");
06

Fault Tolerance

Design for failure. Configure restart strategies that match your SLA and understand the trade-offs between recovery speed and checkpoint frequency.

🔁
Exponential Backoff Restart
Use ExponentialDelayRestartStrategyConfiguration in production. Initial delay 1s, max delay 5min, jitter factor 0.1.
🛡️
Exception Classification
Classify exceptions as NonRecoverableException when they indicate data corruption. Don't retry infinitely on schema errors.
🔬
Chaos Testing
Simulate TaskManager failures monthly in staging. Verify checkpoint recovery under load. Use Flink's REST API to inject failures programmatically.
📡
HA Configuration
Always run ZooKeeper or Kubernetes HA for JobManager. Single JobManager is a single point of failure — unacceptable for production.
Exactly-once sinks: End-to-end exactly-once semantics require both the source (e.g., Kafka) AND the sink to support transactions. A transactional sink that crashes mid-commit requires the two-phase commit protocol. Verify your sink supports TwoPhaseCommitSinkFunction.
07

Monitoring & Observability

Export Flink metrics to Prometheus. Alert on the following critical signals:

  • lastCheckpointDuration — Alert if >50% of checkpoint interval. Indicates state growth or I/O bottleneck.
  • numberOfFailedCheckpoints — Any non-zero value in production is a critical event.
  • currentInputWatermark — Monitor lag vs. wall-clock to detect watermark stalls from idle partitions.
  • backPressuredTimeMsPerSecond — Alert at >500ms/s. Indicates downstream saturation; scale or tune the slow operator.
  • !
    numRecordsOutPerSecond — Establish baseline and alert on >40% drop within a 5-minute window.
  • !
    JVM GC pause time — P99 GC pause >200ms causes checkpoint timeouts. Switch to ZGC on JDK 17+.
flink-conf.yaml — Prometheus
metrics.reporter.prom.factory.class:
  org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249
metrics.reporter.prom.interval: 15 SECONDS

# Include job-level labels for Grafana dashboards
metrics.scope.jm.job: <host>.jobmanager.<job_name>
metrics.scope.tm.task: <host>.taskmanager.<tm_id>.<job_name>.<task_name>
08

Security

Flink clusters process sensitive data and must be hardened in production environments. At minimum, enforce the following controls.

🔐
SSL/TLS Everywhere
Enable security.ssl.internal.enabled: true for all internal cluster communication. Use mutual TLS between JobManager and TaskManagers.
🎫
Kerberos Auth
For Hadoop/HDFS/Kafka in secured clusters, configure Kerberos keytabs. Rotate keytabs before expiry using credential refresh callbacks.
🔒
REST API Security
Never expose the Flink Web UI / REST API publicly. Place behind VPN or use Kubernetes NetworkPolicy to restrict access to internal subnets only.
📋
Secret Management
Never hard-code credentials in job code or config files. Use Flink's ConfigurationDecryptor SPI or inject secrets via Kubernetes Secrets mounted as env vars.
Kubernetes deployment: Run TaskManagers as non-root. Set securityContext.runAsNonRoot: true and securityContext.readOnlyRootFilesystem: true in pod specs. Mount tmp dirs as emptyDir volumes.
09

Deployment Checklist

Before promoting any Flink job to production, verify all items:

  • State backend configured to RocksDB with incremental checkpointing
  • Checkpoint storage points to durable object store (S3/GCS/ADLS)
  • Restart strategy set (not default fixed-delay)
  • Prometheus metrics reporter configured and scraped
  • Watermark strategy set with appropriate out-of-orderness bound
  • All state descriptors use registered serializers (no Kryo fallback)
  • State TTL configured on all keyed state
  • SSL enabled for internal cluster communication
  • Web UI / REST API not exposed to public internet
  • Load tested at 2× expected peak throughput in staging