Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "io"
9 : "sync"
10 : "sync/atomic"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/record"
16 : "github.com/cockroachdb/pebble/vfs"
17 : "github.com/prometheus/client_golang/prometheus"
18 : )
19 :
20 : // recordQueueEntry is an entry in recordQueue.
21 : type recordQueueEntry struct {
22 : p []byte
23 : opts SyncOptions
24 : }
25 :
26 : const initialBufferLen = 8192
27 :
28 : // recordQueue is a variable-size single-producer multiple-consumer queue. It
29 : // is not lock-free, but most operations only need mu.RLock. It needs a mutex
30 : // to grow the size, since there is no upper bound on the number of queued
31 : // records (which are all the records that are not synced, and will need to be
32 : // written again in case of failover). Additionally, it needs a mutex to
33 : // atomically grab a snapshot of the queued records and provide them to a new
34 : // LogWriter that is being switched to.
35 : type recordQueue struct {
36 : // Only held for reading for all pop operations and most push operations.
37 : // Held for writing when buffer needs to be grown or when switching to a new
38 : // writer.
39 : mu sync.RWMutex
40 :
41 : // queue is [tail, head). tail is the oldest entry and head is the index for
42 : // the next entry.
43 : //
44 : // Consumers: atomically read and write tail in pop (using
45 : // compare-and-swap). This is not the usual kind of queue consumer since
46 : // they already know the index that they are popping exists, hence don't
47 : // need to look at head.
48 : //
49 : // Producer: atomically reads tail in push. Writes to head.
50 : //
51 : // Based on the above we only need tail to be atomic. However, the producer
52 : // also populates entries in buffer, whose values need to be seen by the
53 : // consumers when doing a pop, which means they need to synchronize using a
54 : // release and acquire memory barrier pair, where the push does the release
55 : // and the pop does the acquire. For this reason we make head also atomic
56 : // and merge head and tail into a single atomic, so that the store of head
57 : // in push and the load of tail in pop accomplishes this release-acquire
58 : // pair.
59 : //
60 : // All updates to headTail hold mu at least for reading. So when mu is held
61 : // for writing, there is a guarantee that headTail is not being updated.
62 : //
63 : // head is most-significant 32 bits and tail is least-significant 32 bits.
64 : headTail atomic.Uint64
65 :
66 : // Access to buffer requires at least RLock.
67 : buffer []recordQueueEntry
68 :
69 : lastTailObservedByProducer uint32
70 :
71 : // Read requires RLock.
72 : writer *record.LogWriter
73 :
74 : // When writer != nil, this is the return value of the last call to
75 : // SyncRecordGeneralized. It is updated in (a) WriteRecord calls push, using
76 : // only RLock (since WriteRecord is externally synchronized), (b)
77 : // snapshotAndSwitchWriter, using Lock. (b) excludes (a).
78 : lastLogSize int64
79 : }
80 :
81 1 : func (q *recordQueue) init() {
82 1 : *q = recordQueue{
83 1 : buffer: make([]recordQueueEntry, initialBufferLen),
84 1 : }
85 1 : }
86 :
87 : // NB: externally synchronized, i.e., no concurrent push calls.
88 : func (q *recordQueue) push(
89 : p []byte,
90 : opts SyncOptions,
91 : latestLogSizeInWriteRecord int64,
92 : latestWriterInWriteRecord *record.LogWriter,
93 1 : ) (index uint32, writer *record.LogWriter, lastLogSize int64) {
94 1 : ht := q.headTail.Load()
95 1 : h, t := unpackHeadTail(ht)
96 1 : n := int(h - t)
97 1 : if len(q.buffer) == n {
98 1 : // Full
99 1 : m := 2 * n
100 1 : newBuffer := make([]recordQueueEntry, m)
101 1 : for i := int(t); i < int(h); i++ {
102 1 : newBuffer[i%m] = q.buffer[i%n]
103 1 : }
104 1 : q.mu.Lock()
105 1 : q.buffer = newBuffer
106 1 : q.mu.Unlock()
107 : }
108 1 : q.mu.RLock()
109 1 : q.buffer[h] = recordQueueEntry{
110 1 : p: p,
111 1 : opts: opts,
112 1 : }
113 1 : // Reclaim memory for consumed entries. We couldn't do that in pop since
114 1 : // multiple consumers are popping using CAS and that immediately transfers
115 1 : // ownership to the producer.
116 1 : for i := q.lastTailObservedByProducer; i < t; i++ {
117 1 : q.buffer[i] = recordQueueEntry{}
118 1 : }
119 1 : q.lastTailObservedByProducer = t
120 1 : q.headTail.Add(1 << headTailBits)
121 1 : writer = q.writer
122 1 : if writer == latestWriterInWriteRecord {
123 1 : // WriteRecord has written to this writer since the switch.
124 1 : q.lastLogSize = latestLogSizeInWriteRecord
125 1 : }
126 : // Else writer is a new writer that was switched to, so ignore the
127 : // latestLogSizeInWriteRecord.
128 :
129 1 : lastLogSize = q.lastLogSize
130 1 : q.mu.RUnlock()
131 1 : return h, writer, lastLogSize
132 : }
133 :
134 1 : func (q *recordQueue) length() int {
135 1 : ht := q.headTail.Load()
136 1 : h, t := unpackHeadTail(ht)
137 1 : return int(h - t)
138 1 : }
139 :
140 : // Pops all entries. Must be called only after the last push returns.
141 1 : func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
142 1 : ht := q.headTail.Load()
143 1 : h, t := unpackHeadTail(ht)
144 1 : n := int(h - t)
145 1 : if n == 0 {
146 1 : return 0, 0
147 1 : }
148 1 : return n, q.pop(h-1, err, false)
149 : }
150 :
151 : // Pops all entries up to and including index. The remaining queue is
152 : // [index+1, head).
153 : //
154 : // NB: we could slightly simplify to only have the latest writer be able to
155 : // pop. This would avoid the CAS below, but it seems better to reduce the
156 : // amount of queued work regardless of who has successfully written it.
157 1 : func (q *recordQueue) pop(index uint32, err error, runCb bool) (numSyncsPopped int) {
158 1 : var buf [512]SyncOptions
159 1 : // Tail can increase, and numEntriesToPop decrease, due to competition with
160 1 : // other consumers. Head can increase due to the concurrent producer.
161 1 : headTailEntriesToPop := func() (ht uint64, h uint32, t uint32, numEntriesToPop int) {
162 1 : ht = q.headTail.Load()
163 1 : h, t = unpackHeadTail(ht)
164 1 : tail := int(t)
165 1 : numEntriesToPop = int(index) - tail + 1
166 1 : return ht, h, t, numEntriesToPop
167 1 : }
168 1 : ht, head, tail, numEntriesToPop := headTailEntriesToPop()
169 1 : if numEntriesToPop <= 0 {
170 1 : return 0
171 1 : }
172 1 : var b []SyncOptions
173 1 : if numEntriesToPop <= len(buf) {
174 1 : b = buf[:numEntriesToPop]
175 1 : } else {
176 1 : // Do allocation before acquiring the mutex.
177 1 : b = make([]SyncOptions, numEntriesToPop)
178 1 : }
179 1 : q.mu.RLock()
180 1 : n := len(q.buffer)
181 1 : for i := 0; i < numEntriesToPop; i++ {
182 1 : // Grab all the possible entries before doing CAS, since successful CAS
183 1 : // will also release those buffer slots to the producer.
184 1 : b[i] = q.buffer[(i+int(tail))%n].opts
185 1 : }
186 : // CAS, with retry loop, since this pop can race with other consumers.
187 1 : for {
188 1 : newHT := makeHeadTail(head, index+1)
189 1 : if q.headTail.CompareAndSwap(ht, newHT) {
190 1 : break
191 : }
192 0 : ht, head, _, numEntriesToPop = headTailEntriesToPop()
193 0 : if numEntriesToPop <= 0 {
194 0 : break
195 : }
196 : }
197 1 : q.mu.RUnlock()
198 1 :
199 1 : // The current value of numEntriesToPop is the number of entries that were
200 1 : // popped.
201 1 : if numEntriesToPop <= 0 {
202 0 : return 0
203 0 : }
204 1 : bufLen := len(b)
205 1 : // [0, bufLen-numEntriesToPop) were not popped, since this pop raced with
206 1 : // other consumers that popped those entries.
207 1 : for i := bufLen - numEntriesToPop; i < bufLen; i++ {
208 1 : if b[i].Done != nil {
209 1 : numSyncsPopped++
210 1 : if err != nil {
211 1 : *b[i].Err = err
212 1 : }
213 1 : b[i].Done.Done()
214 : }
215 : }
216 1 : return numSyncsPopped
217 : }
218 :
219 : func (q *recordQueue) snapshotAndSwitchWriter(
220 : writer *record.LogWriter,
221 : snapshotFunc func(firstIndex uint32, entries []recordQueueEntry) (logSize int64),
222 1 : ) {
223 1 : q.mu.Lock()
224 1 : defer q.mu.Unlock()
225 1 : q.writer = writer
226 1 : h, t := unpackHeadTail(q.headTail.Load())
227 1 : n := h - t
228 1 : if n > 0 {
229 1 : m := uint32(len(q.buffer))
230 1 : b := make([]recordQueueEntry, n)
231 1 : for i := t; i < h; i++ {
232 1 : b[i-t] = q.buffer[i%m]
233 1 : }
234 1 : q.lastLogSize = snapshotFunc(t, b)
235 : }
236 : }
237 :
238 : // getLastIndex is used by failoverWriter.Close.
239 1 : func (q *recordQueue) getLastIndex() (lastIndex int64) {
240 1 : h, _ := unpackHeadTail(q.headTail.Load())
241 1 : return int64(h) - 1
242 1 : }
243 :
244 : const headTailBits = 32
245 :
246 1 : func unpackHeadTail(ht uint64) (head, tail uint32) {
247 1 : const mask = 1<<headTailBits - 1
248 1 : head = uint32((ht >> headTailBits) & mask)
249 1 : tail = uint32(ht & mask)
250 1 : return head, tail
251 1 : }
252 :
253 1 : func makeHeadTail(head, tail uint32) uint64 {
254 1 : return (uint64(head) << headTailBits) | uint64(tail)
255 1 : }
256 :
257 : // Maximum number of physical log files when writing a virtual WAL. Arbitrarily
258 : // chosen value. Setting this to 2 will not simplify the code. We make this a
259 : // constant since we want a fixed size array for writer.writers.
260 : const maxPhysicalLogs = 10
261 :
262 : // failoverWriter is the implementation of Writer in failover mode. No Writer
263 : // method blocks for IO, except for Close.
264 : //
265 : // Loosely speaking, Close blocks until all records are successfully written
266 : // and synced to some log writer. Monitoring of log writer latency and errors
267 : // continues after Close is called, which means failoverWriter can be switched
268 : // to a new log writer after Close is called, to unblock Close.
269 : //
270 : // More precisely, Close does not block if there is an error in creating or
271 : // closing the latest LogWriter when close was called. This is because errors
272 : // are considered indicative of misconfiguration, and the user of
273 : // failoverWriter can dampen switching when observing errors (e.g. see
274 : // failoverMonitor), so close does not assume any liveness of calls to
275 : // switchToNewDir when such errors occur. Since the caller (see db.go) treats
276 : // an error on Writer.Close as fatal, this does mean that failoverWriter has
277 : // limited ability to mask errors (its primary task is to mask high latency).
278 : type failoverWriter struct {
279 : opts failoverWriterOpts
280 : q recordQueue
281 : writers [maxPhysicalLogs]logWriterAndRecorder
282 : mu struct {
283 : sync.Mutex
284 : // cond is signaled when the latest LogWriter is set in writers (or there
285 : // is a creation error), or when the latest LogWriter is successfully
286 : // closed. It is waited on in Close. We don't use channels and select
287 : // since what Close is waiting on is dynamic based on the local state in
288 : // Close, so using Cond is simpler.
289 : cond *sync.Cond
290 : // nextWriterIndex is advanced before creating the *LogWriter. That is, a
291 : // slot is reserved by taking the current value of nextWriterIndex and
292 : // incrementing it, and then the *LogWriter for that slot is created. When
293 : // newFailoverWriter returns, nextWriterIndex = 1.
294 : //
295 : // The latest *LogWriter is (will be) at nextWriterIndex-1.
296 : //
297 : // INVARIANT: nextWriterIndex <= len(writers)
298 : nextWriterIndex logNameIndex
299 : closed bool
300 : // metrics is initialized in Close. Currently we just use the metrics from
301 : // the latest writer after it is closed, since in the common case with
302 : // only one writer, that writer's flush loop will have finished and the
303 : // metrics will be current. With multiple writers, these metrics can be
304 : // quite inaccurate. The WriteThroughput metric includes an IdleDuration,
305 : // which can be high for a writer that was switched away from, and
306 : // therefore not indicative of overall work being done by the
307 : // failoverWriter. The PendingBufferLen and SyncQueueLen are similarly
308 : // inaccurate once there is no more work being given to a writer. We could
309 : // add a method to LogWriter to stop sampling metrics when it is not the
310 : // latest writer. Then we could aggregate all these metrics across all
311 : // writers.
312 : //
313 : // Note that CockroachDB does not use these metrics in any meaningful way.
314 : //
315 : // TODO(sumeer): do the improved solution outlined above.
316 : metrics record.LogWriterMetrics
317 : }
318 : // State for computing logical offset. The cumulative offset state is in
319 : // offset. Each time we call SyncRecordGeneralized from WriteRecord, we
320 : // compute the delta from the size returned by this LogWriter now, and the
321 : // size returned by this LogWriter in the previous call to
322 : // SyncRecordGeneralized. That previous call to SyncRecordGeneralized may
323 : // have happened from WriteRecord, or asynchronously during a switch. So
324 : // that previous call state requires synchronization and is maintained in
325 : // recordQueue. The offset is incremented by this delta without any
326 : // synchronization, since we rely on external synchronization (like the
327 : // standaloneWriter).
328 : logicalOffset struct {
329 : latestWriterInWriteRecord *record.LogWriter
330 : latestLogSizeInWriteRecord int64
331 : offset int64
332 : // Transitions once from false => true when there is a non-nil writer.
333 : notEstimatedOffset bool
334 : }
335 : psiForWriteRecordBacking record.PendingSyncIndex
336 : psiForSwitchBacking record.PendingSyncIndex
337 : }
338 :
339 : type logWriterAndRecorder struct {
340 : // This may never become non-nil, if when the LogWriter was finally created,
341 : // it was no longer the latest writer. Additionally, if there was an error
342 : // in creating the writer, w will remain nil and createError will be set.
343 : w *record.LogWriter
344 : // createError is set if there is an error creating the writer. This is
345 : // useful in Close since we need to know when the work for creating the
346 : // latest writer is done, whether it resulted in success or not.
347 : createError error
348 : r latencyAndErrorRecorder
349 : }
350 :
351 : var _ Writer = &failoverWriter{}
352 :
353 : var _ switchableWriter = &failoverWriter{}
354 :
355 : type failoverWriterOpts struct {
356 : wn NumWAL
357 : logger base.Logger
358 : timeSource
359 :
360 : // Options that feed into SyncingFileOptions.
361 : noSyncOnClose bool
362 : bytesPerSync int
363 : preallocateSize func() int
364 :
365 : // Options for record.LogWriter.
366 : minSyncInterval func() time.Duration
367 : fsyncLatency prometheus.Histogram
368 : queueSemChan chan struct{}
369 : stopper *stopper
370 :
371 : writerClosed func()
372 :
373 : writerCreatedForTest chan<- struct{}
374 : }
375 :
376 : func newFailoverWriter(
377 : opts failoverWriterOpts, initialDir dirAndFileHandle,
378 1 : ) (*failoverWriter, error) {
379 1 : ww := &failoverWriter{
380 1 : opts: opts,
381 1 : }
382 1 : ww.q.init()
383 1 : ww.mu.cond = sync.NewCond(&ww.mu)
384 1 : // The initial record.LogWriter creation also happens via a
385 1 : // switchToNewWriter since we don't want it to block newFailoverWriter.
386 1 : err := ww.switchToNewDir(initialDir)
387 1 : if err != nil {
388 0 : // Switching limit cannot be exceeded when creating.
389 0 : panic(err)
390 : }
391 1 : return ww, nil
392 : }
393 :
394 : // WriteRecord implements Writer.
395 1 : func (ww *failoverWriter) WriteRecord(p []byte, opts SyncOptions) (logicalOffset int64, err error) {
396 1 : recordIndex, writer, lastLogSize := ww.q.push(
397 1 : p, opts, ww.logicalOffset.latestLogSizeInWriteRecord, ww.logicalOffset.latestWriterInWriteRecord)
398 1 : if writer == nil {
399 1 : // Don't have a record.LogWriter yet, so use an estimate. This estimate
400 1 : // will get overwritten.
401 1 : ww.logicalOffset.offset += int64(len(p))
402 1 : return ww.logicalOffset.offset, nil
403 1 : }
404 : // INVARIANT: writer != nil.
405 1 : notEstimatedOffset := ww.logicalOffset.notEstimatedOffset
406 1 : if !notEstimatedOffset {
407 1 : ww.logicalOffset.notEstimatedOffset = true
408 1 : }
409 1 : ww.psiForWriteRecordBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
410 1 : if opts.Done != nil {
411 1 : ww.psiForWriteRecordBacking.Index = int64(recordIndex)
412 1 : }
413 1 : ww.logicalOffset.latestLogSizeInWriteRecord, err = writer.SyncRecordGeneralized(p, &ww.psiForWriteRecordBacking)
414 1 : ww.logicalOffset.latestWriterInWriteRecord = writer
415 1 : if notEstimatedOffset {
416 1 : delta := ww.logicalOffset.latestLogSizeInWriteRecord - lastLogSize
417 1 : ww.logicalOffset.offset += delta
418 1 : } else {
419 1 : // Overwrite the estimate. This is a best-effort improvement in that it is
420 1 : // accurate for the common case where writer is the first LogWriter.
421 1 : // Consider a failover scenario where there was no LogWriter for the first
422 1 : // 10 records, so they are all accumulated as an estimate. Then the first
423 1 : // LogWriter successfully writes and syncs the first 5 records and gets
424 1 : // stuck. A switch happens to a second LogWriter that is handed the
425 1 : // remaining 5 records, and the the 11th record arrives via a WriteRecord.
426 1 : // The transition from !notEstimatedOffset to notEstimatedOffset will
427 1 : // happen on this 11th record, and the logic here will use the length of
428 1 : // the second LogWriter, that does not reflect the full length.
429 1 : //
430 1 : // TODO(sumeer): try to make this more correct, without adding much more
431 1 : // complexity, and without adding synchronization.
432 1 : ww.logicalOffset.offset = ww.logicalOffset.latestLogSizeInWriteRecord
433 1 : }
434 1 : return ww.logicalOffset.offset, err
435 : }
436 :
437 : // switchToNewDir starts switching to dir. It implements switchableWriter. All
438 : // work is async, and a non-nil error is returned only if the switching limit
439 : // is exceeded.
440 1 : func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
441 1 : ww.mu.Lock()
442 1 : // Can have a late switchToNewDir call is the failoverMonitor has not yet
443 1 : // been told that the writer is closed. Ignore.
444 1 : if ww.mu.closed {
445 1 : ww.mu.Unlock()
446 1 : if ww.opts.writerCreatedForTest != nil {
447 1 : ww.opts.writerCreatedForTest <- struct{}{}
448 1 : }
449 1 : return nil
450 : }
451 : // writerIndex is the slot for this writer.
452 1 : writerIndex := ww.mu.nextWriterIndex
453 1 : if int(writerIndex) == len(ww.writers) {
454 1 : ww.mu.Unlock()
455 1 : return errors.Errorf("exceeded switching limit")
456 1 : }
457 1 : ww.mu.nextWriterIndex++
458 1 : ww.mu.Unlock()
459 1 :
460 1 : // Creation is async.
461 1 : ww.opts.stopper.runAsync(func() {
462 1 : // TODO(sumeer): recycling of logs.
463 1 : filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(ww.opts.wn, writerIndex))
464 1 : recorderAndWriter := &ww.writers[writerIndex].r
465 1 : recorderAndWriter.ts = ww.opts.timeSource
466 1 : var file vfs.File
467 1 : // handleErrFunc is called when err != nil. It handles the multiple IO error
468 1 : // cases below.
469 1 : handleErrFunc := func(err error) {
470 1 : if file != nil {
471 1 : file.Close()
472 1 : }
473 1 : ww.mu.Lock()
474 1 : defer ww.mu.Unlock()
475 1 : ww.writers[writerIndex].createError = err
476 1 : ww.mu.cond.Signal()
477 1 : if ww.opts.writerCreatedForTest != nil {
478 1 : ww.opts.writerCreatedForTest <- struct{}{}
479 1 : }
480 : }
481 1 : var err error
482 1 : // Create file.
483 1 : recorderAndWriter.writeStart()
484 1 : file, err = dir.FS.Create(filename)
485 1 : recorderAndWriter.writeEnd(err)
486 1 : // TODO(sumeer): should we fatal if primary dir? At some point it is better
487 1 : // to fatal instead of continuing to failover.
488 1 : // base.MustExist(dir.FS, filename, ww.opts.logger, err)
489 1 : if err != nil {
490 1 : handleErrFunc(err)
491 1 : return
492 1 : }
493 : // Sync dir.
494 1 : recorderAndWriter.writeStart()
495 1 : err = dir.Sync()
496 1 : recorderAndWriter.writeEnd(err)
497 1 : if err != nil {
498 1 : handleErrFunc(err)
499 1 : return
500 1 : }
501 : // Wrap in a syncingFile.
502 1 : syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
503 1 : NoSyncOnClose: ww.opts.noSyncOnClose,
504 1 : BytesPerSync: ww.opts.bytesPerSync,
505 1 : PreallocateSize: ww.opts.preallocateSize(),
506 1 : })
507 1 : // Wrap in the latencyAndErrorRecorder.
508 1 : recorderAndWriter.setWriter(syncingFile)
509 1 :
510 1 : // Using NumWAL as the DiskFileNum is fine since it is used only as
511 1 : // EOF trailer for safe log recycling. Even though many log files can
512 1 : // map to a single NumWAL, a file used for NumWAL n at index m will
513 1 : // never get recycled for NumWAL n at a later index (since recycling
514 1 : // happens when n as a whole is obsolete).
515 1 : w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn),
516 1 : record.LogWriterConfig{
517 1 : WALMinSyncInterval: ww.opts.minSyncInterval,
518 1 : WALFsyncLatency: ww.opts.fsyncLatency,
519 1 : QueueSemChan: ww.opts.queueSemChan,
520 1 : ExternalSyncQueueCallback: ww.doneSyncCallback,
521 1 : })
522 1 : closeWriter := func() bool {
523 1 : ww.mu.Lock()
524 1 : defer ww.mu.Unlock()
525 1 : if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed {
526 1 : // Not the latest writer or the writer was closed while this async
527 1 : // creation was ongoing.
528 1 : if ww.opts.writerCreatedForTest != nil {
529 1 : ww.opts.writerCreatedForTest <- struct{}{}
530 1 : }
531 1 : return true
532 : }
533 : // Latest writer.
534 1 : ww.writers[writerIndex].w = w
535 1 : ww.mu.cond.Signal()
536 1 : // NB: snapshotAndSwitchWriter does not block on IO, since
537 1 : // SyncRecordGeneralized does no IO.
538 1 : ww.q.snapshotAndSwitchWriter(w,
539 1 : func(firstIndex uint32, entries []recordQueueEntry) (logSize int64) {
540 1 : for i := range entries {
541 1 : ww.psiForSwitchBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
542 1 : if entries[i].opts.Done != nil {
543 1 : ww.psiForSwitchBacking.Index = int64(firstIndex) + int64(i)
544 1 : }
545 1 : var err error
546 1 : logSize, err = w.SyncRecordGeneralized(entries[i].p, &ww.psiForSwitchBacking)
547 1 : if err != nil {
548 0 : // TODO(sumeer): log periodically. The err will also surface via
549 0 : // the latencyAndErrorRecorder, so if a switch is possible, it
550 0 : // will be done.
551 0 : ww.opts.logger.Errorf("%s", err)
552 0 : }
553 : }
554 1 : return logSize
555 : })
556 1 : if ww.opts.writerCreatedForTest != nil {
557 1 : ww.opts.writerCreatedForTest <- struct{}{}
558 1 : }
559 1 : return false
560 : }()
561 1 : if closeWriter {
562 1 : // Never wrote anything to this writer so don't care about the
563 1 : // returned error.
564 1 : ww.opts.stopper.runAsync(func() {
565 1 : _ = w.Close()
566 1 : })
567 : }
568 : })
569 1 : return nil
570 : }
571 :
572 : // doneSyncCallback is the record.ExternalSyncQueueCallback called by
573 : // record.LogWriter.
574 : //
575 : // recordQueue is popped from only when some work requests a sync (and
576 : // successfully syncs). In the worst case, if no syncs are requested, we could
577 : // queue all the records needed to fill up a memtable in the recordQueue. This
578 : // can have two negative effects: (a) in the case of failover, we need to
579 : // replay all the data in the current mutable memtable, which takes more time,
580 : // (b) the memory usage is proportional to the size of the memtable. We ignore
581 : // these negatives since, (a) users like CockroachDB regularly sync, and (b)
582 : // the default memtable size is only 64MB.
583 1 : func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) {
584 1 : if err != nil {
585 1 : // Don't pop anything since we can retry after switching to a new
586 1 : // LogWriter.
587 1 : return
588 1 : }
589 : // NB: harmless after Close returns since numSyncsPopped will be 0.
590 1 : numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err, true)
591 1 : if ww.opts.queueSemChan != nil {
592 1 : for i := 0; i < numSyncsPopped; i++ {
593 1 : <-ww.opts.queueSemChan
594 1 : }
595 : }
596 : }
597 :
598 : // ongoingLatencyOrErrorForCurDir implements switchableWriter.
599 1 : func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) {
600 1 : r := ww.recorderForCurDir()
601 1 : if r == nil {
602 1 : return 0, nil
603 1 : }
604 1 : return r.ongoingLatencyOrError()
605 : }
606 :
607 : // For internal use and testing.
608 1 : func (ww *failoverWriter) recorderForCurDir() *latencyAndErrorRecorder {
609 1 : ww.mu.Lock()
610 1 : defer ww.mu.Unlock()
611 1 : if ww.mu.closed {
612 1 : return nil
613 1 : }
614 1 : return &ww.writers[ww.mu.nextWriterIndex-1].r
615 : }
616 :
617 : // Close implements Writer.
618 : //
619 : // NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be
620 : // called after Close is called, and there is also a possibility that they get
621 : // called after Close returns and before failoverMonitor knows that the
622 : // failoverWriter is closed.
623 : //
624 : // doneSyncCallback can be called anytime after Close returns since there
625 : // could be stuck writes that finish arbitrarily later.
626 : //
627 : // See the long comment about Close behavior where failoverWriter is declared.
628 1 : func (ww *failoverWriter) Close() (logicalOffset int64, err error) {
629 1 : offset, err := ww.closeInternal()
630 1 : ww.opts.writerClosed()
631 1 : return offset, err
632 1 : }
633 :
634 1 : func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
635 1 : logicalOffset = ww.logicalOffset.offset
636 1 : // [0, closeCalledCount) have had LogWriter.Close called (though may not
637 1 : // have finished) or the LogWriter will never be non-nil. Either way, they
638 1 : // have been "processed".
639 1 : closeCalledCount := logNameIndex(0)
640 1 : // lastWriterState is the state for the last writer, for which we are
641 1 : // waiting for LogWriter.Close to finish or for creation to be unsuccessful.
642 1 : // What is considered the last writer can change. All state is protected by
643 1 : // ww.mu.
644 1 : type lastWriterState struct {
645 1 : index logNameIndex
646 1 : closed bool
647 1 : err error
648 1 : metrics record.LogWriterMetrics
649 1 : }
650 1 : var lastWriter lastWriterState
651 1 : lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
652 1 : ww.mu.Lock()
653 1 : defer ww.mu.Unlock()
654 1 : // Every iteration starts and ends with the mutex held.
655 1 : //
656 1 : // Invariant: ww.mu.nextWriterIndex >= 1.
657 1 : //
658 1 : // We will loop until we have closed the lastWriter (and use
659 1 : // lastPossibleWriter.err). We also need to call close on all LogWriters
660 1 : // that will not close themselves, i.e., those that have already been
661 1 : // created and installed in failoverWriter.writers (this set may change
662 1 : // while failoverWriter.Close runs).
663 1 : for !lastWriter.closed {
664 1 : numWriters := ww.mu.nextWriterIndex
665 1 : if numWriters > closeCalledCount {
666 1 : // INVARIANT: numWriters > closeCalledCount.
667 1 : lastWriter = lastWriterState{
668 1 : index: numWriters - 1,
669 1 : }
670 1 : // Try to process [closeCalledCount, numWriters). Will surely process
671 1 : // [closeCalledCount, numWriters-1), since those writers are either done
672 1 : // initializing, or will close themselves. The writer at numWriters-1 we
673 1 : // can only process if it is done initializing, else we will iterate
674 1 : // again.
675 1 : for i := closeCalledCount; i < numWriters; i++ {
676 1 : w := ww.writers[i].w
677 1 : cErr := ww.writers[i].createError
678 1 : // Is the current index the last writer. If yes, this is also the last
679 1 : // loop iteration.
680 1 : isLastWriter := i == lastWriter.index
681 1 : if w != nil {
682 1 : // Can close it, so extend closeCalledCount.
683 1 : closeCalledCount = i + 1
684 1 : if isLastWriter {
685 1 : // We may care about its error and when it finishes closing.
686 1 : index := i
687 1 : ww.opts.stopper.runAsync(func() {
688 1 : // Last writer(s) (since new writers can be created and become
689 1 : // last, as we iterate) are guaranteed to have seen the last
690 1 : // record (since it was queued before Close was called). It is
691 1 : // possible that a writer got created after the last record was
692 1 : // dequeued and before this fact was realized by Close. In that
693 1 : // case we will harmlessly tell it that it synced that last
694 1 : // record, though it has already been written and synced by
695 1 : // another writer.
696 1 : err := w.CloseWithLastQueuedRecord(lastRecordIndex)
697 1 : ww.mu.Lock()
698 1 : defer ww.mu.Unlock()
699 1 : if lastWriter.index == index {
700 1 : lastWriter.closed = true
701 1 : lastWriter.err = err
702 1 : lastWriter.metrics = w.Metrics()
703 1 : ww.mu.cond.Signal()
704 1 : }
705 : })
706 1 : } else {
707 1 : // Don't care about the returned error since all the records we
708 1 : // relied on this writer for were already successfully written.
709 1 : ww.opts.stopper.runAsync(func() {
710 1 : _ = w.CloseWithLastQueuedRecord(record.PendingSyncIndex{Index: record.NoSyncIndex})
711 1 : })
712 : }
713 1 : } else if cErr != nil {
714 1 : // Have processed it, so extend closeCalledCount.
715 1 : closeCalledCount = i + 1
716 1 : if isLastWriter {
717 1 : lastWriter.closed = true
718 1 : lastWriter.err = cErr
719 1 : lastWriter.metrics = record.LogWriterMetrics{}
720 1 : }
721 : // Else, ignore.
722 1 : } else {
723 1 : if !isLastWriter {
724 1 : // Not last writer, so will close itself.
725 1 : closeCalledCount = i + 1
726 1 : }
727 : // Else, last writer, so we may have to close it.
728 : }
729 : }
730 : }
731 1 : if !lastWriter.closed {
732 1 : // Either waiting for creation of last writer or waiting for the close
733 1 : // to finish, or something else to become the last writer.
734 1 : ww.mu.cond.Wait()
735 1 : }
736 : }
737 1 : err = lastWriter.err
738 1 : ww.mu.metrics = lastWriter.metrics
739 1 : ww.mu.closed = true
740 1 : _, _ = ww.q.popAll(err)
741 1 : return logicalOffset, err
742 : }
743 :
744 : // Metrics implements writer.
745 1 : func (ww *failoverWriter) Metrics() record.LogWriterMetrics {
746 1 : ww.mu.Lock()
747 1 : defer ww.mu.Unlock()
748 1 : return ww.mu.metrics
749 1 : }
750 :
751 : // latencyAndErrorRecorder records ongoing write and sync operations and errors
752 : // in those operations. record.LogWriter cannot continue functioning after any
753 : // error, so all errors are considered permanent.
754 : //
755 : // writeStart/writeEnd are used directly when creating a file. After the file
756 : // is successfully created, setWriter turns latencyAndErrorRecorder into an
757 : // implementation of writerSyncerCloser that will record for the Write and
758 : // Sync methods.
759 : type latencyAndErrorRecorder struct {
760 : ts timeSource
761 : ongoingOperationStart atomic.Int64
762 : error atomic.Pointer[error]
763 : writerSyncerCloser
764 : }
765 :
766 : type writerSyncerCloser interface {
767 : io.Writer
768 : io.Closer
769 : Sync() error
770 : }
771 :
772 1 : func (r *latencyAndErrorRecorder) writeStart() {
773 1 : r.ongoingOperationStart.Store(r.ts.now().UnixNano())
774 1 : }
775 :
776 1 : func (r *latencyAndErrorRecorder) writeEnd(err error) {
777 1 : if err != nil {
778 1 : ptr := &err
779 1 : r.error.Store(ptr)
780 1 : }
781 1 : r.ongoingOperationStart.Store(0)
782 : }
783 :
784 1 : func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) {
785 1 : r.writerSyncerCloser = w
786 1 : }
787 :
788 1 : func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) {
789 1 : startTime := r.ongoingOperationStart.Load()
790 1 : var latency time.Duration
791 1 : if startTime != 0 {
792 1 : l := r.ts.now().UnixNano() - startTime
793 1 : if l < 0 {
794 0 : l = 0
795 0 : }
796 1 : latency = time.Duration(l)
797 : }
798 1 : errPtr := r.error.Load()
799 1 : var err error
800 1 : if errPtr != nil {
801 1 : err = *errPtr
802 1 : }
803 1 : return latency, err
804 : }
805 :
806 : // Sync implements writerSyncerCloser.
807 1 : func (r *latencyAndErrorRecorder) Sync() error {
808 1 : r.writeStart()
809 1 : err := r.writerSyncerCloser.Sync()
810 1 : r.writeEnd(err)
811 1 : return err
812 1 : }
813 :
814 : // Write implements io.Writer.
815 1 : func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) {
816 1 : r.writeStart()
817 1 : n, err = r.writerSyncerCloser.Write(p)
818 1 : r.writeEnd(err)
819 1 : return n, err
820 1 : }
|