LCOV - code coverage report
Current view: top level - pebble/wal - wal.go (source / functions) Hit Total Coverage
Test: 2024-03-19 08:15Z 13bbeea1 - tests + meta.lcov Lines: 76 76 100.0 %
Date: 2024-03-19 08:17:12 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "strconv"
      11             :         "strings"
      12             :         "sync"
      13             :         "time"
      14             : 
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/record"
      17             :         "github.com/cockroachdb/pebble/vfs"
      18             :         "github.com/prometheus/client_golang/prometheus"
      19             : )
      20             : 
      21             : // TODO(sumeer): write a high-level comment describing the approach.
      22             : 
      23             : // Dir is used for storing log files.
      24             : type Dir struct {
      25             :         FS      vfs.FS
      26             :         Dirname string
      27             : }
      28             : 
      29             : // NumWAL is the number of the virtual WAL. It can map to one or more physical
      30             : // log files. In standalone mode, it will map to exactly one log file. In
      31             : // failover mode, it can map to many log files, which are totally ordered
      32             : // (using a dense logNameIndex).
      33             : //
      34             : // In general, WAL refers to the virtual WAL, and file refers to a log file.
      35             : // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to
      36             : // them. Additional mapping to one or more files happens in this package. If a
      37             : // WAL maps to multiple files, the source of truth regarding that mapping is
      38             : // the contents of the directories.
      39             : type NumWAL base.DiskFileNum
      40             : 
      41             : // String implements fmt.Stringer.
      42           1 : func (s NumWAL) String() string { return base.DiskFileNum(s).String() }
      43             : 
      44             : // LogNameIndex numbers log files within a WAL.
      45             : type LogNameIndex uint32
      46             : 
      47             : // String implements fmt.Stringer.
      48           2 : func (li LogNameIndex) String() string {
      49           2 :         return fmt.Sprintf("%03d", li)
      50           2 : }
      51             : 
      52             : // makeLogFilename makes a log filename.
      53           2 : func makeLogFilename(wn NumWAL, index LogNameIndex) string {
      54           2 :         if index == 0 {
      55           2 :                 // Use a backward compatible name, for simplicity.
      56           2 :                 return fmt.Sprintf("%s.log", base.DiskFileNum(wn).String())
      57           2 :         }
      58           2 :         return fmt.Sprintf("%s-%s.log", base.DiskFileNum(wn).String(), index)
      59             : }
      60             : 
      61             : // ParseLogFilename takes a base filename and parses it into its constituent
      62             : // NumWAL and LogNameIndex. If the filename is not a log file, it returns false
      63             : // for the final return value.
      64           2 : func ParseLogFilename(name string) (NumWAL, LogNameIndex, bool) {
      65           2 :         i := strings.IndexByte(name, '.')
      66           2 :         if i < 0 || name[i:] != ".log" {
      67           2 :                 return 0, 0, false
      68           2 :         }
      69           2 :         j := strings.IndexByte(name[:i], '-')
      70           2 :         if j < 0 {
      71           2 :                 dfn, ok := base.ParseDiskFileNum(name[:i])
      72           2 :                 if !ok {
      73           1 :                         // We've considered returning an error for filenames that end in
      74           1 :                         // '.log' but fail to parse correctly. We decided against it because
      75           1 :                         // the '.log' suffix is used by Cockroach's daignostics log files.
      76           1 :                         // It's conceivable that some of these found their way into a data
      77           1 :                         // directory, and erroring would cause an issue for an existing
      78           1 :                         // Cockroach deployment.
      79           1 :                         return 0, 0, false
      80           1 :                 }
      81           2 :                 return NumWAL(dfn), 0, true
      82             :         }
      83           2 :         dfn, ok := base.ParseDiskFileNum(name[:j])
      84           2 :         if !ok {
      85           1 :                 return 0, 0, false
      86           1 :         }
      87           2 :         li, err := strconv.ParseUint(name[j+1:i], 10, 64)
      88           2 :         if err != nil {
      89           1 :                 return 0, 0, false
      90           1 :         }
      91           2 :         return NumWAL(dfn), LogNameIndex(li), true
      92             : }
      93             : 
      94             : // Options provides configuration for the Manager.
      95             : type Options struct {
      96             :         // Primary dir for storing WAL files. It must already be created and synced
      97             :         // up to the root.
      98             :         Primary Dir
      99             :         // Secondary is used for failover. Optional. It must already be created and
     100             :         // synced up to the root.
     101             :         Secondary Dir
     102             : 
     103             :         // MinUnflushedLogNum is the smallest WAL number corresponding to
     104             :         // mutations that have not been flushed to a sstable.
     105             :         MinUnflushedWALNum NumWAL
     106             : 
     107             :         // Recycling configuration. Only files in the primary dir are recycled.
     108             : 
     109             :         // MaxNumRecyclableLogs is the maximum number of log files to maintain for
     110             :         // recycling.
     111             :         MaxNumRecyclableLogs int
     112             : 
     113             :         // Configuration for calling vfs.NewSyncingFile.
     114             : 
     115             :         // NoSyncOnClose is documented in SyncingFileOptions.
     116             :         NoSyncOnClose bool
     117             :         // BytesPerSync is documented in SyncingFileOptions.
     118             :         BytesPerSync int
     119             :         // PreallocateSize is documented in SyncingFileOptions.
     120             :         PreallocateSize func() int
     121             : 
     122             :         // MinSyncInterval is documented in Options.WALMinSyncInterval.
     123             :         MinSyncInterval func() time.Duration
     124             :         // FsyncLatency records fsync latency. This doesn't differentiate between
     125             :         // fsyncs on the primary and secondary dir.
     126             :         //
     127             :         // TODO(sumeer): consider separating out into two histograms.
     128             :         FsyncLatency prometheus.Histogram
     129             :         // QueueSemChan is the channel to pop from when popping from queued records
     130             :         // that have requested a sync. It's original purpose was to function as a
     131             :         // semaphore that prevents the record.LogWriter.flusher.syncQueue from
     132             :         // overflowing (which will cause a panic). It is still useful in that role
     133             :         // when the WALManager is configured in standalone mode. In failover mode
     134             :         // there is no syncQueue, so the pushback into the commit pipeline is
     135             :         // unnecessary, but possibly harmless.
     136             :         QueueSemChan chan struct{}
     137             : 
     138             :         // Logger for logging.
     139             :         Logger base.Logger
     140             : 
     141             :         // EventListener is called on events, like log file creation.
     142             :         EventListener EventListener
     143             : 
     144             :         FailoverOptions
     145             : }
     146             : 
     147             : // Init constructs and initializes a WAL manager from the provided options and
     148             : // the set of initial logs.
     149           2 : func Init(o Options, initial Logs) (Manager, error) {
     150           2 :         var m Manager
     151           2 :         if o.Secondary == (Dir{}) {
     152           2 :                 m = new(StandaloneManager)
     153           2 :         } else {
     154           2 :                 m = new(failoverManager)
     155           2 :         }
     156           2 :         if err := m.init(o, initial); err != nil {
     157           1 :                 return nil, err
     158           1 :         }
     159           2 :         return m, nil
     160             : }
     161             : 
     162             : // Dirs returns the primary Dir and the secondary if provided.
     163           2 : func (o *Options) Dirs() []Dir {
     164           2 :         if o.Secondary == (Dir{}) {
     165           2 :                 return []Dir{o.Primary}
     166           2 :         }
     167           2 :         return []Dir{o.Primary, o.Secondary}
     168             : }
     169             : 
     170             : // FailoverOptions are options that are specific to failover mode.
     171             : type FailoverOptions struct {
     172             :         // PrimaryDirProbeInterval is the interval for probing the primary dir, when
     173             :         // the WAL is being written to the secondary, to decide when to fail back.
     174             :         PrimaryDirProbeInterval time.Duration
     175             :         // HealthyProbeLatencyThreshold is the latency threshold to declare that the
     176             :         // primary is healthy again.
     177             :         HealthyProbeLatencyThreshold time.Duration
     178             :         // HealthyInterval is the time interval over which the probes have to be
     179             :         // healthy. That is, we look at probe history of length
     180             :         // HealthyInterval/PrimaryDirProbeInterval.
     181             :         HealthyInterval time.Duration
     182             : 
     183             :         // UnhealthySamplingInterval is the interval for sampling ongoing calls and
     184             :         // errors in the latest LogWriter.
     185             :         UnhealthySamplingInterval time.Duration
     186             :         // UnhealthyOperationLatencyThreshold is the latency threshold that is
     187             :         // considered unhealthy, for operations done by a LogWriter. The second return
     188             :         // value indicates whether we should consider failover at all. If the second
     189             :         // return value is false, failover is disabled.
     190             :         UnhealthyOperationLatencyThreshold func() (time.Duration, bool)
     191             : 
     192             :         // ElevatedWriteStallThresholdLag is the duration for which an elevated
     193             :         // threshold should continue after a switch back to the primary dir. This is
     194             :         // because we may have accumulated many unflushed memtables and flushing
     195             :         // them can take some time. Maybe set to 60s.
     196             :         ElevatedWriteStallThresholdLag time.Duration
     197             : 
     198             :         // timeSource is only non-nil for tests.
     199             :         timeSource
     200             : 
     201             :         monitorIterationForTesting chan<- struct{}
     202             :         proberIterationForTesting  chan<- struct{}
     203             :         monitorStateForTesting     func(numSwitches int, ongoingLatencyAtSwitch time.Duration)
     204             :         logWriterCreatedForTesting chan<- struct{}
     205             : }
     206             : 
     207             : // EnsureDefaults ensures that the default values for all options are set if a
     208             : // valid value was not already specified.
     209           2 : func (o *FailoverOptions) EnsureDefaults() {
     210           2 :         if o.PrimaryDirProbeInterval == 0 {
     211           1 :                 o.PrimaryDirProbeInterval = time.Second
     212           1 :         }
     213           2 :         if o.HealthyProbeLatencyThreshold == 0 {
     214           1 :                 o.HealthyProbeLatencyThreshold = 25 * time.Millisecond
     215           1 :         }
     216           2 :         if o.HealthyInterval == 0 {
     217           1 :                 o.HealthyInterval = 15 * time.Second
     218           1 :         }
     219           2 :         if o.UnhealthySamplingInterval == 0 {
     220           1 :                 o.UnhealthySamplingInterval = 100 * time.Millisecond
     221           1 :         }
     222           2 :         if o.UnhealthyOperationLatencyThreshold == nil {
     223           1 :                 o.UnhealthyOperationLatencyThreshold = func() (time.Duration, bool) {
     224           1 :                         return 100 * time.Millisecond, true
     225           1 :                 }
     226             :         }
     227           2 :         if o.ElevatedWriteStallThresholdLag == 0 {
     228           1 :                 o.ElevatedWriteStallThresholdLag = 60 * time.Second
     229           1 :         }
     230             : }
     231             : 
     232             : // EventListener is called on events, like log file creation.
     233             : type EventListener interface {
     234             :         // LogCreated informs the listener of a log file creation.
     235             :         LogCreated(CreateInfo)
     236             : }
     237             : 
     238             : // CreateInfo contains info about a log file creation event.
     239             : type CreateInfo struct {
     240             :         // JobID is the ID of the job the caused the WAL to be created.
     241             :         //
     242             :         // TODO(sumeer): for a file created later due to the need to failover, we
     243             :         // need to provide a JobID generator func in Options.
     244             :         JobID int
     245             :         // Path to the file. This includes the NumWAL, and implicitly or explicitly
     246             :         // includes the logNameIndex.
     247             :         Path string
     248             :         // IsSecondary is true if the file was created on the secondary.
     249             :         IsSecondary bool
     250             :         // Num is the WAL number.
     251             :         Num NumWAL
     252             :         // RecycledFileNum is the file number of a previous log file which was
     253             :         // recycled to create this one. Zero if recycling did not take place.
     254             :         RecycledFileNum base.DiskFileNum
     255             :         // Err contains any error.
     256             :         Err error
     257             : }
     258             : 
     259             : // Stats exposes stats used in Pebble metrics.
     260             : //
     261             : // NB: Metrics.WAL.{Size,BytesIn,BytesWritten} are not maintained by the wal
     262             : // package.
     263             : //
     264             : // TODO(sumeer): with failover, Metrics.WAL.BytesWritten needs to be
     265             : // maintained here.
     266             : type Stats struct {
     267             :         // ObsoleteFileCount is the number of obsolete log files.
     268             :         ObsoleteFileCount int
     269             :         // ObsoleteFileSize is the total size of obsolete log files.
     270             :         ObsoleteFileSize uint64
     271             :         // LiveFileCount is the number of live log files.
     272             :         LiveFileCount int
     273             :         // LiveFileSize is the total size of live log files. This can be higher than
     274             :         // LiveSize due to log recycling (a live log file may be larger than the
     275             :         // size used in its latest incarnation), or failover (resulting in multiple
     276             :         // log files containing the same records).
     277             :         //
     278             :         // This is updated only when log files are closed, to minimize
     279             :         // synchronization.
     280             :         LiveFileSize uint64
     281             :         // Failover contains failover stats.
     282             :         Failover FailoverStats
     283             : }
     284             : 
     285             : // FailoverStats contains stats about WAL failover. These are empty if
     286             : // failover is not configured.
     287             : type FailoverStats struct {
     288             :         // DirSwitchCount is the number of times WAL writing has switched to a
     289             :         // different directory, either due to failover, when the current dir is
     290             :         // unhealthy, or to failback to the primary, when the primary is healthy
     291             :         // again.
     292             :         DirSwitchCount int64
     293             :         // The following durations do not account for continued background writes to
     294             :         // a directory that has been switched away from. These background writes can
     295             :         // happen because of queued records.
     296             : 
     297             :         // PrimaryWriteDuration is the cumulative duration for which WAL writes are
     298             :         // using the primary directory.
     299             :         PrimaryWriteDuration time.Duration
     300             :         // SecondaryWriteDuration is the cumulative duration for which WAL writes
     301             :         // are using the secondary directory.
     302             :         SecondaryWriteDuration time.Duration
     303             : }
     304             : 
     305             : // Manager handles all WAL work.
     306             : //
     307             : //   - Obsolete can be called concurrently with WAL writing.
     308             : //   - WAL writing: Is done via Create, and the various Writer methods. These
     309             : //     are required to be serialized via external synchronization (specifically,
     310             : //     the caller does it via commitPipeline.mu).
     311             : type Manager interface {
     312             :         // init initializes the Manager. init is called during DB initialization.
     313             :         init(o Options, initial Logs) error
     314             : 
     315             :         // List returns the virtual WALs in ascending order.
     316             :         List() (Logs, error)
     317             :         // Obsolete informs the manager that all virtual WALs less than
     318             :         // minUnflushedNum are obsolete. The callee can choose to recycle some
     319             :         // underlying log files, if !noRecycle. The log files that are not recycled,
     320             :         // and therefore can be deleted, are returned. The deletable files are no
     321             :         // longer tracked by the manager.
     322             :         Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error)
     323             :         // Create creates a new virtual WAL.
     324             :         //
     325             :         // NumWALs passed to successive Create calls must be monotonically
     326             :         // increasing, and be greater than any NumWAL seen earlier. The caller must
     327             :         // close the previous Writer before calling Create.
     328             :         //
     329             :         // jobID is used for the WALEventListener.
     330             :         Create(wn NumWAL, jobID int) (Writer, error)
     331             :         // ElevateWriteStallThresholdForFailover returns true if the caller should
     332             :         // use a high write stall threshold because the WALs are being written to
     333             :         // the secondary dir.
     334             :         ElevateWriteStallThresholdForFailover() bool
     335             :         // Stats returns the latest Stats.
     336             :         Stats() Stats
     337             :         // Close the manager.
     338             :         // REQUIRES: Writers and Readers have already been closed.
     339             :         Close() error
     340             : 
     341             :         // RecyclerForTesting exposes the internal LogRecycler.
     342             :         RecyclerForTesting() *LogRecycler
     343             : }
     344             : 
     345             : // DeletableLog contains information about a log file that can be deleted.
     346             : type DeletableLog struct {
     347             :         vfs.FS
     348             :         // Path to the file.
     349             :         Path string
     350             :         NumWAL
     351             :         ApproxFileSize uint64
     352             : }
     353             : 
     354             : // SyncOptions has non-nil Done and Err when fsync is requested, else both are
     355             : // nil.
     356             : type SyncOptions struct {
     357             :         Done *sync.WaitGroup
     358             :         Err  *error
     359             : }
     360             : 
     361             : // Writer writes to a virtual WAL. A Writer in standalone mode maps to a
     362             : // single record.LogWriter. In failover mode, it can failover across multiple
     363             : // physical log files.
     364             : type Writer interface {
     365             :         // WriteRecord writes a complete record. The record is asynchronously
     366             :         // persisted to the underlying writer. If SyncOptions.Done != nil, the wait
     367             :         // group will be notified when durability is guaranteed or an error has
     368             :         // occurred (set in SyncOptions.Err). External synchronisation provided by
     369             :         // commitPipeline.mu guarantees that WriteRecord calls are serialized.
     370             :         //
     371             :         // The logicalOffset is the logical size of the WAL after this record is
     372             :         // written. If the WAL corresponds to a single log file, this is the offset
     373             :         // in that log file.
     374             :         //
     375             :         // Some Writer implementations may continue to read p after WriteRecord
     376             :         // returns. This is an obstacle to reusing p's memory. If the caller would
     377             :         // like to reuse p's memory, the caller may pass a non-nil [RefFunc].
     378             :         // If the Writer will retain p, it will invoke the [RefFunc] before
     379             :         // returning. When it's finished, it will invoke the func returned by the
     380             :         // [RefFunc] to release its reference.
     381             :         WriteRecord(p []byte, opts SyncOptions, ref RefFunc) (logicalOffset int64, err error)
     382             :         // Close the writer.
     383             :         Close() (logicalOffset int64, err error)
     384             :         // Metrics must be called after Close. The callee will no longer modify the
     385             :         // returned LogWriterMetrics.
     386             :         Metrics() record.LogWriterMetrics
     387             : }
     388             : 
     389             : // RefFunc holds funcs to increment a reference count associated with a record
     390             : // passed to [Writer.WriteRecord]. See the comment on WriteRecord.
     391             : type RefFunc func() (unref func())
     392             : 
     393             : // Reader reads a virtual WAL.
     394             : type Reader interface {
     395             :         // NextRecord returns a reader for the next record. It returns io.EOF if there
     396             :         // are no more records. The reader returned becomes stale after the next Next
     397             :         // call, and should no longer be used.
     398             :         NextRecord() (io.Reader, Offset, error)
     399             :         // Close the reader.
     400             :         Close() error
     401             : }
     402             : 
     403             : // Offset indicates the offset or position of a record within a WAL.
     404             : type Offset struct {
     405             :         // PhysicalFile is the path to the physical file containing a particular
     406             :         // record.
     407             :         PhysicalFile string
     408             :         // Physical indicates the file offset at which a record begins within
     409             :         // the physical file named by PhysicalFile.
     410             :         Physical int64
     411             : }
     412             : 
     413             : // String implements fmt.Stringer, returning a string representation of the
     414             : // offset.
     415           1 : func (o Offset) String() string {
     416           1 :         return fmt.Sprintf("(%s: %d)", o.PhysicalFile, o.Physical)
     417           1 : }

Generated by: LCOV version 1.14