LCOV - code coverage report
Current view: top level - pebble/wal - wal.go (source / functions) Hit Total Coverage
Test: 2024-07-21 08:15Z 72c3f550 - meta test only.lcov Lines: 45 76 59.2 %
Date: 2024-07-21 08:16:40 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "strconv"
      11             :         "strings"
      12             :         "sync"
      13             :         "time"
      14             : 
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/record"
      17             :         "github.com/cockroachdb/pebble/vfs"
      18             :         "github.com/prometheus/client_golang/prometheus"
      19             : )
      20             : 
      21             : // TODO(sumeer): write a high-level comment describing the approach.
      22             : 
      23             : // Dir is used for storing log files.
      24             : type Dir struct {
      25             :         FS      vfs.FS
      26             :         Dirname string
      27             : }
      28             : 
      29             : // NumWAL is the number of the virtual WAL. It can map to one or more physical
      30             : // log files. In standalone mode, it will map to exactly one log file. In
      31             : // failover mode, it can map to many log files, which are totally ordered
      32             : // (using a dense logNameIndex).
      33             : //
      34             : // In general, WAL refers to the virtual WAL, and file refers to a log file.
      35             : // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to
      36             : // them. Additional mapping to one or more files happens in this package. If a
      37             : // WAL maps to multiple files, the source of truth regarding that mapping is
      38             : // the contents of the directories.
      39             : type NumWAL base.DiskFileNum
      40             : 
      41             : // String implements fmt.Stringer.
      42           0 : func (s NumWAL) String() string { return base.DiskFileNum(s).String() }
      43             : 
      44             : // LogNameIndex numbers log files within a WAL.
      45             : type LogNameIndex uint32
      46             : 
      47             : // String implements fmt.Stringer.
      48           1 : func (li LogNameIndex) String() string {
      49           1 :         return fmt.Sprintf("%03d", li)
      50           1 : }
      51             : 
      52             : // makeLogFilename makes a log filename.
      53           1 : func makeLogFilename(wn NumWAL, index LogNameIndex) string {
      54           1 :         if index == 0 {
      55           1 :                 // Use a backward compatible name, for simplicity.
      56           1 :                 return fmt.Sprintf("%s.log", base.DiskFileNum(wn).String())
      57           1 :         }
      58           1 :         return fmt.Sprintf("%s-%s.log", base.DiskFileNum(wn).String(), index)
      59             : }
      60             : 
      61             : // ParseLogFilename takes a base filename and parses it into its constituent
      62             : // NumWAL and LogNameIndex. If the filename is not a log file, it returns false
      63             : // for the final return value.
      64           1 : func ParseLogFilename(name string) (NumWAL, LogNameIndex, bool) {
      65           1 :         i := strings.IndexByte(name, '.')
      66           1 :         if i < 0 || name[i:] != ".log" {
      67           1 :                 return 0, 0, false
      68           1 :         }
      69           1 :         j := strings.IndexByte(name[:i], '-')
      70           1 :         if j < 0 {
      71           1 :                 dfn, ok := base.ParseDiskFileNum(name[:i])
      72           1 :                 if !ok {
      73           0 :                         // We've considered returning an error for filenames that end in
      74           0 :                         // '.log' but fail to parse correctly. We decided against it because
      75           0 :                         // the '.log' suffix is used by Cockroach's daignostics log files.
      76           0 :                         // It's conceivable that some of these found their way into a data
      77           0 :                         // directory, and erroring would cause an issue for an existing
      78           0 :                         // Cockroach deployment.
      79           0 :                         return 0, 0, false
      80           0 :                 }
      81           1 :                 return NumWAL(dfn), 0, true
      82             :         }
      83           1 :         dfn, ok := base.ParseDiskFileNum(name[:j])
      84           1 :         if !ok {
      85           0 :                 return 0, 0, false
      86           0 :         }
      87           1 :         li, err := strconv.ParseUint(name[j+1:i], 10, 64)
      88           1 :         if err != nil {
      89           0 :                 return 0, 0, false
      90           0 :         }
      91           1 :         return NumWAL(dfn), LogNameIndex(li), true
      92             : }
      93             : 
      94             : // Options provides configuration for the Manager.
      95             : type Options struct {
      96             :         // Primary dir for storing WAL files. It must already be created and synced
      97             :         // up to the root.
      98             :         Primary Dir
      99             :         // Secondary is used for failover. Optional. It must already be created and
     100             :         // synced up to the root.
     101             :         Secondary Dir
     102             : 
     103             :         // MinUnflushedLogNum is the smallest WAL number corresponding to
     104             :         // mutations that have not been flushed to a sstable.
     105             :         MinUnflushedWALNum NumWAL
     106             : 
     107             :         // Recycling configuration. Only files in the primary dir are recycled.
     108             : 
     109             :         // MaxNumRecyclableLogs is the maximum number of log files to maintain for
     110             :         // recycling.
     111             :         MaxNumRecyclableLogs int
     112             : 
     113             :         // Configuration for calling vfs.NewSyncingFile.
     114             : 
     115             :         // NoSyncOnClose is documented in SyncingFileOptions.
     116             :         NoSyncOnClose bool
     117             :         // BytesPerSync is documented in SyncingFileOptions.
     118             :         BytesPerSync int
     119             :         // PreallocateSize is documented in SyncingFileOptions.
     120             :         PreallocateSize func() int
     121             : 
     122             :         // MinSyncInterval is documented in Options.WALMinSyncInterval.
     123             :         MinSyncInterval func() time.Duration
     124             :         // FsyncLatency records fsync latency. This doesn't differentiate between
     125             :         // fsyncs on the primary and secondary dir.
     126             :         //
     127             :         // TODO(sumeer): consider separating out into two histograms.
     128             :         FsyncLatency prometheus.Histogram
     129             :         // QueueSemChan is the channel to pop from when popping from queued records
     130             :         // that have requested a sync. It's original purpose was to function as a
     131             :         // semaphore that prevents the record.LogWriter.flusher.syncQueue from
     132             :         // overflowing (which will cause a panic). It is still useful in that role
     133             :         // when the WALManager is configured in standalone mode. In failover mode
     134             :         // there is no syncQueue, so the pushback into the commit pipeline is
     135             :         // unnecessary, but possibly harmless.
     136             :         QueueSemChan chan struct{}
     137             : 
     138             :         // Logger for logging.
     139             :         Logger base.Logger
     140             : 
     141             :         // EventListener is called on events, like log file creation.
     142             :         EventListener EventListener
     143             : 
     144             :         FailoverOptions
     145             :         // FailoverWriteAndSyncLatency is only populated when WAL failover is
     146             :         // configured.
     147             :         FailoverWriteAndSyncLatency prometheus.Histogram
     148             : }
     149             : 
     150             : // Init constructs and initializes a WAL manager from the provided options and
     151             : // the set of initial logs.
     152           1 : func Init(o Options, initial Logs) (Manager, error) {
     153           1 :         var m Manager
     154           1 :         if o.Secondary == (Dir{}) {
     155           1 :                 m = new(StandaloneManager)
     156           1 :         } else {
     157           1 :                 m = new(failoverManager)
     158           1 :         }
     159           1 :         if err := m.init(o, initial); err != nil {
     160           0 :                 return nil, err
     161           0 :         }
     162           1 :         return m, nil
     163             : }
     164             : 
     165             : // Dirs returns the primary Dir and the secondary if provided.
     166           1 : func (o *Options) Dirs() []Dir {
     167           1 :         if o.Secondary == (Dir{}) {
     168           1 :                 return []Dir{o.Primary}
     169           1 :         }
     170           1 :         return []Dir{o.Primary, o.Secondary}
     171             : }
     172             : 
     173             : // FailoverOptions are options that are specific to failover mode.
     174             : type FailoverOptions struct {
     175             :         // PrimaryDirProbeInterval is the interval for probing the primary dir, when
     176             :         // the WAL is being written to the secondary, to decide when to fail back.
     177             :         PrimaryDirProbeInterval time.Duration
     178             :         // HealthyProbeLatencyThreshold is the latency threshold to declare that the
     179             :         // primary is healthy again.
     180             :         HealthyProbeLatencyThreshold time.Duration
     181             :         // HealthyInterval is the time interval over which the probes have to be
     182             :         // healthy. That is, we look at probe history of length
     183             :         // HealthyInterval/PrimaryDirProbeInterval.
     184             :         HealthyInterval time.Duration
     185             : 
     186             :         // UnhealthySamplingInterval is the interval for sampling ongoing calls and
     187             :         // errors in the latest LogWriter.
     188             :         UnhealthySamplingInterval time.Duration
     189             :         // UnhealthyOperationLatencyThreshold is the latency threshold that is
     190             :         // considered unhealthy, for operations done by a LogWriter. The second return
     191             :         // value indicates whether we should consider failover at all. If the second
     192             :         // return value is false, failover is disabled.
     193             :         UnhealthyOperationLatencyThreshold func() (time.Duration, bool)
     194             : 
     195             :         // ElevatedWriteStallThresholdLag is the duration for which an elevated
     196             :         // threshold should continue after a switch back to the primary dir. This is
     197             :         // because we may have accumulated many unflushed memtables and flushing
     198             :         // them can take some time. Maybe set to 60s.
     199             :         ElevatedWriteStallThresholdLag time.Duration
     200             : 
     201             :         // timeSource is only non-nil for tests.
     202             :         timeSource
     203             : 
     204             :         monitorIterationForTesting chan<- struct{}
     205             :         proberIterationForTesting  chan<- struct{}
     206             :         monitorStateForTesting     func(numSwitches int, ongoingLatencyAtSwitch time.Duration)
     207             :         logWriterCreatedForTesting chan<- struct{}
     208             : }
     209             : 
     210             : // EnsureDefaults ensures that the default values for all options are set if a
     211             : // valid value was not already specified.
     212           1 : func (o *FailoverOptions) EnsureDefaults() {
     213           1 :         if o.PrimaryDirProbeInterval == 0 {
     214           0 :                 o.PrimaryDirProbeInterval = time.Second
     215           0 :         }
     216           1 :         if o.HealthyProbeLatencyThreshold == 0 {
     217           0 :                 o.HealthyProbeLatencyThreshold = 25 * time.Millisecond
     218           0 :         }
     219           1 :         if o.HealthyInterval == 0 {
     220           0 :                 o.HealthyInterval = 15 * time.Second
     221           0 :         }
     222           1 :         if o.UnhealthySamplingInterval == 0 {
     223           0 :                 o.UnhealthySamplingInterval = 100 * time.Millisecond
     224           0 :         }
     225           1 :         if o.UnhealthyOperationLatencyThreshold == nil {
     226           0 :                 o.UnhealthyOperationLatencyThreshold = func() (time.Duration, bool) {
     227           0 :                         return 100 * time.Millisecond, true
     228           0 :                 }
     229             :         }
     230           1 :         if o.ElevatedWriteStallThresholdLag == 0 {
     231           0 :                 o.ElevatedWriteStallThresholdLag = 60 * time.Second
     232           0 :         }
     233             : }
     234             : 
     235             : // EventListener is called on events, like log file creation.
     236             : type EventListener interface {
     237             :         // LogCreated informs the listener of a log file creation.
     238             :         LogCreated(CreateInfo)
     239             : }
     240             : 
     241             : // CreateInfo contains info about a log file creation event.
     242             : type CreateInfo struct {
     243             :         // JobID is the ID of the job the caused the WAL to be created.
     244             :         //
     245             :         // TODO(sumeer): for a file created later due to the need to failover, we
     246             :         // need to provide a JobID generator func in Options.
     247             :         JobID int
     248             :         // Path to the file. This includes the NumWAL, and implicitly or explicitly
     249             :         // includes the logNameIndex.
     250             :         Path string
     251             :         // IsSecondary is true if the file was created on the secondary.
     252             :         IsSecondary bool
     253             :         // Num is the WAL number.
     254             :         Num NumWAL
     255             :         // RecycledFileNum is the file number of a previous log file which was
     256             :         // recycled to create this one. Zero if recycling did not take place.
     257             :         RecycledFileNum base.DiskFileNum
     258             :         // Err contains any error.
     259             :         Err error
     260             : }
     261             : 
     262             : // Stats exposes stats used in Pebble metrics.
     263             : //
     264             : // NB: Metrics.WAL.{Size,BytesIn,BytesWritten} are not maintained by the wal
     265             : // package.
     266             : //
     267             : // TODO(sumeer): with failover, Metrics.WAL.BytesWritten needs to be
     268             : // maintained here.
     269             : type Stats struct {
     270             :         // ObsoleteFileCount is the number of obsolete log files.
     271             :         ObsoleteFileCount int
     272             :         // ObsoleteFileSize is the total size of obsolete log files.
     273             :         ObsoleteFileSize uint64
     274             :         // LiveFileCount is the number of live log files.
     275             :         LiveFileCount int
     276             :         // LiveFileSize is the total size of live log files. This can be higher than
     277             :         // LiveSize due to log recycling (a live log file may be larger than the
     278             :         // size used in its latest incarnation), or failover (resulting in multiple
     279             :         // log files containing the same records).
     280             :         //
     281             :         // This is updated only when log files are closed, to minimize
     282             :         // synchronization.
     283             :         LiveFileSize uint64
     284             :         // Failover contains failover stats.
     285             :         Failover FailoverStats
     286             : }
     287             : 
     288             : // FailoverStats contains stats about WAL failover. These are empty if
     289             : // failover is not configured.
     290             : type FailoverStats struct {
     291             :         // DirSwitchCount is the number of times WAL writing has switched to a
     292             :         // different directory, either due to failover, when the current dir is
     293             :         // unhealthy, or to failback to the primary, when the primary is healthy
     294             :         // again.
     295             :         DirSwitchCount int64
     296             :         // The following durations do not account for continued background writes to
     297             :         // a directory that has been switched away from. These background writes can
     298             :         // happen because of queued records.
     299             : 
     300             :         // PrimaryWriteDuration is the cumulative duration for which WAL writes are
     301             :         // using the primary directory.
     302             :         PrimaryWriteDuration time.Duration
     303             :         // SecondaryWriteDuration is the cumulative duration for which WAL writes
     304             :         // are using the secondary directory.
     305             :         SecondaryWriteDuration time.Duration
     306             : 
     307             :         // FailoverWriteAndSyncLatency measures the latency of writing and syncing a
     308             :         // set of writes that were synced together. Each sample represents the
     309             :         // highest latency observed across the writes in the set of writes. It gives
     310             :         // us a sense of the user-observed latency, which can be much lower than the
     311             :         // underlying fsync latency, when WAL failover is working effectively.
     312             :         FailoverWriteAndSyncLatency prometheus.Histogram
     313             : }
     314             : 
     315             : // Manager handles all WAL work.
     316             : //
     317             : //   - Obsolete can be called concurrently with WAL writing.
     318             : //   - WAL writing: Is done via Create, and the various Writer methods. These
     319             : //     are required to be serialized via external synchronization (specifically,
     320             : //     the caller does it via commitPipeline.mu).
     321             : type Manager interface {
     322             :         // init initializes the Manager. init is called during DB initialization.
     323             :         init(o Options, initial Logs) error
     324             : 
     325             :         // List returns the virtual WALs in ascending order.
     326             :         List() (Logs, error)
     327             :         // Obsolete informs the manager that all virtual WALs less than
     328             :         // minUnflushedNum are obsolete. The callee can choose to recycle some
     329             :         // underlying log files, if !noRecycle. The log files that are not recycled,
     330             :         // and therefore can be deleted, are returned. The deletable files are no
     331             :         // longer tracked by the manager.
     332             :         Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error)
     333             :         // Create creates a new virtual WAL.
     334             :         //
     335             :         // NumWALs passed to successive Create calls must be monotonically
     336             :         // increasing, and be greater than any NumWAL seen earlier. The caller must
     337             :         // close the previous Writer before calling Create.
     338             :         //
     339             :         // jobID is used for the WALEventListener.
     340             :         Create(wn NumWAL, jobID int) (Writer, error)
     341             :         // ElevateWriteStallThresholdForFailover returns true if the caller should
     342             :         // use a high write stall threshold because the WALs are being written to
     343             :         // the secondary dir.
     344             :         ElevateWriteStallThresholdForFailover() bool
     345             :         // Stats returns the latest Stats.
     346             :         Stats() Stats
     347             :         // Close the manager.
     348             :         // REQUIRES: Writers and Readers have already been closed.
     349             :         Close() error
     350             : 
     351             :         // RecyclerForTesting exposes the internal LogRecycler.
     352             :         RecyclerForTesting() *LogRecycler
     353             : }
     354             : 
     355             : // DeletableLog contains information about a log file that can be deleted.
     356             : type DeletableLog struct {
     357             :         vfs.FS
     358             :         // Path to the file.
     359             :         Path string
     360             :         NumWAL
     361             :         ApproxFileSize uint64
     362             : }
     363             : 
     364             : // SyncOptions has non-nil Done and Err when fsync is requested, else both are
     365             : // nil.
     366             : type SyncOptions struct {
     367             :         Done *sync.WaitGroup
     368             :         Err  *error
     369             : }
     370             : 
     371             : // Writer writes to a virtual WAL. A Writer in standalone mode maps to a
     372             : // single record.LogWriter. In failover mode, it can failover across multiple
     373             : // physical log files.
     374             : type Writer interface {
     375             :         // WriteRecord writes a complete record. The record is asynchronously
     376             :         // persisted to the underlying writer. If SyncOptions.Done != nil, the wait
     377             :         // group will be notified when durability is guaranteed or an error has
     378             :         // occurred (set in SyncOptions.Err). External synchronisation provided by
     379             :         // commitPipeline.mu guarantees that WriteRecord calls are serialized.
     380             :         //
     381             :         // The logicalOffset is the logical size of the WAL after this record is
     382             :         // written. If the WAL corresponds to a single log file, this is the offset
     383             :         // in that log file.
     384             :         //
     385             :         // Some Writer implementations may continue to read p after WriteRecord
     386             :         // returns. This is an obstacle to reusing p's memory. If the caller would
     387             :         // like to reuse p's memory, the caller may pass a non-nil [RefCount].  If
     388             :         // the Writer will retain p, it will invoke the [RefCount] before returning.
     389             :         // When it's finished, it will invoke [RefCount.Unref] to release its
     390             :         // reference.
     391             :         WriteRecord(p []byte, opts SyncOptions, ref RefCount) (logicalOffset int64, err error)
     392             :         // Close the writer.
     393             :         Close() (logicalOffset int64, err error)
     394             :         // Metrics must be called after Close. The callee will no longer modify the
     395             :         // returned LogWriterMetrics.
     396             :         Metrics() record.LogWriterMetrics
     397             : }
     398             : 
     399             : // RefCount is a reference count associated with a record passed to
     400             : // [Writer.WriteRecord]. See the comment on WriteRecord.
     401             : type RefCount interface {
     402             :         // Ref increments the reference count.
     403             :         Ref()
     404             :         // Unref increments the reference count.
     405             :         Unref()
     406             : }
     407             : 
     408             : // Reader reads a virtual WAL.
     409             : type Reader interface {
     410             :         // NextRecord returns a reader for the next record. It returns io.EOF if there
     411             :         // are no more records. The reader returned becomes stale after the next Next
     412             :         // call, and should no longer be used.
     413             :         NextRecord() (io.Reader, Offset, error)
     414             :         // Close the reader.
     415             :         Close() error
     416             : }
     417             : 
     418             : // Offset indicates the offset or position of a record within a WAL.
     419             : type Offset struct {
     420             :         // PhysicalFile is the path to the physical file containing a particular
     421             :         // record.
     422             :         PhysicalFile string
     423             :         // Physical indicates the file offset at which a record begins within
     424             :         // the physical file named by PhysicalFile.
     425             :         Physical int64
     426             : }
     427             : 
     428             : // String implements fmt.Stringer, returning a string representation of the
     429             : // offset.
     430           0 : func (o Offset) String() string {
     431           0 :         return fmt.Sprintf("(%s: %d)", o.PhysicalFile, o.Physical)
     432           0 : }

Generated by: LCOV version 1.14