Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "io"
9 : "sync"
10 : "sync/atomic"
11 : "time"
12 :
13 : "github.com/cockroachdb/crlib/crtime"
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/prometheus/client_golang/prometheus"
19 : )
20 :
21 : // recordQueueEntry is an entry in recordQueue.
22 : type recordQueueEntry struct {
23 : p []byte
24 : opts SyncOptions
25 : refCount RefCount
26 : writeStart crtime.Mono
27 : }
28 :
29 : type poppedEntry struct {
30 : opts SyncOptions
31 : refCount RefCount
32 : writeStart crtime.Mono
33 : }
34 :
35 : const initialBufferLen = 8192
36 :
37 : // recordQueue is a variable-size single-producer multiple-consumer queue. It
38 : // is not lock-free, but most operations only need mu.RLock. It needs a mutex
39 : // to grow the size, since there is no upper bound on the number of queued
40 : // records (which are all the records that are not synced, and will need to be
41 : // written again in case of failover). Additionally, it needs a mutex to
42 : // atomically grab a snapshot of the queued records and provide them to a new
43 : // LogWriter that is being switched to.
44 : type recordQueue struct {
45 : // Only held for reading for all pop operations and most push operations.
46 : // Held for writing when buffer needs to be grown or when switching to a new
47 : // writer.
48 : mu sync.RWMutex
49 :
50 : // queue is [tail, head). tail is the oldest entry and head is the index for
51 : // the next entry.
52 : //
53 : // Consumers: atomically read and write tail in pop. This is not the usual
54 : // kind of queue consumer since they already know the index that they are
55 : // popping exists, hence don't need to look at head.
56 : //
57 : // Producer: atomically reads tail in push. Writes to head.
58 : //
59 : // Based on the above we only need tail to be atomic. However, the producer
60 : // also populates entries in buffer, whose values need to be seen by the
61 : // consumers when doing a pop, which means they need to synchronize using a
62 : // release and acquire memory barrier pair, where the push does the release
63 : // and the pop does the acquire. For this reason we make head also atomic
64 : // and merge head and tail into a single atomic, so that the store of head
65 : // in push and the load of tail in pop accomplishes this release-acquire
66 : // pair.
67 : //
68 : // We initially implemented competition between multiple consumers solely
69 : // via atomic read-write of the tail using compare-and-swap (CAS). Since the
70 : // atomic read-write to tail in pop releases those buffer entries for reuse
71 : // to the producer, the consumer needs to grab the contents of
72 : // recordQueueEntry that it needs to do callbacks etc. (specifically the
73 : // contents corresponding to poppedEntry), *before* it succeeds with the
74 : // atomic read-write. This introduced a false data race in the Golang data
75 : // race detector
76 : // https://github.com/cockroachdb/pebble/issues/3380#issuecomment-1981188174.
77 : // Consider the case where the queue is [10,20), and consumer C1 is trying
78 : // to pop [10,12) and consumer C2 is trying to pop [10,14). The following
79 : // interleaving can happen:
80 : //
81 : // [C1] reads head=10, tail=20
82 : // [C2] reads head=10, tail=20
83 : // [C1] reads buffer contents [10,12) and makes local copy
84 : // [C1] CAS to make the queue [12,20)
85 : // [C2] reads buffer contents [10,14) and makes local copy, concurrently
86 : // with producer writing to 10, 11. *
87 : // [C2] CAS fails for popping [10,14), so revises to [12,14), and succeeds
88 : // in CAS. C2 only uses the contents it read into the local copy for
89 : // [12,14).
90 : //
91 : // * is a false data race since C2 is later deciding what contents it should
92 : // use from among the contents it read, based on what indices it
93 : // successfully popped. Unfortunately, we don't have a way to annotate the
94 : // code to tell the data race detector to ignore this false positive. So we
95 : // need to strengthen the synchronization to prevent such false positives.
96 : // We observe that usually a consumer will be popping a batch of entries
97 : // (based on a single successful fsync), and the number of consumers will be
98 : // small (usually 1). In comparison, producers can be highly concurrent (due
99 : // to workload concurrency). We don't want consumers to compete for a mutex
100 : // with producers, but we can afford to have multiple consumers compete for
101 : // a mutex. So we fix this false data race by using consumerMu to force
102 : // single-threaded popping.
103 : //
104 : // An alternative would be to pass the information contained in poppedEntry
105 : // to the LogWriter, so that it can pass it back when popping (so we don't
106 : // have to retrieve it from the recordQueue.buffer). We would still need
107 : // recordQueue.buffer, since writer switching needs those entries to be
108 : // replayed. We don't consider this solution for the same reason we replaced
109 : // record.pendingSyncsWithSyncQueue with
110 : // record.pendingSyncsWithHighestSyncIndex for the failoverWriter code path
111 : // -- we cannot bound the queue in the LogWriter by record.SyncConcurrency:
112 : // say SyncConcurrency was 4096, and LogWriter1's queue was full, and we
113 : // switched to LogWriter2, to which we replayed the same records and filled
114 : // up the queue. Then if LogWriter1 unblocks and pops all the 4096 entries,
115 : // the commit pipeline can send another 4096 entries, while LogWriter2 is
116 : // still blocked on trying to write and sync the previous 4096 entries. This
117 : // will overflow the queue in LogWriter2.
118 : //
119 : // All updates to headTail hold mu at least for reading. So when mu is held
120 : // for writing, there is a guarantee that headTail is not being updated.
121 : //
122 : // head is most-significant 32 bits and tail is least-significant 32 bits.
123 : headTail atomic.Uint64
124 :
125 : consumerMu sync.Mutex
126 :
127 : // Access to buffer requires at least RLock.
128 : buffer []recordQueueEntry
129 :
130 : lastTailObservedByProducer uint32
131 :
132 : // Read requires RLock.
133 : writer *record.LogWriter
134 :
135 : // When writer != nil, this is the return value of the last call to
136 : // SyncRecordGeneralized. It is updated in (a) WriteRecord calls push, using
137 : // only RLock (since WriteRecord is externally synchronized), (b)
138 : // snapshotAndSwitchWriter, using Lock. (b) excludes (a).
139 : lastLogSize int64
140 :
141 : failoverWriteAndSyncLatency prometheus.Histogram
142 : }
143 :
144 1 : func (q *recordQueue) init(failoverWriteAndSyncLatency prometheus.Histogram) {
145 1 : *q = recordQueue{
146 1 : buffer: make([]recordQueueEntry, initialBufferLen),
147 1 : failoverWriteAndSyncLatency: failoverWriteAndSyncLatency,
148 1 : }
149 1 : }
150 :
151 : // NB: externally synchronized, i.e., no concurrent push calls.
152 : func (q *recordQueue) push(
153 : p []byte,
154 : opts SyncOptions,
155 : refCount RefCount,
156 : writeStart crtime.Mono,
157 : latestLogSizeInWriteRecord int64,
158 : latestWriterInWriteRecord *record.LogWriter,
159 1 : ) (index uint32, writer *record.LogWriter, lastLogSize int64) {
160 1 : ht := q.headTail.Load()
161 1 : h, t := unpackHeadTail(ht)
162 1 : n := int(h - t)
163 1 : m := len(q.buffer)
164 1 : if m == n {
165 1 : // Full
166 1 : m = 2 * n
167 1 : newBuffer := make([]recordQueueEntry, m)
168 1 : for i := int(t); i < int(h); i++ {
169 1 : newBuffer[i%m] = q.buffer[i%n]
170 1 : }
171 1 : q.mu.Lock()
172 1 : q.buffer = newBuffer
173 1 : q.mu.Unlock()
174 : }
175 1 : q.mu.RLock()
176 1 : q.buffer[int(h)%m] = recordQueueEntry{
177 1 : p: p,
178 1 : opts: opts,
179 1 : refCount: refCount,
180 1 : writeStart: writeStart,
181 1 : }
182 1 : // Reclaim memory for consumed entries. We couldn't do that in pop since
183 1 : // multiple consumers are popping using CAS and that immediately transfers
184 1 : // ownership to the producer.
185 1 : for i := q.lastTailObservedByProducer; i < t; i++ {
186 1 : q.buffer[int(i)%m] = recordQueueEntry{}
187 1 : }
188 1 : q.lastTailObservedByProducer = t
189 1 : q.headTail.Add(1 << headTailBits)
190 1 : writer = q.writer
191 1 : if writer == latestWriterInWriteRecord {
192 1 : // WriteRecord has written to this writer since the switch.
193 1 : q.lastLogSize = latestLogSizeInWriteRecord
194 1 : }
195 : // Else writer is a new writer that was switched to, so ignore the
196 : // latestLogSizeInWriteRecord.
197 :
198 1 : lastLogSize = q.lastLogSize
199 1 : q.mu.RUnlock()
200 1 : return h, writer, lastLogSize
201 : }
202 :
203 1 : func (q *recordQueue) length() int {
204 1 : ht := q.headTail.Load()
205 1 : h, t := unpackHeadTail(ht)
206 1 : return int(h - t)
207 1 : }
208 :
209 : // Pops all entries. Must be called only after the last push returns.
210 1 : func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
211 1 : ht := q.headTail.Load()
212 1 : h, t := unpackHeadTail(ht)
213 1 : n := int(h - t)
214 1 : if n == 0 {
215 1 : return 0, 0
216 1 : }
217 1 : return n, q.pop(h-1, err)
218 : }
219 :
220 : // Pops all entries up to and including index. The remaining queue is
221 : // [index+1, head).
222 : //
223 : // NB: we could slightly simplify to only have the latest writer be able to
224 : // pop. This would avoid the CAS below, but it seems better to reduce the
225 : // amount of queued work regardless of who has successfully written it.
226 1 : func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) {
227 1 : now := crtime.NowMono()
228 1 : var buf [512]poppedEntry
229 1 : tailEntriesToPop := func() (t uint32, numEntriesToPop int) {
230 1 : ht := q.headTail.Load()
231 1 : _, t = unpackHeadTail(ht)
232 1 : tail := int(t)
233 1 : numEntriesToPop = int(index) - tail + 1
234 1 : return t, numEntriesToPop
235 1 : }
236 1 : q.consumerMu.Lock()
237 1 : // numEntriesToPop is a function of index and tail. The value of tail cannot
238 1 : // change since consumerMu is held.
239 1 : tail, numEntriesToPop := tailEntriesToPop()
240 1 : if numEntriesToPop <= 0 {
241 1 : q.consumerMu.Unlock()
242 1 : return 0
243 1 : }
244 1 : var b []poppedEntry
245 1 : if numEntriesToPop <= len(buf) {
246 1 : b = buf[:numEntriesToPop]
247 1 : } else {
248 1 : // Do allocation before acquiring the mutex.
249 1 : b = make([]poppedEntry, numEntriesToPop)
250 1 : }
251 1 : q.mu.RLock()
252 1 : n := len(q.buffer)
253 1 : for i := 0; i < numEntriesToPop; i++ {
254 1 : // Grab the popped entries before incrementing tail, since that will
255 1 : // release those buffer slots to the producer.
256 1 : idx := (i + int(tail)) % n
257 1 : b[i] = poppedEntry{
258 1 : opts: q.buffer[idx].opts,
259 1 : refCount: q.buffer[idx].refCount,
260 1 : writeStart: q.buffer[idx].writeStart,
261 1 : }
262 1 : }
263 : // Since tail cannot change, we don't need to do a compare-and-swap.
264 1 : q.headTail.Add(uint64(numEntriesToPop))
265 1 : q.mu.RUnlock()
266 1 : q.consumerMu.Unlock()
267 1 : addLatencySample := false
268 1 : var maxLatency time.Duration
269 1 : for i := 0; i < numEntriesToPop; i++ {
270 1 : // Now that we've synced the entry, we can unref it to signal that we
271 1 : // will not read the written byte slice again.
272 1 : if b[i].refCount != nil {
273 1 : b[i].refCount.Unref()
274 1 : }
275 1 : if b[i].opts.Done != nil {
276 1 : numSyncsPopped++
277 1 : if err != nil {
278 1 : *b[i].opts.Err = err
279 1 : }
280 1 : b[i].opts.Done.Done()
281 1 : latency := now.Sub(b[i].writeStart)
282 1 : if !addLatencySample {
283 1 : addLatencySample = true
284 1 : maxLatency = latency
285 1 : } else if maxLatency < latency {
286 0 : maxLatency = latency
287 0 : }
288 : }
289 : }
290 1 : if addLatencySample {
291 1 : if maxLatency < 0 {
292 0 : maxLatency = 0
293 0 : }
294 1 : q.failoverWriteAndSyncLatency.Observe(float64(maxLatency))
295 : }
296 1 : return numSyncsPopped
297 : }
298 :
299 : func (q *recordQueue) snapshotAndSwitchWriter(
300 : writer *record.LogWriter,
301 : snapshotFunc func(firstIndex uint32, entries []recordQueueEntry) (logSize int64),
302 1 : ) {
303 1 : q.mu.Lock()
304 1 : defer q.mu.Unlock()
305 1 : q.writer = writer
306 1 : h, t := unpackHeadTail(q.headTail.Load())
307 1 : n := h - t
308 1 : if n > 0 {
309 1 : m := uint32(len(q.buffer))
310 1 : b := make([]recordQueueEntry, n)
311 1 : for i := t; i < h; i++ {
312 1 : b[i-t] = q.buffer[i%m]
313 1 : }
314 1 : q.lastLogSize = snapshotFunc(t, b)
315 : }
316 : }
317 :
318 : // getLastIndex is used by failoverWriter.Close.
319 1 : func (q *recordQueue) getLastIndex() (lastIndex int64) {
320 1 : h, _ := unpackHeadTail(q.headTail.Load())
321 1 : return int64(h) - 1
322 1 : }
323 :
324 : const headTailBits = 32
325 :
326 1 : func unpackHeadTail(ht uint64) (head, tail uint32) {
327 1 : const mask = 1<<headTailBits - 1
328 1 : head = uint32((ht >> headTailBits) & mask)
329 1 : tail = uint32(ht & mask)
330 1 : return head, tail
331 1 : }
332 :
333 : // Maximum number of physical log files when writing a virtual WAL. Arbitrarily
334 : // chosen value. Setting this to 2 will not simplify the code. We make this a
335 : // constant since we want a fixed size array for writer.writers.
336 : const maxPhysicalLogs = 10
337 :
338 : // failoverWriter is the implementation of Writer in failover mode. No Writer
339 : // method blocks for IO, except for Close.
340 : //
341 : // Loosely speaking, Close blocks until all records are successfully written
342 : // and synced to some log writer. Monitoring of log writer latency and errors
343 : // continues after Close is called, which means failoverWriter can be switched
344 : // to a new log writer after Close is called, to unblock Close.
345 : //
346 : // More precisely, Close does not block if there is an error in creating or
347 : // closing the latest LogWriter when close was called. This is because errors
348 : // are considered indicative of misconfiguration, and the user of
349 : // failoverWriter can dampen switching when observing errors (e.g. see
350 : // failoverMonitor), so close does not assume any liveness of calls to
351 : // switchToNewDir when such errors occur. Since the caller (see db.go) treats
352 : // an error on Writer.Close as fatal, this does mean that failoverWriter has
353 : // limited ability to mask errors (its primary task is to mask high latency).
354 : type failoverWriter struct {
355 : opts failoverWriterOpts
356 : q recordQueue
357 : mu struct {
358 : sync.Mutex
359 : // writers is protected by mu, except for updates to the
360 : // latencyAndErrorRecorder field. WriteRecord does not acquire mu, so the
361 : // protection by mu is for handling concurrent calls to switchToNewDir,
362 : // Close, and getLog.
363 : writers [maxPhysicalLogs]logWriterAndRecorder
364 :
365 : // cond is signaled when the latest LogWriter is set in writers (or there
366 : // is a creation error), or when the latest LogWriter is successfully
367 : // closed. It is waited on in Close. We don't use channels and select
368 : // since what Close is waiting on is dynamic based on the local state in
369 : // Close, so using Cond is simpler.
370 : cond *sync.Cond
371 : // nextWriterIndex is advanced before creating the *LogWriter. That is, a
372 : // slot is reserved by taking the current value of nextWriterIndex and
373 : // incrementing it, and then the *LogWriter for that slot is created. When
374 : // newFailoverWriter returns, nextWriterIndex = 1.
375 : //
376 : // The latest *LogWriter is (will be) at nextWriterIndex-1.
377 : //
378 : // INVARIANT: nextWriterIndex <= len(writers)
379 : nextWriterIndex LogNameIndex
380 : closed bool
381 : // metrics is initialized in Close. Currently we just use the metrics from
382 : // the latest writer after it is closed, since in the common case with
383 : // only one writer, that writer's flush loop will have finished and the
384 : // metrics will be current. With multiple writers, these metrics can be
385 : // quite inaccurate. The WriteThroughput metric includes an IdleDuration,
386 : // which can be high for a writer that was switched away from, and
387 : // therefore not indicative of overall work being done by the
388 : // failoverWriter. The PendingBufferLen and SyncQueueLen are similarly
389 : // inaccurate once there is no more work being given to a writer. We could
390 : // add a method to LogWriter to stop sampling metrics when it is not the
391 : // latest writer. Then we could aggregate all these metrics across all
392 : // writers.
393 : //
394 : // Note that CockroachDB does not use these metrics in any meaningful way.
395 : //
396 : // TODO(sumeer): do the improved solution outlined above.
397 : metrics record.LogWriterMetrics
398 : }
399 : // State for computing logical offset. The cumulative offset state is in
400 : // offset. Each time we call SyncRecordGeneralized from WriteRecord, we
401 : // compute the delta from the size returned by this LogWriter now, and the
402 : // size returned by this LogWriter in the previous call to
403 : // SyncRecordGeneralized. That previous call to SyncRecordGeneralized may
404 : // have happened from WriteRecord, or asynchronously during a switch. So
405 : // that previous call state requires synchronization and is maintained in
406 : // recordQueue. The offset is incremented by this delta without any
407 : // synchronization, since we rely on external synchronization (like the
408 : // standaloneWriter).
409 : logicalOffset struct {
410 : latestWriterInWriteRecord *record.LogWriter
411 : latestLogSizeInWriteRecord int64
412 : offset int64
413 : // Transitions once from false => true when there is a non-nil writer.
414 : notEstimatedOffset bool
415 : }
416 : psiForWriteRecordBacking record.PendingSyncIndex
417 : psiForSwitchBacking record.PendingSyncIndex
418 : }
419 :
420 : type logWriterAndRecorder struct {
421 : // This may never become non-nil, if when the LogWriter was finally created,
422 : // it was no longer the latest writer. Additionally, if there was an error
423 : // in creating the writer, w will remain nil and createError will be set.
424 : w *record.LogWriter
425 : // createError is set if there is an error creating the writer. This is
426 : // useful in Close since we need to know when the work for creating the
427 : // latest writer is done, whether it resulted in success or not.
428 : createError error
429 : r latencyAndErrorRecorder
430 :
431 : // dir, approxFileSize, synchronouslyClosed are kept for initializing
432 : // segmentWithSizeEtc. The approxFileSize is initially set to whatever is
433 : // returned by logCreator. When failoverWriter.Close is called,
434 : // approxFileSize and synchronouslyClosed may be updated.
435 : dir Dir
436 : approxFileSize uint64
437 : synchronouslyClosed bool
438 : }
439 :
440 : var _ Writer = &failoverWriter{}
441 :
442 : var _ switchableWriter = &failoverWriter{}
443 :
444 : type failoverWriterOpts struct {
445 : wn NumWAL
446 : logger base.Logger
447 : timeSource
448 : jobID int
449 : logCreator
450 :
451 : // Options that feed into SyncingFileOptions.
452 : noSyncOnClose bool
453 : bytesPerSync int
454 : preallocateSize func() int
455 :
456 : // Options for record.LogWriter.
457 : minSyncInterval func() time.Duration
458 : fsyncLatency prometheus.Histogram
459 : queueSemChan chan struct{}
460 : stopper *stopper
461 :
462 : failoverWriteAndSyncLatency prometheus.Histogram
463 : writerClosed func(logicalLogWithSizesEtc)
464 :
465 : writerCreatedForTest chan<- struct{}
466 : }
467 :
468 : func simpleLogCreator(
469 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
470 1 : ) (f vfs.File, initialFileSize uint64, err error) {
471 1 : filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
472 1 : // Create file.
473 1 : r.writeStart()
474 1 : f, err = dir.FS.Create(filename, "pebble-wal")
475 1 : r.writeEnd(err)
476 1 : return f, 0, err
477 1 : }
478 :
479 : type logCreator func(
480 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
481 : ) (f vfs.File, initialFileSize uint64, err error)
482 :
483 : func newFailoverWriter(
484 : opts failoverWriterOpts, initialDir dirAndFileHandle,
485 1 : ) (*failoverWriter, error) {
486 1 : ww := &failoverWriter{
487 1 : opts: opts,
488 1 : }
489 1 : ww.q.init(opts.failoverWriteAndSyncLatency)
490 1 : ww.mu.cond = sync.NewCond(&ww.mu)
491 1 : // The initial record.LogWriter creation also happens via a
492 1 : // switchToNewWriter since we don't want it to block newFailoverWriter.
493 1 : err := ww.switchToNewDir(initialDir)
494 1 : if err != nil {
495 0 : // Switching limit cannot be exceeded when creating.
496 0 : panic(err)
497 : }
498 1 : return ww, nil
499 : }
500 :
501 : // WriteRecord implements Writer.
502 : func (ww *failoverWriter) WriteRecord(
503 : p []byte, opts SyncOptions, ref RefCount,
504 1 : ) (logicalOffset int64, err error) {
505 1 : if ref != nil {
506 1 : ref.Ref()
507 1 : }
508 1 : var writeStart crtime.Mono
509 1 : if opts.Done != nil {
510 1 : writeStart = crtime.NowMono()
511 1 : }
512 1 : recordIndex, writer, lastLogSize := ww.q.push(
513 1 : p,
514 1 : opts,
515 1 : ref,
516 1 : writeStart,
517 1 : ww.logicalOffset.latestLogSizeInWriteRecord,
518 1 : ww.logicalOffset.latestWriterInWriteRecord,
519 1 : )
520 1 : if writer == nil {
521 1 : // Don't have a record.LogWriter yet, so use an estimate. This estimate
522 1 : // will get overwritten.
523 1 : ww.logicalOffset.offset += int64(len(p))
524 1 : return ww.logicalOffset.offset, nil
525 1 : }
526 : // INVARIANT: writer != nil.
527 1 : notEstimatedOffset := ww.logicalOffset.notEstimatedOffset
528 1 : if !notEstimatedOffset {
529 1 : ww.logicalOffset.notEstimatedOffset = true
530 1 : }
531 1 : ww.psiForWriteRecordBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
532 1 : if opts.Done != nil {
533 1 : ww.psiForWriteRecordBacking.Index = int64(recordIndex)
534 1 : }
535 1 : ww.logicalOffset.latestLogSizeInWriteRecord, err = writer.SyncRecordGeneralized(p, &ww.psiForWriteRecordBacking)
536 1 : ww.logicalOffset.latestWriterInWriteRecord = writer
537 1 : if notEstimatedOffset {
538 1 : delta := ww.logicalOffset.latestLogSizeInWriteRecord - lastLogSize
539 1 : ww.logicalOffset.offset += delta
540 1 : } else {
541 1 : // Overwrite the estimate. This is a best-effort improvement in that it is
542 1 : // accurate for the common case where writer is the first LogWriter.
543 1 : // Consider a failover scenario where there was no LogWriter for the first
544 1 : // 10 records, so they are all accumulated as an estimate. Then the first
545 1 : // LogWriter successfully writes and syncs the first 5 records and gets
546 1 : // stuck. A switch happens to a second LogWriter that is handed the
547 1 : // remaining 5 records, and the 11th record arrives via a WriteRecord.
548 1 : // The transition from !notEstimatedOffset to notEstimatedOffset will
549 1 : // happen on this 11th record, and the logic here will use the length of
550 1 : // the second LogWriter, that does not reflect the full length.
551 1 : //
552 1 : // TODO(sumeer): try to make this more correct, without adding much more
553 1 : // complexity, and without adding synchronization.
554 1 : ww.logicalOffset.offset = ww.logicalOffset.latestLogSizeInWriteRecord
555 1 : }
556 1 : return ww.logicalOffset.offset, err
557 : }
558 :
559 : // switchToNewDir starts switching to dir. It implements switchableWriter. All
560 : // work is async, and a non-nil error is returned only if the switching limit
561 : // is exceeded.
562 1 : func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
563 1 : ww.mu.Lock()
564 1 : // Can have a late switchToNewDir call is the failoverMonitor has not yet
565 1 : // been told that the writer is closed. Ignore.
566 1 : if ww.mu.closed {
567 1 : ww.mu.Unlock()
568 1 : if ww.opts.writerCreatedForTest != nil {
569 1 : ww.opts.writerCreatedForTest <- struct{}{}
570 1 : }
571 1 : return nil
572 : }
573 : // writerIndex is the slot for this writer.
574 1 : writerIndex := ww.mu.nextWriterIndex
575 1 : if int(writerIndex) == len(ww.mu.writers) {
576 1 : ww.mu.Unlock()
577 1 : return errors.Errorf("exceeded switching limit")
578 1 : }
579 1 : ww.mu.writers[writerIndex].dir = dir.Dir
580 1 : ww.mu.nextWriterIndex++
581 1 : ww.mu.Unlock()
582 1 :
583 1 : // Creation is async.
584 1 : ww.opts.stopper.runAsync(func() {
585 1 : recorderAndWriter := &ww.mu.writers[writerIndex].r
586 1 : recorderAndWriter.ts = ww.opts.timeSource
587 1 : file, initialFileSize, err := ww.opts.logCreator(
588 1 : dir.Dir, ww.opts.wn, writerIndex, recorderAndWriter, ww.opts.jobID)
589 1 : ww.mu.writers[writerIndex].approxFileSize = initialFileSize
590 1 : // handleErrFunc is called when err != nil. It handles the multiple IO error
591 1 : // cases below.
592 1 : handleErrFunc := func(err error) {
593 1 : if file != nil {
594 1 : file.Close()
595 1 : }
596 1 : ww.mu.Lock()
597 1 : defer ww.mu.Unlock()
598 1 : ww.mu.writers[writerIndex].createError = err
599 1 : ww.mu.cond.Signal()
600 1 : if ww.opts.writerCreatedForTest != nil {
601 1 : ww.opts.writerCreatedForTest <- struct{}{}
602 1 : }
603 : }
604 1 : if err != nil {
605 1 : handleErrFunc(err)
606 1 : return
607 1 : }
608 : // Sync dir.
609 1 : recorderAndWriter.writeStart()
610 1 : err = dir.Sync()
611 1 : recorderAndWriter.writeEnd(err)
612 1 : if err != nil {
613 1 : handleErrFunc(err)
614 1 : return
615 1 : }
616 : // Wrap in a syncingFile.
617 1 : syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
618 1 : NoSyncOnClose: ww.opts.noSyncOnClose,
619 1 : BytesPerSync: ww.opts.bytesPerSync,
620 1 : PreallocateSize: ww.opts.preallocateSize(),
621 1 : })
622 1 : // Wrap in the latencyAndErrorRecorder.
623 1 : recorderAndWriter.setWriter(syncingFile)
624 1 :
625 1 : // Using NumWAL as the DiskFileNum is fine since it is used only as
626 1 : // EOF trailer for safe log recycling. Even though many log files can
627 1 : // map to a single NumWAL, a file used for NumWAL n at index m will
628 1 : // never get recycled for NumWAL n at a later index (since recycling
629 1 : // happens when n as a whole is obsolete).
630 1 : w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn),
631 1 : record.LogWriterConfig{
632 1 : WALMinSyncInterval: ww.opts.minSyncInterval,
633 1 : WALFsyncLatency: ww.opts.fsyncLatency,
634 1 : QueueSemChan: ww.opts.queueSemChan,
635 1 : ExternalSyncQueueCallback: ww.doneSyncCallback,
636 1 : })
637 1 : closeWriter := func() bool {
638 1 : ww.mu.Lock()
639 1 : defer ww.mu.Unlock()
640 1 : if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed {
641 1 : // Not the latest writer or the writer was closed while this async
642 1 : // creation was ongoing.
643 1 : if ww.opts.writerCreatedForTest != nil {
644 1 : ww.opts.writerCreatedForTest <- struct{}{}
645 1 : }
646 1 : return true
647 : }
648 : // Latest writer.
649 1 : ww.mu.writers[writerIndex].w = w
650 1 : ww.mu.cond.Signal()
651 1 : // NB: snapshotAndSwitchWriter does not block on IO, since
652 1 : // SyncRecordGeneralized does no IO.
653 1 : ww.q.snapshotAndSwitchWriter(w,
654 1 : func(firstIndex uint32, entries []recordQueueEntry) (logSize int64) {
655 1 : for i := range entries {
656 1 : ww.psiForSwitchBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
657 1 : if entries[i].opts.Done != nil {
658 1 : ww.psiForSwitchBacking.Index = int64(firstIndex) + int64(i)
659 1 : }
660 1 : var err error
661 1 : logSize, err = w.SyncRecordGeneralized(entries[i].p, &ww.psiForSwitchBacking)
662 1 : if err != nil {
663 0 : // TODO(sumeer): log periodically. The err will also surface via
664 0 : // the latencyAndErrorRecorder, so if a switch is possible, it
665 0 : // will be done.
666 0 : ww.opts.logger.Errorf("%s", err)
667 0 : }
668 : }
669 1 : return logSize
670 : })
671 1 : if ww.opts.writerCreatedForTest != nil {
672 1 : ww.opts.writerCreatedForTest <- struct{}{}
673 1 : }
674 1 : return false
675 : }()
676 1 : if closeWriter {
677 1 : // Never wrote anything to this writer so don't care about the
678 1 : // returned error.
679 1 : ww.opts.stopper.runAsync(func() {
680 1 : _ = w.Close()
681 1 : // TODO(sumeer): consider deleting this file too, since
682 1 : // failoverWriter.Close may not wait for it. This is going to be
683 1 : // extremely rare, so the risk of garbage empty files piling up is
684 1 : // extremely low. Say failover happens daily and of those cases we
685 1 : // have to be very unlucky and the close happens while a failover was
686 1 : // ongoing and the previous LogWriter successfully wrote everything
687 1 : // (say 1% probability if we want to be pessimistic). A garbage file
688 1 : // every 100 days. Restarts will delete that garbage.
689 1 : })
690 : }
691 : })
692 1 : return nil
693 : }
694 :
695 : // doneSyncCallback is the record.ExternalSyncQueueCallback called by
696 : // record.LogWriter.
697 : //
698 : // recordQueue is popped from only when some work requests a sync (and
699 : // successfully syncs). In the worst case, if no syncs are requested, we could
700 : // queue all the records needed to fill up a memtable in the recordQueue. This
701 : // can have two negative effects: (a) in the case of failover, we need to
702 : // replay all the data in the current mutable memtable, which takes more time,
703 : // (b) the memory usage is proportional to the size of the memtable. We ignore
704 : // these negatives since, (a) users like CockroachDB regularly sync, and (b)
705 : // the default memtable size is only 64MB.
706 1 : func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) {
707 1 : if err != nil {
708 1 : // Don't pop anything since we can retry after switching to a new
709 1 : // LogWriter.
710 1 : return
711 1 : }
712 : // NB: harmless after Close returns since numSyncsPopped will be 0.
713 1 : numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err)
714 1 : if ww.opts.queueSemChan != nil {
715 1 : for i := 0; i < numSyncsPopped; i++ {
716 1 : <-ww.opts.queueSemChan
717 1 : }
718 : }
719 : }
720 :
721 : // ongoingLatencyOrErrorForCurDir implements switchableWriter.
722 1 : func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) {
723 1 : r := ww.recorderForCurDir()
724 1 : if r == nil {
725 1 : return 0, nil
726 1 : }
727 1 : return r.ongoingLatencyOrError()
728 : }
729 :
730 : // For internal use and testing.
731 1 : func (ww *failoverWriter) recorderForCurDir() *latencyAndErrorRecorder {
732 1 : ww.mu.Lock()
733 1 : defer ww.mu.Unlock()
734 1 : if ww.mu.closed {
735 1 : return nil
736 1 : }
737 1 : return &ww.mu.writers[ww.mu.nextWriterIndex-1].r
738 : }
739 :
740 : // Close implements Writer.
741 : //
742 : // NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be
743 : // called after Close is called, and there is also a possibility that they get
744 : // called after Close returns and before failoverMonitor knows that the
745 : // failoverWriter is closed.
746 : //
747 : // doneSyncCallback can be called anytime after Close returns since there
748 : // could be stuck writes that finish arbitrarily later.
749 : //
750 : // See the long comment about Close behavior where failoverWriter is declared.
751 1 : func (ww *failoverWriter) Close() (logicalOffset int64, err error) {
752 1 : offset, err := ww.closeInternal()
753 1 : ww.opts.writerClosed(ww.getLog())
754 1 : return offset, err
755 1 : }
756 :
757 1 : func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
758 1 : logicalOffset = ww.logicalOffset.offset
759 1 : // [0, closeCalledCount) have had LogWriter.Close called (though may not
760 1 : // have finished) or the LogWriter will never be non-nil. Either way, they
761 1 : // have been "processed".
762 1 : closeCalledCount := LogNameIndex(0)
763 1 : // lastWriterState is the state for the last writer, for which we are
764 1 : // waiting for LogWriter.Close to finish or for creation to be unsuccessful.
765 1 : // What is considered the last writer can change. All state is protected by
766 1 : // ww.mu.
767 1 : type lastWriterState struct {
768 1 : index LogNameIndex
769 1 : closed bool
770 1 : err error
771 1 : metrics record.LogWriterMetrics
772 1 : }
773 1 : var lastWriter lastWriterState
774 1 : lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
775 1 : ww.mu.Lock()
776 1 : defer ww.mu.Unlock()
777 1 : numWriters := ww.mu.nextWriterIndex
778 1 : // Every iteration starts and ends with the mutex held.
779 1 : //
780 1 : // Invariant: numWriters >= 1.
781 1 : //
782 1 : // We will loop until we have closed the lastWriter (and use
783 1 : // lastWriter.err). We also need to call close on all LogWriters
784 1 : // that will not close themselves, i.e., those that have already been
785 1 : // created and installed in failoverWriter.writers (this set may change
786 1 : // while failoverWriter.Close runs).
787 1 : for !lastWriter.closed || numWriters > lastWriter.index+1 {
788 1 : if numWriters > closeCalledCount {
789 1 : // lastWriter.index may or may not have advanced. If it has advanced, we
790 1 : // need to reinitialize lastWriterState. If it hasn't advanced, and
791 1 : // numWriters > closeCalledCount, we know that we haven't called close
792 1 : // on it, so nothing in lastWriterState needs to be retained. For
793 1 : // simplicity, we overwrite in both cases.
794 1 : lastWriter = lastWriterState{
795 1 : index: numWriters - 1,
796 1 : }
797 1 : // Try to process [closeCalledCount, numWriters). Will surely process
798 1 : // [closeCalledCount, numWriters-1), since those writers are either done
799 1 : // initializing, or will close themselves. The writer at numWriters-1 we
800 1 : // can only process if it is done initializing, else we will iterate
801 1 : // again.
802 1 : for i := closeCalledCount; i < numWriters; i++ {
803 1 : w := ww.mu.writers[i].w
804 1 : cErr := ww.mu.writers[i].createError
805 1 : // Is the current index the last writer. If yes, this is also the last
806 1 : // loop iteration.
807 1 : isLastWriter := i == lastWriter.index
808 1 : if w != nil {
809 1 : // Can close it, so extend closeCalledCount.
810 1 : closeCalledCount = i + 1
811 1 : size := uint64(w.Size())
812 1 : if ww.mu.writers[i].approxFileSize < size {
813 1 : ww.mu.writers[i].approxFileSize = size
814 1 : }
815 1 : if isLastWriter {
816 1 : // We may care about its error and when it finishes closing.
817 1 : index := i
818 1 : ww.opts.stopper.runAsync(func() {
819 1 : // Last writer(s) (since new writers can be created and become
820 1 : // last, as we iterate) are guaranteed to have seen the last
821 1 : // record (since it was queued before Close was called). It is
822 1 : // possible that a writer got created after the last record was
823 1 : // dequeued and before this fact was realized by Close. In that
824 1 : // case we will harmlessly tell it that it synced that last
825 1 : // record, though it has already been written and synced by
826 1 : // another writer.
827 1 : err := w.CloseWithLastQueuedRecord(lastRecordIndex)
828 1 : ww.mu.Lock()
829 1 : defer ww.mu.Unlock()
830 1 : if lastWriter.index == index {
831 1 : lastWriter.closed = true
832 1 : lastWriter.err = err
833 1 : lastWriter.metrics = w.Metrics()
834 1 : ww.mu.cond.Signal()
835 1 : }
836 : })
837 1 : } else {
838 1 : // Don't care about the returned error since all the records we
839 1 : // relied on this writer for were already successfully written.
840 1 : ww.opts.stopper.runAsync(func() {
841 1 : _ = w.CloseWithLastQueuedRecord(record.PendingSyncIndex{Index: record.NoSyncIndex})
842 1 : })
843 : }
844 1 : } else if cErr != nil {
845 1 : // Have processed it, so extend closeCalledCount.
846 1 : closeCalledCount = i + 1
847 1 : if isLastWriter {
848 1 : lastWriter.closed = true
849 1 : lastWriter.err = cErr
850 1 : lastWriter.metrics = record.LogWriterMetrics{}
851 1 : }
852 : // Else, ignore.
853 1 : } else {
854 1 : if !isLastWriter {
855 1 : // Not last writer, so will close itself.
856 1 : closeCalledCount = i + 1
857 1 : }
858 : // Else, last writer, so we may have to close it.
859 : }
860 : }
861 : }
862 1 : if !lastWriter.closed {
863 1 : // Either waiting for creation of last writer or waiting for the close
864 1 : // to finish, or something else to become the last writer.
865 1 : //
866 1 : // It is possible that what we think of as the last writer (lastWriter)
867 1 : // closes itself while ww.mu is no longer held here, and a new LogWriter
868 1 : // is created too. All the records are synced, but the real last writer
869 1 : // may still be writing some records. Specifically, consider the
870 1 : // following sequence while this wait does not hold the mutex:
871 1 : //
872 1 : // - recordQueue has an entry, with index 50, that does not require a
873 1 : // sync.
874 1 : // - Last writer created at index 10 and entry 50 is handed to it.
875 1 : // - lastWriter.index is still 9 and it closes itself and signals this
876 1 : // cond. It has written entry 50 and synced (since close syncs).
877 1 : // - The wait completes.
878 1 : //
879 1 : // Now the writer at index 10 will never be closed and will never sync.
880 1 : // A crash can cause some part of what it writes to be lost. Note that
881 1 : // there is no data loss, but there are some unfortunate consequences:
882 1 : //
883 1 : // - We never closed a file descriptor.
884 1 : // - virtualWALReader.NextRecord can return an error on finding a
885 1 : // malformed chunk in the last writer (at index 10) instead of
886 1 : // swallowing the error. This can cause DB.Open to fail.
887 1 : //
888 1 : // To avoid this, we grab the latest value of numWriters on reacquiring
889 1 : // the mutex, and will continue looping until the writer at index 10 is
890 1 : // closed (or writer at index 11 is created).
891 1 : ww.mu.cond.Wait()
892 1 : numWriters = ww.mu.nextWriterIndex
893 1 : }
894 : }
895 1 : if ww.mu.writers[lastWriter.index].w != nil {
896 1 : // This permits log recycling.
897 1 : ww.mu.writers[lastWriter.index].synchronouslyClosed = true
898 1 : }
899 1 : err = lastWriter.err
900 1 : ww.mu.metrics = lastWriter.metrics
901 1 : ww.mu.closed = true
902 1 : n, m := ww.q.popAll(err)
903 1 : if err == nil && (n > 0 || m > 0) {
904 0 : panic(errors.AssertionFailedf("no error but recordQueue had %d records and %d syncs", n, m))
905 : }
906 1 : return logicalOffset, err
907 : }
908 :
909 : // Metrics implements writer.
910 1 : func (ww *failoverWriter) Metrics() record.LogWriterMetrics {
911 1 : ww.mu.Lock()
912 1 : defer ww.mu.Unlock()
913 1 : return ww.mu.metrics
914 1 : }
915 :
916 : // getLog can be called at any time, including after Close returns.
917 1 : func (ww *failoverWriter) getLog() logicalLogWithSizesEtc {
918 1 : ww.mu.Lock()
919 1 : defer ww.mu.Unlock()
920 1 : ll := logicalLogWithSizesEtc{
921 1 : num: ww.opts.wn,
922 1 : }
923 1 : for i := range ww.mu.writers {
924 1 : if ww.mu.writers[i].w != nil {
925 1 : ll.segments = append(ll.segments, segmentWithSizeEtc{
926 1 : segment: segment{
927 1 : logNameIndex: LogNameIndex(i),
928 1 : dir: ww.mu.writers[i].dir,
929 1 : },
930 1 : approxFileSize: ww.mu.writers[i].approxFileSize,
931 1 : synchronouslyClosed: ww.mu.writers[i].synchronouslyClosed,
932 1 : })
933 1 : }
934 : }
935 1 : return ll
936 : }
937 :
938 : // latencyAndErrorRecorder records ongoing write and sync operations and errors
939 : // in those operations. record.LogWriter cannot continue functioning after any
940 : // error, so all errors are considered permanent.
941 : //
942 : // writeStart/writeEnd are used directly when creating a file. After the file
943 : // is successfully created, setWriter turns latencyAndErrorRecorder into an
944 : // implementation of writerSyncerCloser that will record for the Write and
945 : // Sync methods.
946 : type latencyAndErrorRecorder struct {
947 : ts timeSource
948 : ongoingOperationStart atomic.Int64
949 : error atomic.Pointer[error]
950 : writerSyncerCloser
951 : }
952 :
953 : type writerSyncerCloser interface {
954 : io.Writer
955 : io.Closer
956 : Sync() error
957 : }
958 :
959 1 : func (r *latencyAndErrorRecorder) writeStart() {
960 1 : r.ongoingOperationStart.Store(r.ts.now().UnixNano())
961 1 : }
962 :
963 1 : func (r *latencyAndErrorRecorder) writeEnd(err error) {
964 1 : if err != nil {
965 1 : ptr := &err
966 1 : r.error.Store(ptr)
967 1 : }
968 1 : r.ongoingOperationStart.Store(0)
969 : }
970 :
971 1 : func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) {
972 1 : r.writerSyncerCloser = w
973 1 : }
974 :
975 1 : func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) {
976 1 : startTime := r.ongoingOperationStart.Load()
977 1 : var latency time.Duration
978 1 : if startTime != 0 {
979 1 : l := r.ts.now().UnixNano() - startTime
980 1 : if l < 0 {
981 0 : l = 0
982 0 : }
983 1 : latency = time.Duration(l)
984 : }
985 1 : errPtr := r.error.Load()
986 1 : var err error
987 1 : if errPtr != nil {
988 1 : err = *errPtr
989 1 : }
990 1 : return latency, err
991 : }
992 :
993 : // Sync implements writerSyncerCloser.
994 1 : func (r *latencyAndErrorRecorder) Sync() error {
995 1 : r.writeStart()
996 1 : err := r.writerSyncerCloser.Sync()
997 1 : r.writeEnd(err)
998 1 : return err
999 1 : }
1000 :
1001 : // Write implements io.Writer.
1002 1 : func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) {
1003 1 : r.writeStart()
1004 1 : n, err = r.writerSyncerCloser.Write(p)
1005 1 : r.writeEnd(err)
1006 1 : return n, err
1007 1 : }
|