LCOV - code coverage report
Current view: top level - pebble/wal - standalone_manager.go (source / functions) Hit Total Coverage
Test: 2024-07-14 08:16Z 9a4ea4df - tests only.lcov Lines: 192 201 95.5 %
Date: 2024-07-14 08:16:55 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package wal
       6             : 
       7             : import (
       8             :         "os"
       9             :         "sync"
      10             : 
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/record"
      13             :         "github.com/cockroachdb/pebble/vfs"
      14             : )
      15             : 
      16             : // StandaloneManager implements Manager with a single log file per WAL (no
      17             : // failover capability).
      18             : type StandaloneManager struct {
      19             :         o        Options
      20             :         recycler LogRecycler
      21             :         walDir   vfs.File
      22             :         // initialObsolete holds the set of DeletableLogs that formed the logs
      23             :         // passed into Init. The initialObsolete logs are all obsolete. Once
      24             :         // returned via Manager.Obsolete, initialObsolete is cleared. The
      25             :         // initialObsolete logs are stored separately from mu.queue because they may
      26             :         // include logs that were NOT created by the standalone manager, and
      27             :         // multiple physical log files may form one logical WAL.
      28             :         initialObsolete []DeletableLog
      29             : 
      30             :         // External synchronization is relied on when accessing w in Manager.Create,
      31             :         // Writer.{WriteRecord,Close}.
      32             :         w *standaloneWriter
      33             : 
      34             :         mu struct {
      35             :                 sync.Mutex
      36             :                 // The queue of WALs, containing both flushed and unflushed WALs. The
      37             :                 // FileInfo.FileNum is also the NumWAL, since there is one log file for
      38             :                 // each WAL. The flushed logs are a prefix, the unflushed logs a suffix.
      39             :                 // If w != nil, the last entry here is that active WAL. For the active
      40             :                 // log, FileInfo.FileSize is the size when it was opened and can be
      41             :                 // greater than zero because of log recycling.
      42             :                 queue []base.FileInfo
      43             :         }
      44             : }
      45             : 
      46             : var _ Manager = &StandaloneManager{}
      47             : 
      48             : // init implements Manager.
      49           1 : func (m *StandaloneManager) init(o Options, initial Logs) error {
      50           1 :         if o.Secondary.FS != nil {
      51           0 :                 return base.AssertionFailedf("cannot create StandaloneManager with a secondary")
      52           0 :         }
      53           1 :         var err error
      54           1 :         var walDir vfs.File
      55           1 :         if walDir, err = o.Primary.FS.OpenDir(o.Primary.Dirname); err != nil {
      56           1 :                 return err
      57           1 :         }
      58           1 :         *m = StandaloneManager{
      59           1 :                 o:      o,
      60           1 :                 walDir: walDir,
      61           1 :         }
      62           1 :         m.recycler.Init(o.MaxNumRecyclableLogs)
      63           1 : 
      64           1 :         closeAndReturnErr := func(err error) error {
      65           0 :                 err = firstError(err, walDir.Close())
      66           0 :                 return err
      67           0 :         }
      68           1 :         for _, ll := range initial {
      69           1 :                 if m.recycler.MinRecycleLogNum() <= ll.Num {
      70           1 :                         m.recycler.SetMinRecycleLogNum(ll.Num + 1)
      71           1 :                 }
      72           1 :                 m.initialObsolete, err = appendDeletableLogs(m.initialObsolete, ll)
      73           1 :                 if err != nil {
      74           0 :                         return closeAndReturnErr(err)
      75           0 :                 }
      76             :         }
      77           1 :         return nil
      78             : }
      79             : 
      80             : // List implements Manager.
      81           1 : func (m *StandaloneManager) List() (Logs, error) {
      82           1 :         m.mu.Lock()
      83           1 :         defer m.mu.Unlock()
      84           1 :         wals := make(Logs, len(m.mu.queue))
      85           1 :         for i := range m.mu.queue {
      86           1 :                 wals[i] = LogicalLog{
      87           1 :                         Num:      NumWAL(m.mu.queue[i].FileNum),
      88           1 :                         segments: []segment{{dir: m.o.Primary}},
      89           1 :                 }
      90           1 :         }
      91           1 :         return wals, nil
      92             : }
      93             : 
      94             : // Obsolete implements Manager.
      95             : func (m *StandaloneManager) Obsolete(
      96             :         minUnflushedNum NumWAL, noRecycle bool,
      97           1 : ) (toDelete []DeletableLog, err error) {
      98           1 :         m.mu.Lock()
      99           1 :         defer m.mu.Unlock()
     100           1 : 
     101           1 :         // If this is the first call to Obsolete after Open, we may have deletable
     102           1 :         // logs outside the queue.
     103           1 :         toDelete, m.initialObsolete = m.initialObsolete, nil
     104           1 : 
     105           1 :         i := 0
     106           1 :         for ; i < len(m.mu.queue); i++ {
     107           1 :                 fi := m.mu.queue[i]
     108           1 :                 if fi.FileNum >= base.DiskFileNum(minUnflushedNum) {
     109           1 :                         break
     110             :                 }
     111           1 :                 if noRecycle || !m.recycler.Add(fi) {
     112           1 :                         toDelete = append(toDelete, DeletableLog{
     113           1 :                                 FS:             m.o.Primary.FS,
     114           1 :                                 Path:           m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(fi.FileNum), 000)),
     115           1 :                                 NumWAL:         NumWAL(fi.FileNum),
     116           1 :                                 ApproxFileSize: fi.FileSize,
     117           1 :                         })
     118           1 :                 }
     119             :         }
     120           1 :         m.mu.queue = m.mu.queue[i:]
     121           1 :         return toDelete, nil
     122             : }
     123             : 
     124             : // Create implements Manager.
     125           1 : func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error) {
     126           1 :         // TODO(sumeer): check monotonicity of wn.
     127           1 :         newLogNum := base.DiskFileNum(wn)
     128           1 :         newLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(wn, 0))
     129           1 : 
     130           1 :         // Try to use a recycled log file. Recycling log files is an important
     131           1 :         // performance optimization as it is faster to sync a file that has
     132           1 :         // already been written, than one which is being written for the first
     133           1 :         // time. This is due to the need to sync file metadata when a file is
     134           1 :         // being written for the first time. Note this is true even if file
     135           1 :         // preallocation is performed (e.g. fallocate).
     136           1 :         var recycleLog base.FileInfo
     137           1 :         var recycleOK bool
     138           1 :         var newLogFile vfs.File
     139           1 :         var err error
     140           1 :         recycleLog, recycleOK = m.recycler.Peek()
     141           1 :         if recycleOK {
     142           1 :                 recycleLogName := m.o.Primary.FS.PathJoin(m.o.Primary.Dirname, makeLogFilename(NumWAL(recycleLog.FileNum), 0))
     143           1 :                 newLogFile, err = m.o.Primary.FS.ReuseForWrite(recycleLogName, newLogName, "pebble-wal")
     144           1 :                 base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err)
     145           1 :         } else {
     146           1 :                 newLogFile, err = m.o.Primary.FS.Create(newLogName, "pebble-wal")
     147           1 :                 base.MustExist(m.o.Primary.FS, newLogName, m.o.Logger, err)
     148           1 :         }
     149           1 :         createInfo := CreateInfo{
     150           1 :                 JobID:           jobID,
     151           1 :                 Path:            newLogName,
     152           1 :                 IsSecondary:     false,
     153           1 :                 Num:             wn,
     154           1 :                 RecycledFileNum: recycleLog.FileNum,
     155           1 :                 Err:             nil,
     156           1 :         }
     157           1 :         defer func() {
     158           1 :                 createInfo.Err = err
     159           1 :                 m.o.EventListener.LogCreated(createInfo)
     160           1 :         }()
     161             : 
     162           1 :         if err != nil {
     163           1 :                 return nil, err
     164           1 :         }
     165           1 :         var newLogSize uint64
     166           1 :         if recycleOK {
     167           1 :                 // Figure out the recycled WAL size. This Stat is necessary
     168           1 :                 // because ReuseForWrite's contract allows for removing the
     169           1 :                 // old file and creating a new one. We don't know whether the
     170           1 :                 // WAL was actually recycled.
     171           1 :                 // TODO(jackson): Adding a boolean to the ReuseForWrite return
     172           1 :                 // value indicating whether or not the file was actually
     173           1 :                 // reused would allow us to skip the stat and use
     174           1 :                 // recycleLog.FileSize.
     175           1 :                 var finfo os.FileInfo
     176           1 :                 finfo, err = newLogFile.Stat()
     177           1 :                 if err == nil {
     178           1 :                         newLogSize = uint64(finfo.Size())
     179           1 :                 }
     180           1 :                 err = firstError(err, m.recycler.Pop(recycleLog.FileNum))
     181           1 :                 if err != nil {
     182           0 :                         return nil, firstError(err, newLogFile.Close())
     183           0 :                 }
     184             :         }
     185             :         // TODO(peter): RocksDB delays sync of the parent directory until the
     186             :         // first time the log is synced. Is that worthwhile?
     187           1 :         if err = m.walDir.Sync(); err != nil {
     188           1 :                 err = firstError(err, newLogFile.Close())
     189           1 :                 return nil, err
     190           1 :         }
     191           1 :         newLogFile = vfs.NewSyncingFile(newLogFile, vfs.SyncingFileOptions{
     192           1 :                 NoSyncOnClose:   m.o.NoSyncOnClose,
     193           1 :                 BytesPerSync:    m.o.BytesPerSync,
     194           1 :                 PreallocateSize: m.o.PreallocateSize(),
     195           1 :         })
     196           1 :         w := record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
     197           1 :                 WALFsyncLatency:    m.o.FsyncLatency,
     198           1 :                 WALMinSyncInterval: m.o.MinSyncInterval,
     199           1 :                 QueueSemChan:       m.o.QueueSemChan,
     200           1 :         })
     201           1 :         m.w = &standaloneWriter{
     202           1 :                 m: m,
     203           1 :                 w: w,
     204           1 :         }
     205           1 :         m.mu.Lock()
     206           1 :         defer m.mu.Unlock()
     207           1 :         m.mu.queue = append(m.mu.queue, base.FileInfo{FileNum: newLogNum, FileSize: newLogSize})
     208           1 :         return m.w, nil
     209             : }
     210             : 
     211             : // ElevateWriteStallThresholdForFailover implements Manager.
     212           1 : func (m *StandaloneManager) ElevateWriteStallThresholdForFailover() bool {
     213           1 :         return false
     214           1 : }
     215             : 
     216             : // Stats implements Manager.
     217           1 : func (m *StandaloneManager) Stats() Stats {
     218           1 :         obsoleteLogsCount, obsoleteLogSize := m.recycler.Stats()
     219           1 :         m.mu.Lock()
     220           1 :         defer m.mu.Unlock()
     221           1 :         var fileSize uint64
     222           1 :         for i := range m.mu.queue {
     223           1 :                 fileSize += m.mu.queue[i].FileSize
     224           1 :         }
     225           1 :         for i := range m.initialObsolete {
     226           1 :                 if i == 0 || m.initialObsolete[i].NumWAL != m.initialObsolete[i-1].NumWAL {
     227           1 :                         obsoleteLogsCount++
     228           1 :                 }
     229           1 :                 obsoleteLogSize += m.initialObsolete[i].ApproxFileSize
     230             :         }
     231           1 :         return Stats{
     232           1 :                 ObsoleteFileCount: obsoleteLogsCount,
     233           1 :                 ObsoleteFileSize:  obsoleteLogSize,
     234           1 :                 LiveFileCount:     len(m.mu.queue),
     235           1 :                 LiveFileSize:      fileSize,
     236           1 :         }
     237             : }
     238             : 
     239             : // Close implements Manager.
     240           1 : func (m *StandaloneManager) Close() error {
     241           1 :         var err error
     242           1 :         if m.w != nil {
     243           1 :                 _, err = m.w.Close()
     244           1 :         }
     245           1 :         return firstError(err, m.walDir.Close())
     246             : }
     247             : 
     248             : // RecyclerForTesting implements Manager.
     249           1 : func (m *StandaloneManager) RecyclerForTesting() *LogRecycler {
     250           1 :         return &m.recycler
     251           1 : }
     252             : 
     253             : // firstError returns the first non-nil error of err0 and err1, or nil if both
     254             : // are nil.
     255           1 : func firstError(err0, err1 error) error {
     256           1 :         if err0 != nil {
     257           1 :                 return err0
     258           1 :         }
     259           1 :         return err1
     260             : }
     261             : 
     262             : type standaloneWriter struct {
     263             :         m *StandaloneManager
     264             :         w *record.LogWriter
     265             : }
     266             : 
     267             : var _ Writer = &standaloneWriter{}
     268             : 
     269             : // WriteRecord implements Writer.
     270             : func (w *standaloneWriter) WriteRecord(
     271             :         p []byte, opts SyncOptions, _ RefCount,
     272           1 : ) (logicalOffset int64, err error) {
     273           1 :         return w.w.SyncRecord(p, opts.Done, opts.Err)
     274           1 : }
     275             : 
     276             : // Close implements Writer.
     277           1 : func (w *standaloneWriter) Close() (logicalOffset int64, err error) {
     278           1 :         logicalOffset = w.w.Size()
     279           1 :         // Close the log. This writes an EOF trailer signifying the end of the file
     280           1 :         // and syncs it to disk. The caller must close the previous log before
     281           1 :         // creating the new log file, otherwise a crash could leave both logs with
     282           1 :         // unclean tails, and DB.Open will treat the previous log as corrupt.
     283           1 :         err = w.w.Close()
     284           1 :         w.m.mu.Lock()
     285           1 :         defer w.m.mu.Unlock()
     286           1 :         i := len(w.m.mu.queue) - 1
     287           1 :         // The log may have grown past its original physical size. Update its file
     288           1 :         // size in the queue so we have a proper accounting of its file size.
     289           1 :         if w.m.mu.queue[i].FileSize < uint64(logicalOffset) {
     290           1 :                 w.m.mu.queue[i].FileSize = uint64(logicalOffset)
     291           1 :         }
     292           1 :         w.m.w = nil
     293           1 :         return logicalOffset, err
     294             : }
     295             : 
     296             : // Metrics implements Writer.
     297           1 : func (w *standaloneWriter) Metrics() record.LogWriterMetrics {
     298           1 :         return w.w.Metrics()
     299           1 : }

Generated by: LCOV version 1.14