Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "io"
9 : "sync"
10 : "sync/atomic"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/record"
16 : "github.com/cockroachdb/pebble/vfs"
17 : "github.com/prometheus/client_golang/prometheus"
18 : )
19 :
20 : // recordQueueEntry is an entry in recordQueue.
21 : type recordQueueEntry struct {
22 : p []byte
23 : opts SyncOptions
24 : refCount RefCount
25 : writeStartUnixNanos int64
26 : }
27 :
28 : type poppedEntry struct {
29 : opts SyncOptions
30 : refCount RefCount
31 : writeStartUnixNanos int64
32 : }
33 :
34 : const initialBufferLen = 8192
35 :
36 : // recordQueue is a variable-size single-producer multiple-consumer queue. It
37 : // is not lock-free, but most operations only need mu.RLock. It needs a mutex
38 : // to grow the size, since there is no upper bound on the number of queued
39 : // records (which are all the records that are not synced, and will need to be
40 : // written again in case of failover). Additionally, it needs a mutex to
41 : // atomically grab a snapshot of the queued records and provide them to a new
42 : // LogWriter that is being switched to.
43 : type recordQueue struct {
44 : // Only held for reading for all pop operations and most push operations.
45 : // Held for writing when buffer needs to be grown or when switching to a new
46 : // writer.
47 : mu sync.RWMutex
48 :
49 : // queue is [tail, head). tail is the oldest entry and head is the index for
50 : // the next entry.
51 : //
52 : // Consumers: atomically read and write tail in pop. This is not the usual
53 : // kind of queue consumer since they already know the index that they are
54 : // popping exists, hence don't need to look at head.
55 : //
56 : // Producer: atomically reads tail in push. Writes to head.
57 : //
58 : // Based on the above we only need tail to be atomic. However, the producer
59 : // also populates entries in buffer, whose values need to be seen by the
60 : // consumers when doing a pop, which means they need to synchronize using a
61 : // release and acquire memory barrier pair, where the push does the release
62 : // and the pop does the acquire. For this reason we make head also atomic
63 : // and merge head and tail into a single atomic, so that the store of head
64 : // in push and the load of tail in pop accomplishes this release-acquire
65 : // pair.
66 : //
67 : // We initially implemented competition between multiple consumers solely
68 : // via atomic read-write of the tail using compare-and-swap (CAS). Since the
69 : // atomic read-write to tail in pop releases those buffer entries for reuse
70 : // to the producer, the consumer needs to grab the contents of
71 : // recordQueueEntry that it needs to do callbacks etc. (specifically the
72 : // contents corresponding to poppedEntry), *before* it succeeds with the
73 : // atomic read-write. This introduced a false data race in the Golang data
74 : // race detector
75 : // https://github.com/cockroachdb/pebble/issues/3380#issuecomment-1981188174.
76 : // Consider the case where the queue is [10,20), and consumer C1 is trying
77 : // to pop [10,12) and consumer C2 is trying to pop [10,14). The following
78 : // interleaving can happen:
79 : //
80 : // [C1] reads head=10, tail=20
81 : // [C2] reads head=10, tail=20
82 : // [C1] reads buffer contents [10,12) and makes local copy
83 : // [C1] CAS to make the queue [12,20)
84 : // [C2] reads buffer contents [10,14) and makes local copy, concurrently
85 : // with producer writing to 10, 11. *
86 : // [C2] CAS fails for popping [10,14), so revises to [12,14), and succeeds
87 : // in CAS. C2 only uses the contents it read into the local copy for
88 : // [12,14).
89 : //
90 : // * is a false data race since C2 is later deciding what contents it should
91 : // use from among the contents it read, based on what indices it
92 : // successfully popped. Unfortunately, we don't have a way to annotate the
93 : // code to tell the data race detector to ignore this false positive. So we
94 : // need to strengthen the synchronization to prevent such false positives.
95 : // We observe that usually a consumer will be popping a batch of entries
96 : // (based on a single successful fsync), and the number of consumers will be
97 : // small (usually 1). In comparison, producers can be highly concurrent (due
98 : // to workload concurrency). We don't want consumers to compete for a mutex
99 : // with producers, but we can afford to have multiple consumers compete for
100 : // a mutex. So we fix this false data race by using consumerMu to force
101 : // single-threaded popping.
102 : //
103 : // An alternative would be to pass the information contained in poppedEntry
104 : // to the LogWriter, so that it can pass it back when popping (so we don't
105 : // have to retrieve it from the recordQueue.buffer). We would still need
106 : // recordQueue.buffer, since writer switching needs those entries to be
107 : // replayed. We don't consider this solution for the same reason we replaced
108 : // record.pendingSyncsWithSyncQueue with
109 : // record.pendingSyncsWithHighestSyncIndex for the failoverWriter code path
110 : // -- we cannot bound the queue in the LogWriter by record.SyncConcurrency:
111 : // say SyncConcurrency was 4096, and LogWriter1's queue was full, and we
112 : // switched to LogWriter2, to which we replayed the same records and filled
113 : // up the queue. Then if LogWriter1 unblocks and pops all the 4096 entries,
114 : // the commit pipeline can send another 4096 entries, while LogWriter2 is
115 : // still blocked on trying to write and sync the previous 4096 entries. This
116 : // will overflow the queue in LogWriter2.
117 : //
118 : // All updates to headTail hold mu at least for reading. So when mu is held
119 : // for writing, there is a guarantee that headTail is not being updated.
120 : //
121 : // head is most-significant 32 bits and tail is least-significant 32 bits.
122 : headTail atomic.Uint64
123 :
124 : consumerMu sync.Mutex
125 :
126 : // Access to buffer requires at least RLock.
127 : buffer []recordQueueEntry
128 :
129 : lastTailObservedByProducer uint32
130 :
131 : // Read requires RLock.
132 : writer *record.LogWriter
133 :
134 : // When writer != nil, this is the return value of the last call to
135 : // SyncRecordGeneralized. It is updated in (a) WriteRecord calls push, using
136 : // only RLock (since WriteRecord is externally synchronized), (b)
137 : // snapshotAndSwitchWriter, using Lock. (b) excludes (a).
138 : lastLogSize int64
139 :
140 : failoverWriteAndSyncLatency prometheus.Histogram
141 : }
142 :
143 1 : func (q *recordQueue) init(failoverWriteAndSyncLatency prometheus.Histogram) {
144 1 : *q = recordQueue{
145 1 : buffer: make([]recordQueueEntry, initialBufferLen),
146 1 : failoverWriteAndSyncLatency: failoverWriteAndSyncLatency,
147 1 : }
148 1 : }
149 :
150 : // NB: externally synchronized, i.e., no concurrent push calls.
151 : func (q *recordQueue) push(
152 : p []byte,
153 : opts SyncOptions,
154 : refCount RefCount,
155 : writeStartUnixNanos int64,
156 : latestLogSizeInWriteRecord int64,
157 : latestWriterInWriteRecord *record.LogWriter,
158 1 : ) (index uint32, writer *record.LogWriter, lastLogSize int64) {
159 1 : ht := q.headTail.Load()
160 1 : h, t := unpackHeadTail(ht)
161 1 : n := int(h - t)
162 1 : m := len(q.buffer)
163 1 : if m == n {
164 1 : // Full
165 1 : m = 2 * n
166 1 : newBuffer := make([]recordQueueEntry, m)
167 1 : for i := int(t); i < int(h); i++ {
168 1 : newBuffer[i%m] = q.buffer[i%n]
169 1 : }
170 1 : q.mu.Lock()
171 1 : q.buffer = newBuffer
172 1 : q.mu.Unlock()
173 : }
174 1 : q.mu.RLock()
175 1 : q.buffer[int(h)%m] = recordQueueEntry{
176 1 : p: p,
177 1 : opts: opts,
178 1 : refCount: refCount,
179 1 : writeStartUnixNanos: writeStartUnixNanos,
180 1 : }
181 1 : // Reclaim memory for consumed entries. We couldn't do that in pop since
182 1 : // multiple consumers are popping using CAS and that immediately transfers
183 1 : // ownership to the producer.
184 1 : for i := q.lastTailObservedByProducer; i < t; i++ {
185 1 : q.buffer[int(i)%m] = recordQueueEntry{}
186 1 : }
187 1 : q.lastTailObservedByProducer = t
188 1 : q.headTail.Add(1 << headTailBits)
189 1 : writer = q.writer
190 1 : if writer == latestWriterInWriteRecord {
191 1 : // WriteRecord has written to this writer since the switch.
192 1 : q.lastLogSize = latestLogSizeInWriteRecord
193 1 : }
194 : // Else writer is a new writer that was switched to, so ignore the
195 : // latestLogSizeInWriteRecord.
196 :
197 1 : lastLogSize = q.lastLogSize
198 1 : q.mu.RUnlock()
199 1 : return h, writer, lastLogSize
200 : }
201 :
202 1 : func (q *recordQueue) length() int {
203 1 : ht := q.headTail.Load()
204 1 : h, t := unpackHeadTail(ht)
205 1 : return int(h - t)
206 1 : }
207 :
208 : // Pops all entries. Must be called only after the last push returns.
209 1 : func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
210 1 : ht := q.headTail.Load()
211 1 : h, t := unpackHeadTail(ht)
212 1 : n := int(h - t)
213 1 : if n == 0 {
214 1 : return 0, 0
215 1 : }
216 1 : return n, q.pop(h-1, err)
217 : }
218 :
219 : // Pops all entries up to and including index. The remaining queue is
220 : // [index+1, head).
221 : //
222 : // NB: we could slightly simplify to only have the latest writer be able to
223 : // pop. This would avoid the CAS below, but it seems better to reduce the
224 : // amount of queued work regardless of who has successfully written it.
225 1 : func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) {
226 1 : nowUnixNanos := time.Now().UnixNano()
227 1 : var buf [512]poppedEntry
228 1 : tailEntriesToPop := func() (t uint32, numEntriesToPop int) {
229 1 : ht := q.headTail.Load()
230 1 : _, t = unpackHeadTail(ht)
231 1 : tail := int(t)
232 1 : numEntriesToPop = int(index) - tail + 1
233 1 : return t, numEntriesToPop
234 1 : }
235 1 : q.consumerMu.Lock()
236 1 : // numEntriesToPop is a function of index and tail. The value of tail cannot
237 1 : // change since consumerMu is held.
238 1 : tail, numEntriesToPop := tailEntriesToPop()
239 1 : if numEntriesToPop <= 0 {
240 1 : q.consumerMu.Unlock()
241 1 : return 0
242 1 : }
243 1 : var b []poppedEntry
244 1 : if numEntriesToPop <= len(buf) {
245 1 : b = buf[:numEntriesToPop]
246 1 : } else {
247 1 : // Do allocation before acquiring the mutex.
248 1 : b = make([]poppedEntry, numEntriesToPop)
249 1 : }
250 1 : q.mu.RLock()
251 1 : n := len(q.buffer)
252 1 : for i := 0; i < numEntriesToPop; i++ {
253 1 : // Grab the popped entries before incrementing tail, since that will
254 1 : // release those buffer slots to the producer.
255 1 : idx := (i + int(tail)) % n
256 1 : b[i] = poppedEntry{
257 1 : opts: q.buffer[idx].opts,
258 1 : refCount: q.buffer[idx].refCount,
259 1 : writeStartUnixNanos: q.buffer[idx].writeStartUnixNanos,
260 1 : }
261 1 : }
262 : // Since tail cannot change, we don't need to do a compare-and-swap.
263 1 : q.headTail.Add(uint64(numEntriesToPop))
264 1 : q.mu.RUnlock()
265 1 : q.consumerMu.Unlock()
266 1 : addLatencySample := false
267 1 : var maxLatencyNanos int64
268 1 : for i := 0; i < numEntriesToPop; i++ {
269 1 : // Now that we've synced the entry, we can unref it to signal that we
270 1 : // will not read the written byte slice again.
271 1 : if b[i].refCount != nil {
272 1 : b[i].refCount.Unref()
273 1 : }
274 1 : if b[i].opts.Done != nil {
275 1 : numSyncsPopped++
276 1 : if err != nil {
277 1 : *b[i].opts.Err = err
278 1 : }
279 1 : b[i].opts.Done.Done()
280 1 : latency := nowUnixNanos - b[i].writeStartUnixNanos
281 1 : if !addLatencySample {
282 1 : addLatencySample = true
283 1 : maxLatencyNanos = latency
284 1 : } else if maxLatencyNanos < latency {
285 0 : maxLatencyNanos = latency
286 0 : }
287 : }
288 : }
289 1 : if addLatencySample {
290 1 : if maxLatencyNanos < 0 {
291 0 : maxLatencyNanos = 0
292 0 : }
293 1 : q.failoverWriteAndSyncLatency.Observe(float64(maxLatencyNanos))
294 : }
295 1 : return numSyncsPopped
296 : }
297 :
298 : func (q *recordQueue) snapshotAndSwitchWriter(
299 : writer *record.LogWriter,
300 : snapshotFunc func(firstIndex uint32, entries []recordQueueEntry) (logSize int64),
301 1 : ) {
302 1 : q.mu.Lock()
303 1 : defer q.mu.Unlock()
304 1 : q.writer = writer
305 1 : h, t := unpackHeadTail(q.headTail.Load())
306 1 : n := h - t
307 1 : if n > 0 {
308 1 : m := uint32(len(q.buffer))
309 1 : b := make([]recordQueueEntry, n)
310 1 : for i := t; i < h; i++ {
311 1 : b[i-t] = q.buffer[i%m]
312 1 : }
313 1 : q.lastLogSize = snapshotFunc(t, b)
314 : }
315 : }
316 :
317 : // getLastIndex is used by failoverWriter.Close.
318 1 : func (q *recordQueue) getLastIndex() (lastIndex int64) {
319 1 : h, _ := unpackHeadTail(q.headTail.Load())
320 1 : return int64(h) - 1
321 1 : }
322 :
323 : const headTailBits = 32
324 :
325 1 : func unpackHeadTail(ht uint64) (head, tail uint32) {
326 1 : const mask = 1<<headTailBits - 1
327 1 : head = uint32((ht >> headTailBits) & mask)
328 1 : tail = uint32(ht & mask)
329 1 : return head, tail
330 1 : }
331 :
332 : // Maximum number of physical log files when writing a virtual WAL. Arbitrarily
333 : // chosen value. Setting this to 2 will not simplify the code. We make this a
334 : // constant since we want a fixed size array for writer.writers.
335 : const maxPhysicalLogs = 10
336 :
337 : // failoverWriter is the implementation of Writer in failover mode. No Writer
338 : // method blocks for IO, except for Close.
339 : //
340 : // Loosely speaking, Close blocks until all records are successfully written
341 : // and synced to some log writer. Monitoring of log writer latency and errors
342 : // continues after Close is called, which means failoverWriter can be switched
343 : // to a new log writer after Close is called, to unblock Close.
344 : //
345 : // More precisely, Close does not block if there is an error in creating or
346 : // closing the latest LogWriter when close was called. This is because errors
347 : // are considered indicative of misconfiguration, and the user of
348 : // failoverWriter can dampen switching when observing errors (e.g. see
349 : // failoverMonitor), so close does not assume any liveness of calls to
350 : // switchToNewDir when such errors occur. Since the caller (see db.go) treats
351 : // an error on Writer.Close as fatal, this does mean that failoverWriter has
352 : // limited ability to mask errors (its primary task is to mask high latency).
353 : type failoverWriter struct {
354 : opts failoverWriterOpts
355 : q recordQueue
356 : mu struct {
357 : sync.Mutex
358 : // writers is protected by mu, except for updates to the
359 : // latencyAndErrorRecorder field. WriteRecord does not acquire mu, so the
360 : // protection by mu is for handling concurrent calls to switchToNewDir,
361 : // Close, and getLog.
362 : writers [maxPhysicalLogs]logWriterAndRecorder
363 :
364 : // cond is signaled when the latest LogWriter is set in writers (or there
365 : // is a creation error), or when the latest LogWriter is successfully
366 : // closed. It is waited on in Close. We don't use channels and select
367 : // since what Close is waiting on is dynamic based on the local state in
368 : // Close, so using Cond is simpler.
369 : cond *sync.Cond
370 : // nextWriterIndex is advanced before creating the *LogWriter. That is, a
371 : // slot is reserved by taking the current value of nextWriterIndex and
372 : // incrementing it, and then the *LogWriter for that slot is created. When
373 : // newFailoverWriter returns, nextWriterIndex = 1.
374 : //
375 : // The latest *LogWriter is (will be) at nextWriterIndex-1.
376 : //
377 : // INVARIANT: nextWriterIndex <= len(writers)
378 : nextWriterIndex LogNameIndex
379 : closed bool
380 : // metrics is initialized in Close. Currently we just use the metrics from
381 : // the latest writer after it is closed, since in the common case with
382 : // only one writer, that writer's flush loop will have finished and the
383 : // metrics will be current. With multiple writers, these metrics can be
384 : // quite inaccurate. The WriteThroughput metric includes an IdleDuration,
385 : // which can be high for a writer that was switched away from, and
386 : // therefore not indicative of overall work being done by the
387 : // failoverWriter. The PendingBufferLen and SyncQueueLen are similarly
388 : // inaccurate once there is no more work being given to a writer. We could
389 : // add a method to LogWriter to stop sampling metrics when it is not the
390 : // latest writer. Then we could aggregate all these metrics across all
391 : // writers.
392 : //
393 : // Note that CockroachDB does not use these metrics in any meaningful way.
394 : //
395 : // TODO(sumeer): do the improved solution outlined above.
396 : metrics record.LogWriterMetrics
397 : }
398 : // State for computing logical offset. The cumulative offset state is in
399 : // offset. Each time we call SyncRecordGeneralized from WriteRecord, we
400 : // compute the delta from the size returned by this LogWriter now, and the
401 : // size returned by this LogWriter in the previous call to
402 : // SyncRecordGeneralized. That previous call to SyncRecordGeneralized may
403 : // have happened from WriteRecord, or asynchronously during a switch. So
404 : // that previous call state requires synchronization and is maintained in
405 : // recordQueue. The offset is incremented by this delta without any
406 : // synchronization, since we rely on external synchronization (like the
407 : // standaloneWriter).
408 : logicalOffset struct {
409 : latestWriterInWriteRecord *record.LogWriter
410 : latestLogSizeInWriteRecord int64
411 : offset int64
412 : // Transitions once from false => true when there is a non-nil writer.
413 : notEstimatedOffset bool
414 : }
415 : psiForWriteRecordBacking record.PendingSyncIndex
416 : psiForSwitchBacking record.PendingSyncIndex
417 : }
418 :
419 : type logWriterAndRecorder struct {
420 : // This may never become non-nil, if when the LogWriter was finally created,
421 : // it was no longer the latest writer. Additionally, if there was an error
422 : // in creating the writer, w will remain nil and createError will be set.
423 : w *record.LogWriter
424 : // createError is set if there is an error creating the writer. This is
425 : // useful in Close since we need to know when the work for creating the
426 : // latest writer is done, whether it resulted in success or not.
427 : createError error
428 : r latencyAndErrorRecorder
429 :
430 : // dir, approxFileSize, synchronouslyClosed are kept for initializing
431 : // segmentWithSizeEtc. The approxFileSize is initially set to whatever is
432 : // returned by logCreator. When failoverWriter.Close is called,
433 : // approxFileSize and synchronouslyClosed may be updated.
434 : dir Dir
435 : approxFileSize uint64
436 : synchronouslyClosed bool
437 : }
438 :
439 : var _ Writer = &failoverWriter{}
440 :
441 : var _ switchableWriter = &failoverWriter{}
442 :
443 : type failoverWriterOpts struct {
444 : wn NumWAL
445 : logger base.Logger
446 : timeSource
447 : jobID int
448 : logCreator
449 :
450 : // Options that feed into SyncingFileOptions.
451 : noSyncOnClose bool
452 : bytesPerSync int
453 : preallocateSize func() int
454 :
455 : // Options for record.LogWriter.
456 : minSyncInterval func() time.Duration
457 : fsyncLatency prometheus.Histogram
458 : queueSemChan chan struct{}
459 : stopper *stopper
460 :
461 : failoverWriteAndSyncLatency prometheus.Histogram
462 : writerClosed func(logicalLogWithSizesEtc)
463 :
464 : writerCreatedForTest chan<- struct{}
465 : }
466 :
467 : func simpleLogCreator(
468 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
469 1 : ) (f vfs.File, initialFileSize uint64, err error) {
470 1 : filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
471 1 : // Create file.
472 1 : r.writeStart()
473 1 : f, err = dir.FS.Create(filename, "pebble-wal")
474 1 : r.writeEnd(err)
475 1 : return f, 0, err
476 1 : }
477 :
478 : type logCreator func(
479 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
480 : ) (f vfs.File, initialFileSize uint64, err error)
481 :
482 : func newFailoverWriter(
483 : opts failoverWriterOpts, initialDir dirAndFileHandle,
484 1 : ) (*failoverWriter, error) {
485 1 : ww := &failoverWriter{
486 1 : opts: opts,
487 1 : }
488 1 : ww.q.init(opts.failoverWriteAndSyncLatency)
489 1 : ww.mu.cond = sync.NewCond(&ww.mu)
490 1 : // The initial record.LogWriter creation also happens via a
491 1 : // switchToNewWriter since we don't want it to block newFailoverWriter.
492 1 : err := ww.switchToNewDir(initialDir)
493 1 : if err != nil {
494 0 : // Switching limit cannot be exceeded when creating.
495 0 : panic(err)
496 : }
497 1 : return ww, nil
498 : }
499 :
500 : // WriteRecord implements Writer.
501 : func (ww *failoverWriter) WriteRecord(
502 : p []byte, opts SyncOptions, ref RefCount,
503 1 : ) (logicalOffset int64, err error) {
504 1 : if ref != nil {
505 1 : ref.Ref()
506 1 : }
507 1 : var writeStartUnixNanos int64
508 1 : if opts.Done != nil {
509 1 : writeStartUnixNanos = time.Now().UnixNano()
510 1 : }
511 1 : recordIndex, writer, lastLogSize := ww.q.push(
512 1 : p,
513 1 : opts,
514 1 : ref,
515 1 : writeStartUnixNanos,
516 1 : ww.logicalOffset.latestLogSizeInWriteRecord,
517 1 : ww.logicalOffset.latestWriterInWriteRecord,
518 1 : )
519 1 : if writer == nil {
520 1 : // Don't have a record.LogWriter yet, so use an estimate. This estimate
521 1 : // will get overwritten.
522 1 : ww.logicalOffset.offset += int64(len(p))
523 1 : return ww.logicalOffset.offset, nil
524 1 : }
525 : // INVARIANT: writer != nil.
526 1 : notEstimatedOffset := ww.logicalOffset.notEstimatedOffset
527 1 : if !notEstimatedOffset {
528 1 : ww.logicalOffset.notEstimatedOffset = true
529 1 : }
530 1 : ww.psiForWriteRecordBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
531 1 : if opts.Done != nil {
532 1 : ww.psiForWriteRecordBacking.Index = int64(recordIndex)
533 1 : }
534 1 : ww.logicalOffset.latestLogSizeInWriteRecord, err = writer.SyncRecordGeneralized(p, &ww.psiForWriteRecordBacking)
535 1 : ww.logicalOffset.latestWriterInWriteRecord = writer
536 1 : if notEstimatedOffset {
537 1 : delta := ww.logicalOffset.latestLogSizeInWriteRecord - lastLogSize
538 1 : ww.logicalOffset.offset += delta
539 1 : } else {
540 1 : // Overwrite the estimate. This is a best-effort improvement in that it is
541 1 : // accurate for the common case where writer is the first LogWriter.
542 1 : // Consider a failover scenario where there was no LogWriter for the first
543 1 : // 10 records, so they are all accumulated as an estimate. Then the first
544 1 : // LogWriter successfully writes and syncs the first 5 records and gets
545 1 : // stuck. A switch happens to a second LogWriter that is handed the
546 1 : // remaining 5 records, and the 11th record arrives via a WriteRecord.
547 1 : // The transition from !notEstimatedOffset to notEstimatedOffset will
548 1 : // happen on this 11th record, and the logic here will use the length of
549 1 : // the second LogWriter, that does not reflect the full length.
550 1 : //
551 1 : // TODO(sumeer): try to make this more correct, without adding much more
552 1 : // complexity, and without adding synchronization.
553 1 : ww.logicalOffset.offset = ww.logicalOffset.latestLogSizeInWriteRecord
554 1 : }
555 1 : return ww.logicalOffset.offset, err
556 : }
557 :
558 : // switchToNewDir starts switching to dir. It implements switchableWriter. All
559 : // work is async, and a non-nil error is returned only if the switching limit
560 : // is exceeded.
561 1 : func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
562 1 : ww.mu.Lock()
563 1 : // Can have a late switchToNewDir call is the failoverMonitor has not yet
564 1 : // been told that the writer is closed. Ignore.
565 1 : if ww.mu.closed {
566 1 : ww.mu.Unlock()
567 1 : if ww.opts.writerCreatedForTest != nil {
568 1 : ww.opts.writerCreatedForTest <- struct{}{}
569 1 : }
570 1 : return nil
571 : }
572 : // writerIndex is the slot for this writer.
573 1 : writerIndex := ww.mu.nextWriterIndex
574 1 : if int(writerIndex) == len(ww.mu.writers) {
575 1 : ww.mu.Unlock()
576 1 : return errors.Errorf("exceeded switching limit")
577 1 : }
578 1 : ww.mu.writers[writerIndex].dir = dir.Dir
579 1 : ww.mu.nextWriterIndex++
580 1 : ww.mu.Unlock()
581 1 :
582 1 : // Creation is async.
583 1 : ww.opts.stopper.runAsync(func() {
584 1 : recorderAndWriter := &ww.mu.writers[writerIndex].r
585 1 : recorderAndWriter.ts = ww.opts.timeSource
586 1 : file, initialFileSize, err := ww.opts.logCreator(
587 1 : dir.Dir, ww.opts.wn, writerIndex, recorderAndWriter, ww.opts.jobID)
588 1 : ww.mu.writers[writerIndex].approxFileSize = initialFileSize
589 1 : // handleErrFunc is called when err != nil. It handles the multiple IO error
590 1 : // cases below.
591 1 : handleErrFunc := func(err error) {
592 1 : if file != nil {
593 1 : file.Close()
594 1 : }
595 1 : ww.mu.Lock()
596 1 : defer ww.mu.Unlock()
597 1 : ww.mu.writers[writerIndex].createError = err
598 1 : ww.mu.cond.Signal()
599 1 : if ww.opts.writerCreatedForTest != nil {
600 1 : ww.opts.writerCreatedForTest <- struct{}{}
601 1 : }
602 : }
603 1 : if err != nil {
604 1 : handleErrFunc(err)
605 1 : return
606 1 : }
607 : // Sync dir.
608 1 : recorderAndWriter.writeStart()
609 1 : err = dir.Sync()
610 1 : recorderAndWriter.writeEnd(err)
611 1 : if err != nil {
612 1 : handleErrFunc(err)
613 1 : return
614 1 : }
615 : // Wrap in a syncingFile.
616 1 : syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
617 1 : NoSyncOnClose: ww.opts.noSyncOnClose,
618 1 : BytesPerSync: ww.opts.bytesPerSync,
619 1 : PreallocateSize: ww.opts.preallocateSize(),
620 1 : })
621 1 : // Wrap in the latencyAndErrorRecorder.
622 1 : recorderAndWriter.setWriter(syncingFile)
623 1 :
624 1 : // Using NumWAL as the DiskFileNum is fine since it is used only as
625 1 : // EOF trailer for safe log recycling. Even though many log files can
626 1 : // map to a single NumWAL, a file used for NumWAL n at index m will
627 1 : // never get recycled for NumWAL n at a later index (since recycling
628 1 : // happens when n as a whole is obsolete).
629 1 : w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn),
630 1 : record.LogWriterConfig{
631 1 : WALMinSyncInterval: ww.opts.minSyncInterval,
632 1 : WALFsyncLatency: ww.opts.fsyncLatency,
633 1 : QueueSemChan: ww.opts.queueSemChan,
634 1 : ExternalSyncQueueCallback: ww.doneSyncCallback,
635 1 : })
636 1 : closeWriter := func() bool {
637 1 : ww.mu.Lock()
638 1 : defer ww.mu.Unlock()
639 1 : if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed {
640 1 : // Not the latest writer or the writer was closed while this async
641 1 : // creation was ongoing.
642 1 : if ww.opts.writerCreatedForTest != nil {
643 1 : ww.opts.writerCreatedForTest <- struct{}{}
644 1 : }
645 1 : return true
646 : }
647 : // Latest writer.
648 1 : ww.mu.writers[writerIndex].w = w
649 1 : ww.mu.cond.Signal()
650 1 : // NB: snapshotAndSwitchWriter does not block on IO, since
651 1 : // SyncRecordGeneralized does no IO.
652 1 : ww.q.snapshotAndSwitchWriter(w,
653 1 : func(firstIndex uint32, entries []recordQueueEntry) (logSize int64) {
654 1 : for i := range entries {
655 1 : ww.psiForSwitchBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
656 1 : if entries[i].opts.Done != nil {
657 1 : ww.psiForSwitchBacking.Index = int64(firstIndex) + int64(i)
658 1 : }
659 1 : var err error
660 1 : logSize, err = w.SyncRecordGeneralized(entries[i].p, &ww.psiForSwitchBacking)
661 1 : if err != nil {
662 0 : // TODO(sumeer): log periodically. The err will also surface via
663 0 : // the latencyAndErrorRecorder, so if a switch is possible, it
664 0 : // will be done.
665 0 : ww.opts.logger.Errorf("%s", err)
666 0 : }
667 : }
668 1 : return logSize
669 : })
670 1 : if ww.opts.writerCreatedForTest != nil {
671 1 : ww.opts.writerCreatedForTest <- struct{}{}
672 1 : }
673 1 : return false
674 : }()
675 1 : if closeWriter {
676 1 : // Never wrote anything to this writer so don't care about the
677 1 : // returned error.
678 1 : ww.opts.stopper.runAsync(func() {
679 1 : _ = w.Close()
680 1 : // TODO(sumeer): consider deleting this file too, since
681 1 : // failoverWriter.Close may not wait for it. This is going to be
682 1 : // extremely rare, so the risk of garbage empty files piling up is
683 1 : // extremely low. Say failover happens daily and of those cases we
684 1 : // have to be very unlucky and the close happens while a failover was
685 1 : // ongoing and the previous LogWriter successfully wrote everything
686 1 : // (say 1% probability if we want to be pessimistic). A garbage file
687 1 : // every 100 days. Restarts will delete that garbage.
688 1 : })
689 : }
690 : })
691 1 : return nil
692 : }
693 :
694 : // doneSyncCallback is the record.ExternalSyncQueueCallback called by
695 : // record.LogWriter.
696 : //
697 : // recordQueue is popped from only when some work requests a sync (and
698 : // successfully syncs). In the worst case, if no syncs are requested, we could
699 : // queue all the records needed to fill up a memtable in the recordQueue. This
700 : // can have two negative effects: (a) in the case of failover, we need to
701 : // replay all the data in the current mutable memtable, which takes more time,
702 : // (b) the memory usage is proportional to the size of the memtable. We ignore
703 : // these negatives since, (a) users like CockroachDB regularly sync, and (b)
704 : // the default memtable size is only 64MB.
705 1 : func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) {
706 1 : if err != nil {
707 1 : // Don't pop anything since we can retry after switching to a new
708 1 : // LogWriter.
709 1 : return
710 1 : }
711 : // NB: harmless after Close returns since numSyncsPopped will be 0.
712 1 : numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err)
713 1 : if ww.opts.queueSemChan != nil {
714 1 : for i := 0; i < numSyncsPopped; i++ {
715 1 : <-ww.opts.queueSemChan
716 1 : }
717 : }
718 : }
719 :
720 : // ongoingLatencyOrErrorForCurDir implements switchableWriter.
721 1 : func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) {
722 1 : r := ww.recorderForCurDir()
723 1 : if r == nil {
724 0 : return 0, nil
725 0 : }
726 1 : return r.ongoingLatencyOrError()
727 : }
728 :
729 : // For internal use and testing.
730 1 : func (ww *failoverWriter) recorderForCurDir() *latencyAndErrorRecorder {
731 1 : ww.mu.Lock()
732 1 : defer ww.mu.Unlock()
733 1 : if ww.mu.closed {
734 0 : return nil
735 0 : }
736 1 : return &ww.mu.writers[ww.mu.nextWriterIndex-1].r
737 : }
738 :
739 : // Close implements Writer.
740 : //
741 : // NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be
742 : // called after Close is called, and there is also a possibility that they get
743 : // called after Close returns and before failoverMonitor knows that the
744 : // failoverWriter is closed.
745 : //
746 : // doneSyncCallback can be called anytime after Close returns since there
747 : // could be stuck writes that finish arbitrarily later.
748 : //
749 : // See the long comment about Close behavior where failoverWriter is declared.
750 1 : func (ww *failoverWriter) Close() (logicalOffset int64, err error) {
751 1 : offset, err := ww.closeInternal()
752 1 : ww.opts.writerClosed(ww.getLog())
753 1 : return offset, err
754 1 : }
755 :
756 1 : func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
757 1 : logicalOffset = ww.logicalOffset.offset
758 1 : // [0, closeCalledCount) have had LogWriter.Close called (though may not
759 1 : // have finished) or the LogWriter will never be non-nil. Either way, they
760 1 : // have been "processed".
761 1 : closeCalledCount := LogNameIndex(0)
762 1 : // lastWriterState is the state for the last writer, for which we are
763 1 : // waiting for LogWriter.Close to finish or for creation to be unsuccessful.
764 1 : // What is considered the last writer can change. All state is protected by
765 1 : // ww.mu.
766 1 : type lastWriterState struct {
767 1 : index LogNameIndex
768 1 : closed bool
769 1 : err error
770 1 : metrics record.LogWriterMetrics
771 1 : }
772 1 : var lastWriter lastWriterState
773 1 : lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
774 1 : ww.mu.Lock()
775 1 : defer ww.mu.Unlock()
776 1 : // Every iteration starts and ends with the mutex held.
777 1 : //
778 1 : // Invariant: ww.mu.nextWriterIndex >= 1.
779 1 : //
780 1 : // We will loop until we have closed the lastWriter (and use
781 1 : // lastWriter.err). We also need to call close on all LogWriters
782 1 : // that will not close themselves, i.e., those that have already been
783 1 : // created and installed in failoverWriter.writers (this set may change
784 1 : // while failoverWriter.Close runs).
785 1 : for !lastWriter.closed {
786 1 : numWriters := ww.mu.nextWriterIndex
787 1 : if numWriters > closeCalledCount {
788 1 : // INVARIANT: numWriters > closeCalledCount.
789 1 : lastWriter = lastWriterState{
790 1 : index: numWriters - 1,
791 1 : }
792 1 : // Try to process [closeCalledCount, numWriters). Will surely process
793 1 : // [closeCalledCount, numWriters-1), since those writers are either done
794 1 : // initializing, or will close themselves. The writer at numWriters-1 we
795 1 : // can only process if it is done initializing, else we will iterate
796 1 : // again.
797 1 : for i := closeCalledCount; i < numWriters; i++ {
798 1 : w := ww.mu.writers[i].w
799 1 : cErr := ww.mu.writers[i].createError
800 1 : // Is the current index the last writer. If yes, this is also the last
801 1 : // loop iteration.
802 1 : isLastWriter := i == lastWriter.index
803 1 : if w != nil {
804 1 : // Can close it, so extend closeCalledCount.
805 1 : closeCalledCount = i + 1
806 1 : size := uint64(w.Size())
807 1 : if ww.mu.writers[i].approxFileSize < size {
808 1 : ww.mu.writers[i].approxFileSize = size
809 1 : }
810 1 : if isLastWriter {
811 1 : // We may care about its error and when it finishes closing.
812 1 : index := i
813 1 : ww.opts.stopper.runAsync(func() {
814 1 : // Last writer(s) (since new writers can be created and become
815 1 : // last, as we iterate) are guaranteed to have seen the last
816 1 : // record (since it was queued before Close was called). It is
817 1 : // possible that a writer got created after the last record was
818 1 : // dequeued and before this fact was realized by Close. In that
819 1 : // case we will harmlessly tell it that it synced that last
820 1 : // record, though it has already been written and synced by
821 1 : // another writer.
822 1 : err := w.CloseWithLastQueuedRecord(lastRecordIndex)
823 1 : ww.mu.Lock()
824 1 : defer ww.mu.Unlock()
825 1 : if lastWriter.index == index {
826 1 : lastWriter.closed = true
827 1 : lastWriter.err = err
828 1 : lastWriter.metrics = w.Metrics()
829 1 : ww.mu.cond.Signal()
830 1 : }
831 : })
832 1 : } else {
833 1 : // Don't care about the returned error since all the records we
834 1 : // relied on this writer for were already successfully written.
835 1 : ww.opts.stopper.runAsync(func() {
836 1 : _ = w.CloseWithLastQueuedRecord(record.PendingSyncIndex{Index: record.NoSyncIndex})
837 1 : })
838 : }
839 1 : } else if cErr != nil {
840 1 : // Have processed it, so extend closeCalledCount.
841 1 : closeCalledCount = i + 1
842 1 : if isLastWriter {
843 1 : lastWriter.closed = true
844 1 : lastWriter.err = cErr
845 1 : lastWriter.metrics = record.LogWriterMetrics{}
846 1 : }
847 : // Else, ignore.
848 1 : } else {
849 1 : if !isLastWriter {
850 1 : // Not last writer, so will close itself.
851 1 : closeCalledCount = i + 1
852 1 : }
853 : // Else, last writer, so we may have to close it.
854 : }
855 : }
856 : }
857 1 : if !lastWriter.closed {
858 1 : // Either waiting for creation of last writer or waiting for the close
859 1 : // to finish, or something else to become the last writer.
860 1 : ww.mu.cond.Wait()
861 1 : }
862 : }
863 1 : if ww.mu.writers[lastWriter.index].w != nil {
864 1 : // This permits log recycling.
865 1 : ww.mu.writers[lastWriter.index].synchronouslyClosed = true
866 1 : }
867 1 : err = lastWriter.err
868 1 : ww.mu.metrics = lastWriter.metrics
869 1 : ww.mu.closed = true
870 1 : _, _ = ww.q.popAll(err)
871 1 : return logicalOffset, err
872 : }
873 :
874 : // Metrics implements writer.
875 1 : func (ww *failoverWriter) Metrics() record.LogWriterMetrics {
876 1 : ww.mu.Lock()
877 1 : defer ww.mu.Unlock()
878 1 : return ww.mu.metrics
879 1 : }
880 :
881 : // getLog can be called at any time, including after Close returns.
882 1 : func (ww *failoverWriter) getLog() logicalLogWithSizesEtc {
883 1 : ww.mu.Lock()
884 1 : defer ww.mu.Unlock()
885 1 : ll := logicalLogWithSizesEtc{
886 1 : num: ww.opts.wn,
887 1 : }
888 1 : for i := range ww.mu.writers {
889 1 : if ww.mu.writers[i].w != nil {
890 1 : ll.segments = append(ll.segments, segmentWithSizeEtc{
891 1 : segment: segment{
892 1 : logNameIndex: LogNameIndex(i),
893 1 : dir: ww.mu.writers[i].dir,
894 1 : },
895 1 : approxFileSize: ww.mu.writers[i].approxFileSize,
896 1 : synchronouslyClosed: ww.mu.writers[i].synchronouslyClosed,
897 1 : })
898 1 : }
899 : }
900 1 : return ll
901 : }
902 :
903 : // latencyAndErrorRecorder records ongoing write and sync operations and errors
904 : // in those operations. record.LogWriter cannot continue functioning after any
905 : // error, so all errors are considered permanent.
906 : //
907 : // writeStart/writeEnd are used directly when creating a file. After the file
908 : // is successfully created, setWriter turns latencyAndErrorRecorder into an
909 : // implementation of writerSyncerCloser that will record for the Write and
910 : // Sync methods.
911 : type latencyAndErrorRecorder struct {
912 : ts timeSource
913 : ongoingOperationStart atomic.Int64
914 : error atomic.Pointer[error]
915 : writerSyncerCloser
916 : }
917 :
918 : type writerSyncerCloser interface {
919 : io.Writer
920 : io.Closer
921 : Sync() error
922 : }
923 :
924 1 : func (r *latencyAndErrorRecorder) writeStart() {
925 1 : r.ongoingOperationStart.Store(r.ts.now().UnixNano())
926 1 : }
927 :
928 1 : func (r *latencyAndErrorRecorder) writeEnd(err error) {
929 1 : if err != nil {
930 1 : ptr := &err
931 1 : r.error.Store(ptr)
932 1 : }
933 1 : r.ongoingOperationStart.Store(0)
934 : }
935 :
936 1 : func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) {
937 1 : r.writerSyncerCloser = w
938 1 : }
939 :
940 1 : func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) {
941 1 : startTime := r.ongoingOperationStart.Load()
942 1 : var latency time.Duration
943 1 : if startTime != 0 {
944 1 : l := r.ts.now().UnixNano() - startTime
945 1 : if l < 0 {
946 0 : l = 0
947 0 : }
948 1 : latency = time.Duration(l)
949 : }
950 1 : errPtr := r.error.Load()
951 1 : var err error
952 1 : if errPtr != nil {
953 1 : err = *errPtr
954 1 : }
955 1 : return latency, err
956 : }
957 :
958 : // Sync implements writerSyncerCloser.
959 1 : func (r *latencyAndErrorRecorder) Sync() error {
960 1 : r.writeStart()
961 1 : err := r.writerSyncerCloser.Sync()
962 1 : r.writeEnd(err)
963 1 : return err
964 1 : }
965 :
966 : // Write implements io.Writer.
967 1 : func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) {
968 1 : r.writeStart()
969 1 : n, err = r.writerSyncerCloser.Write(p)
970 1 : r.writeEnd(err)
971 1 : return n, err
972 1 : }
|