LCOV - code coverage report
Current view: top level - pebble - cleaner.go (source / functions) Hit Total Coverage
Test: 2024-01-01 08:15Z 1cce3d01 - tests + meta.lcov Lines: 141 161 87.6 %
Date: 2024-01-01 08:16:54 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "runtime/pprof"
      10             :         "sync"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/errors/oserror"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/invariants"
      16             :         "github.com/cockroachdb/pebble/objstorage"
      17             :         "github.com/cockroachdb/tokenbucket"
      18             : )
      19             : 
      20             : // Cleaner exports the base.Cleaner type.
      21             : type Cleaner = base.Cleaner
      22             : 
      23             : // DeleteCleaner exports the base.DeleteCleaner type.
      24             : type DeleteCleaner = base.DeleteCleaner
      25             : 
      26             : // ArchiveCleaner exports the base.ArchiveCleaner type.
      27             : type ArchiveCleaner = base.ArchiveCleaner
      28             : 
      29             : type cleanupManager struct {
      30             :         opts            *Options
      31             :         objProvider     objstorage.Provider
      32             :         onTableDeleteFn func(fileSize uint64)
      33             :         deletePacer     *deletionPacer
      34             : 
      35             :         // jobsCh is used as the cleanup job queue.
      36             :         jobsCh chan *cleanupJob
      37             :         // waitGroup is used to wait for the background goroutine to exit.
      38             :         waitGroup sync.WaitGroup
      39             : 
      40             :         mu struct {
      41             :                 sync.Mutex
      42             :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      43             :                 totalJobs              int
      44             :                 completedJobs          int
      45             :                 completedJobsCond      sync.Cond
      46             :                 jobsQueueWarningIssued bool
      47             :         }
      48             : }
      49             : 
      50             : // We can queue this many jobs before we have to block EnqueueJob.
      51             : const jobsQueueDepth = 1000
      52             : 
      53             : // obsoleteFile holds information about a file that needs to be deleted soon.
      54             : type obsoleteFile struct {
      55             :         dir      string
      56             :         fileNum  base.DiskFileNum
      57             :         fileType fileType
      58             :         fileSize uint64
      59             : }
      60             : 
      61             : type cleanupJob struct {
      62             :         jobID         int
      63             :         obsoleteFiles []obsoleteFile
      64             : }
      65             : 
      66             : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      67             : // The cleanupManager must be Close()d.
      68             : func openCleanupManager(
      69             :         opts *Options,
      70             :         objProvider objstorage.Provider,
      71             :         onTableDeleteFn func(fileSize uint64),
      72             :         getDeletePacerInfo func() deletionPacerInfo,
      73           2 : ) *cleanupManager {
      74           2 :         cm := &cleanupManager{
      75           2 :                 opts:            opts,
      76           2 :                 objProvider:     objProvider,
      77           2 :                 onTableDeleteFn: onTableDeleteFn,
      78           2 :                 deletePacer:     newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
      79           2 :                 jobsCh:          make(chan *cleanupJob, jobsQueueDepth),
      80           2 :         }
      81           2 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
      82           2 :         cm.waitGroup.Add(1)
      83           2 : 
      84           2 :         go func() {
      85           2 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
      86           2 :                         cm.mainLoop()
      87           2 :                 })
      88             :         }()
      89             : 
      90           2 :         return cm
      91             : }
      92             : 
      93             : // Close stops the background goroutine, waiting until all queued jobs are completed.
      94             : // Delete pacing is disabled for the remaining jobs.
      95           2 : func (cm *cleanupManager) Close() {
      96           2 :         close(cm.jobsCh)
      97           2 :         cm.waitGroup.Wait()
      98           2 : }
      99             : 
     100             : // EnqueueJob adds a cleanup job to the manager's queue.
     101           2 : func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) {
     102           2 :         job := &cleanupJob{
     103           2 :                 jobID:         jobID,
     104           2 :                 obsoleteFiles: obsoleteFiles,
     105           2 :         }
     106           2 : 
     107           2 :         // Report deleted bytes to the pacer, which can use this data to potentially
     108           2 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     109           2 :         // rather than when we get to the job, otherwise the reported bytes will be
     110           2 :         // subject to the throttling rate which defeats the purpose.
     111           2 :         var pacingBytes uint64
     112           2 :         for _, of := range obsoleteFiles {
     113           2 :                 if cm.needsPacing(of.fileType, of.fileNum) {
     114           2 :                         pacingBytes += of.fileSize
     115           2 :                 }
     116             :         }
     117           2 :         if pacingBytes > 0 {
     118           2 :                 cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
     119           2 :         }
     120             : 
     121           2 :         cm.mu.Lock()
     122           2 :         cm.mu.totalJobs++
     123           2 :         cm.maybeLogLocked()
     124           2 :         cm.mu.Unlock()
     125           2 : 
     126           2 :         if invariants.Enabled && len(cm.jobsCh) >= cap(cm.jobsCh)-2 {
     127           0 :                 panic("cleanup jobs queue full")
     128             :         }
     129             : 
     130           2 :         cm.jobsCh <- job
     131             : }
     132             : 
     133             : // Wait until the completion of all jobs that were already queued.
     134             : //
     135             : // Does not wait for jobs that are enqueued during the call.
     136             : //
     137             : // Note that DB.mu should not be held while calling this method; the background
     138             : // goroutine needs to acquire DB.mu to update deleted table metrics.
     139           2 : func (cm *cleanupManager) Wait() {
     140           2 :         cm.mu.Lock()
     141           2 :         defer cm.mu.Unlock()
     142           2 :         n := cm.mu.totalJobs
     143           2 :         for cm.mu.completedJobs < n {
     144           2 :                 cm.mu.completedJobsCond.Wait()
     145           2 :         }
     146             : }
     147             : 
     148             : // mainLoop runs the manager's background goroutine.
     149           2 : func (cm *cleanupManager) mainLoop() {
     150           2 :         defer cm.waitGroup.Done()
     151           2 : 
     152           2 :         var tb tokenbucket.TokenBucket
     153           2 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     154           2 :         tb.Init(1.0, 1.0)
     155           2 :         for job := range cm.jobsCh {
     156           2 :                 for _, of := range job.obsoleteFiles {
     157           2 :                         if of.fileType != fileTypeTable {
     158           2 :                                 path := base.MakeFilepath(cm.opts.FS, of.dir, of.fileType, of.fileNum)
     159           2 :                                 cm.deleteObsoleteFile(of.fileType, job.jobID, path, of.fileNum, of.fileSize)
     160           2 :                         } else {
     161           2 :                                 cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize)
     162           2 :                                 cm.onTableDeleteFn(of.fileSize)
     163           2 :                                 cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.fileNum)
     164           2 :                         }
     165             :                 }
     166           2 :                 cm.mu.Lock()
     167           2 :                 cm.mu.completedJobs++
     168           2 :                 cm.mu.completedJobsCond.Broadcast()
     169           2 :                 cm.maybeLogLocked()
     170           2 :                 cm.mu.Unlock()
     171             :         }
     172             : }
     173             : 
     174           2 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNum base.DiskFileNum) bool {
     175           2 :         if fileType != fileTypeTable {
     176           2 :                 return false
     177           2 :         }
     178           2 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     179           2 :         if err != nil {
     180           0 :                 // The object was already removed from the provider; we won't actually
     181           0 :                 // delete anything, so we don't need to pace.
     182           0 :                 return false
     183           0 :         }
     184             :         // Don't throttle deletion of remote objects.
     185           2 :         return !meta.IsRemote()
     186             : }
     187             : 
     188             : // maybePace sleeps before deleting an object if appropriate. It is always
     189             : // called from the background goroutine.
     190             : func (cm *cleanupManager) maybePace(
     191             :         tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
     192           2 : ) {
     193           2 :         if !cm.needsPacing(fileType, fileNum) {
     194           2 :                 return
     195           2 :         }
     196             : 
     197           2 :         tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
     198           2 :         if tokens == 0.0 {
     199           2 :                 // The token bucket might be in debt; it could make us wait even for 0
     200           2 :                 // tokens. We don't want that if the pacer decided throttling should be
     201           2 :                 // disabled.
     202           2 :                 return
     203           2 :         }
     204             :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     205             :         // the token bucket accumulates up to one second of unused tokens.
     206           2 :         for {
     207           2 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     208           2 :                 if ok {
     209           2 :                         break
     210             :                 }
     211           0 :                 time.Sleep(d)
     212             :         }
     213             : }
     214             : 
     215             : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     216             : func (cm *cleanupManager) deleteObsoleteFile(
     217             :         fileType fileType, jobID int, path string, fileNum base.DiskFileNum, fileSize uint64,
     218           2 : ) {
     219           2 :         // TODO(peter): need to handle this error, probably by re-adding the
     220           2 :         // file that couldn't be deleted to one of the obsolete slices map.
     221           2 :         err := cm.opts.Cleaner.Clean(cm.opts.FS, fileType, path)
     222           2 :         if oserror.IsNotExist(err) {
     223           0 :                 return
     224           0 :         }
     225             : 
     226           2 :         switch fileType {
     227           2 :         case fileTypeLog:
     228           2 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     229           2 :                         JobID:   jobID,
     230           2 :                         Path:    path,
     231           2 :                         FileNum: fileNum.FileNum(),
     232           2 :                         Err:     err,
     233           2 :                 })
     234           2 :         case fileTypeManifest:
     235           2 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     236           2 :                         JobID:   jobID,
     237           2 :                         Path:    path,
     238           2 :                         FileNum: fileNum.FileNum(),
     239           2 :                         Err:     err,
     240           2 :                 })
     241           0 :         case fileTypeTable:
     242           0 :                 panic("invalid deletion of object file")
     243             :         }
     244             : }
     245             : 
     246             : func (cm *cleanupManager) deleteObsoleteObject(
     247             :         fileType fileType, jobID int, fileNum base.DiskFileNum,
     248           2 : ) {
     249           2 :         if fileType != fileTypeTable {
     250           0 :                 panic("not an object")
     251             :         }
     252             : 
     253           2 :         var path string
     254           2 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     255           2 :         if err != nil {
     256           0 :                 path = "<nil>"
     257           2 :         } else {
     258           2 :                 path = cm.objProvider.Path(meta)
     259           2 :                 err = cm.objProvider.Remove(fileType, fileNum)
     260           2 :         }
     261           2 :         if cm.objProvider.IsNotExistError(err) {
     262           0 :                 return
     263           0 :         }
     264             : 
     265           2 :         switch fileType {
     266           2 :         case fileTypeTable:
     267           2 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     268           2 :                         JobID:   jobID,
     269           2 :                         Path:    path,
     270           2 :                         FileNum: fileNum.FileNum(),
     271           2 :                         Err:     err,
     272           2 :                 })
     273             :         }
     274             : }
     275             : 
     276             : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     277             : // when the job queue gets back to less than 10% full.
     278             : //
     279             : // Must be called with cm.mu locked.
     280           2 : func (cm *cleanupManager) maybeLogLocked() {
     281           2 :         const highThreshold = jobsQueueDepth * 3 / 4
     282           2 :         const lowThreshold = jobsQueueDepth / 10
     283           2 : 
     284           2 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     285           2 : 
     286           2 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     287           0 :                 cm.mu.jobsQueueWarningIssued = true
     288           0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     289           0 :         }
     290             : 
     291           2 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     292           0 :                 cm.mu.jobsQueueWarningIssued = false
     293           0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     294           0 :         }
     295             : }

Generated by: LCOV version 1.14