Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package wal
6 :
7 : import (
8 : "io"
9 : "sync"
10 : "sync/atomic"
11 : "time"
12 :
13 : "github.com/cockroachdb/crlib/crtime"
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/prometheus/client_golang/prometheus"
19 : )
20 :
21 : // recordQueueEntry is an entry in recordQueue.
22 : type recordQueueEntry struct {
23 : p []byte
24 : opts SyncOptions
25 : refCount RefCount
26 : writeStart crtime.Mono
27 : }
28 :
29 : type poppedEntry struct {
30 : opts SyncOptions
31 : refCount RefCount
32 : writeStart crtime.Mono
33 : }
34 :
35 : const initialBufferLen = 8192
36 :
37 : // recordQueue is a variable-size single-producer multiple-consumer queue. It
38 : // is not lock-free, but most operations only need mu.RLock. It needs a mutex
39 : // to grow the size, since there is no upper bound on the number of queued
40 : // records (which are all the records that are not synced, and will need to be
41 : // written again in case of failover). Additionally, it needs a mutex to
42 : // atomically grab a snapshot of the queued records and provide them to a new
43 : // LogWriter that is being switched to.
44 : type recordQueue struct {
45 : // Only held for reading for all pop operations and most push operations.
46 : // Held for writing when buffer needs to be grown or when switching to a new
47 : // writer.
48 : mu sync.RWMutex
49 :
50 : // queue is [tail, head). tail is the oldest entry and head is the index for
51 : // the next entry.
52 : //
53 : // Consumers: atomically read and write tail in pop. This is not the usual
54 : // kind of queue consumer since they already know the index that they are
55 : // popping exists, hence don't need to look at head.
56 : //
57 : // Producer: atomically reads tail in push. Writes to head.
58 : //
59 : // Based on the above we only need tail to be atomic. However, the producer
60 : // also populates entries in buffer, whose values need to be seen by the
61 : // consumers when doing a pop, which means they need to synchronize using a
62 : // release and acquire memory barrier pair, where the push does the release
63 : // and the pop does the acquire. For this reason we make head also atomic
64 : // and merge head and tail into a single atomic, so that the store of head
65 : // in push and the load of tail in pop accomplishes this release-acquire
66 : // pair.
67 : //
68 : // We initially implemented competition between multiple consumers solely
69 : // via atomic read-write of the tail using compare-and-swap (CAS). Since the
70 : // atomic read-write to tail in pop releases those buffer entries for reuse
71 : // to the producer, the consumer needs to grab the contents of
72 : // recordQueueEntry that it needs to do callbacks etc. (specifically the
73 : // contents corresponding to poppedEntry), *before* it succeeds with the
74 : // atomic read-write. This introduced a false data race in the Golang data
75 : // race detector
76 : // https://github.com/cockroachdb/pebble/issues/3380#issuecomment-1981188174.
77 : // Consider the case where the queue is [10,20), and consumer C1 is trying
78 : // to pop [10,12) and consumer C2 is trying to pop [10,14). The following
79 : // interleaving can happen:
80 : //
81 : // [C1] reads head=10, tail=20
82 : // [C2] reads head=10, tail=20
83 : // [C1] reads buffer contents [10,12) and makes local copy
84 : // [C1] CAS to make the queue [12,20)
85 : // [C2] reads buffer contents [10,14) and makes local copy, concurrently
86 : // with producer writing to 10, 11. *
87 : // [C2] CAS fails for popping [10,14), so revises to [12,14), and succeeds
88 : // in CAS. C2 only uses the contents it read into the local copy for
89 : // [12,14).
90 : //
91 : // * is a false data race since C2 is later deciding what contents it should
92 : // use from among the contents it read, based on what indices it
93 : // successfully popped. Unfortunately, we don't have a way to annotate the
94 : // code to tell the data race detector to ignore this false positive. So we
95 : // need to strengthen the synchronization to prevent such false positives.
96 : // We observe that usually a consumer will be popping a batch of entries
97 : // (based on a single successful fsync), and the number of consumers will be
98 : // small (usually 1). In comparison, producers can be highly concurrent (due
99 : // to workload concurrency). We don't want consumers to compete for a mutex
100 : // with producers, but we can afford to have multiple consumers compete for
101 : // a mutex. So we fix this false data race by using consumerMu to force
102 : // single-threaded popping.
103 : //
104 : // An alternative would be to pass the information contained in poppedEntry
105 : // to the LogWriter, so that it can pass it back when popping (so we don't
106 : // have to retrieve it from the recordQueue.buffer). We would still need
107 : // recordQueue.buffer, since writer switching needs those entries to be
108 : // replayed. We don't consider this solution for the same reason we replaced
109 : // record.pendingSyncsWithSyncQueue with
110 : // record.pendingSyncsWithHighestSyncIndex for the failoverWriter code path
111 : // -- we cannot bound the queue in the LogWriter by record.SyncConcurrency:
112 : // say SyncConcurrency was 4096, and LogWriter1's queue was full, and we
113 : // switched to LogWriter2, to which we replayed the same records and filled
114 : // up the queue. Then if LogWriter1 unblocks and pops all the 4096 entries,
115 : // the commit pipeline can send another 4096 entries, while LogWriter2 is
116 : // still blocked on trying to write and sync the previous 4096 entries. This
117 : // will overflow the queue in LogWriter2.
118 : //
119 : // All updates to headTail hold mu at least for reading. So when mu is held
120 : // for writing, there is a guarantee that headTail is not being updated.
121 : //
122 : // head is most-significant 32 bits and tail is least-significant 32 bits.
123 : headTail atomic.Uint64
124 :
125 : consumerMu sync.Mutex
126 :
127 : // Access to buffer requires at least RLock.
128 : buffer []recordQueueEntry
129 :
130 : lastTailObservedByProducer uint32
131 :
132 : // Read requires RLock.
133 : writer *record.LogWriter
134 :
135 : // When writer != nil, this is the return value of the last call to
136 : // SyncRecordGeneralized. It is updated in (a) WriteRecord calls push, using
137 : // only RLock (since WriteRecord is externally synchronized), (b)
138 : // snapshotAndSwitchWriter, using Lock. (b) excludes (a).
139 : lastLogSize int64
140 :
141 : failoverWriteAndSyncLatency prometheus.Histogram
142 : }
143 :
144 2 : func (q *recordQueue) init(failoverWriteAndSyncLatency prometheus.Histogram) {
145 2 : *q = recordQueue{
146 2 : buffer: make([]recordQueueEntry, initialBufferLen),
147 2 : failoverWriteAndSyncLatency: failoverWriteAndSyncLatency,
148 2 : }
149 2 : }
150 :
151 : // NB: externally synchronized, i.e., no concurrent push calls.
152 : func (q *recordQueue) push(
153 : p []byte,
154 : opts SyncOptions,
155 : refCount RefCount,
156 : writeStart crtime.Mono,
157 : latestLogSizeInWriteRecord int64,
158 : latestWriterInWriteRecord *record.LogWriter,
159 2 : ) (index uint32, writer *record.LogWriter, lastLogSize int64) {
160 2 : ht := q.headTail.Load()
161 2 : h, t := unpackHeadTail(ht)
162 2 : n := int(h - t)
163 2 : m := len(q.buffer)
164 2 : if m == n {
165 1 : // Full
166 1 : m = 2 * n
167 1 : newBuffer := make([]recordQueueEntry, m)
168 1 : for i := int(t); i < int(h); i++ {
169 1 : newBuffer[i%m] = q.buffer[i%n]
170 1 : }
171 1 : q.mu.Lock()
172 1 : q.buffer = newBuffer
173 1 : q.mu.Unlock()
174 : }
175 2 : q.mu.RLock()
176 2 : q.buffer[int(h)%m] = recordQueueEntry{
177 2 : p: p,
178 2 : opts: opts,
179 2 : refCount: refCount,
180 2 : writeStart: writeStart,
181 2 : }
182 2 : // Reclaim memory for consumed entries. We couldn't do that in pop since
183 2 : // multiple consumers are popping using CAS and that immediately transfers
184 2 : // ownership to the producer.
185 2 : for i := q.lastTailObservedByProducer; i < t; i++ {
186 2 : q.buffer[int(i)%m] = recordQueueEntry{}
187 2 : }
188 2 : q.lastTailObservedByProducer = t
189 2 : q.headTail.Add(1 << headTailBits)
190 2 : writer = q.writer
191 2 : if writer == latestWriterInWriteRecord {
192 2 : // WriteRecord has written to this writer since the switch.
193 2 : q.lastLogSize = latestLogSizeInWriteRecord
194 2 : }
195 : // Else writer is a new writer that was switched to, so ignore the
196 : // latestLogSizeInWriteRecord.
197 :
198 2 : lastLogSize = q.lastLogSize
199 2 : q.mu.RUnlock()
200 2 : return h, writer, lastLogSize
201 : }
202 :
203 1 : func (q *recordQueue) length() int {
204 1 : ht := q.headTail.Load()
205 1 : h, t := unpackHeadTail(ht)
206 1 : return int(h - t)
207 1 : }
208 :
209 : // Pops all entries. Must be called only after the last push returns.
210 2 : func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) {
211 2 : ht := q.headTail.Load()
212 2 : h, t := unpackHeadTail(ht)
213 2 : n := int(h - t)
214 2 : if n == 0 {
215 2 : return 0, 0
216 2 : }
217 1 : return n, q.pop(h-1, err)
218 : }
219 :
220 : // Pops all entries up to and including index. The remaining queue is
221 : // [index+1, head).
222 : //
223 : // NB: we could slightly simplify to only have the latest writer be able to
224 : // pop. This would avoid the CAS below, but it seems better to reduce the
225 : // amount of queued work regardless of who has successfully written it.
226 2 : func (q *recordQueue) pop(index uint32, err error) (numSyncsPopped int) {
227 2 : now := crtime.NowMono()
228 2 : var buf [512]poppedEntry
229 2 : tailEntriesToPop := func() (t uint32, numEntriesToPop int) {
230 2 : ht := q.headTail.Load()
231 2 : _, t = unpackHeadTail(ht)
232 2 : tail := int(t)
233 2 : numEntriesToPop = int(index) - tail + 1
234 2 : return t, numEntriesToPop
235 2 : }
236 2 : q.consumerMu.Lock()
237 2 : // numEntriesToPop is a function of index and tail. The value of tail cannot
238 2 : // change since consumerMu is held.
239 2 : tail, numEntriesToPop := tailEntriesToPop()
240 2 : if numEntriesToPop <= 0 {
241 2 : q.consumerMu.Unlock()
242 2 : return 0
243 2 : }
244 2 : var b []poppedEntry
245 2 : if numEntriesToPop <= len(buf) {
246 2 : b = buf[:numEntriesToPop]
247 2 : } else {
248 1 : // Do allocation before acquiring the mutex.
249 1 : b = make([]poppedEntry, numEntriesToPop)
250 1 : }
251 2 : q.mu.RLock()
252 2 : n := len(q.buffer)
253 2 : for i := 0; i < numEntriesToPop; i++ {
254 2 : // Grab the popped entries before incrementing tail, since that will
255 2 : // release those buffer slots to the producer.
256 2 : idx := (i + int(tail)) % n
257 2 : b[i] = poppedEntry{
258 2 : opts: q.buffer[idx].opts,
259 2 : refCount: q.buffer[idx].refCount,
260 2 : writeStart: q.buffer[idx].writeStart,
261 2 : }
262 2 : }
263 : // Since tail cannot change, we don't need to do a compare-and-swap.
264 2 : q.headTail.Add(uint64(numEntriesToPop))
265 2 : q.mu.RUnlock()
266 2 : q.consumerMu.Unlock()
267 2 : addLatencySample := false
268 2 : var maxLatency time.Duration
269 2 : for i := 0; i < numEntriesToPop; i++ {
270 2 : // Now that we've synced the entry, we can unref it to signal that we
271 2 : // will not read the written byte slice again.
272 2 : if b[i].refCount != nil {
273 2 : b[i].refCount.Unref()
274 2 : }
275 2 : if b[i].opts.Done != nil {
276 2 : numSyncsPopped++
277 2 : if err != nil {
278 1 : *b[i].opts.Err = err
279 1 : }
280 2 : b[i].opts.Done.Done()
281 2 : latency := now.Sub(b[i].writeStart)
282 2 : if !addLatencySample {
283 2 : addLatencySample = true
284 2 : maxLatency = latency
285 2 : } else if maxLatency < latency {
286 0 : maxLatency = latency
287 0 : }
288 : }
289 : }
290 2 : if addLatencySample {
291 2 : if maxLatency < 0 {
292 0 : maxLatency = 0
293 0 : }
294 2 : q.failoverWriteAndSyncLatency.Observe(float64(maxLatency))
295 : }
296 2 : return numSyncsPopped
297 : }
298 :
299 : func (q *recordQueue) snapshotAndSwitchWriter(
300 : writer *record.LogWriter,
301 : snapshotFunc func(firstIndex uint32, entries []recordQueueEntry) (logSize int64),
302 2 : ) {
303 2 : q.mu.Lock()
304 2 : defer q.mu.Unlock()
305 2 : q.writer = writer
306 2 : h, t := unpackHeadTail(q.headTail.Load())
307 2 : n := h - t
308 2 : if n > 0 {
309 2 : m := uint32(len(q.buffer))
310 2 : b := make([]recordQueueEntry, n)
311 2 : for i := t; i < h; i++ {
312 2 : b[i-t] = q.buffer[i%m]
313 2 : }
314 2 : q.lastLogSize = snapshotFunc(t, b)
315 : }
316 : }
317 :
318 : // getLastIndex is used by failoverWriter.Close.
319 2 : func (q *recordQueue) getLastIndex() (lastIndex int64) {
320 2 : h, _ := unpackHeadTail(q.headTail.Load())
321 2 : return int64(h) - 1
322 2 : }
323 :
324 : const headTailBits = 32
325 :
326 2 : func unpackHeadTail(ht uint64) (head, tail uint32) {
327 2 : const mask = 1<<headTailBits - 1
328 2 : head = uint32((ht >> headTailBits) & mask)
329 2 : tail = uint32(ht & mask)
330 2 : return head, tail
331 2 : }
332 :
333 : // Maximum number of physical log files when writing a virtual WAL. Arbitrarily
334 : // chosen value. Setting this to 2 will not simplify the code. We make this a
335 : // constant since we want a fixed size array for writer.writers.
336 : const maxPhysicalLogs = 10
337 :
338 : // failoverWriter is the implementation of Writer in failover mode. No Writer
339 : // method blocks for IO, except for Close.
340 : //
341 : // Loosely speaking, Close blocks until all records are successfully written
342 : // and synced to some log writer. Monitoring of log writer latency and errors
343 : // continues after Close is called, which means failoverWriter can be switched
344 : // to a new log writer after Close is called, to unblock Close.
345 : //
346 : // More precisely, Close does not block if there is an error in creating or
347 : // closing the latest LogWriter when close was called. This is because errors
348 : // are considered indicative of misconfiguration, and the user of
349 : // failoverWriter can dampen switching when observing errors (e.g. see
350 : // failoverMonitor), so close does not assume any liveness of calls to
351 : // switchToNewDir when such errors occur. Since the caller (see db.go) treats
352 : // an error on Writer.Close as fatal, this does mean that failoverWriter has
353 : // limited ability to mask errors (its primary task is to mask high latency).
354 : type failoverWriter struct {
355 : opts failoverWriterOpts
356 : q recordQueue
357 : mu struct {
358 : sync.Mutex
359 : // writers is protected by mu, except for updates to the
360 : // latencyAndErrorRecorder field. WriteRecord does not acquire mu, so the
361 : // protection by mu is for handling concurrent calls to switchToNewDir,
362 : // Close, and getLog.
363 : writers [maxPhysicalLogs]logWriterAndRecorder
364 :
365 : // cond is signaled when the latest LogWriter is set in writers (or there
366 : // is a creation error), or when the latest LogWriter is successfully
367 : // closed. It is waited on in Close. We don't use channels and select
368 : // since what Close is waiting on is dynamic based on the local state in
369 : // Close, so using Cond is simpler.
370 : cond *sync.Cond
371 : // nextWriterIndex is advanced before creating the *LogWriter. That is, a
372 : // slot is reserved by taking the current value of nextWriterIndex and
373 : // incrementing it, and then the *LogWriter for that slot is created. When
374 : // newFailoverWriter returns, nextWriterIndex = 1.
375 : //
376 : // The latest *LogWriter is (will be) at nextWriterIndex-1.
377 : //
378 : // INVARIANT: nextWriterIndex <= len(writers)
379 : nextWriterIndex LogNameIndex
380 : closed bool
381 : // metrics is initialized in Close. Currently we just use the metrics from
382 : // the latest writer after it is closed, since in the common case with
383 : // only one writer, that writer's flush loop will have finished and the
384 : // metrics will be current. With multiple writers, these metrics can be
385 : // quite inaccurate. The WriteThroughput metric includes an IdleDuration,
386 : // which can be high for a writer that was switched away from, and
387 : // therefore not indicative of overall work being done by the
388 : // failoverWriter. The PendingBufferLen and SyncQueueLen are similarly
389 : // inaccurate once there is no more work being given to a writer. We could
390 : // add a method to LogWriter to stop sampling metrics when it is not the
391 : // latest writer. Then we could aggregate all these metrics across all
392 : // writers.
393 : //
394 : // Note that CockroachDB does not use these metrics in any meaningful way.
395 : //
396 : // TODO(sumeer): do the improved solution outlined above.
397 : metrics record.LogWriterMetrics
398 : }
399 : // State for computing logical offset. The cumulative offset state is in
400 : // offset. Each time we call SyncRecordGeneralized from WriteRecord, we
401 : // compute the delta from the size returned by this LogWriter now, and the
402 : // size returned by this LogWriter in the previous call to
403 : // SyncRecordGeneralized. That previous call to SyncRecordGeneralized may
404 : // have happened from WriteRecord, or asynchronously during a switch. So
405 : // that previous call state requires synchronization and is maintained in
406 : // recordQueue. The offset is incremented by this delta without any
407 : // synchronization, since we rely on external synchronization (like the
408 : // standaloneWriter).
409 : logicalOffset struct {
410 : latestWriterInWriteRecord *record.LogWriter
411 : latestLogSizeInWriteRecord int64
412 : offset int64
413 : // Transitions once from false => true when there is a non-nil writer.
414 : notEstimatedOffset bool
415 : }
416 : psiForWriteRecordBacking record.PendingSyncIndex
417 : psiForSwitchBacking record.PendingSyncIndex
418 : }
419 :
420 : type logWriterAndRecorder struct {
421 : // This may never become non-nil, if when the LogWriter was finally created,
422 : // it was no longer the latest writer. Additionally, if there was an error
423 : // in creating the writer, w will remain nil and createError will be set.
424 : w *record.LogWriter
425 : // createError is set if there is an error creating the writer. This is
426 : // useful in Close since we need to know when the work for creating the
427 : // latest writer is done, whether it resulted in success or not.
428 : createError error
429 : r latencyAndErrorRecorder
430 :
431 : // dir, approxFileSize, synchronouslyClosed are kept for initializing
432 : // segmentWithSizeEtc. The approxFileSize is initially set to whatever is
433 : // returned by logCreator. When failoverWriter.Close is called,
434 : // approxFileSize and synchronouslyClosed may be updated.
435 : dir Dir
436 : approxFileSize uint64
437 : synchronouslyClosed bool
438 : }
439 :
440 : var _ Writer = &failoverWriter{}
441 :
442 : var _ switchableWriter = &failoverWriter{}
443 :
444 : type failoverWriterOpts struct {
445 : wn NumWAL
446 : logger base.Logger
447 : timeSource
448 : jobID int
449 : logCreator
450 :
451 : // Options that feed into SyncingFileOptions.
452 : noSyncOnClose bool
453 : bytesPerSync int
454 : preallocateSize func() int
455 :
456 : // Options for record.LogWriter.
457 : minSyncInterval func() time.Duration
458 : fsyncLatency prometheus.Histogram
459 : queueSemChan chan struct{}
460 : stopper *stopper
461 :
462 : failoverWriteAndSyncLatency prometheus.Histogram
463 : // writerClosed is a callback invoked by the FailoverWriter when it's
464 : // closed. It notifies the FailoverManager that the writer is now closed and
465 : // propagates information about the various physical segment files that have
466 : // been created.
467 : //
468 : // Note that the asynchronous creation of physical segment files means that
469 : // the writerClosed invocation is not guaranteed to include all physical
470 : // segment files that will ultimately be created for this logical WAL. If a
471 : // new segment file is created after writerClosed is inovked, it will be
472 : // propagated to the FailoverManager via the segmentClosed callback.
473 : writerClosed func(logicalLogWithSizesEtc)
474 : // segmentClosed is a callback invoked by the FailoverWriter when a segment
475 : // file creation completes but the writerClosed callback has already been
476 : // invoked. It's used to ensure that we reclaim all physical segment files,
477 : // including ones that did not complete creation before the Writer was
478 : // closed.
479 : segmentClosed func(logicalLogWithSizesEtc)
480 :
481 : writerCreatedForTest chan<- struct{}
482 :
483 : // writeWALSyncOffsets determines whether to write WAL sync chunk offsets.
484 : // The format major version can change (ratchet) at runtime, so this must be
485 : // a function rather than a static bool to ensure we use the latest format version.
486 : writeWALSyncOffsets func() bool
487 : }
488 :
489 : func simpleLogCreator(
490 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
491 1 : ) (f vfs.File, initialFileSize uint64, err error) {
492 1 : filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(wn, li))
493 1 : // Create file.
494 1 : r.writeStart()
495 1 : f, err = dir.FS.Create(filename, "pebble-wal")
496 1 : r.writeEnd(err)
497 1 : return f, 0, err
498 1 : }
499 :
500 : type logCreator func(
501 : dir Dir, wn NumWAL, li LogNameIndex, r *latencyAndErrorRecorder, jobID int,
502 : ) (f vfs.File, initialFileSize uint64, err error)
503 :
504 : func newFailoverWriter(
505 : opts failoverWriterOpts, initialDir dirAndFileHandle,
506 2 : ) (*failoverWriter, error) {
507 2 : ww := &failoverWriter{
508 2 : opts: opts,
509 2 : }
510 2 : ww.q.init(opts.failoverWriteAndSyncLatency)
511 2 : ww.mu.cond = sync.NewCond(&ww.mu)
512 2 : // The initial record.LogWriter creation also happens via a
513 2 : // switchToNewWriter since we don't want it to block newFailoverWriter.
514 2 : err := ww.switchToNewDir(initialDir)
515 2 : if err != nil {
516 0 : // Switching limit cannot be exceeded when creating.
517 0 : panic(err)
518 : }
519 2 : return ww, nil
520 : }
521 :
522 : // WriteRecord implements Writer.
523 : func (ww *failoverWriter) WriteRecord(
524 : p []byte, opts SyncOptions, ref RefCount,
525 2 : ) (logicalOffset int64, err error) {
526 2 : if ref != nil {
527 2 : ref.Ref()
528 2 : }
529 2 : var writeStart crtime.Mono
530 2 : if opts.Done != nil {
531 2 : writeStart = crtime.NowMono()
532 2 : }
533 2 : recordIndex, writer, lastLogSize := ww.q.push(
534 2 : p,
535 2 : opts,
536 2 : ref,
537 2 : writeStart,
538 2 : ww.logicalOffset.latestLogSizeInWriteRecord,
539 2 : ww.logicalOffset.latestWriterInWriteRecord,
540 2 : )
541 2 : if writer == nil {
542 2 : // Don't have a record.LogWriter yet, so use an estimate. This estimate
543 2 : // will get overwritten.
544 2 : ww.logicalOffset.offset += int64(len(p))
545 2 : return ww.logicalOffset.offset, nil
546 2 : }
547 : // INVARIANT: writer != nil.
548 2 : notEstimatedOffset := ww.logicalOffset.notEstimatedOffset
549 2 : if !notEstimatedOffset {
550 2 : ww.logicalOffset.notEstimatedOffset = true
551 2 : }
552 2 : ww.psiForWriteRecordBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
553 2 : if opts.Done != nil {
554 2 : ww.psiForWriteRecordBacking.Index = int64(recordIndex)
555 2 : }
556 2 : ww.logicalOffset.latestLogSizeInWriteRecord, err = writer.SyncRecordGeneralized(p, &ww.psiForWriteRecordBacking)
557 2 : ww.logicalOffset.latestWriterInWriteRecord = writer
558 2 : if notEstimatedOffset {
559 2 : delta := ww.logicalOffset.latestLogSizeInWriteRecord - lastLogSize
560 2 : ww.logicalOffset.offset += delta
561 2 : } else {
562 2 : // Overwrite the estimate. This is a best-effort improvement in that it is
563 2 : // accurate for the common case where writer is the first LogWriter.
564 2 : // Consider a failover scenario where there was no LogWriter for the first
565 2 : // 10 records, so they are all accumulated as an estimate. Then the first
566 2 : // LogWriter successfully writes and syncs the first 5 records and gets
567 2 : // stuck. A switch happens to a second LogWriter that is handed the
568 2 : // remaining 5 records, and the 11th record arrives via a WriteRecord.
569 2 : // The transition from !notEstimatedOffset to notEstimatedOffset will
570 2 : // happen on this 11th record, and the logic here will use the length of
571 2 : // the second LogWriter, that does not reflect the full length.
572 2 : //
573 2 : // TODO(sumeer): try to make this more correct, without adding much more
574 2 : // complexity, and without adding synchronization.
575 2 : ww.logicalOffset.offset = ww.logicalOffset.latestLogSizeInWriteRecord
576 2 : }
577 2 : return ww.logicalOffset.offset, err
578 : }
579 :
580 : // switchToNewDir starts switching to dir. It implements switchableWriter. All
581 : // work is async, and a non-nil error is returned only if the switching limit
582 : // is exceeded.
583 2 : func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
584 2 : ww.mu.Lock()
585 2 : // Can have a late switchToNewDir call is the failoverMonitor has not yet
586 2 : // been told that the writer is closed. Ignore.
587 2 : if ww.mu.closed {
588 1 : ww.mu.Unlock()
589 1 : if ww.opts.writerCreatedForTest != nil {
590 1 : ww.opts.writerCreatedForTest <- struct{}{}
591 1 : }
592 1 : return nil
593 : }
594 : // writerIndex is the slot for this writer.
595 2 : writerIndex := ww.mu.nextWriterIndex
596 2 : if int(writerIndex) == len(ww.mu.writers) {
597 1 : ww.mu.Unlock()
598 1 : return errors.Errorf("exceeded switching limit")
599 1 : }
600 2 : ww.mu.writers[writerIndex].dir = dir.Dir
601 2 : ww.mu.nextWriterIndex++
602 2 : ww.mu.Unlock()
603 2 :
604 2 : // Creation is async.
605 2 : ww.opts.stopper.runAsync(func() {
606 2 : recorderAndWriter := &ww.mu.writers[writerIndex].r
607 2 : recorderAndWriter.ts = ww.opts.timeSource
608 2 : file, initialFileSize, err := ww.opts.logCreator(
609 2 : dir.Dir, ww.opts.wn, writerIndex, recorderAndWriter, ww.opts.jobID)
610 2 : ww.mu.writers[writerIndex].approxFileSize = initialFileSize
611 2 : // handleErrFunc is called when err != nil. It handles the multiple IO error
612 2 : // cases below.
613 2 : handleErrFunc := func(err error) {
614 1 : if file != nil {
615 1 : file.Close()
616 1 : }
617 1 : ww.mu.Lock()
618 1 : defer ww.mu.Unlock()
619 1 : ww.mu.writers[writerIndex].createError = err
620 1 : ww.mu.cond.Signal()
621 1 : if ww.opts.writerCreatedForTest != nil {
622 1 : ww.opts.writerCreatedForTest <- struct{}{}
623 1 : }
624 : }
625 2 : if err != nil {
626 1 : handleErrFunc(err)
627 1 : return
628 1 : }
629 : // Sync dir.
630 2 : recorderAndWriter.writeStart()
631 2 : err = dir.Sync()
632 2 : recorderAndWriter.writeEnd(err)
633 2 : if err != nil {
634 1 : handleErrFunc(err)
635 1 : return
636 1 : }
637 : // Wrap in a syncingFile.
638 2 : syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
639 2 : NoSyncOnClose: ww.opts.noSyncOnClose,
640 2 : BytesPerSync: ww.opts.bytesPerSync,
641 2 : PreallocateSize: ww.opts.preallocateSize(),
642 2 : })
643 2 : // Wrap in the latencyAndErrorRecorder.
644 2 : recorderAndWriter.setWriter(syncingFile)
645 2 :
646 2 : // Using NumWAL as the DiskFileNum is fine since it is used only as
647 2 : // EOF trailer for safe log recycling. Even though many log files can
648 2 : // map to a single NumWAL, a file used for NumWAL n at index m will
649 2 : // never get recycled for NumWAL n at a later index (since recycling
650 2 : // happens when n as a whole is obsolete).
651 2 : w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn),
652 2 : record.LogWriterConfig{
653 2 : WALMinSyncInterval: ww.opts.minSyncInterval,
654 2 : WALFsyncLatency: ww.opts.fsyncLatency,
655 2 : QueueSemChan: ww.opts.queueSemChan,
656 2 : ExternalSyncQueueCallback: ww.doneSyncCallback,
657 2 : WriteWALSyncOffsets: ww.opts.writeWALSyncOffsets,
658 2 : })
659 2 : closeWriter := func() bool {
660 2 : ww.mu.Lock()
661 2 : defer ww.mu.Unlock()
662 2 : if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed {
663 2 : // Not the latest writer or the writer was closed while this async
664 2 : // creation was ongoing.
665 2 : if ww.opts.writerCreatedForTest != nil {
666 1 : ww.opts.writerCreatedForTest <- struct{}{}
667 1 : }
668 2 : return true
669 : }
670 : // Latest writer.
671 2 : ww.mu.writers[writerIndex].w = w
672 2 : ww.mu.cond.Signal()
673 2 : // NB: snapshotAndSwitchWriter does not block on IO, since
674 2 : // SyncRecordGeneralized does no IO.
675 2 : ww.q.snapshotAndSwitchWriter(w,
676 2 : func(firstIndex uint32, entries []recordQueueEntry) (logSize int64) {
677 2 : for i := range entries {
678 2 : ww.psiForSwitchBacking = record.PendingSyncIndex{Index: record.NoSyncIndex}
679 2 : if entries[i].opts.Done != nil {
680 2 : ww.psiForSwitchBacking.Index = int64(firstIndex) + int64(i)
681 2 : }
682 2 : var err error
683 2 : logSize, err = w.SyncRecordGeneralized(entries[i].p, &ww.psiForSwitchBacking)
684 2 : if err != nil {
685 0 : // TODO(sumeer): log periodically. The err will also surface via
686 0 : // the latencyAndErrorRecorder, so if a switch is possible, it
687 0 : // will be done.
688 0 : ww.opts.logger.Errorf("%s", err)
689 0 : }
690 : }
691 2 : return logSize
692 : })
693 2 : if ww.opts.writerCreatedForTest != nil {
694 1 : ww.opts.writerCreatedForTest <- struct{}{}
695 1 : }
696 2 : return false
697 : }()
698 2 : if closeWriter {
699 2 : // Never wrote anything to this writer so don't care about the
700 2 : // returned error.
701 2 : ww.opts.stopper.runAsync(func() {
702 2 : _ = w.Close()
703 2 : // Invoke the segmentClosed callback to propagate knowledge that
704 2 : // there's an obsolete segment file we should clean up. Note
705 2 : // that the file may be occupying non-negligible disk space even
706 2 : // though we never wrote to it due to preallocation.
707 2 : ww.opts.segmentClosed(logicalLogWithSizesEtc{
708 2 : num: ww.opts.wn,
709 2 : segments: []segmentWithSizeEtc{
710 2 : {
711 2 : segment: segment{
712 2 : logNameIndex: LogNameIndex(writerIndex),
713 2 : dir: dir.Dir,
714 2 : },
715 2 : approxFileSize: initialFileSize,
716 2 : synchronouslyClosed: false,
717 2 : },
718 2 : },
719 2 : })
720 2 : })
721 : }
722 : })
723 2 : return nil
724 : }
725 :
726 : // doneSyncCallback is the record.ExternalSyncQueueCallback called by
727 : // record.LogWriter.
728 : //
729 : // recordQueue is popped from only when some work requests a sync (and
730 : // successfully syncs). In the worst case, if no syncs are requested, we could
731 : // queue all the records needed to fill up a memtable in the recordQueue. This
732 : // can have two negative effects: (a) in the case of failover, we need to
733 : // replay all the data in the current mutable memtable, which takes more time,
734 : // (b) the memory usage is proportional to the size of the memtable. We ignore
735 : // these negatives since, (a) users like CockroachDB regularly sync, and (b)
736 : // the default memtable size is only 64MB.
737 2 : func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) {
738 2 : if err != nil {
739 1 : // Don't pop anything since we can retry after switching to a new
740 1 : // LogWriter.
741 1 : return
742 1 : }
743 : // NB: harmless after Close returns since numSyncsPopped will be 0.
744 2 : numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err)
745 2 : if ww.opts.queueSemChan != nil {
746 2 : for i := 0; i < numSyncsPopped; i++ {
747 2 : <-ww.opts.queueSemChan
748 2 : }
749 : }
750 : }
751 :
752 : // ongoingLatencyOrErrorForCurDir implements switchableWriter.
753 2 : func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) {
754 2 : r := ww.recorderForCurDir()
755 2 : if r == nil {
756 1 : return 0, nil
757 1 : }
758 2 : return r.ongoingLatencyOrError()
759 : }
760 :
761 : // For internal use and testing.
762 2 : func (ww *failoverWriter) recorderForCurDir() *latencyAndErrorRecorder {
763 2 : ww.mu.Lock()
764 2 : defer ww.mu.Unlock()
765 2 : if ww.mu.closed {
766 1 : return nil
767 1 : }
768 2 : return &ww.mu.writers[ww.mu.nextWriterIndex-1].r
769 : }
770 :
771 : // Close implements Writer.
772 : //
773 : // NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be
774 : // called after Close is called, and there is also a possibility that they get
775 : // called after Close returns and before failoverMonitor knows that the
776 : // failoverWriter is closed.
777 : //
778 : // doneSyncCallback can be called anytime after Close returns since there
779 : // could be stuck writes that finish arbitrarily later.
780 : //
781 : // See the long comment about Close behavior where failoverWriter is declared.
782 2 : func (ww *failoverWriter) Close() (logicalOffset int64, err error) {
783 2 : offset, err := ww.closeInternal()
784 2 : ww.opts.writerClosed(ww.getLog())
785 2 : return offset, err
786 2 : }
787 :
788 2 : func (ww *failoverWriter) closeInternal() (logicalOffset int64, err error) {
789 2 : logicalOffset = ww.logicalOffset.offset
790 2 : // [0, closeCalledCount) have had LogWriter.Close called (though may not
791 2 : // have finished) or the LogWriter will never be non-nil. Either way, they
792 2 : // have been "processed".
793 2 : closeCalledCount := LogNameIndex(0)
794 2 : // lastWriterState is the state for the last writer, for which we are
795 2 : // waiting for LogWriter.Close to finish or for creation to be unsuccessful.
796 2 : // What is considered the last writer can change. All state is protected by
797 2 : // ww.mu.
798 2 : type lastWriterState struct {
799 2 : index LogNameIndex
800 2 : closed bool
801 2 : err error
802 2 : metrics record.LogWriterMetrics
803 2 : }
804 2 : var lastWriter lastWriterState
805 2 : lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()}
806 2 : ww.mu.Lock()
807 2 : defer ww.mu.Unlock()
808 2 : numWriters := ww.mu.nextWriterIndex
809 2 : // Every iteration starts and ends with the mutex held.
810 2 : //
811 2 : // Invariant: numWriters >= 1.
812 2 : //
813 2 : // We will loop until we have closed the lastWriter (and use
814 2 : // lastWriter.err). We also need to call close on all LogWriters
815 2 : // that will not close themselves, i.e., those that have already been
816 2 : // created and installed in failoverWriter.writers (this set may change
817 2 : // while failoverWriter.Close runs).
818 2 : for !lastWriter.closed || numWriters > lastWriter.index+1 {
819 2 : if numWriters > closeCalledCount {
820 2 : // lastWriter.index may or may not have advanced. If it has advanced, we
821 2 : // need to reinitialize lastWriterState. If it hasn't advanced, and
822 2 : // numWriters > closeCalledCount, we know that we haven't called close
823 2 : // on it, so nothing in lastWriterState needs to be retained. For
824 2 : // simplicity, we overwrite in both cases.
825 2 : lastWriter = lastWriterState{
826 2 : index: numWriters - 1,
827 2 : }
828 2 : // Try to process [closeCalledCount, numWriters). Will surely process
829 2 : // [closeCalledCount, numWriters-1), since those writers are either done
830 2 : // initializing, or will close themselves. The writer at numWriters-1 we
831 2 : // can only process if it is done initializing, else we will iterate
832 2 : // again.
833 2 : for i := closeCalledCount; i < numWriters; i++ {
834 2 : w := ww.mu.writers[i].w
835 2 : cErr := ww.mu.writers[i].createError
836 2 : // Is the current index the last writer. If yes, this is also the last
837 2 : // loop iteration.
838 2 : isLastWriter := i == lastWriter.index
839 2 : if w != nil {
840 2 : // Can close it, so extend closeCalledCount.
841 2 : closeCalledCount = i + 1
842 2 : size := uint64(w.Size())
843 2 : if ww.mu.writers[i].approxFileSize < size {
844 2 : ww.mu.writers[i].approxFileSize = size
845 2 : }
846 2 : if isLastWriter {
847 2 : // We may care about its error and when it finishes closing.
848 2 : index := i
849 2 : ww.opts.stopper.runAsync(func() {
850 2 : // Last writer(s) (since new writers can be created and become
851 2 : // last, as we iterate) are guaranteed to have seen the last
852 2 : // record (since it was queued before Close was called). It is
853 2 : // possible that a writer got created after the last record was
854 2 : // dequeued and before this fact was realized by Close. In that
855 2 : // case we will harmlessly tell it that it synced that last
856 2 : // record, though it has already been written and synced by
857 2 : // another writer.
858 2 : err := w.CloseWithLastQueuedRecord(lastRecordIndex)
859 2 : ww.mu.Lock()
860 2 : defer ww.mu.Unlock()
861 2 : if lastWriter.index == index {
862 2 : lastWriter.closed = true
863 2 : lastWriter.err = err
864 2 : lastWriter.metrics = w.Metrics()
865 2 : ww.mu.cond.Signal()
866 2 : }
867 : })
868 2 : } else {
869 2 : // Don't care about the returned error since all the records we
870 2 : // relied on this writer for were already successfully written.
871 2 : ww.opts.stopper.runAsync(func() {
872 2 : _ = w.CloseWithLastQueuedRecord(record.PendingSyncIndex{Index: record.NoSyncIndex})
873 2 : })
874 : }
875 2 : } else if cErr != nil {
876 1 : // Have processed it, so extend closeCalledCount.
877 1 : closeCalledCount = i + 1
878 1 : if isLastWriter {
879 1 : lastWriter.closed = true
880 1 : lastWriter.err = cErr
881 1 : lastWriter.metrics = record.LogWriterMetrics{}
882 1 : }
883 : // Else, ignore.
884 2 : } else {
885 2 : if !isLastWriter {
886 2 : // Not last writer, so will close itself.
887 2 : closeCalledCount = i + 1
888 2 : }
889 : // Else, last writer, so we may have to close it.
890 : }
891 : }
892 : }
893 2 : if !lastWriter.closed {
894 2 : // Either waiting for creation of last writer or waiting for the close
895 2 : // to finish, or something else to become the last writer.
896 2 : //
897 2 : // It is possible that what we think of as the last writer (lastWriter)
898 2 : // closes itself while ww.mu is no longer held here, and a new LogWriter
899 2 : // is created too. All the records are synced, but the real last writer
900 2 : // may still be writing some records. Specifically, consider the
901 2 : // following sequence while this wait does not hold the mutex:
902 2 : //
903 2 : // - recordQueue has an entry, with index 50, that does not require a
904 2 : // sync.
905 2 : // - Last writer created at index 10 and entry 50 is handed to it.
906 2 : // - lastWriter.index is still 9 and it closes itself and signals this
907 2 : // cond. It has written entry 50 and synced (since close syncs).
908 2 : // - The wait completes.
909 2 : //
910 2 : // Now the writer at index 10 will never be closed and will never sync.
911 2 : // A crash can cause some part of what it writes to be lost. Note that
912 2 : // there is no data loss, but there are some unfortunate consequences:
913 2 : //
914 2 : // - We never closed a file descriptor.
915 2 : // - virtualWALReader.NextRecord can return an error on finding a
916 2 : // malformed chunk in the last writer (at index 10) instead of
917 2 : // swallowing the error. This can cause DB.Open to fail.
918 2 : //
919 2 : // To avoid this, we grab the latest value of numWriters on reacquiring
920 2 : // the mutex, and will continue looping until the writer at index 10 is
921 2 : // closed (or writer at index 11 is created).
922 2 : ww.mu.cond.Wait()
923 2 : numWriters = ww.mu.nextWriterIndex
924 2 : }
925 : }
926 2 : if ww.mu.writers[lastWriter.index].w != nil {
927 2 : // This permits log recycling.
928 2 : ww.mu.writers[lastWriter.index].synchronouslyClosed = true
929 2 : }
930 2 : err = lastWriter.err
931 2 : ww.mu.metrics = lastWriter.metrics
932 2 : ww.mu.closed = true
933 2 : n, m := ww.q.popAll(err)
934 2 : if err == nil && (n > 0 || m > 0) {
935 0 : panic(errors.AssertionFailedf("no error but recordQueue had %d records and %d syncs", n, m))
936 : }
937 2 : return logicalOffset, err
938 : }
939 :
940 : // Metrics implements writer.
941 2 : func (ww *failoverWriter) Metrics() record.LogWriterMetrics {
942 2 : ww.mu.Lock()
943 2 : defer ww.mu.Unlock()
944 2 : return ww.mu.metrics
945 2 : }
946 :
947 : // getLog can be called at any time, including after Close returns.
948 2 : func (ww *failoverWriter) getLog() logicalLogWithSizesEtc {
949 2 : ww.mu.Lock()
950 2 : defer ww.mu.Unlock()
951 2 : ll := logicalLogWithSizesEtc{
952 2 : num: ww.opts.wn,
953 2 : }
954 2 : for i := range ww.mu.writers {
955 2 : if ww.mu.writers[i].w != nil {
956 2 : ll.segments = append(ll.segments, segmentWithSizeEtc{
957 2 : segment: segment{
958 2 : logNameIndex: LogNameIndex(i),
959 2 : dir: ww.mu.writers[i].dir,
960 2 : },
961 2 : approxFileSize: ww.mu.writers[i].approxFileSize,
962 2 : synchronouslyClosed: ww.mu.writers[i].synchronouslyClosed,
963 2 : })
964 2 : }
965 : }
966 2 : return ll
967 : }
968 :
969 : // latencyAndErrorRecorder records ongoing write and sync operations and errors
970 : // in those operations. record.LogWriter cannot continue functioning after any
971 : // error, so all errors are considered permanent.
972 : //
973 : // writeStart/writeEnd are used directly when creating a file. After the file
974 : // is successfully created, setWriter turns latencyAndErrorRecorder into an
975 : // implementation of writerSyncerCloser that will record for the Write and
976 : // Sync methods.
977 : type latencyAndErrorRecorder struct {
978 : ts timeSource
979 : ongoingOperationStart atomic.Int64
980 : error atomic.Pointer[error]
981 : writerSyncerCloser
982 : }
983 :
984 : type writerSyncerCloser interface {
985 : io.Writer
986 : io.Closer
987 : Sync() error
988 : }
989 :
990 2 : func (r *latencyAndErrorRecorder) writeStart() {
991 2 : r.ongoingOperationStart.Store(r.ts.now().UnixNano())
992 2 : }
993 :
994 2 : func (r *latencyAndErrorRecorder) writeEnd(err error) {
995 2 : if err != nil {
996 1 : ptr := &err
997 1 : r.error.Store(ptr)
998 1 : }
999 2 : r.ongoingOperationStart.Store(0)
1000 : }
1001 :
1002 2 : func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) {
1003 2 : r.writerSyncerCloser = w
1004 2 : }
1005 :
1006 2 : func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) {
1007 2 : startTime := r.ongoingOperationStart.Load()
1008 2 : var latency time.Duration
1009 2 : if startTime != 0 {
1010 2 : l := r.ts.now().UnixNano() - startTime
1011 2 : if l < 0 {
1012 0 : l = 0
1013 0 : }
1014 2 : latency = time.Duration(l)
1015 : }
1016 2 : errPtr := r.error.Load()
1017 2 : var err error
1018 2 : if errPtr != nil {
1019 1 : err = *errPtr
1020 1 : }
1021 2 : return latency, err
1022 : }
1023 :
1024 : // Sync implements writerSyncerCloser.
1025 2 : func (r *latencyAndErrorRecorder) Sync() error {
1026 2 : r.writeStart()
1027 2 : err := r.writerSyncerCloser.Sync()
1028 2 : r.writeEnd(err)
1029 2 : return err
1030 2 : }
1031 :
1032 : // Write implements io.Writer.
1033 2 : func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) {
1034 2 : r.writeStart()
1035 2 : n, err = r.writerSyncerCloser.Write(p)
1036 2 : r.writeEnd(err)
1037 2 : return n, err
1038 2 : }
|