LCOV - code coverage report
Current view: top level - pebble - cleaner.go (source / functions) Hit Total Coverage
Test: 2023-12-19 08:16Z 5c5ad7ed - meta test only.lcov Lines: 141 161 87.6 %
Date: 2023-12-19 08:16:43 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "runtime/pprof"
      10             :         "sync"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/errors/oserror"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/invariants"
      16             :         "github.com/cockroachdb/pebble/objstorage"
      17             :         "github.com/cockroachdb/tokenbucket"
      18             : )
      19             : 
      20             : // Cleaner exports the base.Cleaner type.
      21             : type Cleaner = base.Cleaner
      22             : 
      23             : // DeleteCleaner exports the base.DeleteCleaner type.
      24             : type DeleteCleaner = base.DeleteCleaner
      25             : 
      26             : // ArchiveCleaner exports the base.ArchiveCleaner type.
      27             : type ArchiveCleaner = base.ArchiveCleaner
      28             : 
      29             : type cleanupManager struct {
      30             :         opts            *Options
      31             :         objProvider     objstorage.Provider
      32             :         onTableDeleteFn func(fileSize uint64)
      33             :         deletePacer     *deletionPacer
      34             : 
      35             :         // jobsCh is used as the cleanup job queue.
      36             :         jobsCh chan *cleanupJob
      37             :         // waitGroup is used to wait for the background goroutine to exit.
      38             :         waitGroup sync.WaitGroup
      39             : 
      40             :         mu struct {
      41             :                 sync.Mutex
      42             :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      43             :                 totalJobs              int
      44             :                 completedJobs          int
      45             :                 completedJobsCond      sync.Cond
      46             :                 jobsQueueWarningIssued bool
      47             :         }
      48             : }
      49             : 
      50             : // We can queue this many jobs before we have to block EnqueueJob.
      51             : const jobsQueueDepth = 1000
      52             : 
      53             : // obsoleteFile holds information about a file that needs to be deleted soon.
      54             : type obsoleteFile struct {
      55             :         dir      string
      56             :         fileNum  base.DiskFileNum
      57             :         fileType fileType
      58             :         fileSize uint64
      59             : }
      60             : 
      61             : type cleanupJob struct {
      62             :         jobID         int
      63             :         obsoleteFiles []obsoleteFile
      64             : }
      65             : 
      66             : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      67             : // The cleanupManager must be Close()d.
      68             : func openCleanupManager(
      69             :         opts *Options,
      70             :         objProvider objstorage.Provider,
      71             :         onTableDeleteFn func(fileSize uint64),
      72             :         getDeletePacerInfo func() deletionPacerInfo,
      73           1 : ) *cleanupManager {
      74           1 :         cm := &cleanupManager{
      75           1 :                 opts:            opts,
      76           1 :                 objProvider:     objProvider,
      77           1 :                 onTableDeleteFn: onTableDeleteFn,
      78           1 :                 deletePacer:     newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
      79           1 :                 jobsCh:          make(chan *cleanupJob, jobsQueueDepth),
      80           1 :         }
      81           1 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
      82           1 :         cm.waitGroup.Add(1)
      83           1 : 
      84           1 :         go func() {
      85           1 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
      86           1 :                         cm.mainLoop()
      87           1 :                 })
      88             :         }()
      89             : 
      90           1 :         return cm
      91             : }
      92             : 
      93             : // Close stops the background goroutine, waiting until all queued jobs are completed.
      94             : // Delete pacing is disabled for the remaining jobs.
      95           1 : func (cm *cleanupManager) Close() {
      96           1 :         close(cm.jobsCh)
      97           1 :         cm.waitGroup.Wait()
      98           1 : }
      99             : 
     100             : // EnqueueJob adds a cleanup job to the manager's queue.
     101           1 : func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) {
     102           1 :         job := &cleanupJob{
     103           1 :                 jobID:         jobID,
     104           1 :                 obsoleteFiles: obsoleteFiles,
     105           1 :         }
     106           1 : 
     107           1 :         // Report deleted bytes to the pacer, which can use this data to potentially
     108           1 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     109           1 :         // rather than when we get to the job, otherwise the reported bytes will be
     110           1 :         // subject to the throttling rate which defeats the purpose.
     111           1 :         var pacingBytes uint64
     112           1 :         for _, of := range obsoleteFiles {
     113           1 :                 if cm.needsPacing(of.fileType, of.fileNum) {
     114           1 :                         pacingBytes += of.fileSize
     115           1 :                 }
     116             :         }
     117           1 :         if pacingBytes > 0 {
     118           1 :                 cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
     119           1 :         }
     120             : 
     121           1 :         cm.mu.Lock()
     122           1 :         cm.mu.totalJobs++
     123           1 :         cm.maybeLogLocked()
     124           1 :         cm.mu.Unlock()
     125           1 : 
     126           1 :         if invariants.Enabled && len(cm.jobsCh) >= cap(cm.jobsCh)-2 {
     127           0 :                 panic("cleanup jobs queue full")
     128             :         }
     129             : 
     130           1 :         cm.jobsCh <- job
     131             : }
     132             : 
     133             : // Wait until the completion of all jobs that were already queued.
     134             : //
     135             : // Does not wait for jobs that are enqueued during the call.
     136             : //
     137             : // Note that DB.mu should not be held while calling this method; the background
     138             : // goroutine needs to acquire DB.mu to update deleted table metrics.
     139           1 : func (cm *cleanupManager) Wait() {
     140           1 :         cm.mu.Lock()
     141           1 :         defer cm.mu.Unlock()
     142           1 :         n := cm.mu.totalJobs
     143           1 :         for cm.mu.completedJobs < n {
     144           1 :                 cm.mu.completedJobsCond.Wait()
     145           1 :         }
     146             : }
     147             : 
     148             : // mainLoop runs the manager's background goroutine.
     149           1 : func (cm *cleanupManager) mainLoop() {
     150           1 :         defer cm.waitGroup.Done()
     151           1 : 
     152           1 :         var tb tokenbucket.TokenBucket
     153           1 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     154           1 :         tb.Init(1.0, 1.0)
     155           1 :         for job := range cm.jobsCh {
     156           1 :                 for _, of := range job.obsoleteFiles {
     157           1 :                         if of.fileType != fileTypeTable {
     158           1 :                                 path := base.MakeFilepath(cm.opts.FS, of.dir, of.fileType, of.fileNum)
     159           1 :                                 cm.deleteObsoleteFile(of.fileType, job.jobID, path, of.fileNum, of.fileSize)
     160           1 :                         } else {
     161           1 :                                 cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize)
     162           1 :                                 cm.onTableDeleteFn(of.fileSize)
     163           1 :                                 cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.fileNum)
     164           1 :                         }
     165             :                 }
     166           1 :                 cm.mu.Lock()
     167           1 :                 cm.mu.completedJobs++
     168           1 :                 cm.mu.completedJobsCond.Broadcast()
     169           1 :                 cm.maybeLogLocked()
     170           1 :                 cm.mu.Unlock()
     171             :         }
     172             : }
     173             : 
     174           1 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNum base.DiskFileNum) bool {
     175           1 :         if fileType != fileTypeTable {
     176           1 :                 return false
     177           1 :         }
     178           1 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     179           1 :         if err != nil {
     180           0 :                 // The object was already removed from the provider; we won't actually
     181           0 :                 // delete anything, so we don't need to pace.
     182           0 :                 return false
     183           0 :         }
     184             :         // Don't throttle deletion of remote objects.
     185           1 :         return !meta.IsRemote()
     186             : }
     187             : 
     188             : // maybePace sleeps before deleting an object if appropriate. It is always
     189             : // called from the background goroutine.
     190             : func (cm *cleanupManager) maybePace(
     191             :         tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
     192           1 : ) {
     193           1 :         if !cm.needsPacing(fileType, fileNum) {
     194           1 :                 return
     195           1 :         }
     196             : 
     197           1 :         tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
     198           1 :         if tokens == 0.0 {
     199           1 :                 // The token bucket might be in debt; it could make us wait even for 0
     200           1 :                 // tokens. We don't want that if the pacer decided throttling should be
     201           1 :                 // disabled.
     202           1 :                 return
     203           1 :         }
     204             :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     205             :         // the token bucket accumulates up to one second of unused tokens.
     206           1 :         for {
     207           1 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     208           1 :                 if ok {
     209           1 :                         break
     210             :                 }
     211           0 :                 time.Sleep(d)
     212             :         }
     213             : }
     214             : 
     215             : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     216             : func (cm *cleanupManager) deleteObsoleteFile(
     217             :         fileType fileType, jobID int, path string, fileNum base.DiskFileNum, fileSize uint64,
     218           1 : ) {
     219           1 :         // TODO(peter): need to handle this error, probably by re-adding the
     220           1 :         // file that couldn't be deleted to one of the obsolete slices map.
     221           1 :         err := cm.opts.Cleaner.Clean(cm.opts.FS, fileType, path)
     222           1 :         if oserror.IsNotExist(err) {
     223           0 :                 return
     224           0 :         }
     225             : 
     226           1 :         switch fileType {
     227           1 :         case fileTypeLog:
     228           1 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     229           1 :                         JobID:   jobID,
     230           1 :                         Path:    path,
     231           1 :                         FileNum: fileNum.FileNum(),
     232           1 :                         Err:     err,
     233           1 :                 })
     234           1 :         case fileTypeManifest:
     235           1 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     236           1 :                         JobID:   jobID,
     237           1 :                         Path:    path,
     238           1 :                         FileNum: fileNum.FileNum(),
     239           1 :                         Err:     err,
     240           1 :                 })
     241           0 :         case fileTypeTable:
     242           0 :                 panic("invalid deletion of object file")
     243             :         }
     244             : }
     245             : 
     246             : func (cm *cleanupManager) deleteObsoleteObject(
     247             :         fileType fileType, jobID int, fileNum base.DiskFileNum,
     248           1 : ) {
     249           1 :         if fileType != fileTypeTable {
     250           0 :                 panic("not an object")
     251             :         }
     252             : 
     253           1 :         var path string
     254           1 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     255           1 :         if err != nil {
     256           0 :                 path = "<nil>"
     257           1 :         } else {
     258           1 :                 path = cm.objProvider.Path(meta)
     259           1 :                 err = cm.objProvider.Remove(fileType, fileNum)
     260           1 :         }
     261           1 :         if cm.objProvider.IsNotExistError(err) {
     262           0 :                 return
     263           0 :         }
     264             : 
     265           1 :         switch fileType {
     266           1 :         case fileTypeTable:
     267           1 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     268           1 :                         JobID:   jobID,
     269           1 :                         Path:    path,
     270           1 :                         FileNum: fileNum.FileNum(),
     271           1 :                         Err:     err,
     272           1 :                 })
     273             :         }
     274             : }
     275             : 
     276             : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     277             : // when the job queue gets back to less than 10% full.
     278             : //
     279             : // Must be called with cm.mu locked.
     280           1 : func (cm *cleanupManager) maybeLogLocked() {
     281           1 :         const highThreshold = jobsQueueDepth * 3 / 4
     282           1 :         const lowThreshold = jobsQueueDepth / 10
     283           1 : 
     284           1 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     285           1 : 
     286           1 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     287           0 :                 cm.mu.jobsQueueWarningIssued = true
     288           0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     289           0 :         }
     290             : 
     291           1 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     292           0 :                 cm.mu.jobsQueueWarningIssued = false
     293           0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     294           0 :         }
     295             : }

Generated by: LCOV version 1.14