Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "io"
9 : "sync"
10 : "sync/atomic"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/record"
16 : "github.com/cockroachdb/pebble/vfs"
17 : "github.com/prometheus/client_golang/prometheus"
18 : )
19 :
20 : // recordQueueEntry is an entry in recordQueue.
21 : type recordQueueEntry struct {
22 : p []byte
23 : opts SyncOptions
24 : refCount RefCount
25 : writeStartUnixNanos int64
26 : }
27 :
28 : type poppedEntry struct {
29 : opts SyncOptions
30 : refCount RefCount
31 : writeStartUnixNanos int64
32 : }
33 :
34 : const initialBufferLen = 8192
35 :
36 : // recordQueue is a variable-size single-producer multiple-consumer queue. It
37 : // is not lock-free, but most operations only need mu.RLock. It needs a mutex
38 : // to grow the size, since there is no upper bound on the number of queued
39 : // records (which are all the records that are not synced, and will need to be
40 : // written again in case of failover). Additionally, it needs a mutex to
41 : // atomically grab a snapshot of the queued records and provide them to a new
42 : // LogWriter that is being switched to.
43 : type recordQueue struct {
44 : // Only held for reading for all pop operations and most push operations.
45 : // Held for writing when buffer needs to be grown or when switching to a new
46 : // writer.
47 : mu sync.RWMutex
48 :
49 : // queue is [tail, head). tail is the oldest entry and head is the index for
50 : // the next entry.
51 : //
52 : // Consumers: atomically read and write tail in pop. This is not the usual
53 : // kind of queue consumer since they already know the index that they are
54 : // popping exists, hence don't need to look at head.
55 : //
56 : // Producer: atomically reads tail in push. Writes to head.
57 : //
58 : // Based on the above we only need tail to be atomic. However, the producer
59 : // also populates entries in buffer, whose values need to be seen by the
60 : // consumers when doing a pop, which means they need to synchronize using a
61 : // release and acquire memory barrier pair, where the push does the release
62 : // and the pop does the acquire. For this reason we make head also atomic
63 : // and merge head and tail into a single atomic, so that the store of head
64 : // in push and the load of tail in pop accomplishes this release-acquire
65 : // pair.
66 : //
67 : // We initially implemented competition between multiple consumers solely
68 : // via atomic read-write of the tail using compare-and-swap (CAS). Since the
69 : // atomic read-write to tail in pop releases those buffer entries for reuse
70 : // to the producer, the consumer needs to grab the contents of
71 : // recordQueueEntry that it needs to do callbacks etc. (specifically the
72 : // contents corresponding to poppedEntry), *before* it succeeds with the
73 : // atomic read-write. This introduced a false data race in the Golang data
74 : // race detector
75 : // https://github.com/cockroachdb/pebble/issues/3380#issuecomment-1981188174.
76 : // Consider the case where the queue is [10,20), and consumer C1 is trying
77 : // to pop [10,12) and consumer C2 is trying to pop [10,14). The following
78 : // interleaving can happen:
79 : //
80 : // [C1] reads head=10, tail=20
81 : // [C2] reads head=10, tail=20
82 : // [C1] reads buffer contents [10,12) and makes local copy
83 : // [C1] CAS to make the queue [12,20)
84 : // [C2] reads buffer contents [10,14) and makes local copy, concurrently
85 : // with producer writing to 10, 11. *
86 : // [C2] CAS fails for popping [10,14), so revises to [12,14), and succeeds
87 : // in CAS. C2 only uses the contents it read into the local copy for
88 : // [12,14).
89 : //
90 : // * is a false data race since C2 is later deciding what contents it should
91 : // use from among the contents it read, based on what indices it
92 : // successfully popped. Unfortunately, we don't have a way to annotate the
93 : // code to tell the data race detector to ignore this false positive. So we
94 : // need to strengthen the synchronization to prevent such false positives.
95 : // We observe that usually a consumer will be popping a batch of entries
96 : // (based on a single successful fsync), and the number of consumers will be
97 : // small (usually 1). In comparison, producers can be highly concurrent (due
98 : // to workload concurrency). We don't want consumers to compete for a mutex
99 : // with producers, but we can afford to have multiple consumers compete for
100 : // a mutex. So we fix this false data race by using consumerMu to force
101 : // single-threaded popping.
102 : //
103 : // An alternative would be to pass the information contained in poppedEntry
104 : // to the LogWriter, so that it can pass it back when popping (so we don't
105 : // have to retrieve it from the recordQueue.buffer). We would still need
106 : // recordQueue.buffer, since writer switching needs those entries to be
107 : // replayed. We don't consider this solution for the same reason we replaced
108 : // record.pendingSyncsWithSyncQueue with
109 : // record.pendingSyncsWithHighestSyncIndex for the failoverWriter code path
110 : // -- we cannot bound the queue in the LogWriter by record.SyncConcurrency:
111 : // say SyncConcurrency was 4096, and LogWriter1's queue was full, and we
112 : // switched to LogWriter2, to which we replayed the same records and filled
113 : // up the queue. Then if LogWriter1 unblocks and pops all the 4096 entries,
114 : // the commit pipeline can send another 4096 entries, while LogWriter2 is
115 : // still blocked on trying to write and sync the previous 4096 entries. This
116 : // will overflow the queue in LogWriter2.
117 : //
118 : // All updates to headTail hold mu at least for reading. So when mu is held
119 : // for writing, there is a guarantee that headTail is not being updated.
120 : //
121 : // head is most-significant 32 bits and tail is least-significant 32 bits.
122 : headTail atomic.Uint64
123 :
124 : consumerMu sync.Mutex
125 :
126 : // Access to buffer requires at least RLock.
127 : buffer []recordQueueEntry
128 :
129 : lastTailObservedByProducer uint32
130 :
131 : // Read requires RLock.
132 : writer *record.LogWriter
133 :
134 : // When writer != nil, this is the return value of the last call to
135 : // SyncRecordGeneralized. It is updated in (a) WriteRecord calls push, using
136 : // only RLock (since WriteRecord is externally synchronized), (b)
137 : // snapshotAndSwitchWriter, using Lock. (b) excludes (a).
138 : lastLogSize int64
139 :
140 : failoverWriteAndSyncLatency prometheus.Histogram
141 : }
142 :
143 2 : func (q *recordQueue) init(failoverWriteAndSyncLatency prometheus.Histogram) {
144 2 : *q = recordQueue{
145 2 : buffer: make([]recordQueueEntry, initialBufferLen),
146 2 : failoverWriteAndSyncLatency: failoverWriteAndSyncLatency,
147 2 : }
148 2 : }
149 :
150 : // NB: externally synchronized, i.e., no concurrent push calls.
151 : func (q *recordQueue) push(
152 : p []byte,
153 : opts SyncOptions,
154 : refCount RefCount,
155 : writeStartUnixNanos int64,
156 : latestLogSizeInWriteRecord int64,
157 : latestWriterInWriteRecord *record.LogWriter,
158 2 : ) (index uint32, writer *record.LogWriter, lastLogSize int64) {
159 2 : ht := q.headTail.Load()
160 2 : h, t := unpackHeadTail(ht)
161 2 : n := int(h - t)
162 2 : m := len(q.buffer)
163 2 : if m == n {
164 1 : // Full
165 1 : m = 2 * n
166 1 : newBuffer := make([]recordQueueEntry, m)
167 1 : for i := int(t); i < int(h); i++ {
168 1 : newBuffer[i%m] = q.buffer[i%n]
169 1 : }
170 1 : q.mu.Lock()
171 1 : q.buffer = newBuffer
172 1 : q.mu.Unlock()
173 : }
174 2 : q.mu.RLock()
175 2 : q.buffer[int(h)%m] = recordQueueEntry{
176 2 : p: p,
177 2 : opts: opts,
178 2 : refCount: refCount,
179 2 : writeStartUnixNanos: writeStartUnixNanos,
180 2 : }
181 2 : // Reclaim memory for consumed entries. We couldn't do that in pop since
182 2 : // multiple consumers are popping using CAS and that immediately transfers
183 2 : // ownership to the producer.
184 2 : for i := q.lastTailObservedByProducer; i < t; i++ {
185 2 : q.buffer[int(i)%m] = recordQueueEntry{}
186 2 : }
187 2 : q.lastTailObservedByProducer = t
188 2 : q.headTail.Add(1 << headTailBits)
189 2 : writer = q.writer
190 2 : if writer == latestWriterInWriteRecord {
191 2 : // WriteRecord has written to this writer since the switch.
192 2 : q.lastLogSize = latestLogSizeInWriteRecord
193 2 : }
194 : // Else writer is a new writer that was switched to, so ignore the
195 : // latestLogSizeInWriteRecord.
196 :
197 2 : lastLogSize = q.lastLogSize
198 2 : q.mu.RUnlock()
199 2 : return h, writer, lastLogSize
200 : }
201 :
202 1 : func (q *recordQueue) length() int {
203 1 : ht := q.headTail.Load()
204 1 : h, t := unpackHeadTail(ht)
205 1 : return int(h - t)
206 1 : }
207 :
208 : // Pops all entries. Must be called only after the last push returns.
209 2 : func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
210 2 : ht := q.headTail.Load()
211 2 : h, t := unpackHeadTail(ht)
212 2 : n := int(h - t)
213 2 : if n == 0 {
214 2 : return 0, 0
215 2 : }
216 1 : return n, q.pop(h-1, err)
217 : }
218 :
219 : // Pops all entries up to and including index. The remaining queue is
220 : // [index+1, head).
221 : //
222 : // NB: we could slightly simplify to only have the latest writer be able to
223 : // pop. This would avoid the CAS below, but it seems better to reduce the
224 : // amount of queued work regardless of who has successfully written it.
225 2 : func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) {
226 2 : nowUnixNanos := time.Now().UnixNano()
227 2 : var buf [512]poppedEntry
228 2 : tailEntriesToPop := func() (t uint32, numEntriesToPop int) {
229 2 : ht := q.headTail.Load()
230 2 : _, t = unpackHeadTail(ht)
231 2 : tail := int(t)
232 2 : numEntriesToPop = int(index) - tail + 1
233 2 : return t, numEntriesToPop
234 2 : }
235 2 : q.consumerMu.Lock()
236 2 : // numEntriesToPop is a function of index and tail. The value of tail cannot
237 2 : // change since consumerMu is held.
238 2 : tail, numEntriesToPop := tailEntriesToPop()
239 2 : if numEntriesToPop <= 0 {
240 2 : q.consumerMu.Unlock()
241 2 : return 0
242 2 : }
243 2 : var b []poppedEntry
244 2 : if numEntriesToPop <= len(buf) {
245 2 : b = buf[:numEntriesToPop]
246 2 : } else {
247 1 : // Do allocation before acquiring the mutex.
248 1 : b = make([]poppedEntry, numEntriesToPop)
249 1 : }
250 2 : q.mu.RLock()
251 2 : n := len(q.buffer)
252 2 : for i := 0; i < numEntriesToPop; i++ {
253 2 : // Grab the popped entries before incrementing tail, since that will
254 2 : // release those buffer slots to the producer.
255 2 : idx := (i + int(tail)) % n
256 2 : b[i] = poppedEntry{
257 2 : opts: q.buffer[idx].opts,
258 2 : refCount: q.buffer[idx].refCount,
259 2 : writeStartUnixNanos: q.buffer[idx].writeStartUnixNanos,
260 2 : }
261 2 : }
262 : // Since tail cannot change, we don't need to do a compare-and-swap.
263 2 : q.headTail.Add(uint64(numEntriesToPop))
264 2 : q.mu.RUnlock()
265 2 : q.consumerMu.Unlock()
266 2 : addLatencySample := false
267 2 : var maxLatencyNanos int64
268 2 : for i := 0; i < numEntriesToPop; i++ {
269 2 : // Now that we've synced the entry, we can unref it to signal that we
270 2 : // will not read the written byte slice again.
271 2 : if b[i].refCount != nil {
272 2 : b[i].refCount.Unref()
273 2 : }
274 2 : if b[i].opts.Done != nil {
275 2 : numSyncsPopped++
276 2 : if err != nil {
277 1 : *b[i].opts.Err = err
278 1 : }
279 2 : b[i].opts.Done.Done()
280 2 : latency := nowUnixNanos - b[i].writeStartUnixNanos
281 2 : if !addLatencySample {
282 2 : addLatencySample = true
283 2 : maxLatencyNanos = latency
284 2 : } else if maxLatencyNanos < latency {
285 0 : maxLatencyNanos = latency
286 0 : }
287 : }
288 : }
289 2 : if addLatencySample {
290 2 : if maxLatencyNanos < 0 {
291 0 : maxLatencyNanos = 0
292 0 : }
293 2 : q.failoverWriteAndSyncLatency.Observe(float64(maxLatencyNanos))
294 : }
295 2 : return numSyncsPopped
296 : }
297 :
298 : func (q *recordQueue) snapshotAndSwitchWriter(
299 : writer *record.LogWriter,
300 : snapshotFunc func(firstIndex uint32, entries []recordQueueEntry) (logSize int64),
301 2 : ) {
302 2 : q.mu.Lock()
303 2 : defer q.mu.Unlock()
304 2 : q.writer = writer
305 2 : h, t := unpackHeadTail(q.headTail.Load())
306 2 : n := h - t
307 2 : if n > 0 {
308 2 : m := uint32(len(q.buffer))
309 2 : b := make([]recordQueueEntry, n)
310 2 : for i := t; i < h; i++ {
311 2 : b[i-t] = q.buffer[i%m]
312 2 : }
313 2 : q.lastLogSize = snapshotFunc(t, b)
314 : }
315 : }
316 :
317 : // getLastIndex is used by failoverWriter.Close.
318 2 : func (q *recordQueue) getLastIndex() (lastIndex int64) {
319 2 : h, _ := unpackHeadTail(q.headTail.Load())
320 2 : return int64(h) - 1
321 2 : }
322 :
323 : const headTailBits = 32
324 :
325 2 : func unpackHeadTail(ht uint64) (head, tail uint32) {
326 2 : const mask = 1<<headTailBits - 1
327 2 : head = uint32((ht >> headTailBits) & mask)
328 2 : tail = uint32(ht & mask)
329 2 : return head, tail
330 2 : }
331 :
332 : // Maximum number of physical log files when writing a virtual WAL. Arbitrarily
333 : // chosen value. Setting this to 2 will not simplify the code. We make this a
334 : // constant since we want a fixed size array for writer.writers.
335 : const maxPhysicalLogs = 10
336 :
337 : // failoverWriter is the implementation of Writer in failover mode. No Writer
338 : // method blocks for IO, except for Close.
339 : //
340 : // Loosely speaking, Close blocks until all records are successfully written
341 : // and synced to some log writer. Monitoring of log writer latency and errors
342 : // continues after Close is called, which means failoverWriter can be switched
343 : // to a new log writer after Close is called, to unblock Close.
344 : //
345 : // More precisely, Close does not block if there is an error in creating or
346 : // closing the latest LogWriter when close was called. This is because errors
347 : // are considered indicative of misconfiguration, and the user of
348 : // failoverWriter can dampen switching when observing errors (e.g. see
349 : // failoverMonitor), so close does not assume any liveness of calls to
350 : // switchToNewDir when such errors occur. Since the caller (see db.go) treats
351 : // an error on Writer.Close as fatal, this does mean that failoverWriter has
352 : // limited ability to mask errors (its primary task is to mask high latency).
353 : type failoverWriter struct {
354 : opts failoverWriterOpts
355 : q recordQueue
356 : mu struct {
357 : sync.Mutex
358 : // writers is protected by mu, except for updates to the
359 : // latencyAndErrorRecorder field. WriteRecord does not acquire mu, so the
360 : // protection by mu is for handling concurrent calls to switchToNewDir,
361 : // Close, and getLog.
362 : writers [maxPhysicalLogs]logWriterAndRecorder
363 :
364 : // cond is signaled when the latest LogWriter is set in writers (or there
365 : // is a creation error), or when the latest LogWriter is successfully
366 : // closed. It is waited on in Close. We don't use channels and select
367 : // since what Close is waiting on is dynamic based on the local state in
368 : // Close, so using Cond is simpler.
369 : cond *sync.Cond
370 : // nextWriterIndex is advanced before creating the *LogWriter. That is, a
371 : // slot is reserved by taking the current value of nextWriterIndex and
372 : // incrementing it, and then the *LogWriter for that slot is created. When
373 : // newFailoverWriter returns, nextWriterIndex = 1.
374 : //
375 : // The latest *LogWriter is (will be) at nextWriterIndex-1.
376 : //
377 : // INVARIANT: nextWriterIndex <= len(writers)
378 : nextWriterIndex LogNameIndex
379 : closed bool
380 : // metrics is initialized in Close. Currently we just use the metrics from
381 : // the latest writer after it is closed, since in the common case with
382 : // only one writer, that writer's flush loop will have finished and the
383 : // metrics will be current. With multiple writers, these metrics can be
384 : // quite inaccurate. The WriteThroughput metric includes an IdleDuration,
385 : // which can be high for a writer that was switched away from, and
386 : // therefore not indicative of overall work being done by the
387 : // failoverWriter. The PendingBufferLen and SyncQueueLen are similarly
388 : // inaccurate once there is no more work being given to a writer. We could
389 : // add a method to LogWriter to stop sampling metrics when it is not the
390 : // latest writer. Then we could aggregate all these metrics across all
391 : // writers.
392 : //
393 : // Note that CockroachDB does not use these metrics in any meaningful way.
394 : //
395 : // TODO(sumeer): do the improved solution outlined above.
396 : metrics record.LogWriterMetrics
397 : }
398 : // State for computing logical offset. The cumulative offset state is in
399 : // offset. Each time we call SyncRecordGeneralized from WriteRecord, we
400 : // compute the delta from the size returned by this LogWriter now, and the
401 : // size returned by this LogWriter in the previous call to
402 : // SyncRecordGeneralized. That previous call to SyncRecordGeneralized may
403 : // have happened from WriteRecord, or asynchronously during a switch. So
404 : // that previous call state requires synchronization and is maintained in
405 : // recordQueue. The offset is incremented by this delta without any
406 : // synchronization, since we rely on external synchronization (like the
407 : // standaloneWriter).
408 : logicalOffset struct {
409 : latestWriterInWriteRecord *record.LogWriter
410 : latestLogSizeInWriteRecord int64
411 : offset int64
412 : // Transitions once from false => true when there is a non-nil writer.
413 : notEstimatedOffset bool
414 : }
415 : psiForWriteRecordBacking record.PendingSyncIndex
416 : psiForSwitchBacking record.PendingSyncIndex
417 : }
418 :
419 : type logWriterAndRecorder struct {
420 : // This may never become non-nil, if when the LogWriter was finally created,
421 : // it was no longer the latest writer. Additionally, if there was an error
422 : // in creating the writer, w will remain nil and createError will be set.
423 : w *record.LogWriter
424 : // createError is set if there is an error creating the writer. This is
425 : // useful in Close since we need to know when the work for creating the
426 : // latest writer is done, whether it resulted in success or not.
427 : createError error
428 : r latencyAndErrorRecorder
429 :
430 : // dir, approxFileSize, synchronouslyClosed are kept for initializing
431 : // segmentWithSizeEtc. The approxFileSize is initially set to whatever is
432 : // returned by logCreator. When failoverWriter.Close is called,
433 : // approxFileSize and synchronouslyClosed may be updated.
434 : dir Dir
435 : approxFileSize uint64
436 : synchronouslyClosed bool
437 : }
438 :
439 : var _ Writer = &failoverWriter{}
440 :
441 : var _ switchableWriter = &failoverWriter{}
442 :
443 : type failoverWriterOpts struct {
444 : wn NumWAL
445 : logger base.Logger
446 : timeSource
447 : jobID int
448 : logCreator
449 :
450 : // Options that feed into SyncingFileOptions.
451 : noSyncOnClose bool
452 : bytesPerSync int
453 : preallocateSize func() int
454 :
455 : // Options for record.LogWriter.
456 : minSyncInterval func() time.Duration
457 : fsyncLatency prometheus.Histogram
458 : queueSemChan chan struct{}
459 : stopper *stopper
460 :
461 : failoverWriteAndSyncLatency prometheus.Histogram
462 : writerClosed func(logicalLogWithSizesEtc)
463 :
464 : writerCreatedForTest chan<- struct{}
465 : }
466 :
467 : func simpleLogCreator(
468 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
469 1 : ) (f vfs.File, initialFileSize uint64, err error) {
470 1 : filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
471 1 : // Create file.
472 1 : r.writeStart()
473 1 : f, err = dir.FS.Create(filename, "pebble-wal")
474 1 : r.writeEnd(err)
475 1 : return f, 0, err
476 1 : }
477 :
478 : type logCreator func(
479 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
480 : ) (f vfs.File, initialFileSize uint64, err error)
481 :
482 : func newFailoverWriter(
483 : opts failoverWriterOpts, initialDir dirAndFileHandle,
484 2 : ) (*failoverWriter, error) {
485 2 : ww := &failoverWriter{
486 2 : opts: opts,
487 2 : }
488 2 : ww.q.init(opts.failoverWriteAndSyncLatency)
489 2 : ww.mu.cond = sync.NewCond(&ww.mu)
490 2 : // The initial record.LogWriter creation also happens via a
491 2 : // switchToNewWriter since we don't want it to block newFailoverWriter.
492 2 : err := ww.switchToNewDir(initialDir)
493 2 : if err != nil {
494 0 : // Switching limit cannot be exceeded when creating.
495 0 : panic(err)
496 : }
497 2 : return ww, nil
498 : }
499 :
500 : // WriteRecord implements Writer.
501 : func (ww *failoverWriter) WriteRecord(
502 : p []byte, opts SyncOptions, ref RefCount,
503 2 : ) (logicalOffset int64, err error) {
504 2 : if ref != nil {
505 2 : ref.Ref()
506 2 : }
507 2 : var writeStartUnixNanos int64
508 2 : if opts.Done != nil {
509 2 : writeStartUnixNanos = time.Now().UnixNano()
510 2 : }
511 2 : recordIndex, writer, lastLogSize := ww.q.push(
512 2 : p,
513 2 : opts,
514 2 : ref,
515 2 : writeStartUnixNanos,
516 2 : ww.logicalOffset.latestLogSizeInWriteRecord,
517 2 : ww.logicalOffset.latestWriterInWriteRecord,
518 2 : )
519 2 : if writer == nil {
520 2 : // Don't have a record.LogWriter yet, so use an estimate. This estimate
521 2 : // will get overwritten.
522 2 : ww.logicalOffset.offset += int64(len(p))
523 2 : return ww.logicalOffset.offset, nil
524 2 : }
525 : // INVARIANT: writer != nil.
526 2 : notEstimatedOffset := ww.logicalOffset.notEstimatedOffset
527 2 : if !notEstimatedOffset {
528 2 : ww.logicalOffset.notEstimatedOffset = true
529 2 : }
530 2 : ww.psiForWriteRecordBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
531 2 : if opts.Done != nil {
532 2 : ww.psiForWriteRecordBacking.Index = int64(recordIndex)
533 2 : }
534 2 : ww.logicalOffset.latestLogSizeInWriteRecord, err = writer.SyncRecordGeneralized(p, &ww.psiForWriteRecordBacking)
535 2 : ww.logicalOffset.latestWriterInWriteRecord = writer
536 2 : if notEstimatedOffset {
537 2 : delta := ww.logicalOffset.latestLogSizeInWriteRecord - lastLogSize
538 2 : ww.logicalOffset.offset += delta
539 2 : } else {
540 2 : // Overwrite the estimate. This is a best-effort improvement in that it is
541 2 : // accurate for the common case where writer is the first LogWriter.
542 2 : // Consider a failover scenario where there was no LogWriter for the first
543 2 : // 10 records, so they are all accumulated as an estimate. Then the first
544 2 : // LogWriter successfully writes and syncs the first 5 records and gets
545 2 : // stuck. A switch happens to a second LogWriter that is handed the
546 2 : // remaining 5 records, and the 11th record arrives via a WriteRecord.
547 2 : // The transition from !notEstimatedOffset to notEstimatedOffset will
548 2 : // happen on this 11th record, and the logic here will use the length of
549 2 : // the second LogWriter, that does not reflect the full length.
550 2 : //
551 2 : // TODO(sumeer): try to make this more correct, without adding much more
552 2 : // complexity, and without adding synchronization.
553 2 : ww.logicalOffset.offset = ww.logicalOffset.latestLogSizeInWriteRecord
554 2 : }
555 2 : return ww.logicalOffset.offset, err
556 : }
557 :
558 : // switchToNewDir starts switching to dir. It implements switchableWriter. All
559 : // work is async, and a non-nil error is returned only if the switching limit
560 : // is exceeded.
561 2 : func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
562 2 : ww.mu.Lock()
563 2 : // Can have a late switchToNewDir call is the failoverMonitor has not yet
564 2 : // been told that the writer is closed. Ignore.
565 2 : if ww.mu.closed {
566 1 : ww.mu.Unlock()
567 1 : if ww.opts.writerCreatedForTest != nil {
568 1 : ww.opts.writerCreatedForTest <- struct{}{}
569 1 : }
570 1 : return nil
571 : }
572 : // writerIndex is the slot for this writer.
573 2 : writerIndex := ww.mu.nextWriterIndex
574 2 : if int(writerIndex) == len(ww.mu.writers) {
575 1 : ww.mu.Unlock()
576 1 : return errors.Errorf("exceeded switching limit")
577 1 : }
578 2 : ww.mu.writers[writerIndex].dir = dir.Dir
579 2 : ww.mu.nextWriterIndex++
580 2 : ww.mu.Unlock()
581 2 :
582 2 : // Creation is async.
583 2 : ww.opts.stopper.runAsync(func() {
584 2 : recorderAndWriter := &ww.mu.writers[writerIndex].r
585 2 : recorderAndWriter.ts = ww.opts.timeSource
586 2 : file, initialFileSize, err := ww.opts.logCreator(
587 2 : dir.Dir, ww.opts.wn, writerIndex, recorderAndWriter, ww.opts.jobID)
588 2 : ww.mu.writers[writerIndex].approxFileSize = initialFileSize
589 2 : // handleErrFunc is called when err != nil. It handles the multiple IO error
590 2 : // cases below.
591 2 : handleErrFunc := func(err error) {
592 1 : if file != nil {
593 1 : file.Close()
594 1 : }
595 1 : ww.mu.Lock()
596 1 : defer ww.mu.Unlock()
597 1 : ww.mu.writers[writerIndex].createError = err
598 1 : ww.mu.cond.Signal()
599 1 : if ww.opts.writerCreatedForTest != nil {
600 1 : ww.opts.writerCreatedForTest <- struct{}{}
601 1 : }
602 : }
603 2 : if err != nil {
604 1 : handleErrFunc(err)
605 1 : return
606 1 : }
607 : // Sync dir.
608 2 : recorderAndWriter.writeStart()
609 2 : err = dir.Sync()
610 2 : recorderAndWriter.writeEnd(err)
611 2 : if err != nil {
612 1 : handleErrFunc(err)
613 1 : return
614 1 : }
615 : // Wrap in a syncingFile.
616 2 : syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
617 2 : NoSyncOnClose: ww.opts.noSyncOnClose,
618 2 : BytesPerSync: ww.opts.bytesPerSync,
619 2 : PreallocateSize: ww.opts.preallocateSize(),
620 2 : })
621 2 : // Wrap in the latencyAndErrorRecorder.
622 2 : recorderAndWriter.setWriter(syncingFile)
623 2 :
624 2 : // Using NumWAL as the DiskFileNum is fine since it is used only as
625 2 : // EOF trailer for safe log recycling. Even though many log files can
626 2 : // map to a single NumWAL, a file used for NumWAL n at index m will
627 2 : // never get recycled for NumWAL n at a later index (since recycling
628 2 : // happens when n as a whole is obsolete).
629 2 : w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn),
630 2 : record.LogWriterConfig{
631 2 : WALMinSyncInterval: ww.opts.minSyncInterval,
632 2 : WALFsyncLatency: ww.opts.fsyncLatency,
633 2 : QueueSemChan: ww.opts.queueSemChan,
634 2 : ExternalSyncQueueCallback: ww.doneSyncCallback,
635 2 : })
636 2 : closeWriter := func() bool {
637 2 : ww.mu.Lock()
638 2 : defer ww.mu.Unlock()
639 2 : if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed {
640 2 : // Not the latest writer or the writer was closed while this async
641 2 : // creation was ongoing.
642 2 : if ww.opts.writerCreatedForTest != nil {
643 1 : ww.opts.writerCreatedForTest <- struct{}{}
644 1 : }
645 2 : return true
646 : }
647 : // Latest writer.
648 2 : ww.mu.writers[writerIndex].w = w
649 2 : ww.mu.cond.Signal()
650 2 : // NB: snapshotAndSwitchWriter does not block on IO, since
651 2 : // SyncRecordGeneralized does no IO.
652 2 : ww.q.snapshotAndSwitchWriter(w,
653 2 : func(firstIndex uint32, entries []recordQueueEntry) (logSize int64) {
654 2 : for i := range entries {
655 2 : ww.psiForSwitchBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
656 2 : if entries[i].opts.Done != nil {
657 2 : ww.psiForSwitchBacking.Index = int64(firstIndex) + int64(i)
658 2 : }
659 2 : var err error
660 2 : logSize, err = w.SyncRecordGeneralized(entries[i].p, &ww.psiForSwitchBacking)
661 2 : if err != nil {
662 0 : // TODO(sumeer): log periodically. The err will also surface via
663 0 : // the latencyAndErrorRecorder, so if a switch is possible, it
664 0 : // will be done.
665 0 : ww.opts.logger.Errorf("%s", err)
666 0 : }
667 : }
668 2 : return logSize
669 : })
670 2 : if ww.opts.writerCreatedForTest != nil {
671 1 : ww.opts.writerCreatedForTest <- struct{}{}
672 1 : }
673 2 : return false
674 : }()
675 2 : if closeWriter {
676 2 : // Never wrote anything to this writer so don't care about the
677 2 : // returned error.
678 2 : ww.opts.stopper.runAsync(func() {
679 2 : _ = w.Close()
680 2 : // TODO(sumeer): consider deleting this file too, since
681 2 : // failoverWriter.Close may not wait for it. This is going to be
682 2 : // extremely rare, so the risk of garbage empty files piling up is
683 2 : // extremely low. Say failover happens daily and of those cases we
684 2 : // have to be very unlucky and the close happens while a failover was
685 2 : // ongoing and the previous LogWriter successfully wrote everything
686 2 : // (say 1% probability if we want to be pessimistic). A garbage file
687 2 : // every 100 days. Restarts will delete that garbage.
688 2 : })
689 : }
690 : })
691 2 : return nil
692 : }
693 :
694 : // doneSyncCallback is the record.ExternalSyncQueueCallback called by
695 : // record.LogWriter.
696 : //
697 : // recordQueue is popped from only when some work requests a sync (and
698 : // successfully syncs). In the worst case, if no syncs are requested, we could
699 : // queue all the records needed to fill up a memtable in the recordQueue. This
700 : // can have two negative effects: (a) in the case of failover, we need to
701 : // replay all the data in the current mutable memtable, which takes more time,
702 : // (b) the memory usage is proportional to the size of the memtable. We ignore
703 : // these negatives since, (a) users like CockroachDB regularly sync, and (b)
704 : // the default memtable size is only 64MB.
705 2 : func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) {
706 2 : if err != nil {
707 1 : // Don't pop anything since we can retry after switching to a new
708 1 : // LogWriter.
709 1 : return
710 1 : }
711 : // NB: harmless after Close returns since numSyncsPopped will be 0.
712 2 : numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err)
713 2 : if ww.opts.queueSemChan != nil {
714 2 : for i := 0; i < numSyncsPopped; i++ {
715 2 : <-ww.opts.queueSemChan
716 2 : }
717 : }
718 : }
719 :
720 : // ongoingLatencyOrErrorForCurDir implements switchableWriter.
721 2 : func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) {
722 2 : r := ww.recorderForCurDir()
723 2 : if r == nil {
724 2 : return 0, nil
725 2 : }
726 2 : return r.ongoingLatencyOrError()
727 : }
728 :
729 : // For internal use and testing.
730 2 : func (ww *failoverWriter) recorderForCurDir() *latencyAndErrorRecorder {
731 2 : ww.mu.Lock()
732 2 : defer ww.mu.Unlock()
733 2 : if ww.mu.closed {
734 2 : return nil
735 2 : }
736 2 : return &ww.mu.writers[ww.mu.nextWriterIndex-1].r
737 : }
738 :
739 : // Close implements Writer.
740 : //
741 : // NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be
742 : // called after Close is called, and there is also a possibility that they get
743 : // called after Close returns and before failoverMonitor knows that the
744 : // failoverWriter is closed.
745 : //
746 : // doneSyncCallback can be called anytime after Close returns since there
747 : // could be stuck writes that finish arbitrarily later.
748 : //
749 : // See the long comment about Close behavior where failoverWriter is declared.
750 2 : func (ww *failoverWriter) Close() (logicalOffset int64, err error) {
751 2 : offset, err := ww.closeInternal()
752 2 : ww.opts.writerClosed(ww.getLog())
753 2 : return offset, err
754 2 : }
755 :
756 2 : func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
757 2 : logicalOffset = ww.logicalOffset.offset
758 2 : // [0, closeCalledCount) have had LogWriter.Close called (though may not
759 2 : // have finished) or the LogWriter will never be non-nil. Either way, they
760 2 : // have been "processed".
761 2 : closeCalledCount := LogNameIndex(0)
762 2 : // lastWriterState is the state for the last writer, for which we are
763 2 : // waiting for LogWriter.Close to finish or for creation to be unsuccessful.
764 2 : // What is considered the last writer can change. All state is protected by
765 2 : // ww.mu.
766 2 : type lastWriterState struct {
767 2 : index LogNameIndex
768 2 : closed bool
769 2 : err error
770 2 : metrics record.LogWriterMetrics
771 2 : }
772 2 : var lastWriter lastWriterState
773 2 : lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
774 2 : ww.mu.Lock()
775 2 : defer ww.mu.Unlock()
776 2 : numWriters := ww.mu.nextWriterIndex
777 2 : // Every iteration starts and ends with the mutex held.
778 2 : //
779 2 : // Invariant: numWriters >= 1.
780 2 : //
781 2 : // We will loop until we have closed the lastWriter (and use
782 2 : // lastWriter.err). We also need to call close on all LogWriters
783 2 : // that will not close themselves, i.e., those that have already been
784 2 : // created and installed in failoverWriter.writers (this set may change
785 2 : // while failoverWriter.Close runs).
786 2 : for !lastWriter.closed || numWriters > lastWriter.index+1 {
787 2 : if numWriters > closeCalledCount {
788 2 : // lastWriter.index may or may not have advanced. If it has advanced, we
789 2 : // need to reinitialize lastWriterState. If it hasn't advanced, and
790 2 : // numWriters > closeCalledCount, we know that we haven't called close
791 2 : // on it, so nothing in lastWriterState needs to be retained. For
792 2 : // simplicity, we overwrite in both cases.
793 2 : lastWriter = lastWriterState{
794 2 : index: numWriters - 1,
795 2 : }
796 2 : // Try to process [closeCalledCount, numWriters). Will surely process
797 2 : // [closeCalledCount, numWriters-1), since those writers are either done
798 2 : // initializing, or will close themselves. The writer at numWriters-1 we
799 2 : // can only process if it is done initializing, else we will iterate
800 2 : // again.
801 2 : for i := closeCalledCount; i < numWriters; i++ {
802 2 : w := ww.mu.writers[i].w
803 2 : cErr := ww.mu.writers[i].createError
804 2 : // Is the current index the last writer. If yes, this is also the last
805 2 : // loop iteration.
806 2 : isLastWriter := i == lastWriter.index
807 2 : if w != nil {
808 2 : // Can close it, so extend closeCalledCount.
809 2 : closeCalledCount = i + 1
810 2 : size := uint64(w.Size())
811 2 : if ww.mu.writers[i].approxFileSize < size {
812 2 : ww.mu.writers[i].approxFileSize = size
813 2 : }
814 2 : if isLastWriter {
815 2 : // We may care about its error and when it finishes closing.
816 2 : index := i
817 2 : ww.opts.stopper.runAsync(func() {
818 2 : // Last writer(s) (since new writers can be created and become
819 2 : // last, as we iterate) are guaranteed to have seen the last
820 2 : // record (since it was queued before Close was called). It is
821 2 : // possible that a writer got created after the last record was
822 2 : // dequeued and before this fact was realized by Close. In that
823 2 : // case we will harmlessly tell it that it synced that last
824 2 : // record, though it has already been written and synced by
825 2 : // another writer.
826 2 : err := w.CloseWithLastQueuedRecord(lastRecordIndex)
827 2 : ww.mu.Lock()
828 2 : defer ww.mu.Unlock()
829 2 : if lastWriter.index == index {
830 2 : lastWriter.closed = true
831 2 : lastWriter.err = err
832 2 : lastWriter.metrics = w.Metrics()
833 2 : ww.mu.cond.Signal()
834 2 : }
835 : })
836 2 : } else {
837 2 : // Don't care about the returned error since all the records we
838 2 : // relied on this writer for were already successfully written.
839 2 : ww.opts.stopper.runAsync(func() {
840 2 : _ = w.CloseWithLastQueuedRecord(record.PendingSyncIndex{Index: record.NoSyncIndex})
841 2 : })
842 : }
843 2 : } else if cErr != nil {
844 1 : // Have processed it, so extend closeCalledCount.
845 1 : closeCalledCount = i + 1
846 1 : if isLastWriter {
847 1 : lastWriter.closed = true
848 1 : lastWriter.err = cErr
849 1 : lastWriter.metrics = record.LogWriterMetrics{}
850 1 : }
851 : // Else, ignore.
852 2 : } else {
853 2 : if !isLastWriter {
854 2 : // Not last writer, so will close itself.
855 2 : closeCalledCount = i + 1
856 2 : }
857 : // Else, last writer, so we may have to close it.
858 : }
859 : }
860 : }
861 2 : if !lastWriter.closed {
862 2 : // Either waiting for creation of last writer or waiting for the close
863 2 : // to finish, or something else to become the last writer.
864 2 : //
865 2 : // It is possible that what we think of as the last writer (lastWriter)
866 2 : // closes itself while ww.mu is no longer held here, and a new LogWriter
867 2 : // is created too. All the records are synced, but the real last writer
868 2 : // may still be writing some records. Specifically, consider the
869 2 : // following sequence while this wait does not hold the mutex:
870 2 : //
871 2 : // - recordQueue has an entry, with index 50, that does not require a
872 2 : // sync.
873 2 : // - Last writer created at index 10 and entry 50 is handed to it.
874 2 : // - lastWriter.index is still 9 and it closes itself and signals this
875 2 : // cond. It has written entry 50 and synced (since close syncs).
876 2 : // - The wait completes.
877 2 : //
878 2 : // Now the writer at index 10 will never be closed and will never sync.
879 2 : // A crash can cause some part of what it writes to be lost. Note that
880 2 : // there is no data loss, but there are some unfortunate consequences:
881 2 : //
882 2 : // - We never closed a file descriptor.
883 2 : // - virtualWALReader.NextRecord can return an error on finding a
884 2 : // malformed chunk in the last writer (at index 10) instead of
885 2 : // swallowing the error. This can cause DB.Open to fail.
886 2 : //
887 2 : // To avoid this, we grab the latest value of numWriters on reacquiring
888 2 : // the mutex, and will continue looping until the writer at index 10 is
889 2 : // closed (or writer at index 11 is created).
890 2 : ww.mu.cond.Wait()
891 2 : numWriters = ww.mu.nextWriterIndex
892 2 : }
893 : }
894 2 : if ww.mu.writers[lastWriter.index].w != nil {
895 2 : // This permits log recycling.
896 2 : ww.mu.writers[lastWriter.index].synchronouslyClosed = true
897 2 : }
898 2 : err = lastWriter.err
899 2 : ww.mu.metrics = lastWriter.metrics
900 2 : ww.mu.closed = true
901 2 : n, m := ww.q.popAll(err)
902 2 : if err == nil && (n > 0 || m > 0) {
903 0 : panic(errors.AssertionFailedf("no error but recordQueue had %d records and %d syncs", n, m))
904 : }
905 2 : return logicalOffset, err
906 : }
907 :
908 : // Metrics implements writer.
909 2 : func (ww *failoverWriter) Metrics() record.LogWriterMetrics {
910 2 : ww.mu.Lock()
911 2 : defer ww.mu.Unlock()
912 2 : return ww.mu.metrics
913 2 : }
914 :
915 : // getLog can be called at any time, including after Close returns.
916 2 : func (ww *failoverWriter) getLog() logicalLogWithSizesEtc {
917 2 : ww.mu.Lock()
918 2 : defer ww.mu.Unlock()
919 2 : ll := logicalLogWithSizesEtc{
920 2 : num: ww.opts.wn,
921 2 : }
922 2 : for i := range ww.mu.writers {
923 2 : if ww.mu.writers[i].w != nil {
924 2 : ll.segments = append(ll.segments, segmentWithSizeEtc{
925 2 : segment: segment{
926 2 : logNameIndex: LogNameIndex(i),
927 2 : dir: ww.mu.writers[i].dir,
928 2 : },
929 2 : approxFileSize: ww.mu.writers[i].approxFileSize,
930 2 : synchronouslyClosed: ww.mu.writers[i].synchronouslyClosed,
931 2 : })
932 2 : }
933 : }
934 2 : return ll
935 : }
936 :
937 : // latencyAndErrorRecorder records ongoing write and sync operations and errors
938 : // in those operations. record.LogWriter cannot continue functioning after any
939 : // error, so all errors are considered permanent.
940 : //
941 : // writeStart/writeEnd are used directly when creating a file. After the file
942 : // is successfully created, setWriter turns latencyAndErrorRecorder into an
943 : // implementation of writerSyncerCloser that will record for the Write and
944 : // Sync methods.
945 : type latencyAndErrorRecorder struct {
946 : ts timeSource
947 : ongoingOperationStart atomic.Int64
948 : error atomic.Pointer[error]
949 : writerSyncerCloser
950 : }
951 :
952 : type writerSyncerCloser interface {
953 : io.Writer
954 : io.Closer
955 : Sync() error
956 : }
957 :
958 2 : func (r *latencyAndErrorRecorder) writeStart() {
959 2 : r.ongoingOperationStart.Store(r.ts.now().UnixNano())
960 2 : }
961 :
962 2 : func (r *latencyAndErrorRecorder) writeEnd(err error) {
963 2 : if err != nil {
964 1 : ptr := &err
965 1 : r.error.Store(ptr)
966 1 : }
967 2 : r.ongoingOperationStart.Store(0)
968 : }
969 :
970 2 : func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) {
971 2 : r.writerSyncerCloser = w
972 2 : }
973 :
974 2 : func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) {
975 2 : startTime := r.ongoingOperationStart.Load()
976 2 : var latency time.Duration
977 2 : if startTime != 0 {
978 2 : l := r.ts.now().UnixNano() - startTime
979 2 : if l < 0 {
980 0 : l = 0
981 0 : }
982 2 : latency = time.Duration(l)
983 : }
984 2 : errPtr := r.error.Load()
985 2 : var err error
986 2 : if errPtr != nil {
987 1 : err = *errPtr
988 1 : }
989 2 : return latency, err
990 : }
991 :
992 : // Sync implements writerSyncerCloser.
993 2 : func (r *latencyAndErrorRecorder) Sync() error {
994 2 : r.writeStart()
995 2 : err := r.writerSyncerCloser.Sync()
996 2 : r.writeEnd(err)
997 2 : return err
998 2 : }
999 :
1000 : // Write implements io.Writer.
1001 2 : func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) {
1002 2 : r.writeStart()
1003 2 : n, err = r.writerSyncerCloser.Write(p)
1004 2 : r.writeEnd(err)
1005 2 : return n, err
1006 2 : }
|