LCOV - code coverage report
Current view: top level - pebble/wal - wal.go (source / functions) Hit Total Coverage
Test: 2024-03-04 08:25Z dd51d85c - tests + meta.lcov Lines: 73 73 100.0 %
Date: 2024-03-04 08:26:43 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "strconv"
      11             :         "strings"
      12             :         "sync"
      13             :         "time"
      14             : 
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/record"
      17             :         "github.com/cockroachdb/pebble/vfs"
      18             :         "github.com/prometheus/client_golang/prometheus"
      19             : )
      20             : 
      21             : // TODO(sumeer): write a high-level comment describing the approach.
      22             : 
      23             : // Dir is used for storing log files.
      24             : type Dir struct {
      25             :         FS      vfs.FS
      26             :         Dirname string
      27             : }
      28             : 
      29             : // NumWAL is the number of the virtual WAL. It can map to one or more physical
      30             : // log files. In standalone mode, it will map to exactly one log file. In
      31             : // failover mode, it can map to many log files, which are totally ordered
      32             : // (using a dense logNameIndex).
      33             : //
      34             : // In general, WAL refers to the virtual WAL, and file refers to a log file.
      35             : // The Pebble MANIFEST only knows about virtual WALs and assigns numbers to
      36             : // them. Additional mapping to one or more files happens in this package. If a
      37             : // WAL maps to multiple files, the source of truth regarding that mapping is
      38             : // the contents of the directories.
      39             : type NumWAL base.DiskFileNum
      40             : 
      41             : // String implements fmt.Stringer.
      42           1 : func (s NumWAL) String() string { return base.DiskFileNum(s).String() }
      43             : 
      44             : // LogNameIndex numbers log files within a WAL.
      45             : type LogNameIndex uint32
      46             : 
      47             : // String implements fmt.Stringer.
      48           1 : func (li LogNameIndex) String() string {
      49           1 :         return fmt.Sprintf("%03d", li)
      50           1 : }
      51             : 
      52             : // makeLogFilename makes a log filename.
      53           2 : func makeLogFilename(wn NumWAL, index LogNameIndex) string {
      54           2 :         if index == 0 {
      55           2 :                 // Use a backward compatible name, for simplicity.
      56           2 :                 return fmt.Sprintf("%s.log", base.DiskFileNum(wn).String())
      57           2 :         }
      58           1 :         return fmt.Sprintf("%s-%s.log", base.DiskFileNum(wn).String(), index)
      59             : }
      60             : 
      61             : // ParseLogFilename takes a base filename and parses it into its constituent
      62             : // NumWAL and LogNameIndex. If the filename is not a log file, it returns false
      63             : // for the final return value.
      64           2 : func ParseLogFilename(name string) (NumWAL, LogNameIndex, bool) {
      65           2 :         i := strings.IndexByte(name, '.')
      66           2 :         if i < 0 || name[i:] != ".log" {
      67           2 :                 return 0, 0, false
      68           2 :         }
      69           2 :         j := strings.IndexByte(name[:i], '-')
      70           2 :         if j < 0 {
      71           2 :                 dfn, ok := base.ParseDiskFileNum(name[:i])
      72           2 :                 if !ok {
      73           1 :                         // We've considered returning an error for filenames that end in
      74           1 :                         // '.log' but fail to parse correctly. We decided against it because
      75           1 :                         // the '.log' suffix is used by Cockroach's daignostics log files.
      76           1 :                         // It's conceivable that some of these found their way into a data
      77           1 :                         // directory, and erroring would cause an issue for an existing
      78           1 :                         // Cockroach deployment.
      79           1 :                         return 0, 0, false
      80           1 :                 }
      81           2 :                 return NumWAL(dfn), 0, true
      82             :         }
      83           1 :         dfn, ok := base.ParseDiskFileNum(name[:j])
      84           1 :         if !ok {
      85           1 :                 return 0, 0, false
      86           1 :         }
      87           1 :         li, err := strconv.ParseUint(name[j+1:i], 10, 64)
      88           1 :         if err != nil {
      89           1 :                 return 0, 0, false
      90           1 :         }
      91           1 :         return NumWAL(dfn), LogNameIndex(li), true
      92             : }
      93             : 
      94             : // Options provides configuration for the Manager.
      95             : type Options struct {
      96             :         // Primary dir for storing WAL files. It must already be created and synced
      97             :         // up to the root.
      98             :         Primary Dir
      99             :         // Secondary is used for failover. Optional. It must already be created and
     100             :         // synced up to the root.
     101             :         Secondary Dir
     102             : 
     103             :         // MinUnflushedLogNum is the smallest WAL number corresponding to
     104             :         // mutations that have not been flushed to a sstable.
     105             :         MinUnflushedWALNum NumWAL
     106             : 
     107             :         // Recycling configuration. Only files in the primary dir are recycled.
     108             : 
     109             :         // MaxNumRecyclableLogs is the maximum number of log files to maintain for
     110             :         // recycling.
     111             :         MaxNumRecyclableLogs int
     112             : 
     113             :         // Configuration for calling vfs.NewSyncingFile.
     114             : 
     115             :         // NoSyncOnClose is documented in SyncingFileOptions.
     116             :         NoSyncOnClose bool
     117             :         // BytesPerSync is documented in SyncingFileOptions.
     118             :         BytesPerSync int
     119             :         // PreallocateSize is documented in SyncingFileOptions.
     120             :         PreallocateSize func() int
     121             : 
     122             :         // MinSyncInterval is documented in Options.WALMinSyncInterval.
     123             :         MinSyncInterval func() time.Duration
     124             :         // FsyncLatency records fsync latency. This doesn't differentiate between
     125             :         // fsyncs on the primary and secondary dir.
     126             :         //
     127             :         // TODO(sumeer): consider separating out into two histograms.
     128             :         FsyncLatency prometheus.Histogram
     129             :         // QueueSemChan is the channel to pop from when popping from queued records
     130             :         // that have requested a sync. It's original purpose was to function as a
     131             :         // semaphore that prevents the record.LogWriter.flusher.syncQueue from
     132             :         // overflowing (which will cause a panic). It is still useful in that role
     133             :         // when the WALManager is configured in standalone mode. In failover mode
     134             :         // there is no syncQueue, so the pushback into the commit pipeline is
     135             :         // unnecessary, but possibly harmless.
     136             :         QueueSemChan chan struct{}
     137             : 
     138             :         // Logger for logging.
     139             :         Logger base.Logger
     140             : 
     141             :         // EventListener is called on events, like log file creation.
     142             :         EventListener EventListener
     143             : 
     144             :         FailoverOptions
     145             : }
     146             : 
     147             : // Init constructs and initializes a WAL manager from the provided options and
     148             : // the set of initial logs.
     149           2 : func Init(o Options, initial Logs) (Manager, error) {
     150           2 :         var m Manager
     151           2 :         if o.Secondary == (Dir{}) {
     152           2 :                 m = new(StandaloneManager)
     153           2 :         } else {
     154           1 :                 m = new(failoverManager)
     155           1 :         }
     156           2 :         if err := m.init(o, initial); err != nil {
     157           1 :                 return nil, err
     158           1 :         }
     159           2 :         return m, nil
     160             : }
     161             : 
     162             : // Dirs returns the primary Dir and the secondary if provided.
     163           2 : func (o *Options) Dirs() []Dir {
     164           2 :         if o.Secondary == (Dir{}) {
     165           2 :                 return []Dir{o.Primary}
     166           2 :         }
     167           1 :         return []Dir{o.Primary, o.Secondary}
     168             : }
     169             : 
     170             : // FailoverOptions are options that are specific to failover mode.
     171             : type FailoverOptions struct {
     172             :         // PrimaryDirProbeInterval is the interval for probing the primary dir, when
     173             :         // the WAL is being written to the secondary, to decide when to fail back.
     174             :         PrimaryDirProbeInterval time.Duration
     175             :         // HealthyProbeLatencyThreshold is the latency threshold to declare that the
     176             :         // primary is healthy again.
     177             :         HealthyProbeLatencyThreshold time.Duration
     178             :         // HealthyInterval is the time interval over which the probes have to be
     179             :         // healthy. That is, we look at probe history of length
     180             :         // HealthyInterval/PrimaryDirProbeInterval.
     181             :         HealthyInterval time.Duration
     182             : 
     183             :         // UnhealthySamplingInterval is the interval for sampling ongoing calls and
     184             :         // errors in the latest LogWriter.
     185             :         UnhealthySamplingInterval time.Duration
     186             :         // UnhealthyOperationLatencyThreshold is the latency threshold that is
     187             :         // considered unhealthy, for operations done by a LogWriter.
     188             :         UnhealthyOperationLatencyThreshold func() time.Duration
     189             : 
     190             :         // ElevatedWriteStallThresholdLag is the duration for which an elevated
     191             :         // threshold should continue after a switch back to the primary dir. This is
     192             :         // because we may have accumulated many unflushed memtables and flushing
     193             :         // them can take some time. Maybe set to 60s.
     194             :         ElevatedWriteStallThresholdLag time.Duration
     195             : 
     196             :         // timeSource is only non-nil for tests.
     197             :         timeSource
     198             : 
     199             :         monitorIterationForTesting chan<- struct{}
     200             :         proberIterationForTesting  chan<- struct{}
     201             :         monitorStateForTesting     func(numSwitches int, ongoingLatencyAtSwitch time.Duration)
     202             :         logWriterCreatedForTesting chan<- struct{}
     203             : }
     204             : 
     205             : // EnsureDefaults ensures that the default values for all options are set if a
     206             : // valid value was not already specified.
     207           1 : func (o *FailoverOptions) EnsureDefaults() {
     208           1 :         if o.PrimaryDirProbeInterval == 0 {
     209           1 :                 o.PrimaryDirProbeInterval = time.Second
     210           1 :         }
     211           1 :         if o.HealthyProbeLatencyThreshold == 0 {
     212           1 :                 o.HealthyProbeLatencyThreshold = 100 * time.Millisecond
     213           1 :         }
     214           1 :         if o.HealthyInterval == 0 {
     215           1 :                 o.HealthyInterval = 2 * time.Minute
     216           1 :         }
     217           1 :         if o.UnhealthySamplingInterval == 0 {
     218           1 :                 o.UnhealthySamplingInterval = 100 * time.Millisecond
     219           1 :         }
     220           1 :         if o.UnhealthyOperationLatencyThreshold == nil {
     221           1 :                 o.UnhealthyOperationLatencyThreshold = func() time.Duration {
     222           1 :                         return 200 * time.Millisecond
     223           1 :                 }
     224             :         }
     225             : }
     226             : 
     227             : // EventListener is called on events, like log file creation.
     228             : type EventListener interface {
     229             :         // LogCreated informs the listener of a log file creation.
     230             :         LogCreated(CreateInfo)
     231             : }
     232             : 
     233             : // CreateInfo contains info about a log file creation event.
     234             : type CreateInfo struct {
     235             :         // JobID is the ID of the job the caused the WAL to be created.
     236             :         //
     237             :         // TODO(sumeer): for a file created later due to the need to failover, we
     238             :         // need to provide a JobID generator func in Options.
     239             :         JobID int
     240             :         // Path to the file. This includes the NumWAL, and implicitly or explicitly
     241             :         // includes the logNameIndex.
     242             :         Path string
     243             :         // IsSecondary is true if the file was created on the secondary.
     244             :         IsSecondary bool
     245             :         // Num is the WAL number.
     246             :         Num NumWAL
     247             :         // RecycledFileNum is the file number of a previous log file which was
     248             :         // recycled to create this one. Zero if recycling did not take place.
     249             :         RecycledFileNum base.DiskFileNum
     250             :         // Err contains any error.
     251             :         Err error
     252             : }
     253             : 
     254             : // Stats exposes stats used in Pebble metrics.
     255             : //
     256             : // NB: Metrics.WAL.{Size,BytesIn,BytesWritten} are not maintained by the wal
     257             : // package.
     258             : //
     259             : // TODO(sumeer): with failover, Metrics.WAL.BytesWritten needs to be
     260             : // maintained here.
     261             : type Stats struct {
     262             :         // ObsoleteFileCount is the number of obsolete log files.
     263             :         ObsoleteFileCount int
     264             :         // ObsoleteFileSize is the total size of obsolete log files.
     265             :         ObsoleteFileSize uint64
     266             :         // LiveFileCount is the number of live log files.
     267             :         LiveFileCount int
     268             :         // LiveFileSize is the total size of live log files. This can be higher than
     269             :         // LiveSize due to log recycling (a live log file may be larger than the
     270             :         // size used in its latest incarnation), or failover (resulting in multiple
     271             :         // log files containing the same records).
     272             :         //
     273             :         // This is updated only when log files are closed, to minimize
     274             :         // synchronization.
     275             :         LiveFileSize uint64
     276             :         // Failover contains failover stats.
     277             :         Failover FailoverStats
     278             : }
     279             : 
     280             : // FailoverStats contains stats about WAL failover. These are empty if
     281             : // failover is not configured.
     282             : type FailoverStats struct {
     283             :         // DirSwitchCount is the number of times WAL writing has switched to a
     284             :         // different directory, either due to failover, when the current dir is
     285             :         // unhealthy, or to failback to the primary, when the primary is healthy
     286             :         // again.
     287             :         DirSwitchCount int64
     288             :         // The following durations do not account for continued background writes to
     289             :         // a directory that has been switched away from. These background writes can
     290             :         // happen because of queued records.
     291             : 
     292             :         // PrimaryWriteDuration is the cumulative duration for which WAL writes are
     293             :         // using the primary directory.
     294             :         PrimaryWriteDuration time.Duration
     295             :         // SecondaryWriteDuration is the cumulative duration for which WAL writes
     296             :         // are using the secondary directory.
     297             :         SecondaryWriteDuration time.Duration
     298             : }
     299             : 
     300             : // Manager handles all WAL work.
     301             : //
     302             : //   - Obsolete can be called concurrently with WAL writing.
     303             : //   - WAL writing: Is done via Create, and the various Writer methods. These
     304             : //     are required to be serialized via external synchronization (specifically,
     305             : //     the caller does it via commitPipeline.mu).
     306             : type Manager interface {
     307             :         // init initializes the Manager. init is called during DB initialization.
     308             :         init(o Options, initial Logs) error
     309             : 
     310             :         // List returns the virtual WALs in ascending order.
     311             :         List() (Logs, error)
     312             :         // Obsolete informs the manager that all virtual WALs less than
     313             :         // minUnflushedNum are obsolete. The callee can choose to recycle some
     314             :         // underlying log files, if !noRecycle. The log files that are not recycled,
     315             :         // and therefore can be deleted, are returned. The deletable files are no
     316             :         // longer tracked by the manager.
     317             :         Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error)
     318             :         // Create creates a new virtual WAL.
     319             :         //
     320             :         // NumWALs passed to successive Create calls must be monotonically
     321             :         // increasing, and be greater than any NumWAL seen earlier. The caller must
     322             :         // close the previous Writer before calling Create.
     323             :         //
     324             :         // jobID is used for the WALEventListener.
     325             :         Create(wn NumWAL, jobID int) (Writer, error)
     326             :         // ElevateWriteStallThresholdForFailover returns true if the caller should
     327             :         // use a high write stall threshold because the WALs are being written to
     328             :         // the secondary dir.
     329             :         ElevateWriteStallThresholdForFailover() bool
     330             :         // Stats returns the latest Stats.
     331             :         Stats() Stats
     332             :         // Close the manager.
     333             :         // REQUIRES: Writers and Readers have already been closed.
     334             :         Close() error
     335             : 
     336             :         // RecyclerForTesting exposes the internal LogRecycler.
     337             :         RecyclerForTesting() *LogRecycler
     338             : }
     339             : 
     340             : // DeletableLog contains information about a log file that can be deleted.
     341             : type DeletableLog struct {
     342             :         vfs.FS
     343             :         // Path to the file.
     344             :         Path string
     345             :         NumWAL
     346             :         ApproxFileSize uint64
     347             : }
     348             : 
     349             : // SyncOptions has non-nil Done and Err when fsync is requested, else both are
     350             : // nil.
     351             : type SyncOptions struct {
     352             :         Done *sync.WaitGroup
     353             :         Err  *error
     354             : }
     355             : 
     356             : // Writer writes to a virtual WAL. A Writer in standalone mode maps to a
     357             : // single record.LogWriter. In failover mode, it can failover across multiple
     358             : // physical log files.
     359             : type Writer interface {
     360             :         // WriteRecord writes a complete record. The record is asynchronously
     361             :         // persisted to the underlying writer. If SyncOptions.Done != nil, the wait
     362             :         // group will be notified when durability is guaranteed or an error has
     363             :         // occurred (set in SyncOptions.Err). External synchronisation provided by
     364             :         // commitPipeline.mu guarantees that WriteRecord calls are serialized.
     365             :         //
     366             :         // The logicalOffset is the logical size of the WAL after this record is
     367             :         // written. If the WAL corresponds to a single log file, this is the offset
     368             :         // in that log file.
     369             :         WriteRecord(p []byte, opts SyncOptions) (logicalOffset int64, err error)
     370             :         // Close the writer.
     371             :         Close() (logicalOffset int64, err error)
     372             :         // Metrics must be called after Close. The callee will no longer modify the
     373             :         // returned LogWriterMetrics.
     374             :         Metrics() record.LogWriterMetrics
     375             : }
     376             : 
     377             : // Reader reads a virtual WAL.
     378             : type Reader interface {
     379             :         // NextRecord returns a reader for the next record. It returns io.EOF if there
     380             :         // are no more records. The reader returned becomes stale after the next Next
     381             :         // call, and should no longer be used.
     382             :         NextRecord() (io.Reader, Offset, error)
     383             :         // Close the reader.
     384             :         Close() error
     385             : }
     386             : 
     387             : // Offset indicates the offset or position of a record within a WAL.
     388             : type Offset struct {
     389             :         // PhysicalFile is the path to the physical file containing a particular
     390             :         // record.
     391             :         PhysicalFile string
     392             :         // Physical indicates the file offset at which a record begins within
     393             :         // the physical file named by PhysicalFile.
     394             :         Physical int64
     395             : }
     396             : 
     397             : // String implements fmt.Stringer, returning a string representation of the
     398             : // offset.
     399           1 : func (o Offset) String() string {
     400           1 :         return fmt.Sprintf("(%s: %d)", o.PhysicalFile, o.Physical)
     401           1 : }

Generated by: LCOV version 1.14