LCOV - code coverage report
Current view: top level - pebble - obsolete_files.go (source / functions) Hit Total Coverage
Test: 2024-07-21 08:15Z 72c3f550 - tests + meta.lcov Lines: 367 390 94.1 %
Date: 2024-07-21 08:16:56 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "context"
      10             :         "runtime/pprof"
      11             :         "slices"
      12             :         "sync"
      13             :         "time"
      14             : 
      15             :         "github.com/cockroachdb/errors/oserror"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/objstorage"
      18             :         "github.com/cockroachdb/pebble/vfs"
      19             :         "github.com/cockroachdb/pebble/wal"
      20             :         "github.com/cockroachdb/tokenbucket"
      21             : )
      22             : 
      23             : // Cleaner exports the base.Cleaner type.
      24             : type Cleaner = base.Cleaner
      25             : 
      26             : // DeleteCleaner exports the base.DeleteCleaner type.
      27             : type DeleteCleaner = base.DeleteCleaner
      28             : 
      29             : // ArchiveCleaner exports the base.ArchiveCleaner type.
      30             : type ArchiveCleaner = base.ArchiveCleaner
      31             : 
      32             : type cleanupManager struct {
      33             :         opts            *Options
      34             :         objProvider     objstorage.Provider
      35             :         onTableDeleteFn func(fileSize uint64, isLocal bool)
      36             :         deletePacer     *deletionPacer
      37             : 
      38             :         // jobsCh is used as the cleanup job queue.
      39             :         jobsCh chan *cleanupJob
      40             :         // waitGroup is used to wait for the background goroutine to exit.
      41             :         waitGroup sync.WaitGroup
      42             : 
      43             :         mu struct {
      44             :                 sync.Mutex
      45             :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      46             :                 totalJobs              int
      47             :                 completedJobs          int
      48             :                 completedJobsCond      sync.Cond
      49             :                 jobsQueueWarningIssued bool
      50             :         }
      51             : }
      52             : 
      53             : // We can queue this many jobs before we have to block EnqueueJob.
      54             : const jobsQueueDepth = 1000
      55             : 
      56             : // deletableFile is used for non log files.
      57             : type deletableFile struct {
      58             :         dir      string
      59             :         fileNum  base.DiskFileNum
      60             :         fileSize uint64
      61             :         isLocal  bool
      62             : }
      63             : 
      64             : // obsoleteFile holds information about a file that needs to be deleted soon.
      65             : type obsoleteFile struct {
      66             :         fileType fileType
      67             :         // nonLogFile is populated when fileType != fileTypeLog.
      68             :         nonLogFile deletableFile
      69             :         // logFile is populated when fileType == fileTypeLog.
      70             :         logFile wal.DeletableLog
      71             : }
      72             : 
      73             : type cleanupJob struct {
      74             :         jobID         JobID
      75             :         obsoleteFiles []obsoleteFile
      76             : }
      77             : 
      78             : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      79             : // The cleanupManager must be Close()d.
      80             : func openCleanupManager(
      81             :         opts *Options,
      82             :         objProvider objstorage.Provider,
      83             :         onTableDeleteFn func(fileSize uint64, isLocal bool),
      84             :         getDeletePacerInfo func() deletionPacerInfo,
      85           2 : ) *cleanupManager {
      86           2 :         cm := &cleanupManager{
      87           2 :                 opts:            opts,
      88           2 :                 objProvider:     objProvider,
      89           2 :                 onTableDeleteFn: onTableDeleteFn,
      90           2 :                 deletePacer:     newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
      91           2 :                 jobsCh:          make(chan *cleanupJob, jobsQueueDepth),
      92           2 :         }
      93           2 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
      94           2 :         cm.waitGroup.Add(1)
      95           2 : 
      96           2 :         go func() {
      97           2 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
      98           2 :                         cm.mainLoop()
      99           2 :                 })
     100             :         }()
     101             : 
     102           2 :         return cm
     103             : }
     104             : 
     105             : // Close stops the background goroutine, waiting until all queued jobs are completed.
     106             : // Delete pacing is disabled for the remaining jobs.
     107           2 : func (cm *cleanupManager) Close() {
     108           2 :         close(cm.jobsCh)
     109           2 :         cm.waitGroup.Wait()
     110           2 : }
     111             : 
     112             : // EnqueueJob adds a cleanup job to the manager's queue.
     113           2 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
     114           2 :         job := &cleanupJob{
     115           2 :                 jobID:         jobID,
     116           2 :                 obsoleteFiles: obsoleteFiles,
     117           2 :         }
     118           2 : 
     119           2 :         // Report deleted bytes to the pacer, which can use this data to potentially
     120           2 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     121           2 :         // rather than when we get to the job, otherwise the reported bytes will be
     122           2 :         // subject to the throttling rate which defeats the purpose.
     123           2 :         var pacingBytes uint64
     124           2 :         for _, of := range obsoleteFiles {
     125           2 :                 if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
     126           2 :                         pacingBytes += of.nonLogFile.fileSize
     127           2 :                 }
     128             :         }
     129           2 :         if pacingBytes > 0 {
     130           2 :                 cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
     131           2 :         }
     132             : 
     133           2 :         cm.mu.Lock()
     134           2 :         cm.mu.totalJobs++
     135           2 :         cm.maybeLogLocked()
     136           2 :         cm.mu.Unlock()
     137           2 : 
     138           2 :         cm.jobsCh <- job
     139             : }
     140             : 
     141             : // Wait until the completion of all jobs that were already queued.
     142             : //
     143             : // Does not wait for jobs that are enqueued during the call.
     144             : //
     145             : // Note that DB.mu should not be held while calling this method; the background
     146             : // goroutine needs to acquire DB.mu to update deleted table metrics.
     147           2 : func (cm *cleanupManager) Wait() {
     148           2 :         cm.mu.Lock()
     149           2 :         defer cm.mu.Unlock()
     150           2 :         n := cm.mu.totalJobs
     151           2 :         for cm.mu.completedJobs < n {
     152           2 :                 cm.mu.completedJobsCond.Wait()
     153           2 :         }
     154             : }
     155             : 
     156             : // mainLoop runs the manager's background goroutine.
     157           2 : func (cm *cleanupManager) mainLoop() {
     158           2 :         defer cm.waitGroup.Done()
     159           2 : 
     160           2 :         var tb tokenbucket.TokenBucket
     161           2 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     162           2 :         tb.Init(1.0, 1.0)
     163           2 :         for job := range cm.jobsCh {
     164           2 :                 for _, of := range job.obsoleteFiles {
     165           2 :                         switch of.fileType {
     166           2 :                         case fileTypeTable:
     167           2 :                                 cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
     168           2 :                                 cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
     169           2 :                                 cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
     170           2 :                         case fileTypeLog:
     171           2 :                                 cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
     172           2 :                                         base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize)
     173           2 :                         default:
     174           2 :                                 path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
     175           2 :                                 cm.deleteObsoleteFile(
     176           2 :                                         cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
     177             :                         }
     178             :                 }
     179           2 :                 cm.mu.Lock()
     180           2 :                 cm.mu.completedJobs++
     181           2 :                 cm.mu.completedJobsCond.Broadcast()
     182           2 :                 cm.maybeLogLocked()
     183           2 :                 cm.mu.Unlock()
     184             :         }
     185             : }
     186             : 
     187             : // fileNumIfSST is read iff fileType is fileTypeTable.
     188           2 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
     189           2 :         if fileType != fileTypeTable {
     190           2 :                 return false
     191           2 :         }
     192           2 :         meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
     193           2 :         if err != nil {
     194           1 :                 // The object was already removed from the provider; we won't actually
     195           1 :                 // delete anything, so we don't need to pace.
     196           1 :                 return false
     197           1 :         }
     198             :         // Don't throttle deletion of remote objects.
     199           2 :         return !meta.IsRemote()
     200             : }
     201             : 
     202             : // maybePace sleeps before deleting an object if appropriate. It is always
     203             : // called from the background goroutine.
     204             : func (cm *cleanupManager) maybePace(
     205             :         tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
     206           2 : ) {
     207           2 :         if !cm.needsPacing(fileType, fileNum) {
     208           2 :                 return
     209           2 :         }
     210             : 
     211           2 :         tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
     212           2 :         if tokens == 0.0 {
     213           2 :                 // The token bucket might be in debt; it could make us wait even for 0
     214           2 :                 // tokens. We don't want that if the pacer decided throttling should be
     215           2 :                 // disabled.
     216           2 :                 return
     217           2 :         }
     218             :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     219             :         // the token bucket accumulates up to one second of unused tokens.
     220           2 :         for {
     221           2 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     222           2 :                 if ok {
     223           2 :                         break
     224             :                 }
     225           1 :                 time.Sleep(d)
     226             :         }
     227             : }
     228             : 
     229             : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     230             : func (cm *cleanupManager) deleteObsoleteFile(
     231             :         fs vfs.FS, fileType fileType, jobID JobID, path string, fileNum base.DiskFileNum, fileSize uint64,
     232           2 : ) {
     233           2 :         // TODO(peter): need to handle this error, probably by re-adding the
     234           2 :         // file that couldn't be deleted to one of the obsolete slices map.
     235           2 :         err := cm.opts.Cleaner.Clean(fs, fileType, path)
     236           2 :         if oserror.IsNotExist(err) {
     237           0 :                 return
     238           0 :         }
     239             : 
     240           2 :         switch fileType {
     241           2 :         case fileTypeLog:
     242           2 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     243           2 :                         JobID:   int(jobID),
     244           2 :                         Path:    path,
     245           2 :                         FileNum: fileNum,
     246           2 :                         Err:     err,
     247           2 :                 })
     248           2 :         case fileTypeManifest:
     249           2 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     250           2 :                         JobID:   int(jobID),
     251           2 :                         Path:    path,
     252           2 :                         FileNum: fileNum,
     253           2 :                         Err:     err,
     254           2 :                 })
     255           0 :         case fileTypeTable:
     256           0 :                 panic("invalid deletion of object file")
     257             :         }
     258             : }
     259             : 
     260             : func (cm *cleanupManager) deleteObsoleteObject(
     261             :         fileType fileType, jobID JobID, fileNum base.DiskFileNum,
     262           2 : ) {
     263           2 :         if fileType != fileTypeTable {
     264           0 :                 panic("not an object")
     265             :         }
     266             : 
     267           2 :         var path string
     268           2 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     269           2 :         if err != nil {
     270           1 :                 path = "<nil>"
     271           2 :         } else {
     272           2 :                 path = cm.objProvider.Path(meta)
     273           2 :                 err = cm.objProvider.Remove(fileType, fileNum)
     274           2 :         }
     275           2 :         if cm.objProvider.IsNotExistError(err) {
     276           1 :                 return
     277           1 :         }
     278             : 
     279           2 :         switch fileType {
     280           2 :         case fileTypeTable:
     281           2 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     282           2 :                         JobID:   int(jobID),
     283           2 :                         Path:    path,
     284           2 :                         FileNum: fileNum,
     285           2 :                         Err:     err,
     286           2 :                 })
     287             :         }
     288             : }
     289             : 
     290             : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     291             : // when the job queue gets back to less than 10% full.
     292             : //
     293             : // Must be called with cm.mu locked.
     294           2 : func (cm *cleanupManager) maybeLogLocked() {
     295           2 :         const highThreshold = jobsQueueDepth * 3 / 4
     296           2 :         const lowThreshold = jobsQueueDepth / 10
     297           2 : 
     298           2 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     299           2 : 
     300           2 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     301           0 :                 cm.mu.jobsQueueWarningIssued = true
     302           0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     303           0 :         }
     304             : 
     305           2 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     306           0 :                 cm.mu.jobsQueueWarningIssued = false
     307           0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     308           0 :         }
     309             : }
     310             : 
     311           2 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
     312           2 :         var pacerInfo deletionPacerInfo
     313           2 :         // Call GetDiskUsage after every file deletion. This may seem inefficient,
     314           2 :         // but in practice this was observed to take constant time, regardless of
     315           2 :         // volume size used, at least on linux with ext4 and zfs. All invocations
     316           2 :         // take 10 microseconds or less.
     317           2 :         pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
     318           2 :         d.mu.Lock()
     319           2 :         pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
     320           2 :         pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
     321           2 :         d.mu.Unlock()
     322           2 :         return pacerInfo
     323           2 : }
     324             : 
     325             : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
     326           2 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
     327           2 :         d.mu.Lock()
     328           2 :         d.mu.versions.metrics.Table.ObsoleteCount--
     329           2 :         d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
     330           2 :         if isLocal {
     331           2 :                 d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
     332           2 :         }
     333           2 :         d.mu.Unlock()
     334             : }
     335             : 
     336             : // scanObsoleteFiles scans the filesystem for files that are no longer needed
     337             : // and adds those to the internal lists of obsolete files. Note that the files
     338             : // are not actually deleted by this method. A subsequent call to
     339             : // deleteObsoleteFiles must be performed. Must be not be called concurrently
     340             : // with compactions and flushes. db.mu must be held when calling this function.
     341           2 : func (d *DB) scanObsoleteFiles(list []string) {
     342           2 :         // Disable automatic compactions temporarily to avoid concurrent compactions /
     343           2 :         // flushes from interfering. The original value is restored on completion.
     344           2 :         disabledPrev := d.opts.DisableAutomaticCompactions
     345           2 :         defer func() {
     346           2 :                 d.opts.DisableAutomaticCompactions = disabledPrev
     347           2 :         }()
     348           2 :         d.opts.DisableAutomaticCompactions = true
     349           2 : 
     350           2 :         // Wait for any ongoing compaction to complete before continuing.
     351           2 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
     352           0 :                 d.mu.compact.cond.Wait()
     353           0 :         }
     354             : 
     355           2 :         liveFileNums := make(map[base.DiskFileNum]struct{})
     356           2 :         d.mu.versions.addLiveFileNums(liveFileNums)
     357           2 :         // Protect against files which are only referred to by the ingestedFlushable
     358           2 :         // from being deleted. These are added to the flushable queue on WAL replay
     359           2 :         // during read only mode and aren't part of the Version. Note that if
     360           2 :         // !d.opts.ReadOnly, then all flushables of type ingestedFlushable have
     361           2 :         // already been flushed.
     362           2 :         for _, fEntry := range d.mu.mem.queue {
     363           2 :                 if f, ok := fEntry.flushable.(*ingestedFlushable); ok {
     364           0 :                         for _, file := range f.files {
     365           0 :                                 liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
     366           0 :                         }
     367             :                 }
     368             :         }
     369             : 
     370           2 :         manifestFileNum := d.mu.versions.manifestFileNum
     371           2 : 
     372           2 :         var obsoleteTables []tableInfo
     373           2 :         var obsoleteManifests []fileInfo
     374           2 :         var obsoleteOptions []fileInfo
     375           2 : 
     376           2 :         for _, filename := range list {
     377           2 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
     378           2 :                 if !ok {
     379           2 :                         continue
     380             :                 }
     381           2 :                 switch fileType {
     382           2 :                 case fileTypeManifest:
     383           2 :                         if diskFileNum >= manifestFileNum {
     384           1 :                                 continue
     385             :                         }
     386           2 :                         fi := fileInfo{FileNum: diskFileNum}
     387           2 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     388           1 :                                 fi.FileSize = uint64(stat.Size())
     389           1 :                         }
     390           2 :                         obsoleteManifests = append(obsoleteManifests, fi)
     391           2 :                 case fileTypeOptions:
     392           2 :                         if diskFileNum >= d.optionsFileNum {
     393           0 :                                 continue
     394             :                         }
     395           2 :                         fi := fileInfo{FileNum: diskFileNum}
     396           2 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     397           1 :                                 fi.FileSize = uint64(stat.Size())
     398           1 :                         }
     399           2 :                         obsoleteOptions = append(obsoleteOptions, fi)
     400           2 :                 case fileTypeTable:
     401             :                         // Objects are handled through the objstorage provider below.
     402           2 :                 default:
     403             :                         // Don't delete files we don't know about.
     404             :                 }
     405             :         }
     406             : 
     407           2 :         objects := d.objProvider.List()
     408           2 :         for _, obj := range objects {
     409           2 :                 switch obj.FileType {
     410           2 :                 case fileTypeTable:
     411           2 :                         if _, ok := liveFileNums[obj.DiskFileNum]; ok {
     412           2 :                                 continue
     413             :                         }
     414           2 :                         fileInfo := fileInfo{
     415           2 :                                 FileNum: obj.DiskFileNum,
     416           2 :                         }
     417           2 :                         if size, err := d.objProvider.Size(obj); err == nil {
     418           2 :                                 fileInfo.FileSize = uint64(size)
     419           2 :                         }
     420           2 :                         obsoleteTables = append(obsoleteTables, tableInfo{
     421           2 :                                 fileInfo: fileInfo,
     422           2 :                                 isLocal:  !obj.IsRemote(),
     423           2 :                         })
     424             : 
     425           0 :                 default:
     426             :                         // Ignore object types we don't know about.
     427             :                 }
     428             :         }
     429             : 
     430           2 :         d.mu.versions.obsoleteTables = mergeTableInfos(d.mu.versions.obsoleteTables, obsoleteTables)
     431           2 :         d.mu.versions.updateObsoleteTableMetricsLocked()
     432           2 :         d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
     433           2 :         d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
     434             : }
     435             : 
     436             : // disableFileDeletions disables file deletions and then waits for any
     437             : // in-progress deletion to finish. The caller is required to call
     438             : // enableFileDeletions in order to enable file deletions again. It is ok for
     439             : // multiple callers to disable file deletions simultaneously, though they must
     440             : // all invoke enableFileDeletions in order for file deletions to be re-enabled
     441             : // (there is an internal reference count on file deletion disablement).
     442             : //
     443             : // d.mu must be held when calling this method.
     444           2 : func (d *DB) disableFileDeletions() {
     445           2 :         d.mu.disableFileDeletions++
     446           2 :         d.mu.Unlock()
     447           2 :         defer d.mu.Lock()
     448           2 :         d.cleanupManager.Wait()
     449           2 : }
     450             : 
     451             : // enableFileDeletions enables previously disabled file deletions. A cleanup job
     452             : // is queued if necessary.
     453             : //
     454             : // d.mu must be held when calling this method.
     455           2 : func (d *DB) enableFileDeletions() {
     456           2 :         if d.mu.disableFileDeletions <= 0 {
     457           1 :                 panic("pebble: file deletion disablement invariant violated")
     458             :         }
     459           2 :         d.mu.disableFileDeletions--
     460           2 :         if d.mu.disableFileDeletions > 0 {
     461           0 :                 return
     462           0 :         }
     463           2 :         d.deleteObsoleteFiles(d.newJobIDLocked())
     464             : }
     465             : 
     466             : type fileInfo = base.FileInfo
     467             : 
     468             : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
     469             : //
     470             : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
     471             : //
     472             : // Does nothing if file deletions are disabled (see disableFileDeletions). A
     473             : // cleanup job will be scheduled when file deletions are re-enabled.
     474           2 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
     475           2 :         if d.mu.disableFileDeletions > 0 {
     476           2 :                 return
     477           2 :         }
     478           2 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
     479           2 : 
     480           2 :         // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
     481           2 :         // log that has not had its contents flushed to an sstable.
     482           2 :         obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
     483           2 :         if err != nil {
     484           0 :                 panic(err)
     485             :         }
     486             : 
     487           2 :         obsoleteTables := append([]tableInfo(nil), d.mu.versions.obsoleteTables...)
     488           2 :         d.mu.versions.obsoleteTables = nil
     489           2 : 
     490           2 :         for _, tbl := range obsoleteTables {
     491           2 :                 delete(d.mu.versions.zombieTables, tbl.FileNum)
     492           2 :         }
     493             : 
     494             :         // Sort the manifests cause we want to delete some contiguous prefix
     495             :         // of the older manifests.
     496           2 :         slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
     497           2 :                 return cmp.Compare(a.FileNum, b.FileNum)
     498           2 :         })
     499             : 
     500           2 :         var obsoleteManifests []fileInfo
     501           2 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
     502           2 :         if manifestsToDelete > 0 {
     503           2 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
     504           2 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
     505           2 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
     506           0 :                         d.mu.versions.obsoleteManifests = nil
     507           0 :                 }
     508             :         }
     509             : 
     510           2 :         obsoleteOptions := d.mu.versions.obsoleteOptions
     511           2 :         d.mu.versions.obsoleteOptions = nil
     512           2 : 
     513           2 :         // Release d.mu while preparing the cleanup job and possibly waiting.
     514           2 :         // Note the unusual order: Unlock and then Lock.
     515           2 :         d.mu.Unlock()
     516           2 :         defer d.mu.Lock()
     517           2 : 
     518           2 :         filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
     519           2 :         for _, f := range obsoleteLogs {
     520           2 :                 filesToDelete = append(filesToDelete, obsoleteFile{fileType: fileTypeLog, logFile: f})
     521           2 :         }
     522             :         // We sort to make the order of deletions deterministic, which is nice for
     523             :         // tests.
     524           2 :         slices.SortFunc(obsoleteTables, func(a, b tableInfo) int {
     525           2 :                 return cmp.Compare(a.FileNum, b.FileNum)
     526           2 :         })
     527           2 :         for _, f := range obsoleteTables {
     528           2 :                 d.tableCache.evict(f.FileNum)
     529           2 :                 filesToDelete = append(filesToDelete, obsoleteFile{
     530           2 :                         fileType: fileTypeTable,
     531           2 :                         nonLogFile: deletableFile{
     532           2 :                                 dir:      d.dirname,
     533           2 :                                 fileNum:  f.FileNum,
     534           2 :                                 fileSize: f.FileSize,
     535           2 :                                 isLocal:  f.isLocal,
     536           2 :                         },
     537           2 :                 })
     538           2 :         }
     539           2 :         files := [2]struct {
     540           2 :                 fileType fileType
     541           2 :                 obsolete []fileInfo
     542           2 :         }{
     543           2 :                 {fileTypeManifest, obsoleteManifests},
     544           2 :                 {fileTypeOptions, obsoleteOptions},
     545           2 :         }
     546           2 :         for _, f := range files {
     547           2 :                 // We sort to make the order of deletions deterministic, which is nice for
     548           2 :                 // tests.
     549           2 :                 slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
     550           2 :                         return cmp.Compare(a.FileNum, b.FileNum)
     551           2 :                 })
     552           2 :                 for _, fi := range f.obsolete {
     553           2 :                         dir := d.dirname
     554           2 :                         filesToDelete = append(filesToDelete, obsoleteFile{
     555           2 :                                 fileType: f.fileType,
     556           2 :                                 nonLogFile: deletableFile{
     557           2 :                                         dir:      dir,
     558           2 :                                         fileNum:  fi.FileNum,
     559           2 :                                         fileSize: fi.FileSize,
     560           2 :                                         isLocal:  true,
     561           2 :                                 },
     562           2 :                         })
     563           2 :                 }
     564             :         }
     565           2 :         if len(filesToDelete) > 0 {
     566           2 :                 d.cleanupManager.EnqueueJob(jobID, filesToDelete)
     567           2 :         }
     568           2 :         if d.opts.private.testingAlwaysWaitForCleanup {
     569           1 :                 d.cleanupManager.Wait()
     570           1 :         }
     571             : }
     572             : 
     573           2 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
     574           2 :         d.mu.Lock()
     575           2 :         defer d.mu.Unlock()
     576           2 :         d.maybeScheduleObsoleteTableDeletionLocked()
     577           2 : }
     578             : 
     579           2 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
     580           2 :         if len(d.mu.versions.obsoleteTables) > 0 {
     581           2 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
     582           2 :         }
     583             : }
     584             : 
     585           2 : func merge(a, b []fileInfo) []fileInfo {
     586           2 :         if len(b) == 0 {
     587           2 :                 return a
     588           2 :         }
     589             : 
     590           2 :         a = append(a, b...)
     591           2 :         slices.SortFunc(a, func(a, b fileInfo) int {
     592           2 :                 return cmp.Compare(a.FileNum, b.FileNum)
     593           2 :         })
     594           2 :         return slices.CompactFunc(a, func(a, b fileInfo) bool {
     595           2 :                 return a.FileNum == b.FileNum
     596           2 :         })
     597             : }
     598             : 
     599           2 : func mergeTableInfos(a, b []tableInfo) []tableInfo {
     600           2 :         if len(b) == 0 {
     601           2 :                 return a
     602           2 :         }
     603             : 
     604           2 :         a = append(a, b...)
     605           2 :         slices.SortFunc(a, func(a, b tableInfo) int {
     606           2 :                 return cmp.Compare(a.FileNum, b.FileNum)
     607           2 :         })
     608           2 :         return slices.CompactFunc(a, func(a, b tableInfo) bool {
     609           2 :                 return a.FileNum == b.FileNum
     610           2 :         })
     611             : }

Generated by: LCOV version 1.14