LCOV - code coverage report
Current view: top level - pebble - cleaner.go (source / functions) Hit Total Coverage
Test: 2024-03-15 08:15Z 65193e03 - tests only.lcov Lines: 144 163 88.3 %
Date: 2024-03-15 08:16:14 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "runtime/pprof"
      10             :         "sync"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/errors/oserror"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/objstorage"
      16             :         "github.com/cockroachdb/pebble/vfs"
      17             :         "github.com/cockroachdb/pebble/wal"
      18             :         "github.com/cockroachdb/tokenbucket"
      19             : )
      20             : 
      21             : // Cleaner exports the base.Cleaner type.
      22             : type Cleaner = base.Cleaner
      23             : 
      24             : // DeleteCleaner exports the base.DeleteCleaner type.
      25             : type DeleteCleaner = base.DeleteCleaner
      26             : 
      27             : // ArchiveCleaner exports the base.ArchiveCleaner type.
      28             : type ArchiveCleaner = base.ArchiveCleaner
      29             : 
      30             : type cleanupManager struct {
      31             :         opts            *Options
      32             :         objProvider     objstorage.Provider
      33             :         onTableDeleteFn func(fileSize uint64)
      34             :         deletePacer     *deletionPacer
      35             : 
      36             :         // jobsCh is used as the cleanup job queue.
      37             :         jobsCh chan *cleanupJob
      38             :         // waitGroup is used to wait for the background goroutine to exit.
      39             :         waitGroup sync.WaitGroup
      40             : 
      41             :         mu struct {
      42             :                 sync.Mutex
      43             :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      44             :                 totalJobs              int
      45             :                 completedJobs          int
      46             :                 completedJobsCond      sync.Cond
      47             :                 jobsQueueWarningIssued bool
      48             :         }
      49             : }
      50             : 
      51             : // We can queue this many jobs before we have to block EnqueueJob.
      52             : const jobsQueueDepth = 1000
      53             : 
      54             : // deletableFile is used for non log files.
      55             : type deletableFile struct {
      56             :         dir      string
      57             :         fileNum  base.DiskFileNum
      58             :         fileSize uint64
      59             : }
      60             : 
      61             : // obsoleteFile holds information about a file that needs to be deleted soon.
      62             : type obsoleteFile struct {
      63             :         fileType fileType
      64             :         // nonLogFile is populated when fileType != fileTypeLog.
      65             :         nonLogFile deletableFile
      66             :         // logFile is populated when fileType == fileTypeLog.
      67             :         logFile wal.DeletableLog
      68             : }
      69             : 
      70             : type cleanupJob struct {
      71             :         jobID         int
      72             :         obsoleteFiles []obsoleteFile
      73             : }
      74             : 
      75             : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      76             : // The cleanupManager must be Close()d.
      77             : func openCleanupManager(
      78             :         opts *Options,
      79             :         objProvider objstorage.Provider,
      80             :         onTableDeleteFn func(fileSize uint64),
      81             :         getDeletePacerInfo func() deletionPacerInfo,
      82           1 : ) *cleanupManager {
      83           1 :         cm := &cleanupManager{
      84           1 :                 opts:            opts,
      85           1 :                 objProvider:     objProvider,
      86           1 :                 onTableDeleteFn: onTableDeleteFn,
      87           1 :                 deletePacer:     newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
      88           1 :                 jobsCh:          make(chan *cleanupJob, jobsQueueDepth),
      89           1 :         }
      90           1 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
      91           1 :         cm.waitGroup.Add(1)
      92           1 : 
      93           1 :         go func() {
      94           1 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
      95           1 :                         cm.mainLoop()
      96           1 :                 })
      97             :         }()
      98             : 
      99           1 :         return cm
     100             : }
     101             : 
     102             : // Close stops the background goroutine, waiting until all queued jobs are completed.
     103             : // Delete pacing is disabled for the remaining jobs.
     104           1 : func (cm *cleanupManager) Close() {
     105           1 :         close(cm.jobsCh)
     106           1 :         cm.waitGroup.Wait()
     107           1 : }
     108             : 
     109             : // EnqueueJob adds a cleanup job to the manager's queue.
     110           1 : func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) {
     111           1 :         job := &cleanupJob{
     112           1 :                 jobID:         jobID,
     113           1 :                 obsoleteFiles: obsoleteFiles,
     114           1 :         }
     115           1 : 
     116           1 :         // Report deleted bytes to the pacer, which can use this data to potentially
     117           1 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     118           1 :         // rather than when we get to the job, otherwise the reported bytes will be
     119           1 :         // subject to the throttling rate which defeats the purpose.
     120           1 :         var pacingBytes uint64
     121           1 :         for _, of := range obsoleteFiles {
     122           1 :                 if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
     123           1 :                         pacingBytes += of.nonLogFile.fileSize
     124           1 :                 }
     125             :         }
     126           1 :         if pacingBytes > 0 {
     127           1 :                 cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
     128           1 :         }
     129             : 
     130           1 :         cm.mu.Lock()
     131           1 :         cm.mu.totalJobs++
     132           1 :         cm.maybeLogLocked()
     133           1 :         cm.mu.Unlock()
     134           1 : 
     135           1 :         cm.jobsCh <- job
     136             : }
     137             : 
     138             : // Wait until the completion of all jobs that were already queued.
     139             : //
     140             : // Does not wait for jobs that are enqueued during the call.
     141             : //
     142             : // Note that DB.mu should not be held while calling this method; the background
     143             : // goroutine needs to acquire DB.mu to update deleted table metrics.
     144           1 : func (cm *cleanupManager) Wait() {
     145           1 :         cm.mu.Lock()
     146           1 :         defer cm.mu.Unlock()
     147           1 :         n := cm.mu.totalJobs
     148           1 :         for cm.mu.completedJobs < n {
     149           1 :                 cm.mu.completedJobsCond.Wait()
     150           1 :         }
     151             : }
     152             : 
     153             : // mainLoop runs the manager's background goroutine.
     154           1 : func (cm *cleanupManager) mainLoop() {
     155           1 :         defer cm.waitGroup.Done()
     156           1 : 
     157           1 :         var tb tokenbucket.TokenBucket
     158           1 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     159           1 :         tb.Init(1.0, 1.0)
     160           1 :         for job := range cm.jobsCh {
     161           1 :                 for _, of := range job.obsoleteFiles {
     162           1 :                         switch of.fileType {
     163           1 :                         case fileTypeTable:
     164           1 :                                 cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
     165           1 :                                 cm.onTableDeleteFn(of.nonLogFile.fileSize)
     166           1 :                                 cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
     167           1 :                         case fileTypeLog:
     168           1 :                                 cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
     169           1 :                                         base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize)
     170           1 :                         default:
     171           1 :                                 path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
     172           1 :                                 cm.deleteObsoleteFile(
     173           1 :                                         cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
     174             :                         }
     175             :                 }
     176           1 :                 cm.mu.Lock()
     177           1 :                 cm.mu.completedJobs++
     178           1 :                 cm.mu.completedJobsCond.Broadcast()
     179           1 :                 cm.maybeLogLocked()
     180           1 :                 cm.mu.Unlock()
     181             :         }
     182             : }
     183             : 
     184             : // fileNumIfSST is read iff fileType is fileTypeTable.
     185           1 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
     186           1 :         if fileType != fileTypeTable {
     187           1 :                 return false
     188           1 :         }
     189           1 :         meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
     190           1 :         if err != nil {
     191           0 :                 // The object was already removed from the provider; we won't actually
     192           0 :                 // delete anything, so we don't need to pace.
     193           0 :                 return false
     194           0 :         }
     195             :         // Don't throttle deletion of remote objects.
     196           1 :         return !meta.IsRemote()
     197             : }
     198             : 
     199             : // maybePace sleeps before deleting an object if appropriate. It is always
     200             : // called from the background goroutine.
     201             : func (cm *cleanupManager) maybePace(
     202             :         tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
     203           1 : ) {
     204           1 :         if !cm.needsPacing(fileType, fileNum) {
     205           1 :                 return
     206           1 :         }
     207             : 
     208           1 :         tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
     209           1 :         if tokens == 0.0 {
     210           1 :                 // The token bucket might be in debt; it could make us wait even for 0
     211           1 :                 // tokens. We don't want that if the pacer decided throttling should be
     212           1 :                 // disabled.
     213           1 :                 return
     214           1 :         }
     215             :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     216             :         // the token bucket accumulates up to one second of unused tokens.
     217           1 :         for {
     218           1 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     219           1 :                 if ok {
     220           1 :                         break
     221             :                 }
     222           0 :                 time.Sleep(d)
     223             :         }
     224             : }
     225             : 
     226             : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     227             : func (cm *cleanupManager) deleteObsoleteFile(
     228             :         fs vfs.FS, fileType fileType, jobID int, path string, fileNum base.DiskFileNum, fileSize uint64,
     229           1 : ) {
     230           1 :         // TODO(peter): need to handle this error, probably by re-adding the
     231           1 :         // file that couldn't be deleted to one of the obsolete slices map.
     232           1 :         err := cm.opts.Cleaner.Clean(fs, fileType, path)
     233           1 :         if oserror.IsNotExist(err) {
     234           0 :                 return
     235           0 :         }
     236             : 
     237           1 :         switch fileType {
     238           1 :         case fileTypeLog:
     239           1 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     240           1 :                         JobID:   jobID,
     241           1 :                         Path:    path,
     242           1 :                         FileNum: fileNum,
     243           1 :                         Err:     err,
     244           1 :                 })
     245           1 :         case fileTypeManifest:
     246           1 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     247           1 :                         JobID:   jobID,
     248           1 :                         Path:    path,
     249           1 :                         FileNum: fileNum,
     250           1 :                         Err:     err,
     251           1 :                 })
     252           0 :         case fileTypeTable:
     253           0 :                 panic("invalid deletion of object file")
     254             :         }
     255             : }
     256             : 
     257             : func (cm *cleanupManager) deleteObsoleteObject(
     258             :         fileType fileType, jobID int, fileNum base.DiskFileNum,
     259           1 : ) {
     260           1 :         if fileType != fileTypeTable {
     261           0 :                 panic("not an object")
     262             :         }
     263             : 
     264           1 :         var path string
     265           1 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     266           1 :         if err != nil {
     267           0 :                 path = "<nil>"
     268           1 :         } else {
     269           1 :                 path = cm.objProvider.Path(meta)
     270           1 :                 err = cm.objProvider.Remove(fileType, fileNum)
     271           1 :         }
     272           1 :         if cm.objProvider.IsNotExistError(err) {
     273           0 :                 return
     274           0 :         }
     275             : 
     276           1 :         switch fileType {
     277           1 :         case fileTypeTable:
     278           1 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     279           1 :                         JobID:   jobID,
     280           1 :                         Path:    path,
     281           1 :                         FileNum: fileNum,
     282           1 :                         Err:     err,
     283           1 :                 })
     284             :         }
     285             : }
     286             : 
     287             : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     288             : // when the job queue gets back to less than 10% full.
     289             : //
     290             : // Must be called with cm.mu locked.
     291           1 : func (cm *cleanupManager) maybeLogLocked() {
     292           1 :         const highThreshold = jobsQueueDepth * 3 / 4
     293           1 :         const lowThreshold = jobsQueueDepth / 10
     294           1 : 
     295           1 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     296           1 : 
     297           1 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     298           0 :                 cm.mu.jobsQueueWarningIssued = true
     299           0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     300           0 :         }
     301             : 
     302           1 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     303           0 :                 cm.mu.jobsQueueWarningIssued = false
     304           0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     305           0 :         }
     306             : }

Generated by: LCOV version 1.14