LCOV - code coverage report
Current view: top level - pebble - obsolete_files.go (source / functions) Coverage Total Hit
Test: 2025-02-09 08:17Z 12f37e44 - tests + meta.lcov Lines: 96.4 % 412 397
Test Date: 2025-02-09 08:18:31 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "context"
      10              :         "runtime/pprof"
      11              :         "slices"
      12              :         "sync"
      13              :         "time"
      14              : 
      15              :         "github.com/cockroachdb/crlib/crtime"
      16              :         "github.com/cockroachdb/errors/oserror"
      17              :         "github.com/cockroachdb/pebble/internal/base"
      18              :         "github.com/cockroachdb/pebble/objstorage"
      19              :         "github.com/cockroachdb/pebble/vfs"
      20              :         "github.com/cockroachdb/pebble/wal"
      21              :         "github.com/cockroachdb/tokenbucket"
      22              : )
      23              : 
      24              : // Cleaner exports the base.Cleaner type.
      25              : type Cleaner = base.Cleaner
      26              : 
      27              : // DeleteCleaner exports the base.DeleteCleaner type.
      28              : type DeleteCleaner = base.DeleteCleaner
      29              : 
      30              : // ArchiveCleaner exports the base.ArchiveCleaner type.
      31              : type ArchiveCleaner = base.ArchiveCleaner
      32              : 
      33              : type cleanupManager struct {
      34              :         opts            *Options
      35              :         objProvider     objstorage.Provider
      36              :         onTableDeleteFn func(fileSize uint64, isLocal bool)
      37              :         deletePacer     *deletionPacer
      38              : 
      39              :         // jobsCh is used as the cleanup job queue.
      40              :         jobsCh chan *cleanupJob
      41              :         // waitGroup is used to wait for the background goroutine to exit.
      42              :         waitGroup sync.WaitGroup
      43              : 
      44              :         mu struct {
      45              :                 sync.Mutex
      46              :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      47              :                 totalJobs              int
      48              :                 completedJobs          int
      49              :                 completedJobsCond      sync.Cond
      50              :                 jobsQueueWarningIssued bool
      51              :         }
      52              : }
      53              : 
      54              : // We can queue this many jobs before we have to block EnqueueJob.
      55              : const jobsQueueDepth = 1000
      56              : 
      57              : // deletableFile is used for non log files.
      58              : type deletableFile struct {
      59              :         dir      string
      60              :         fileNum  base.DiskFileNum
      61              :         fileSize uint64
      62              :         isLocal  bool
      63              : }
      64              : 
      65              : // obsoleteFile holds information about a file that needs to be deleted soon.
      66              : type obsoleteFile struct {
      67              :         fileType base.FileType
      68              :         // nonLogFile is populated when fileType != fileTypeLog.
      69              :         nonLogFile deletableFile
      70              :         // logFile is populated when fileType == fileTypeLog.
      71              :         logFile wal.DeletableLog
      72              : }
      73              : 
      74              : type cleanupJob struct {
      75              :         jobID         JobID
      76              :         obsoleteFiles []obsoleteFile
      77              : }
      78              : 
      79              : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      80              : // The cleanupManager must be Close()d.
      81              : func openCleanupManager(
      82              :         opts *Options,
      83              :         objProvider objstorage.Provider,
      84              :         onTableDeleteFn func(fileSize uint64, isLocal bool),
      85              :         getDeletePacerInfo func() deletionPacerInfo,
      86            2 : ) *cleanupManager {
      87            2 :         cm := &cleanupManager{
      88            2 :                 opts:            opts,
      89            2 :                 objProvider:     objProvider,
      90            2 :                 onTableDeleteFn: onTableDeleteFn,
      91            2 :                 deletePacer:     newDeletionPacer(crtime.NowMono(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
      92            2 :                 jobsCh:          make(chan *cleanupJob, jobsQueueDepth),
      93            2 :         }
      94            2 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
      95            2 :         cm.waitGroup.Add(1)
      96            2 : 
      97            2 :         go func() {
      98            2 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
      99            2 :                         cm.mainLoop()
     100            2 :                 })
     101              :         }()
     102              : 
     103            2 :         return cm
     104              : }
     105              : 
     106              : // Close stops the background goroutine, waiting until all queued jobs are completed.
     107              : // Delete pacing is disabled for the remaining jobs.
     108            2 : func (cm *cleanupManager) Close() {
     109            2 :         close(cm.jobsCh)
     110            2 :         cm.waitGroup.Wait()
     111            2 : }
     112              : 
     113              : // EnqueueJob adds a cleanup job to the manager's queue.
     114            2 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
     115            2 :         job := &cleanupJob{
     116            2 :                 jobID:         jobID,
     117            2 :                 obsoleteFiles: obsoleteFiles,
     118            2 :         }
     119            2 : 
     120            2 :         // Report deleted bytes to the pacer, which can use this data to potentially
     121            2 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     122            2 :         // rather than when we get to the job, otherwise the reported bytes will be
     123            2 :         // subject to the throttling rate which defeats the purpose.
     124            2 :         var pacingBytes uint64
     125            2 :         for _, of := range obsoleteFiles {
     126            2 :                 if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
     127            2 :                         pacingBytes += of.nonLogFile.fileSize
     128            2 :                 }
     129              :         }
     130            2 :         if pacingBytes > 0 {
     131            2 :                 cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
     132            2 :         }
     133              : 
     134            2 :         cm.mu.Lock()
     135            2 :         cm.mu.totalJobs++
     136            2 :         cm.maybeLogLocked()
     137            2 :         cm.mu.Unlock()
     138            2 : 
     139            2 :         cm.jobsCh <- job
     140              : }
     141              : 
     142              : // Wait until the completion of all jobs that were already queued.
     143              : //
     144              : // Does not wait for jobs that are enqueued during the call.
     145              : //
     146              : // Note that DB.mu should not be held while calling this method; the background
     147              : // goroutine needs to acquire DB.mu to update deleted table metrics.
     148            2 : func (cm *cleanupManager) Wait() {
     149            2 :         cm.mu.Lock()
     150            2 :         defer cm.mu.Unlock()
     151            2 :         n := cm.mu.totalJobs
     152            2 :         for cm.mu.completedJobs < n {
     153            2 :                 cm.mu.completedJobsCond.Wait()
     154            2 :         }
     155              : }
     156              : 
     157              : // mainLoop runs the manager's background goroutine.
     158            2 : func (cm *cleanupManager) mainLoop() {
     159            2 :         defer cm.waitGroup.Done()
     160            2 : 
     161            2 :         var tb tokenbucket.TokenBucket
     162            2 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     163            2 :         tb.Init(1.0, 1.0)
     164            2 :         for job := range cm.jobsCh {
     165            2 :                 for _, of := range job.obsoleteFiles {
     166            2 :                         switch of.fileType {
     167            2 :                         case base.FileTypeTable:
     168            2 :                                 cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
     169            2 :                                 cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
     170            2 :                                 cm.deleteObsoleteObject(of.fileType, job.jobID, of.nonLogFile.fileNum)
     171            1 :                         case base.FileTypeBlob:
     172            1 :                                 cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
     173            1 :                                 cm.deleteObsoleteObject(of.fileType, job.jobID, of.nonLogFile.fileNum)
     174            2 :                         case base.FileTypeLog:
     175            2 :                                 cm.deleteObsoleteFile(of.logFile.FS, base.FileTypeLog, job.jobID, of.logFile.Path,
     176            2 :                                         base.DiskFileNum(of.logFile.NumWAL))
     177            2 :                         default:
     178            2 :                                 path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
     179            2 :                                 cm.deleteObsoleteFile(
     180            2 :                                         cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum)
     181              :                         }
     182              :                 }
     183            2 :                 cm.mu.Lock()
     184            2 :                 cm.mu.completedJobs++
     185            2 :                 cm.mu.completedJobsCond.Broadcast()
     186            2 :                 cm.maybeLogLocked()
     187            2 :                 cm.mu.Unlock()
     188              :         }
     189              : }
     190              : 
     191              : // fileNumIfSST is read iff fileType is fileTypeTable.
     192            2 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
     193            2 :         if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
     194            2 :                 return false
     195            2 :         }
     196            2 :         meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
     197            2 :         if err != nil {
     198            2 :                 // The object was already removed from the provider; we won't actually
     199            2 :                 // delete anything, so we don't need to pace.
     200            2 :                 return false
     201            2 :         }
     202              :         // Don't throttle deletion of remote objects.
     203            2 :         return !meta.IsRemote()
     204              : }
     205              : 
     206              : // maybePace sleeps before deleting an object if appropriate. It is always
     207              : // called from the background goroutine.
     208              : func (cm *cleanupManager) maybePace(
     209              :         tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
     210            2 : ) {
     211            2 :         if !cm.needsPacing(fileType, fileNum) {
     212            2 :                 return
     213            2 :         }
     214              : 
     215            2 :         tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), fileSize)
     216            2 :         if tokens == 0.0 {
     217            2 :                 // The token bucket might be in debt; it could make us wait even for 0
     218            2 :                 // tokens. We don't want that if the pacer decided throttling should be
     219            2 :                 // disabled.
     220            2 :                 return
     221            2 :         }
     222              :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     223              :         // the token bucket accumulates up to one second of unused tokens.
     224            2 :         for {
     225            2 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     226            2 :                 if ok {
     227            2 :                         break
     228              :                 }
     229            1 :                 time.Sleep(d)
     230              :         }
     231              : }
     232              : 
     233              : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     234              : func (cm *cleanupManager) deleteObsoleteFile(
     235              :         fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
     236            2 : ) {
     237            2 :         // TODO(peter): need to handle this error, probably by re-adding the
     238            2 :         // file that couldn't be deleted to one of the obsolete slices map.
     239            2 :         err := cm.opts.Cleaner.Clean(fs, fileType, path)
     240            2 :         if oserror.IsNotExist(err) {
     241            2 :                 return
     242            2 :         }
     243              : 
     244            2 :         switch fileType {
     245            2 :         case base.FileTypeLog:
     246            2 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     247            2 :                         JobID:   int(jobID),
     248            2 :                         Path:    path,
     249            2 :                         FileNum: fileNum,
     250            2 :                         Err:     err,
     251            2 :                 })
     252            2 :         case base.FileTypeManifest:
     253            2 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     254            2 :                         JobID:   int(jobID),
     255            2 :                         Path:    path,
     256            2 :                         FileNum: fileNum,
     257            2 :                         Err:     err,
     258            2 :                 })
     259            0 :         case base.FileTypeTable, base.FileTypeBlob:
     260            0 :                 panic("invalid deletion of object file")
     261              :         }
     262              : }
     263              : 
     264              : func (cm *cleanupManager) deleteObsoleteObject(
     265              :         fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
     266            2 : ) {
     267            2 :         if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
     268            0 :                 panic("not an object")
     269              :         }
     270              : 
     271            2 :         var path string
     272            2 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     273            2 :         if err != nil {
     274            2 :                 path = "<nil>"
     275            2 :         } else {
     276            2 :                 path = cm.objProvider.Path(meta)
     277            2 :                 err = cm.objProvider.Remove(fileType, fileNum)
     278            2 :         }
     279            2 :         if cm.objProvider.IsNotExistError(err) {
     280            2 :                 return
     281            2 :         }
     282              : 
     283            2 :         switch fileType {
     284            2 :         case base.FileTypeTable:
     285            2 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     286            2 :                         JobID:   int(jobID),
     287            2 :                         Path:    path,
     288            2 :                         FileNum: fileNum,
     289            2 :                         Err:     err,
     290            2 :                 })
     291              :                 // TODO(jackson): Add BlobFileDeleted event.
     292              :         }
     293              : }
     294              : 
     295              : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     296              : // when the job queue gets back to less than 10% full.
     297              : //
     298              : // Must be called with cm.mu locked.
     299            2 : func (cm *cleanupManager) maybeLogLocked() {
     300            2 :         const highThreshold = jobsQueueDepth * 3 / 4
     301            2 :         const lowThreshold = jobsQueueDepth / 10
     302            2 : 
     303            2 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     304            2 : 
     305            2 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     306            0 :                 cm.mu.jobsQueueWarningIssued = true
     307            0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     308            0 :         }
     309              : 
     310            2 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     311            0 :                 cm.mu.jobsQueueWarningIssued = false
     312            0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     313            0 :         }
     314              : }
     315              : 
     316            2 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
     317            2 :         var pacerInfo deletionPacerInfo
     318            2 :         // Call GetDiskUsage after every file deletion. This may seem inefficient,
     319            2 :         // but in practice this was observed to take constant time, regardless of
     320            2 :         // volume size used, at least on linux with ext4 and zfs. All invocations
     321            2 :         // take 10 microseconds or less.
     322            2 :         pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
     323            2 :         d.mu.Lock()
     324            2 :         pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
     325            2 :         pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
     326            2 :         d.mu.Unlock()
     327            2 :         return pacerInfo
     328            2 : }
     329              : 
     330              : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
     331            2 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
     332            2 :         d.mu.Lock()
     333            2 :         d.mu.versions.metrics.Table.ObsoleteCount--
     334            2 :         d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
     335            2 :         if isLocal {
     336            2 :                 d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
     337            2 :         }
     338            2 :         d.mu.Unlock()
     339              : }
     340              : 
     341              : // scanObsoleteFiles scans the filesystem for files that are no longer needed
     342              : // and adds those to the internal lists of obsolete files. Note that the files
     343              : // are not actually deleted by this method. A subsequent call to
     344              : // deleteObsoleteFiles must be performed. Must be not be called concurrently
     345              : // with compactions and flushes. db.mu must be held when calling this function.
     346            2 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
     347            2 :         // Disable automatic compactions temporarily to avoid concurrent compactions /
     348            2 :         // flushes from interfering. The original value is restored on completion.
     349            2 :         disabledPrev := d.opts.DisableAutomaticCompactions
     350            2 :         defer func() {
     351            2 :                 d.opts.DisableAutomaticCompactions = disabledPrev
     352            2 :         }()
     353            2 :         d.opts.DisableAutomaticCompactions = true
     354            2 : 
     355            2 :         // Wait for any ongoing compaction to complete before continuing.
     356            2 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
     357            2 :                 d.mu.compact.cond.Wait()
     358            2 :         }
     359              : 
     360            2 :         liveFileNums := make(map[base.DiskFileNum]struct{})
     361            2 :         d.mu.versions.addLiveFileNums(liveFileNums)
     362            2 :         // Protect against files which are only referred to by the ingestedFlushable
     363            2 :         // from being deleted. These are added to the flushable queue on WAL replay
     364            2 :         // and handle their own obsoletion/deletion. We exclude them from this obsolete
     365            2 :         // file scan to avoid double-deleting these files.
     366            2 :         for _, f := range flushableIngests {
     367            2 :                 for _, file := range f.files {
     368            2 :                         liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
     369            2 :                 }
     370              :         }
     371              : 
     372            2 :         manifestFileNum := d.mu.versions.manifestFileNum
     373            2 : 
     374            2 :         var obsoleteTables []objectInfo
     375            2 :         var obsoleteBlobs []objectInfo
     376            2 :         var obsoleteManifests []fileInfo
     377            2 :         var obsoleteOptions []fileInfo
     378            2 : 
     379            2 :         for _, filename := range list {
     380            2 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
     381            2 :                 if !ok {
     382            2 :                         continue
     383              :                 }
     384            2 :                 switch fileType {
     385            2 :                 case base.FileTypeManifest:
     386            2 :                         if diskFileNum >= manifestFileNum {
     387            2 :                                 continue
     388              :                         }
     389            2 :                         fi := fileInfo{FileNum: diskFileNum}
     390            2 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     391            1 :                                 fi.FileSize = uint64(stat.Size())
     392            1 :                         }
     393            2 :                         obsoleteManifests = append(obsoleteManifests, fi)
     394            2 :                 case base.FileTypeOptions:
     395            2 :                         if diskFileNum >= d.optionsFileNum {
     396            2 :                                 continue
     397              :                         }
     398            2 :                         fi := fileInfo{FileNum: diskFileNum}
     399            2 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     400            1 :                                 fi.FileSize = uint64(stat.Size())
     401            1 :                         }
     402            2 :                         obsoleteOptions = append(obsoleteOptions, fi)
     403            2 :                 case base.FileTypeTable, base.FileTypeBlob:
     404              :                         // Objects are handled through the objstorage provider below.
     405            2 :                 default:
     406              :                         // Don't delete files we don't know about.
     407              :                 }
     408              :         }
     409              : 
     410            2 :         objects := d.objProvider.List()
     411            2 :         for _, obj := range objects {
     412            2 :                 if _, ok := liveFileNums[obj.DiskFileNum]; ok {
     413            2 :                         continue
     414              :                 }
     415            2 :                 makeObjectInfo := func() objectInfo {
     416            2 :                         fileInfo := fileInfo{FileNum: obj.DiskFileNum}
     417            2 :                         if size, err := d.objProvider.Size(obj); err == nil {
     418            2 :                                 fileInfo.FileSize = uint64(size)
     419            2 :                         }
     420            2 :                         return objectInfo{
     421            2 :                                 fileInfo: fileInfo,
     422            2 :                                 isLocal:  !obj.IsRemote(),
     423            2 :                         }
     424              :                 }
     425              : 
     426            2 :                 switch obj.FileType {
     427            2 :                 case base.FileTypeTable:
     428            2 :                         obsoleteTables = append(obsoleteTables, makeObjectInfo())
     429            1 :                 case base.FileTypeBlob:
     430            1 :                         obsoleteBlobs = append(obsoleteBlobs, makeObjectInfo())
     431            0 :                 default:
     432              :                         // Ignore object types we don't know about.
     433              :                 }
     434              :         }
     435              : 
     436            2 :         d.mu.versions.obsoleteTables = mergeObjectInfos(d.mu.versions.obsoleteTables, obsoleteTables)
     437            2 :         d.mu.versions.obsoleteBlobs = mergeObjectInfos(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
     438            2 :         d.mu.versions.updateObsoleteTableMetricsLocked()
     439            2 :         d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
     440            2 :         d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
     441              : }
     442              : 
     443              : // disableFileDeletions disables file deletions and then waits for any
     444              : // in-progress deletion to finish. The caller is required to call
     445              : // enableFileDeletions in order to enable file deletions again. It is ok for
     446              : // multiple callers to disable file deletions simultaneously, though they must
     447              : // all invoke enableFileDeletions in order for file deletions to be re-enabled
     448              : // (there is an internal reference count on file deletion disablement).
     449              : //
     450              : // d.mu must be held when calling this method.
     451            2 : func (d *DB) disableFileDeletions() {
     452            2 :         d.mu.disableFileDeletions++
     453            2 :         d.mu.Unlock()
     454            2 :         defer d.mu.Lock()
     455            2 :         d.cleanupManager.Wait()
     456            2 : }
     457              : 
     458              : // enableFileDeletions enables previously disabled file deletions. A cleanup job
     459              : // is queued if necessary.
     460              : //
     461              : // d.mu must be held when calling this method.
     462            2 : func (d *DB) enableFileDeletions() {
     463            2 :         if d.mu.disableFileDeletions <= 0 {
     464            1 :                 panic("pebble: file deletion disablement invariant violated")
     465              :         }
     466            2 :         d.mu.disableFileDeletions--
     467            2 :         if d.mu.disableFileDeletions > 0 {
     468            0 :                 return
     469            0 :         }
     470            2 :         d.deleteObsoleteFiles(d.newJobIDLocked())
     471              : }
     472              : 
     473              : type fileInfo = base.FileInfo
     474              : 
     475              : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
     476              : //
     477              : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
     478              : //
     479              : // Does nothing if file deletions are disabled (see disableFileDeletions). A
     480              : // cleanup job will be scheduled when file deletions are re-enabled.
     481            2 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
     482            2 :         if d.mu.disableFileDeletions > 0 {
     483            2 :                 return
     484            2 :         }
     485            2 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
     486            2 : 
     487            2 :         // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
     488            2 :         // log that has not had its contents flushed to an sstable.
     489            2 :         obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
     490            2 :         if err != nil {
     491            0 :                 panic(err)
     492              :         }
     493              : 
     494            2 :         obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
     495            2 :         d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
     496            2 :         obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
     497            2 :         d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
     498            2 : 
     499            2 :         for _, tbl := range obsoleteTables {
     500            2 :                 delete(d.mu.versions.zombieTables, tbl.FileNum)
     501            2 :         }
     502              : 
     503              :         // Sort the manifests cause we want to delete some contiguous prefix
     504              :         // of the older manifests.
     505            2 :         slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
     506            2 :                 return cmp.Compare(a.FileNum, b.FileNum)
     507            2 :         })
     508              : 
     509            2 :         var obsoleteManifests []fileInfo
     510            2 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
     511            2 :         if manifestsToDelete > 0 {
     512            2 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
     513            2 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
     514            2 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
     515            0 :                         d.mu.versions.obsoleteManifests = nil
     516            0 :                 }
     517              :         }
     518              : 
     519            2 :         obsoleteOptions := d.mu.versions.obsoleteOptions
     520            2 :         d.mu.versions.obsoleteOptions = nil
     521            2 : 
     522            2 :         // Release d.mu while preparing the cleanup job and possibly waiting.
     523            2 :         // Note the unusual order: Unlock and then Lock.
     524            2 :         d.mu.Unlock()
     525            2 :         defer d.mu.Lock()
     526            2 : 
     527            2 :         filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
     528            2 :         for _, f := range obsoleteLogs {
     529            2 :                 filesToDelete = append(filesToDelete, obsoleteFile{fileType: base.FileTypeLog, logFile: f})
     530            2 :         }
     531              :         // We sort to make the order of deletions deterministic, which is nice for
     532              :         // tests.
     533            2 :         slices.SortFunc(obsoleteTables, func(a, b objectInfo) int {
     534            2 :                 return cmp.Compare(a.FileNum, b.FileNum)
     535            2 :         })
     536            2 :         slices.SortFunc(obsoleteBlobs, func(a, b objectInfo) int {
     537            1 :                 return cmp.Compare(a.FileNum, b.FileNum)
     538            1 :         })
     539            2 :         for _, f := range obsoleteTables {
     540            2 :                 d.fileCache.evict(f.FileNum)
     541            2 :                 filesToDelete = append(filesToDelete, obsoleteFile{
     542            2 :                         fileType: base.FileTypeTable,
     543            2 :                         nonLogFile: deletableFile{
     544            2 :                                 dir:      d.dirname,
     545            2 :                                 fileNum:  f.FileNum,
     546            2 :                                 fileSize: f.FileSize,
     547            2 :                                 isLocal:  f.isLocal,
     548            2 :                         },
     549            2 :                 })
     550            2 :         }
     551            2 :         for _, f := range obsoleteBlobs {
     552            1 :                 d.fileCache.evict(f.FileNum)
     553            1 :                 filesToDelete = append(filesToDelete, obsoleteFile{
     554            1 :                         fileType: base.FileTypeBlob,
     555            1 :                         nonLogFile: deletableFile{
     556            1 :                                 dir:      d.dirname,
     557            1 :                                 fileNum:  f.FileNum,
     558            1 :                                 fileSize: f.FileSize,
     559            1 :                                 isLocal:  f.isLocal,
     560            1 :                         },
     561            1 :                 })
     562            1 :         }
     563              : 
     564            2 :         files := [2]struct {
     565            2 :                 fileType base.FileType
     566            2 :                 obsolete []fileInfo
     567            2 :         }{
     568            2 :                 {base.FileTypeManifest, obsoleteManifests},
     569            2 :                 {base.FileTypeOptions, obsoleteOptions},
     570            2 :         }
     571            2 :         for _, f := range files {
     572            2 :                 // We sort to make the order of deletions deterministic, which is nice for
     573            2 :                 // tests.
     574            2 :                 slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
     575            2 :                         return cmp.Compare(a.FileNum, b.FileNum)
     576            2 :                 })
     577            2 :                 for _, fi := range f.obsolete {
     578            2 :                         dir := d.dirname
     579            2 :                         filesToDelete = append(filesToDelete, obsoleteFile{
     580            2 :                                 fileType: f.fileType,
     581            2 :                                 nonLogFile: deletableFile{
     582            2 :                                         dir:      dir,
     583            2 :                                         fileNum:  fi.FileNum,
     584            2 :                                         fileSize: fi.FileSize,
     585            2 :                                         isLocal:  true,
     586            2 :                                 },
     587            2 :                         })
     588            2 :                 }
     589              :         }
     590            2 :         if len(filesToDelete) > 0 {
     591            2 :                 d.cleanupManager.EnqueueJob(jobID, filesToDelete)
     592            2 :         }
     593            2 :         if d.opts.private.testingAlwaysWaitForCleanup {
     594            1 :                 d.cleanupManager.Wait()
     595            1 :         }
     596              : }
     597              : 
     598            2 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
     599            2 :         d.mu.Lock()
     600            2 :         defer d.mu.Unlock()
     601            2 :         d.maybeScheduleObsoleteTableDeletionLocked()
     602            2 : }
     603              : 
     604            2 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
     605            2 :         if len(d.mu.versions.obsoleteTables) > 0 {
     606            2 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
     607            2 :         }
     608              : }
     609              : 
     610            2 : func merge(a, b []fileInfo) []fileInfo {
     611            2 :         if len(b) == 0 {
     612            2 :                 return a
     613            2 :         }
     614              : 
     615            2 :         a = append(a, b...)
     616            2 :         slices.SortFunc(a, func(a, b fileInfo) int {
     617            2 :                 return cmp.Compare(a.FileNum, b.FileNum)
     618            2 :         })
     619            2 :         return slices.CompactFunc(a, func(a, b fileInfo) bool {
     620            2 :                 return a.FileNum == b.FileNum
     621            2 :         })
     622              : }
     623              : 
     624            2 : func mergeObjectInfos(a, b []objectInfo) []objectInfo {
     625            2 :         if len(b) == 0 {
     626            2 :                 return a
     627            2 :         }
     628              : 
     629            2 :         a = append(a, b...)
     630            2 :         slices.SortFunc(a, func(a, b objectInfo) int {
     631            2 :                 return cmp.Compare(a.FileNum, b.FileNum)
     632            2 :         })
     633            2 :         return slices.CompactFunc(a, func(a, b objectInfo) bool {
     634            2 :                 return a.FileNum == b.FileNum
     635            2 :         })
     636              : }
        

Generated by: LCOV version 2.0-1