LCOV - code coverage report
Current view: top level - pebble/wal/wal - standalone_manager.go (source / functions) Coverage Total Hit
Test: 2025-07-10 08:18Z dd0f8384 - tests + meta.lcov Lines: 95.7 % 210 201
Test Date: 2025-07-10 08:20:26 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package wal
       6              : 
       7              : import (
       8              :         "os"
       9              :         "slices"
      10              :         "sync"
      11              : 
      12              :         "github.com/cockroachdb/pebble/internal/base"
      13              :         "github.com/cockroachdb/pebble/record"
      14              :         "github.com/cockroachdb/pebble/vfs"
      15              : )
      16              : 
      17              : // StandaloneManager implements Manager with a single log file per WAL (no
      18              : // failover capability).
      19              : type StandaloneManager struct {
      20              :         o        Options
      21              :         recycler LogRecycler
      22              :         walDir   vfs.File
      23              :         // initialObsolete holds the set of DeletableLogs that formed the logs
      24              :         // passed into Init. The initialObsolete logs are all obsolete. Once
      25              :         // returned via Manager.Obsolete, initialObsolete is cleared. The
      26              :         // initialObsolete logs are stored separately from mu.queue because they may
      27              :         // include logs that were NOT created by the standalone manager, and
      28              :         // multiple physical log files may form one logical WAL.
      29              :         initialObsolete []DeletableLog
      30              : 
      31              :         // External synchronization is relied on when accessing w in Manager.Create,
      32              :         // Writer.{WriteRecord,Close}.
      33              :         w *standaloneWriter
      34              : 
      35              :         mu struct {
      36              :                 sync.Mutex
      37              :                 // The queue of WALs, containing both flushed and unflushed WALs. The
      38              :                 // FileInfo.FileNum is also the NumWAL, since there is one log file for
      39              :                 // each WAL. The flushed logs are a prefix, the unflushed logs a suffix.
      40              :                 // If w != nil, the last entry here is that active WAL. For the active
      41              :                 // log, FileInfo.FileSize is the size when it was opened and can be
      42              :                 // greater than zero because of log recycling.
      43              :                 queue []base.FileInfo
      44              :         }
      45              : }
      46              : 
      47              : var _ Manager = &StandaloneManager{}
      48              : 
      49              : // init implements Manager.
      50            2 : func (m *StandaloneManager) init(o Options, initial Logs) error {
      51            2 :         if o.Secondary.FS != nil {
      52            0 :                 return base.AssertionFailedf("cannot create StandaloneManager with a secondary")
      53            0 :         }
      54            2 :         var err error
      55            2 :         var walDir vfs.File
      56            2 :         if walDir, err = o.Primary.FS.OpenDir(o.Primary.Dirname); err != nil {
      57            1 :                 return err
      58            1 :         }
      59            2 :         *m = StandaloneManager{
      60            2 :                 o:      o,
      61            2 :                 walDir: walDir,
      62            2 :         }
      63            2 :         m.recycler.Init(o.MaxNumRecyclableLogs)
      64            2 : 
      65            2 :         closeAndReturnErr := func(err error) error {
      66            0 :                 err = firstError(err, walDir.Close())
      67            0 :                 return err
      68            0 :         }
      69            2 :         for _, ll := range initial {
      70            2 :                 if m.recycler.MinRecycleLogNum() <= ll.Num {
      71            2 :                         m.recycler.SetMinRecycleLogNum(ll.Num + 1)
      72            2 :                 }
      73            2 :                 m.initialObsolete, err = appendDeletableLogs(m.initialObsolete, ll)
      74            2 :                 if err != nil {
      75            0 :                         return closeAndReturnErr(err)
      76            0 :                 }
      77              :         }
      78            2 :         return nil
      79              : }
      80              : 
      81              : // List implements Manager.
      82            2 : func (m *StandaloneManager) List() Logs {
      83            2 :         m.mu.Lock()
      84            2 :         defer m.mu.Unlock()
      85            2 :         wals := make(Logs, len(m.mu.queue))
      86            2 :         for i := range m.mu.queue {
      87            2 :                 wals[i] = LogicalLog{
      88            2 :                         Num:      NumWAL(m.mu.queue[i].FileNum),
      89            2 :                         segments: []segment{{dir: m.o.Primary}},
      90            2 :                 }
      91            2 :         }
      92            2 :         return wals
      93              : }
      94              : 
      95              : // Obsolete implements Manager.
      96              : func (m *StandaloneManager) Obsolete(
      97              :         minUnflushedNum NumWAL, noRecycle bool,
      98            2 : ) (toDelete []DeletableLog, err error) {
      99            2 :         m.mu.Lock()
     100            2 :         defer m.mu.Unlock()
     101            2 : 
     102            2 :         // If the DB was recently opened, we may have deletable logs outside the
     103            2 :         // queue.
     104            2 :         m.initialObsolete = slices.DeleteFunc(m.initialObsolete, func(dl DeletableLog) bool {
     105            2 :                 if dl.NumWAL >= minUnflushedNum {
     106            2 :                         return false
     107            2 :                 }
     108            2 :                 toDelete = append(toDelete, dl)
     109            2 :                 return true
     110              :         })
     111              : 
     112            2 :         i := 0
     113            2 :         for ; i < len(m.mu.queue); i++ {
     114            2 :                 fi := m.mu.queue[i]
     115            2 :                 if fi.FileNum >= base.DiskFileNum(minUnflushedNum) {
     116            2 :                         break
     117              :                 }
     118            2 :                 if noRecycle || !m.recycler.Add(fi) {
     119            2 :                         toDelete = append(toDelete, DeletableLog{
     120            2 :                                 FS:             m.o.Primary.FS,
     121            2 :                                 Path:           m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(fi.FileNum), 000)),
     122            2 :                                 NumWAL:         NumWAL(fi.FileNum),
     123            2 :                                 ApproxFileSize: fi.FileSize,
     124            2 :                         })
     125            2 :                 }
     126              :         }
     127            2 :         m.mu.queue = m.mu.queue[i:]
     128            2 :         return toDelete, nil
     129              : }
     130              : 
     131              : // Create implements Manager.
     132            2 : func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) {
     133            2 :         // TODO(sumeer): check monotonicity of wn.
     134            2 :         newLogNum := base.DiskFileNum(wn)
     135            2 :         newLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(wn, 0))
     136            2 : 
     137            2 :         // Try to use a recycled log file. Recycling log files is an important
     138            2 :         // performance optimization as it is faster to sync a file that has
     139            2 :         // already been written, than one which is being written for the first
     140            2 :         // time. This is due to the need to sync file metadata when a file is
     141            2 :         // being written for the first time. Note this is true even if file
     142            2 :         // preallocation is performed (e.g. fallocate).
     143            2 :         var recycleLog base.FileInfo
     144            2 :         var recycleOK bool
     145            2 :         var newLogFile vfs.File
     146            2 :         var err error
     147            2 :         recycleLog, recycleOK = m.recycler.Peek()
     148            2 :         if recycleOK {
     149            1 :                 recycleLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
     150            1 :                 newLogFile, err = m.o.Primary.FS.ReuseForWrite(recycleLogName, newLogName, "pebble-wal")
     151            1 :                 base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err)
     152            2 :         } else {
     153            2 :                 newLogFile, err = m.o.Primary.FS.Create(newLogName, "pebble-wal")
     154            2 :                 base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err)
     155            2 :         }
     156            2 :         createInfo := CreateInfo{
     157            2 :                 JobID:           jobID,
     158            2 :                 Path:            newLogName,
     159            2 :                 IsSecondary:     false,
     160            2 :                 Num:             wn,
     161            2 :                 RecycledFileNum: recycleLog.FileNum,
     162            2 :                 Err:             nil,
     163            2 :         }
     164            2 :         defer func() {
     165            2 :                 createInfo.Err = err
     166            2 :                 m.o.EventListener.LogCreated(createInfo)
     167            2 :         }()
     168              : 
     169            2 :         if err != nil {
     170            1 :                 return nil, err
     171            1 :         }
     172            2 :         var newLogSize uint64
     173            2 :         if recycleOK {
     174            1 :                 // Figure out the recycled WAL size. This Stat is necessary
     175            1 :                 // because ReuseForWrite's contract allows for removing the
     176            1 :                 // old file and creating a new one. We don't know whether the
     177            1 :                 // WAL was actually recycled.
     178            1 :                 // TODO(jackson): Adding a boolean to the ReuseForWrite return
     179            1 :                 // value indicating whether or not the file was actually
     180            1 :                 // reused would allow us to skip the stat and use
     181            1 :                 // recycleLog.FileSize.
     182            1 :                 var finfo os.FileInfo
     183            1 :                 finfo, err = newLogFile.Stat()
     184            1 :                 if err == nil {
     185            1 :                         newLogSize = uint64(finfo.Size())
     186            1 :                 }
     187            1 :                 err = firstError(err, m.recycler.Pop(recycleLog.FileNum))
     188            1 :                 if err != nil {
     189            0 :                         return nil, firstError(err, newLogFile.Close())
     190            0 :                 }
     191              :         }
     192              :         // TODO(peter): RocksDB delays sync of the parent directory until the
     193              :         // first time the log is synced. Is that worthwhile?
     194            2 :         if err = m.walDir.Sync(); err != nil {
     195            1 :                 err = firstError(err, newLogFile.Close())
     196            1 :                 return nil, err
     197            1 :         }
     198            2 :         newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{
     199            2 :                 NoSyncOnClose:   m.o.NoSyncOnClose,
     200            2 :                 BytesPerSync:    m.o.BytesPerSync,
     201            2 :                 PreallocateSize: m.o.PreallocateSize(),
     202            2 :         })
     203            2 :         w := record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
     204            2 :                 WALFsyncLatency:     m.o.FsyncLatency,
     205            2 :                 WALMinSyncInterval:  m.o.MinSyncInterval,
     206            2 :                 QueueSemChan:        m.o.QueueSemChan,
     207            2 :                 WriteWALSyncOffsets: m.o.WriteWALSyncOffsets,
     208            2 :         })
     209            2 :         m.w = &standaloneWriter{
     210            2 :                 m: m,
     211            2 :                 w: w,
     212            2 :         }
     213            2 :         m.mu.Lock()
     214            2 :         defer m.mu.Unlock()
     215            2 :         m.mu.queue = append(m.mu.queue, base.FileInfo{FileNum: newLogNum, FileSize: newLogSize})
     216            2 :         return m.w, nil
     217              : }
     218              : 
     219              : // ElevateWriteStallThresholdForFailover implements Manager.
     220            2 : func (m *StandaloneManager) ElevateWriteStallThresholdForFailover() bool {
     221            2 :         return false
     222            2 : }
     223              : 
     224              : // Stats implements Manager.
     225            2 : func (m *StandaloneManager) Stats() Stats {
     226            2 :         obsoleteLogsCount, obsoleteLogSize := m.recycler.Stats()
     227            2 :         m.mu.Lock()
     228            2 :         defer m.mu.Unlock()
     229            2 :         var fileSize uint64
     230            2 :         for i := range m.mu.queue {
     231            2 :                 fileSize += m.mu.queue[i].FileSize
     232            2 :         }
     233            2 :         for i := range m.initialObsolete {
     234            1 :                 if i == 0 || m.initialObsolete[i].NumWAL != m.initialObsolete[i-1].NumWAL {
     235            1 :                         obsoleteLogsCount++
     236            1 :                 }
     237            1 :                 obsoleteLogSize += m.initialObsolete[i].ApproxFileSize
     238              :         }
     239            2 :         return Stats{
     240            2 :                 ObsoleteFileCount: obsoleteLogsCount,
     241            2 :                 ObsoleteFileSize:  obsoleteLogSize,
     242            2 :                 LiveFileCount:     len(m.mu.queue),
     243            2 :                 LiveFileSize:      fileSize,
     244            2 :         }
     245              : }
     246              : 
     247              : // Close implements Manager.
     248            2 : func (m *StandaloneManager) Close() error {
     249            2 :         var err error
     250            2 :         if m.w != nil {
     251            1 :                 _, err = m.w.Close()
     252            1 :         }
     253            2 :         err = firstError(err, m.walDir.Close())
     254            2 :         if m.o.Primary.Lock != nil {
     255            2 :                 err = firstError(err, m.o.Primary.Lock.Close())
     256            2 :         }
     257            2 :         return err
     258              : }
     259              : 
     260              : // RecyclerForTesting implements Manager.
     261            1 : func (m *StandaloneManager) RecyclerForTesting() *LogRecycler {
     262            1 :         return &m.recycler
     263            1 : }
     264              : 
     265              : // firstError returns the first non-nil error of err0 and err1, or nil if both
     266              : // are nil.
     267            2 : func firstError(err0, err1 error) error {
     268            2 :         if err0 != nil {
     269            1 :                 return err0
     270            1 :         }
     271            2 :         return err1
     272              : }
     273              : 
     274              : type standaloneWriter struct {
     275              :         m *StandaloneManager
     276              :         w *record.LogWriter
     277              : }
     278              : 
     279              : var _ Writer = &standaloneWriter{}
     280              : 
     281              : // WriteRecord implements Writer.
     282              : func (w *standaloneWriter) WriteRecord(
     283              :         p []byte, opts SyncOptions, _ RefCount,
     284            2 : ) (logicalOffset int64, err error) {
     285            2 :         return w.w.SyncRecord(p, opts.Done, opts.Err)
     286            2 : }
     287              : 
     288              : // Close implements Writer.
     289            2 : func (w *standaloneWriter) Close() (logicalOffset int64, err error) {
     290            2 :         logicalOffset = w.w.Size()
     291            2 :         // Close the log. This writes an EOF trailer signifying the end of the file
     292            2 :         // and syncs it to disk. The caller must close the previous log before
     293            2 :         // creating the new log file, otherwise a crash could leave both logs with
     294            2 :         // unclean tails, and DB.Open will treat the previous log as corrupt.
     295            2 :         err = w.w.Close()
     296            2 :         w.m.mu.Lock()
     297            2 :         defer w.m.mu.Unlock()
     298            2 :         i := len(w.m.mu.queue) - 1
     299            2 :         // The log may have grown past its original physical size. Update its file
     300            2 :         // size in the queue so we have a proper accounting of its file size.
     301            2 :         if w.m.mu.queue[i].FileSize < uint64(logicalOffset) {
     302            2 :                 w.m.mu.queue[i].FileSize = uint64(logicalOffset)
     303            2 :         }
     304            2 :         w.m.w = nil
     305            2 :         return logicalOffset, err
     306              : }
     307              : 
     308              : // Metrics implements Writer.
     309            2 : func (w *standaloneWriter) Metrics() record.LogWriterMetrics {
     310            2 :         return w.w.Metrics()
     311            2 : }
        

Generated by: LCOV version 2.0-1