A comprehensive deep-dive into caching strategies, data partitioning, consistency models, and resilience patterns for building systems that scale to millions.
The immovable laws and hard constraints every architect must internalize.
A distributed system can guarantee at most two of the three properties simultaneously:
Since network partitions are inevitable in real distributed systems, you must choose between CP (HBase, Zookeeper, etcd) and AP (Cassandra, DynamoDB, CouchDB) systems. CA systems exist only in single-node or perfect-network scenarios.
Modern view: PACELC extends CAP — even when the network is stable, there's a trade-off between latency and consistency. A lower-latency system often returns slightly stale data.
Eight assumptions that kill systems in production (Deutsch, 1994):
The network is reliable
Latency is zero
Bandwidth is infinite
The network is secure
Topology doesn't change
There is one administrator
Transport cost is zero
The network is homogeneous
Amdahl's Law — Parallel speedup ceiling:
where S = serial fraction, N = number of processors
Little's Law — Queue depth:
L = avg items in system, λ = arrival rate, W = avg wait time
Availability Composition:
Each dependency multiplies downtime risk
Redundant Availability:
Redundancy dramatically raises overall availability
Caching is the single highest-leverage latency reduction technique in distributed systems.
Application code manages the cache manually. On a miss, load from DB and populate the cache. Most common pattern.
Pros: Only requested data is cached; resilient to cache failure.
Cons: Cold start penalty; potential for stale data.
Write to cache and DB simultaneously. Cache is always in sync with the database.
Pros: No stale reads after write.
Cons: Write latency doubled; unused data clutters cache.
Write to cache immediately, async-flush to DB later. Dramatically reduces write latency for bursty workloads.
Risk: Data loss if cache crashes before flush. Requires durable queues (WAL / Kafka) as buffer.
Cache sits in front of DB; on a miss, the cache (not app code) fetches data. Clean separation of concerns.
Popular in CDN and HTTP proxy setups. The cache is the single interface — no dual-path logic in application code.
How entries are removed when the cache is full:
Least Recently Used — evict coldest entry
Least Frequently Used — evict rarest access
Adaptive Replacement Cache — self-tuning
Time-To-Live — expiry-based eviction
Segmented LRU — hot/warm/cold tiers
Many requests miss simultaneously and overwhelm DB. Fix: probabilistic early expiry or mutex locks.
Requests for non-existent keys bypass cache every time. Fix: Bloom filters or null-value caching.
Many keys expire at once causing a DB spike. Fix: jittered TTLs and staggered seeding.
import random, time def get_with_xlru(key, ttl, recompute_fn): # XFetch / probabilistic early recomputation entry = cache.get(key) if entry is not None: value, expiry, delta = entry # Recompute early with probability proportional to staleness now = time.time() if now - delta * log(random.random()) >= expiry: pass # fall through and recompute else: return value start = time.time() value = recompute_fn() # expensive DB call delta = time.time() - start cache.set(key, (value, time.time() + ttl, delta), ex=ttl) return value
| Strategy | Read Latency | Write Latency | Consistency | Complexity | Best For |
|---|---|---|---|---|---|
| Cache-Aside | Low (hit) | Normal | Eventual | Medium | Read-heavy workloads |
| Write-Through | Low | 2× DB | Strong | Medium | Read-after-write critical |
| Write-Behind | Low | Very Low | Weak | High | Write-heavy, tolerate risk |
| Read-Through | Low (hit) | Normal | Eventual | Low | CDN, proxy layers |
Split data across nodes to overcome single-machine limits on storage, memory, and throughput.
Keys are divided into ordered ranges. Each shard owns a contiguous segment.
Example: HBase, MongoDB range sharding, PostgreSQL declarative partitioning.
Hotspot risk: Sequential inserts (timestamps) overwhelm the latest shard.
A hash function distributes keys uniformly across shards, eliminating hotspots for uniform access patterns.
Trade-off: Range queries become scatter-gather across all shards. Good for KV stores like Cassandra.
Map both nodes and keys to a ring. Only K/N keys are remapped when a node joins or leaves (not a full reshuffle).
Used by Cassandra, DynamoDB, Riak, and Memcached. Virtual nodes (vnodes) further smooth the load distribution.
A lookup service (shard map) tracks which shard owns each key. Maximum flexibility — resharding is just metadata update.
The directory becomes a critical dependency. Must be highly available, fast, and durable (often stored in Zookeeper or etcd).
Append a random suffix (e.g., user:7742:3) to spread writes across shards, then scatter-gather on reads.
Celebrity / celebrity-key problem: partition a single viral entity across N shards using a fan-out writer.
Include time component in partition key to avoid all writes landing on today's shard (e.g., event:2024-W21:hash).
Queries touching multiple shards fan out, then the coordinator merges results. Increases tail latency significantly.
Typically avoided. Co-location (keeping related data on the same shard) is the standard answer.
Double-write both old and new shards, verify consistency, cut over. Minimize impact with gradual traffic migration.
type Ring struct { replicas int // virtual nodes per physical node sorted []int // sorted hash ring positions hashMap map[int]string // position -> node address } func (r *Ring) AddNode(node string) { for i := 0; i < r.replicas; i++ { key := hash(fmt.Sprintf("%s#%d", node, i)) r.sorted = insertSorted(r.sorted, key) r.hashMap[key] = node } } func (r *Ring) Lookup(k string) string { h := hash(k) idx := binarySearchCeil(r.sorted, h) // clockwise next node return r.hashMap[r.sorted[idx % len(r.sorted)]] }
Understanding what "correct" means for distributed state is the foundation of every database design decision.
| Model | Guarantee | Latency Impact | Examples | Use When |
|---|---|---|---|---|
| Linearizability | Every operation appears atomic and ordered globally; reads always see latest write | High | etcd, Zookeeper, Spanner | Locks, leader election, financial balances |
| Sequential Consistency | All operations appear in some global order; each process's ops are in program order | Medium | Early CPUs, some DBs | When global wall-clock ordering isn't required |
| Causal Consistency | Causally related ops appear in order. Concurrent ops may be seen differently per node | Low-Medium | MongoDB sessions, COPS | Social feeds, collaborative editing |
| Read-Your-Writes | You always see your own writes; others may not immediately | Low | Cassandra LOCAL_QUORUM, DynamoDB | User profile updates, preferences |
| Eventual Consistency | If no new writes occur, all replicas converge to the same value eventually | Lowest | DNS, Cassandra ALL-relaxed, S3 | View counts, analytics, carts |
| Monotonic Read | Once you read a value, subsequent reads never return older values | Low | Sticky sessions, client routing | Time-series, log tailing |
Raft (used by etcd, CockroachDB, TiKV) — Designed for understandability:
Random election timeouts; first to get majority votes becomes leader.
Leader appends entries; commits once majority acknowledges.
A node cannot win election without all committed entries — no data loss.
Paxos — Theoretical foundation but notoriously hard to implement correctly. Multi-Paxos, Fast Paxos, and Flexible Paxos trade latency for quorum flexibility.
Data structures that can be concurrently updated on multiple replicas and automatically merged without conflicts.
Used in: Riak, Redis CRDT module, collaborative editors (Figma, Google Docs), shopping carts.
CRDTs guarantee strong eventual consistency — all correct replicas that have received all updates are in the same state.
In a system with N replicas, choose W (write quorum) and R (read quorum) such that:
Example: N=3, W=2, R=2 — classic quorum. Tolerates 1 failure with both reads and writes.
Setting W=1, R=N gives high write availability; W=N, R=1 gives strong durability.
async def two_phase_commit(coordinator, participants, txn): # ── Phase 1: Prepare ────────────────────────────── votes = await asyncio.gather(*[ p.prepare(txn) for p in participants ], return_exceptions=True) if all(v == "YES" for v in votes): # ── Phase 2: Commit ─────────────────────────────── coordinator.log_decision("COMMIT") # durable WAL write await asyncio.gather(*[ p.commit(txn) for p in participants ]) else: coordinator.log_decision("ABORT") await asyncio.gather(*[ p.rollback(txn) for p in participants ]) # ⚠️ 2PC is blocking: if coordinator crashes after COMMIT log # but before sending, participants are stuck. # 3PC and Paxos Commit address this but add round-trips.
Systems fail. Design for failure as a first-class concern, not an afterthought.
Three states: Closed (normal), Open (failing, fast-fail), Half-Open (probe recovery).
Prevents cascading failures by stopping calls to a degraded service. Exponential backoff gates transition back to Closed.
Automatically retry transient failures. Exponential backoff prevents thundering herd. Jitter (±random %) prevents synchronized retry storms.
Isolate resources (thread pools, connection pools, semaphores) per consumer/service. One slow dependency cannot exhaust shared resources and bring down everything else.
Named after watertight compartments in a ship — a breach in one bulkhead doesn't sink the vessel.
Every network call must have a timeout. Critically, propagate a context deadline down the entire call chain so downstream services abort work that the client will never use.
A timeout without deadline propagation causes "orphan work" — resources consumed by abandoned requests.
Protect services from overload. Common algorithms:
Load shedding actively drops low-priority work under pressure to preserve capacity for high-priority requests.
Return a cached response, default value, or reduced-feature response when the primary path fails. Always prefer partial availability over hard failure.
Design user experiences around degraded states: "Recommendations unavailable, showing popular items instead."
-- KEYS[1] = bucket key, ARGV[1] = capacity, ARGV[2] = refill/sec, ARGV[3] = requested tokens local key = KEYS[1] local capacity = tonumber(ARGV[1]) local refill = tonumber(ARGV[2]) local requested = tonumber(ARGV[3]) local now = redis.call('TIME') local t = now[1] + now[2] / 1e6 local last = tonumber(redis.call('HGET', key, 'last')) or t local tokens = tonumber(redis.call('HGET', key, 'tokens')) or capacity -- Refill proportional to elapsed time tokens = math.min(capacity, tokens + (t - last) * refill) if tokens >= requested then redis.call('HMSET', key, 'tokens', tokens - requested, 'last', t) redis.call('EXPIRE', key, 3600) return 1 -- allowed else redis.call('HMSET', key, 'tokens', tokens, 'last', t) return 0 -- throttled end
High-level blueprints for structuring distributed systems with clear trade-offs.
Manage long-running transactions across microservices without distributed locks. Each step publishes an event; failures trigger compensating transactions.
Services react to events with no central coordinator. Simple but hard to trace/debug at scale.
A Saga Orchestrator drives the workflow, calling each service in turn. Easier to observe; single point of failure risk.
Sagas guarantee eventual consistency, not ACID atomicity. Design compensating transactions to be idempotent.
Event Sourcing: Never mutate state — store an immutable sequence of facts. Rebuild current state by replaying the event log.
CQRS: Separate read and write models. Write side handles commands and emits events; read side maintains denormalized projections optimized for queries.
The event log is your audit trail, time-travel debugger, and the source of truth. Read models are derived and discardable.
Atomically write a DB record and an outbox event in the same local transaction. A relay process (Debezium CDC) polls the outbox and publishes to the message broker. Eliminates dual-write problems.
-- Atomic: order write + outbox in same txn BEGIN; INSERT INTO orders (id, ...) VALUES (...); INSERT INTO outbox (event_type, payload) VALUES ('OrderPlaced', '{"id":42,...}'); COMMIT;
For mutual exclusion across services. Options ranked by safety:
Lock majority of N Redis masters. Watch for clock skew; GC pauses can cause false expiry.
Strongly consistent; use ephemeral nodes + watches for lease-based locks. Safer than Redis.
SELECT FOR UPDATE on a row. Simple, safe, but limited to a single DB instance.
You cannot run distributed systems without deep observability. Instrument everything.
Aggregated numerical data over time — CPU, request rate, error rate, latency percentiles (p50/p95/p99). Prometheus + Grafana.
Structured (JSON) timestamped events. Correlation IDs link log lines across services. ELK / Loki stack.
End-to-end request journey across services with spans. Identify bottlenecks in complex call graphs. Jaeger / Zipkin / Tempo.
SLI — The actual metric (e.g., request success rate, p99 latency).
SLO — Target for the SLI (e.g., "99.9% requests <200ms over 30 days").
Error Budget = 1 − SLO. The amount of unreliability you're allowed to spend on risky deployments. Once exhausted, freeze releases.
Error budgets align engineering incentives — reliability work gets prioritized automatically when budgets burn down.
USE (Brendan Gregg — for resources):
RED (Tom Wilkie — for services):
RED aligns directly with the user experience. Always instrument your services with RED metrics at every layer.
| Failure Type | Detection Signal | Mitigation | Recovery |
|---|---|---|---|
| Node Crash | Missing heartbeat, health check failure | Replication, redundancy | Automatic failover via leader election |
| Network Partition | Increased timeouts, split-brain detection | Fencing tokens, majority quorum | Manual resolution or automatic merge (CRDT) |
| Slow Service (Grey Failure) | p99 latency spike, queue depth rising | Timeout, circuit breaker, bulkhead | Auto-scaling, load shedding |
| Cascade Failure | Error rate propagating upstream | Circuit breaker, fallback | Gradual traffic ramp-up |
| Data Corruption | Checksum mismatch, unexpected values | Checksums, replication | Restore from last good snapshot + WAL replay |
Chaos Engineering — Deliberately inject failures in production (or staging) to find weaknesses before users do. Netflix Chaos Monkey pioneered this. Modern tools: Gremlin, Chaos Mesh, Litmus. The goal is building confidence in the system's resiliency through empirical validation — not hoping it works.