LCOV - code coverage report
Current view: top level - pebble - cleaner.go (source / functions) Hit Total Coverage
Test: 2024-04-17 08:15Z 990bac8d - tests + meta.lcov Lines: 151 163 92.6 %
Date: 2024-04-17 08:18:29 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "runtime/pprof"
      10             :         "sync"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/errors/oserror"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/objstorage"
      16             :         "github.com/cockroachdb/pebble/vfs"
      17             :         "github.com/cockroachdb/pebble/wal"
      18             :         "github.com/cockroachdb/tokenbucket"
      19             : )
      20             : 
      21             : // Cleaner exports the base.Cleaner type.
      22             : type Cleaner = base.Cleaner
      23             : 
      24             : // DeleteCleaner exports the base.DeleteCleaner type.
      25             : type DeleteCleaner = base.DeleteCleaner
      26             : 
      27             : // ArchiveCleaner exports the base.ArchiveCleaner type.
      28             : type ArchiveCleaner = base.ArchiveCleaner
      29             : 
      30             : type cleanupManager struct {
      31             :         opts            *Options
      32             :         objProvider     objstorage.Provider
      33             :         onTableDeleteFn func(fileSize uint64, isLocal bool)
      34             :         deletePacer     *deletionPacer
      35             : 
      36             :         // jobsCh is used as the cleanup job queue.
      37             :         jobsCh chan *cleanupJob
      38             :         // waitGroup is used to wait for the background goroutine to exit.
      39             :         waitGroup sync.WaitGroup
      40             : 
      41             :         mu struct {
      42             :                 sync.Mutex
      43             :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      44             :                 totalJobs              int
      45             :                 completedJobs          int
      46             :                 completedJobsCond      sync.Cond
      47             :                 jobsQueueWarningIssued bool
      48             :         }
      49             : }
      50             : 
      51             : // We can queue this many jobs before we have to block EnqueueJob.
      52             : const jobsQueueDepth = 1000
      53             : 
      54             : // deletableFile is used for non log files.
      55             : type deletableFile struct {
      56             :         dir      string
      57             :         fileNum  base.DiskFileNum
      58             :         fileSize uint64
      59             :         isLocal  bool
      60             : }
      61             : 
      62             : // obsoleteFile holds information about a file that needs to be deleted soon.
      63             : type obsoleteFile struct {
      64             :         fileType fileType
      65             :         // nonLogFile is populated when fileType != fileTypeLog.
      66             :         nonLogFile deletableFile
      67             :         // logFile is populated when fileType == fileTypeLog.
      68             :         logFile wal.DeletableLog
      69             : }
      70             : 
      71             : type cleanupJob struct {
      72             :         jobID         JobID
      73             :         obsoleteFiles []obsoleteFile
      74             : }
      75             : 
      76             : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      77             : // The cleanupManager must be Close()d.
      78             : func openCleanupManager(
      79             :         opts *Options,
      80             :         objProvider objstorage.Provider,
      81             :         onTableDeleteFn func(fileSize uint64, isLocal bool),
      82             :         getDeletePacerInfo func() deletionPacerInfo,
      83           2 : ) *cleanupManager {
      84           2 :         cm := &cleanupManager{
      85           2 :                 opts:            opts,
      86           2 :                 objProvider:     objProvider,
      87           2 :                 onTableDeleteFn: onTableDeleteFn,
      88           2 :                 deletePacer:     newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
      89           2 :                 jobsCh:          make(chan *cleanupJob, jobsQueueDepth),
      90           2 :         }
      91           2 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
      92           2 :         cm.waitGroup.Add(1)
      93           2 : 
      94           2 :         go func() {
      95           2 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
      96           2 :                         cm.mainLoop()
      97           2 :                 })
      98             :         }()
      99             : 
     100           2 :         return cm
     101             : }
     102             : 
     103             : // Close stops the background goroutine, waiting until all queued jobs are completed.
     104             : // Delete pacing is disabled for the remaining jobs.
     105           2 : func (cm *cleanupManager) Close() {
     106           2 :         close(cm.jobsCh)
     107           2 :         cm.waitGroup.Wait()
     108           2 : }
     109             : 
     110             : // EnqueueJob adds a cleanup job to the manager's queue.
     111           2 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
     112           2 :         job := &cleanupJob{
     113           2 :                 jobID:         jobID,
     114           2 :                 obsoleteFiles: obsoleteFiles,
     115           2 :         }
     116           2 : 
     117           2 :         // Report deleted bytes to the pacer, which can use this data to potentially
     118           2 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     119           2 :         // rather than when we get to the job, otherwise the reported bytes will be
     120           2 :         // subject to the throttling rate which defeats the purpose.
     121           2 :         var pacingBytes uint64
     122           2 :         for _, of := range obsoleteFiles {
     123           2 :                 if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
     124           2 :                         pacingBytes += of.nonLogFile.fileSize
     125           2 :                 }
     126             :         }
     127           2 :         if pacingBytes > 0 {
     128           2 :                 cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
     129           2 :         }
     130             : 
     131           2 :         cm.mu.Lock()
     132           2 :         cm.mu.totalJobs++
     133           2 :         cm.maybeLogLocked()
     134           2 :         cm.mu.Unlock()
     135           2 : 
     136           2 :         cm.jobsCh <- job
     137             : }
     138             : 
     139             : // Wait until the completion of all jobs that were already queued.
     140             : //
     141             : // Does not wait for jobs that are enqueued during the call.
     142             : //
     143             : // Note that DB.mu should not be held while calling this method; the background
     144             : // goroutine needs to acquire DB.mu to update deleted table metrics.
     145           2 : func (cm *cleanupManager) Wait() {
     146           2 :         cm.mu.Lock()
     147           2 :         defer cm.mu.Unlock()
     148           2 :         n := cm.mu.totalJobs
     149           2 :         for cm.mu.completedJobs < n {
     150           2 :                 cm.mu.completedJobsCond.Wait()
     151           2 :         }
     152             : }
     153             : 
     154             : // mainLoop runs the manager's background goroutine.
     155           2 : func (cm *cleanupManager) mainLoop() {
     156           2 :         defer cm.waitGroup.Done()
     157           2 : 
     158           2 :         var tb tokenbucket.TokenBucket
     159           2 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     160           2 :         tb.Init(1.0, 1.0)
     161           2 :         for job := range cm.jobsCh {
     162           2 :                 for _, of := range job.obsoleteFiles {
     163           2 :                         switch of.fileType {
     164           2 :                         case fileTypeTable:
     165           2 :                                 cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
     166           2 :                                 cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
     167           2 :                                 cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
     168           2 :                         case fileTypeLog:
     169           2 :                                 cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
     170           2 :                                         base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize)
     171           2 :                         default:
     172           2 :                                 path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
     173           2 :                                 cm.deleteObsoleteFile(
     174           2 :                                         cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
     175             :                         }
     176             :                 }
     177           2 :                 cm.mu.Lock()
     178           2 :                 cm.mu.completedJobs++
     179           2 :                 cm.mu.completedJobsCond.Broadcast()
     180           2 :                 cm.maybeLogLocked()
     181           2 :                 cm.mu.Unlock()
     182             :         }
     183             : }
     184             : 
     185             : // fileNumIfSST is read iff fileType is fileTypeTable.
     186           2 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
     187           2 :         if fileType != fileTypeTable {
     188           2 :                 return false
     189           2 :         }
     190           2 :         meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
     191           2 :         if err != nil {
     192           1 :                 // The object was already removed from the provider; we won't actually
     193           1 :                 // delete anything, so we don't need to pace.
     194           1 :                 return false
     195           1 :         }
     196             :         // Don't throttle deletion of remote objects.
     197           2 :         return !meta.IsRemote()
     198             : }
     199             : 
     200             : // maybePace sleeps before deleting an object if appropriate. It is always
     201             : // called from the background goroutine.
     202             : func (cm *cleanupManager) maybePace(
     203             :         tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
     204           2 : ) {
     205           2 :         if !cm.needsPacing(fileType, fileNum) {
     206           2 :                 return
     207           2 :         }
     208             : 
     209           2 :         tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
     210           2 :         if tokens == 0.0 {
     211           2 :                 // The token bucket might be in debt; it could make us wait even for 0
     212           2 :                 // tokens. We don't want that if the pacer decided throttling should be
     213           2 :                 // disabled.
     214           2 :                 return
     215           2 :         }
     216             :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     217             :         // the token bucket accumulates up to one second of unused tokens.
     218           2 :         for {
     219           2 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     220           2 :                 if ok {
     221           2 :                         break
     222             :                 }
     223           0 :                 time.Sleep(d)
     224             :         }
     225             : }
     226             : 
     227             : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     228             : func (cm *cleanupManager) deleteObsoleteFile(
     229             :         fs vfs.FS, fileType fileType, jobID JobID, path string, fileNum base.DiskFileNum, fileSize uint64,
     230           2 : ) {
     231           2 :         // TODO(peter): need to handle this error, probably by re-adding the
     232           2 :         // file that couldn't be deleted to one of the obsolete slices map.
     233           2 :         err := cm.opts.Cleaner.Clean(fs, fileType, path)
     234           2 :         if oserror.IsNotExist(err) {
     235           0 :                 return
     236           0 :         }
     237             : 
     238           2 :         switch fileType {
     239           2 :         case fileTypeLog:
     240           2 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     241           2 :                         JobID:   int(jobID),
     242           2 :                         Path:    path,
     243           2 :                         FileNum: fileNum,
     244           2 :                         Err:     err,
     245           2 :                 })
     246           2 :         case fileTypeManifest:
     247           2 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     248           2 :                         JobID:   int(jobID),
     249           2 :                         Path:    path,
     250           2 :                         FileNum: fileNum,
     251           2 :                         Err:     err,
     252           2 :                 })
     253           0 :         case fileTypeTable:
     254           0 :                 panic("invalid deletion of object file")
     255             :         }
     256             : }
     257             : 
     258             : func (cm *cleanupManager) deleteObsoleteObject(
     259             :         fileType fileType, jobID JobID, fileNum base.DiskFileNum,
     260           2 : ) {
     261           2 :         if fileType != fileTypeTable {
     262           0 :                 panic("not an object")
     263             :         }
     264             : 
     265           2 :         var path string
     266           2 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     267           2 :         if err != nil {
     268           1 :                 path = "<nil>"
     269           2 :         } else {
     270           2 :                 path = cm.objProvider.Path(meta)
     271           2 :                 err = cm.objProvider.Remove(fileType, fileNum)
     272           2 :         }
     273           2 :         if cm.objProvider.IsNotExistError(err) {
     274           1 :                 return
     275           1 :         }
     276             : 
     277           2 :         switch fileType {
     278           2 :         case fileTypeTable:
     279           2 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     280           2 :                         JobID:   int(jobID),
     281           2 :                         Path:    path,
     282           2 :                         FileNum: fileNum,
     283           2 :                         Err:     err,
     284           2 :                 })
     285             :         }
     286             : }
     287             : 
     288             : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     289             : // when the job queue gets back to less than 10% full.
     290             : //
     291             : // Must be called with cm.mu locked.
     292           2 : func (cm *cleanupManager) maybeLogLocked() {
     293           2 :         const highThreshold = jobsQueueDepth * 3 / 4
     294           2 :         const lowThreshold = jobsQueueDepth / 10
     295           2 : 
     296           2 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     297           2 : 
     298           2 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     299           0 :                 cm.mu.jobsQueueWarningIssued = true
     300           0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     301           0 :         }
     302             : 
     303           2 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     304           0 :                 cm.mu.jobsQueueWarningIssued = false
     305           0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     306           0 :         }
     307             : }

Generated by: LCOV version 1.14