LCOV - code coverage report
Current view: top level - pebble/wal - wal.go (source / functions) Hit Total Coverage
Test: 2024-12-26 08:17Z 78d53457 - meta test only.lcov Lines: 54 82 65.9 %
Date: 2024-12-26 08:19:09 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "strconv"
      11             :         "strings"
      12             :         "sync"
      13             :         "time"
      14             : 
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/record"
      17             :         "github.com/cockroachdb/pebble/vfs"
      18             :         "github.com/cockroachdb/redact"
      19             :         "github.com/prometheus/client_golang/prometheus"
      20             : )
      21             : 
      22             : // TODO(sumeer): write a high-level comment describing the approach.
      23             : 
      24             : // Dir is used for storing log files.
      25             : type Dir struct {
      26             :         FS      vfs.FS
      27             :         Dirname string
      28             : }
      29             : 
      30             : // NumWAL is the number of the virtual WAL. It can map to one or more physical
      31             : // log files. In standalone mode, it will map to exactly one log file. In
      32             : // failover mode, it can map to many log files, which are totally ordered
      33             : // (using a dense logNameIndex).
      34             : //
      35             : // In general, WAL refers to the virtual WAL, and file refers to a log file.
      36             : // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to
      37             : // them. Additional mapping to one or more files happens in this package. If a
      38             : // WAL maps to multiple files, the source of truth regarding that mapping is
      39             : // the contents of the directories.
      40             : type NumWAL base.DiskFileNum
      41             : 
      42             : // String implements fmt.Stringer.
      43           0 : func (s NumWAL) String() string { return base.DiskFileNum(s).String() }
      44             : 
      45             : // LogNameIndex numbers log files within a WAL.
      46             : type LogNameIndex uint32
      47             : 
      48             : // String implements fmt.Stringer.
      49           1 : func (li LogNameIndex) String() string {
      50           1 :         return fmt.Sprintf("%03d", li)
      51           1 : }
      52             : 
      53             : // makeLogFilename makes a log filename.
      54           1 : func makeLogFilename(wn NumWAL, index LogNameIndex) string {
      55           1 :         if index == 0 {
      56           1 :                 // Use a backward compatible name, for simplicity.
      57           1 :                 return fmt.Sprintf("%s.log", base.DiskFileNum(wn).String())
      58           1 :         }
      59           1 :         return fmt.Sprintf("%s-%s.log", base.DiskFileNum(wn).String(), index)
      60             : }
      61             : 
      62             : // ParseLogFilename takes a base filename and parses it into its constituent
      63             : // NumWAL and LogNameIndex. If the filename is not a log file, it returns false
      64             : // for the final return value.
      65           1 : func ParseLogFilename(name string) (NumWAL, LogNameIndex, bool) {
      66           1 :         i := strings.IndexByte(name, '.')
      67           1 :         if i < 0 || name[i:] != ".log" {
      68           1 :                 return 0, 0, false
      69           1 :         }
      70           1 :         j := strings.IndexByte(name[:i], '-')
      71           1 :         if j < 0 {
      72           1 :                 dfn, ok := base.ParseDiskFileNum(name[:i])
      73           1 :                 if !ok {
      74           0 :                         // We've considered returning an error for filenames that end in
      75           0 :                         // '.log' but fail to parse correctly. We decided against it because
      76           0 :                         // the '.log' suffix is used by Cockroach's daignostics log files.
      77           0 :                         // It's conceivable that some of these found their way into a data
      78           0 :                         // directory, and erroring would cause an issue for an existing
      79           0 :                         // Cockroach deployment.
      80           0 :                         return 0, 0, false
      81           0 :                 }
      82           1 :                 return NumWAL(dfn), 0, true
      83             :         }
      84           1 :         dfn, ok := base.ParseDiskFileNum(name[:j])
      85           1 :         if !ok {
      86           0 :                 return 0, 0, false
      87           0 :         }
      88           1 :         li, err := strconv.ParseUint(name[j+1:i], 10, 64)
      89           1 :         if err != nil {
      90           0 :                 return 0, 0, false
      91           0 :         }
      92           1 :         return NumWAL(dfn), LogNameIndex(li), true
      93             : }
      94             : 
      95             : // Options provides configuration for the Manager.
      96             : type Options struct {
      97             :         // Primary dir for storing WAL files. It must already be created and synced
      98             :         // up to the root.
      99             :         Primary Dir
     100             :         // Secondary is used for failover. Optional. It must already be created and
     101             :         // synced up to the root.
     102             :         Secondary Dir
     103             : 
     104             :         // MinUnflushedLogNum is the smallest WAL number corresponding to
     105             :         // mutations that have not been flushed to a sstable.
     106             :         MinUnflushedWALNum NumWAL
     107             : 
     108             :         // Recycling configuration. Only files in the primary dir are recycled.
     109             : 
     110             :         // MaxNumRecyclableLogs is the maximum number of log files to maintain for
     111             :         // recycling.
     112             :         MaxNumRecyclableLogs int
     113             : 
     114             :         // Configuration for calling vfs.NewSyncingFile.
     115             : 
     116             :         // NoSyncOnClose is documented in SyncingFileOptions.
     117             :         NoSyncOnClose bool
     118             :         // BytesPerSync is documented in SyncingFileOptions.
     119             :         BytesPerSync int
     120             :         // PreallocateSize is documented in SyncingFileOptions.
     121             :         PreallocateSize func() int
     122             : 
     123             :         // MinSyncInterval is documented in Options.WALMinSyncInterval.
     124             :         MinSyncInterval func() time.Duration
     125             :         // FsyncLatency records fsync latency. This doesn't differentiate between
     126             :         // fsyncs on the primary and secondary dir.
     127             :         //
     128             :         // TODO(sumeer): consider separating out into two histograms.
     129             :         FsyncLatency prometheus.Histogram
     130             :         // QueueSemChan is the channel to pop from when popping from queued records
     131             :         // that have requested a sync. It's original purpose was to function as a
     132             :         // semaphore that prevents the record.LogWriter.flusher.syncQueue from
     133             :         // overflowing (which will cause a panic). It is still useful in that role
     134             :         // when the WALManager is configured in standalone mode. In failover mode
     135             :         // there is no syncQueue, so the pushback into the commit pipeline is
     136             :         // unnecessary, but possibly harmless.
     137             :         QueueSemChan chan struct{}
     138             : 
     139             :         // Logger for logging.
     140             :         Logger base.Logger
     141             : 
     142             :         // EventListener is called on events, like log file creation.
     143             :         EventListener EventListener
     144             : 
     145             :         FailoverOptions
     146             :         // FailoverWriteAndSyncLatency is only populated when WAL failover is
     147             :         // configured.
     148             :         FailoverWriteAndSyncLatency prometheus.Histogram
     149             : }
     150             : 
     151             : // Init constructs and initializes a WAL manager from the provided options and
     152             : // the set of initial logs.
     153           1 : func Init(o Options, initial Logs) (Manager, error) {
     154           1 :         var m Manager
     155           1 :         if o.Secondary == (Dir{}) {
     156           1 :                 m = new(StandaloneManager)
     157           1 :         } else {
     158           1 :                 m = new(failoverManager)
     159           1 :         }
     160           1 :         if err := m.init(o, initial); err != nil {
     161           0 :                 return nil, err
     162           0 :         }
     163           1 :         return m, nil
     164             : }
     165             : 
     166             : // Dirs returns the primary Dir and the secondary if provided.
     167           1 : func (o *Options) Dirs() []Dir {
     168           1 :         if o.Secondary == (Dir{}) {
     169           1 :                 return []Dir{o.Primary}
     170           1 :         }
     171           1 :         return []Dir{o.Primary, o.Secondary}
     172             : }
     173             : 
     174             : // FailoverOptions are options that are specific to failover mode.
     175             : type FailoverOptions struct {
     176             :         // PrimaryDirProbeInterval is the interval for probing the primary dir, when
     177             :         // the WAL is being written to the secondary, to decide when to fail back.
     178             :         PrimaryDirProbeInterval time.Duration
     179             :         // HealthyProbeLatencyThreshold is the latency threshold to declare that the
     180             :         // primary is healthy again.
     181             :         HealthyProbeLatencyThreshold time.Duration
     182             :         // HealthyInterval is the time interval over which the probes have to be
     183             :         // healthy. That is, we look at probe history of length
     184             :         // HealthyInterval/PrimaryDirProbeInterval.
     185             :         HealthyInterval time.Duration
     186             : 
     187             :         // UnhealthySamplingInterval is the interval for sampling ongoing calls and
     188             :         // errors in the latest LogWriter.
     189             :         UnhealthySamplingInterval time.Duration
     190             :         // UnhealthyOperationLatencyThreshold is the latency threshold that is
     191             :         // considered unhealthy, for operations done by a LogWriter. The second return
     192             :         // value indicates whether we should consider failover at all. If the second
     193             :         // return value is false, failover is disabled.
     194             :         UnhealthyOperationLatencyThreshold func() (time.Duration, bool)
     195             : 
     196             :         // ElevatedWriteStallThresholdLag is the duration for which an elevated
     197             :         // threshold should continue after a switch back to the primary dir. This is
     198             :         // because we may have accumulated many unflushed memtables and flushing
     199             :         // them can take some time. Maybe set to 60s.
     200             :         ElevatedWriteStallThresholdLag time.Duration
     201             : 
     202             :         // timeSource is only non-nil for tests.
     203             :         timeSource
     204             : 
     205             :         monitorIterationForTesting chan<- struct{}
     206             :         proberIterationForTesting  chan<- struct{}
     207             :         monitorStateForTesting     func(numSwitches int, ongoingLatencyAtSwitch time.Duration)
     208             :         logWriterCreatedForTesting chan<- struct{}
     209             : }
     210             : 
     211             : // EnsureDefaults ensures that the default values for all options are set if a
     212             : // valid value was not already specified.
     213           1 : func (o *FailoverOptions) EnsureDefaults() {
     214           1 :         if o.PrimaryDirProbeInterval == 0 {
     215           0 :                 o.PrimaryDirProbeInterval = time.Second
     216           0 :         }
     217           1 :         if o.HealthyProbeLatencyThreshold == 0 {
     218           0 :                 o.HealthyProbeLatencyThreshold = 25 * time.Millisecond
     219           0 :         }
     220           1 :         if o.HealthyInterval == 0 {
     221           0 :                 o.HealthyInterval = 15 * time.Second
     222           0 :         }
     223           1 :         if o.UnhealthySamplingInterval == 0 {
     224           0 :                 o.UnhealthySamplingInterval = 100 * time.Millisecond
     225           0 :         }
     226           1 :         if o.UnhealthyOperationLatencyThreshold == nil {
     227           0 :                 o.UnhealthyOperationLatencyThreshold = func() (time.Duration, bool) {
     228           0 :                         return 100 * time.Millisecond, true
     229           0 :                 }
     230             :         }
     231           1 :         if o.ElevatedWriteStallThresholdLag == 0 {
     232           0 :                 o.ElevatedWriteStallThresholdLag = 60 * time.Second
     233           0 :         }
     234             : }
     235             : 
     236             : // EventListener is called on events, like log file creation.
     237             : type EventListener interface {
     238             :         // LogCreated informs the listener of a log file creation.
     239             :         LogCreated(CreateInfo)
     240             : }
     241             : 
     242             : // CreateInfo contains info about a log file creation event.
     243             : type CreateInfo struct {
     244             :         // JobID is the ID of the job the caused the WAL to be created.
     245             :         //
     246             :         // TODO(sumeer): for a file created later due to the need to failover, we
     247             :         // need to provide a JobID generator func in Options.
     248             :         JobID int
     249             :         // Path to the file. This includes the NumWAL, and implicitly or explicitly
     250             :         // includes the logNameIndex.
     251             :         Path string
     252             :         // IsSecondary is true if the file was created on the secondary.
     253             :         IsSecondary bool
     254             :         // Num is the WAL number.
     255             :         Num NumWAL
     256             :         // RecycledFileNum is the file number of a previous log file which was
     257             :         // recycled to create this one. Zero if recycling did not take place.
     258             :         RecycledFileNum base.DiskFileNum
     259             :         // Err contains any error.
     260             :         Err error
     261             : }
     262             : 
     263             : // Stats exposes stats used in Pebble metrics.
     264             : //
     265             : // NB: Metrics.WAL.{Size,BytesIn,BytesWritten} are not maintained by the wal
     266             : // package.
     267             : //
     268             : // TODO(sumeer): with failover, Metrics.WAL.BytesWritten needs to be
     269             : // maintained here.
     270             : type Stats struct {
     271             :         // ObsoleteFileCount is the number of obsolete log files.
     272             :         ObsoleteFileCount int
     273             :         // ObsoleteFileSize is the total size of obsolete log files.
     274             :         ObsoleteFileSize uint64
     275             :         // LiveFileCount is the number of live log files.
     276             :         LiveFileCount int
     277             :         // LiveFileSize is the total size of live log files. This can be higher than
     278             :         // LiveSize due to log recycling (a live log file may be larger than the
     279             :         // size used in its latest incarnation), or failover (resulting in multiple
     280             :         // log files containing the same records).
     281             :         //
     282             :         // This is updated only when log files are closed, to minimize
     283             :         // synchronization.
     284             :         LiveFileSize uint64
     285             :         // Failover contains failover stats.
     286             :         Failover FailoverStats
     287             : }
     288             : 
     289             : // FailoverStats contains stats about WAL failover. These are empty if
     290             : // failover is not configured.
     291             : type FailoverStats struct {
     292             :         // DirSwitchCount is the number of times WAL writing has switched to a
     293             :         // different directory, either due to failover, when the current dir is
     294             :         // unhealthy, or to failback to the primary, when the primary is healthy
     295             :         // again.
     296             :         DirSwitchCount int64
     297             :         // The following durations do not account for continued background writes to
     298             :         // a directory that has been switched away from. These background writes can
     299             :         // happen because of queued records.
     300             : 
     301             :         // PrimaryWriteDuration is the cumulative duration for which WAL writes are
     302             :         // using the primary directory.
     303             :         PrimaryWriteDuration time.Duration
     304             :         // SecondaryWriteDuration is the cumulative duration for which WAL writes
     305             :         // are using the secondary directory.
     306             :         SecondaryWriteDuration time.Duration
     307             : 
     308             :         // FailoverWriteAndSyncLatency measures the latency of writing and syncing a
     309             :         // set of writes that were synced together. Each sample represents the
     310             :         // highest latency observed across the writes in the set of writes. It gives
     311             :         // us a sense of the user-observed latency, which can be much lower than the
     312             :         // underlying fsync latency, when WAL failover is working effectively.
     313             :         FailoverWriteAndSyncLatency prometheus.Histogram
     314             : }
     315             : 
     316             : // Manager handles all WAL work.
     317             : //
     318             : //   - Obsolete can be called concurrently with WAL writing.
     319             : //   - WAL writing: Is done via Create, and the various Writer methods. These
     320             : //     are required to be serialized via external synchronization (specifically,
     321             : //     the caller does it via commitPipeline.mu).
     322             : type Manager interface {
     323             :         // init initializes the Manager. init is called during DB initialization.
     324             :         init(o Options, initial Logs) error
     325             : 
     326             :         // List returns the virtual WALs in ascending order. List must not perform
     327             :         // I/O.
     328             :         List() Logs
     329             :         // Obsolete informs the manager that all virtual WALs less than
     330             :         // minUnflushedNum are obsolete. The callee can choose to recycle some
     331             :         // underlying log files, if !noRecycle. The log files that are not recycled,
     332             :         // and therefore can be deleted, are returned. The deletable files are no
     333             :         // longer tracked by the manager.
     334             :         Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error)
     335             :         // Create creates a new virtual WAL.
     336             :         //
     337             :         // NumWALs passed to successive Create calls must be monotonically
     338             :         // increasing, and be greater than any NumWAL seen earlier. The caller must
     339             :         // close the previous Writer before calling Create.
     340             :         //
     341             :         // jobID is used for the WALEventListener.
     342             :         Create(wn NumWAL, jobID int) (Writer, error)
     343             :         // ElevateWriteStallThresholdForFailover returns true if the caller should
     344             :         // use a high write stall threshold because the WALs are being written to
     345             :         // the secondary dir.
     346             :         ElevateWriteStallThresholdForFailover() bool
     347             :         // Stats returns the latest Stats.
     348             :         Stats() Stats
     349             :         // Close the manager.
     350             :         // REQUIRES: Writers and Readers have already been closed.
     351             :         Close() error
     352             : 
     353             :         // RecyclerForTesting exposes the internal LogRecycler.
     354             :         RecyclerForTesting() *LogRecycler
     355             : }
     356             : 
     357             : // DeletableLog contains information about a log file that can be deleted.
     358             : type DeletableLog struct {
     359             :         vfs.FS
     360             :         // Path to the file.
     361             :         Path string
     362             :         NumWAL
     363             :         ApproxFileSize uint64
     364             : }
     365             : 
     366             : // SyncOptions has non-nil Done and Err when fsync is requested, else both are
     367             : // nil.
     368             : type SyncOptions struct {
     369             :         Done *sync.WaitGroup
     370             :         Err  *error
     371             : }
     372             : 
     373             : // Writer writes to a virtual WAL. A Writer in standalone mode maps to a
     374             : // single record.LogWriter. In failover mode, it can failover across multiple
     375             : // physical log files.
     376             : type Writer interface {
     377             :         // WriteRecord writes a complete record. The record is asynchronously
     378             :         // persisted to the underlying writer. If SyncOptions.Done != nil, the wait
     379             :         // group will be notified when durability is guaranteed or an error has
     380             :         // occurred (set in SyncOptions.Err). External synchronisation provided by
     381             :         // commitPipeline.mu guarantees that WriteRecord calls are serialized.
     382             :         //
     383             :         // The logicalOffset is the logical size of the WAL after this record is
     384             :         // written. If the WAL corresponds to a single log file, this is the offset
     385             :         // in that log file.
     386             :         //
     387             :         // Some Writer implementations may continue to read p after WriteRecord
     388             :         // returns. This is an obstacle to reusing p's memory. If the caller would
     389             :         // like to reuse p's memory, the caller may pass a non-nil [RefCount].  If
     390             :         // the Writer will retain p, it will invoke the [RefCount] before returning.
     391             :         // When it's finished, it will invoke [RefCount.Unref] to release its
     392             :         // reference.
     393             :         WriteRecord(p []byte, opts SyncOptions, ref RefCount) (logicalOffset int64, err error)
     394             :         // Close the writer.
     395             :         Close() (logicalOffset int64, err error)
     396             :         // Metrics must be called after Close. The callee will no longer modify the
     397             :         // returned LogWriterMetrics.
     398             :         Metrics() record.LogWriterMetrics
     399             : }
     400             : 
     401             : // RefCount is a reference count associated with a record passed to
     402             : // [Writer.WriteRecord]. See the comment on WriteRecord.
     403             : type RefCount interface {
     404             :         // Ref increments the reference count.
     405             :         Ref()
     406             :         // Unref increments the reference count.
     407             :         Unref()
     408             : }
     409             : 
     410             : // Reader reads a virtual WAL.
     411             : type Reader interface {
     412             :         // NextRecord returns a reader for the next record. It returns io.EOF if there
     413             :         // are no more records. The reader returned becomes stale after the next NextRecord
     414             :         // call, and should no longer be used.
     415             :         NextRecord() (io.Reader, Offset, error)
     416             :         // Close the reader.
     417             :         Close() error
     418             : }
     419             : 
     420             : // Offset indicates the offset or position of a record within a WAL.
     421             : type Offset struct {
     422             :         // PhysicalFile is the path to the physical file containing a particular
     423             :         // record.
     424             :         PhysicalFile string
     425             :         // Physical indicates the file offset at which a record begins within
     426             :         // the physical file named by PhysicalFile.
     427             :         Physical int64
     428             :         // PreviousFilesBytes is the bytes read from all the previous physical
     429             :         // segment files that have been read up to the current log segment. If WAL
     430             :         // failover is not in use, PreviousFileBytes will always be zero. Otherwise,
     431             :         // it may be non-zero when replaying records from multiple segment files
     432             :         // that make up a single logical WAL.
     433             :         PreviousFilesBytes int64
     434             : }
     435             : 
     436             : // String implements fmt.Stringer, returning a string representation of the
     437             : // offset.
     438           1 : func (o Offset) String() string {
     439           1 :         return redact.StringWithoutMarkers(o)
     440           1 : }
     441             : 
     442             : // SafeFormat implements redact.SafeFormatter.
     443           1 : func (o Offset) SafeFormat(w redact.SafePrinter, _ rune) {
     444           1 :         if o.PreviousFilesBytes > 0 {
     445           1 :                 w.Printf("(%s: %d), %d from previous files", o.PhysicalFile, o.Physical, o.PreviousFilesBytes)
     446           1 :                 return
     447           1 :         }
     448           1 :         w.Printf("(%s: %d)", o.PhysicalFile, o.Physical)
     449             : 
     450             : }

Generated by: LCOV version 1.14