Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "runtime/pprof"
11 : "slices"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/crlib/crtime"
16 : "github.com/cockroachdb/errors/oserror"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/objstorage"
19 : "github.com/cockroachdb/pebble/vfs"
20 : "github.com/cockroachdb/pebble/wal"
21 : "github.com/cockroachdb/tokenbucket"
22 : )
23 :
24 : // Cleaner exports the base.Cleaner type.
25 : type Cleaner = base.Cleaner
26 :
27 : // DeleteCleaner exports the base.DeleteCleaner type.
28 : type DeleteCleaner = base.DeleteCleaner
29 :
30 : // ArchiveCleaner exports the base.ArchiveCleaner type.
31 : type ArchiveCleaner = base.ArchiveCleaner
32 :
33 : type cleanupManager struct {
34 : opts *Options
35 : objProvider objstorage.Provider
36 : onTableDeleteFn func(fileSize uint64, isLocal bool)
37 : deletePacer *deletionPacer
38 :
39 : // jobsCh is used as the cleanup job queue.
40 : jobsCh chan *cleanupJob
41 : // waitGroup is used to wait for the background goroutine to exit.
42 : waitGroup sync.WaitGroup
43 :
44 : mu struct {
45 : sync.Mutex
46 : // totalJobs is the total number of enqueued jobs (completed or in progress).
47 : totalJobs int
48 : completedJobs int
49 : completedJobsCond sync.Cond
50 : jobsQueueWarningIssued bool
51 : }
52 : }
53 :
54 : // We can queue this many jobs before we have to block EnqueueJob.
55 : const jobsQueueDepth = 1000
56 :
57 : // deletableFile is used for non log files.
58 : type deletableFile struct {
59 : dir string
60 : fileNum base.DiskFileNum
61 : fileSize uint64
62 : isLocal bool
63 : }
64 :
65 : // obsoleteFile holds information about a file that needs to be deleted soon.
66 : type obsoleteFile struct {
67 : fileType base.FileType
68 : // nonLogFile is populated when fileType != fileTypeLog.
69 : nonLogFile deletableFile
70 : // logFile is populated when fileType == fileTypeLog.
71 : logFile wal.DeletableLog
72 : }
73 :
74 : type cleanupJob struct {
75 : jobID JobID
76 : obsoleteFiles []obsoleteFile
77 : }
78 :
79 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
80 : // The cleanupManager must be Close()d.
81 : func openCleanupManager(
82 : opts *Options,
83 : objProvider objstorage.Provider,
84 : onTableDeleteFn func(fileSize uint64, isLocal bool),
85 : getDeletePacerInfo func() deletionPacerInfo,
86 2 : ) *cleanupManager {
87 2 : cm := &cleanupManager{
88 2 : opts: opts,
89 2 : objProvider: objProvider,
90 2 : onTableDeleteFn: onTableDeleteFn,
91 2 : deletePacer: newDeletionPacer(crtime.NowMono(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
92 2 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
93 2 : }
94 2 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
95 2 : cm.waitGroup.Add(1)
96 2 :
97 2 : go func() {
98 2 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
99 2 : cm.mainLoop()
100 2 : })
101 : }()
102 :
103 2 : return cm
104 : }
105 :
106 : // Close stops the background goroutine, waiting until all queued jobs are completed.
107 : // Delete pacing is disabled for the remaining jobs.
108 2 : func (cm *cleanupManager) Close() {
109 2 : close(cm.jobsCh)
110 2 : cm.waitGroup.Wait()
111 2 : }
112 :
113 : // EnqueueJob adds a cleanup job to the manager's queue.
114 2 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
115 2 : job := &cleanupJob{
116 2 : jobID: jobID,
117 2 : obsoleteFiles: obsoleteFiles,
118 2 : }
119 2 :
120 2 : // Report deleted bytes to the pacer, which can use this data to potentially
121 2 : // increase the deletion rate to keep up. We want to do this at enqueue time
122 2 : // rather than when we get to the job, otherwise the reported bytes will be
123 2 : // subject to the throttling rate which defeats the purpose.
124 2 : var pacingBytes uint64
125 2 : for _, of := range obsoleteFiles {
126 2 : if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
127 2 : pacingBytes += of.nonLogFile.fileSize
128 2 : }
129 : }
130 2 : if pacingBytes > 0 {
131 2 : cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
132 2 : }
133 :
134 2 : cm.mu.Lock()
135 2 : cm.mu.totalJobs++
136 2 : cm.maybeLogLocked()
137 2 : cm.mu.Unlock()
138 2 :
139 2 : cm.jobsCh <- job
140 : }
141 :
142 : // Wait until the completion of all jobs that were already queued.
143 : //
144 : // Does not wait for jobs that are enqueued during the call.
145 : //
146 : // Note that DB.mu should not be held while calling this method; the background
147 : // goroutine needs to acquire DB.mu to update deleted table metrics.
148 2 : func (cm *cleanupManager) Wait() {
149 2 : cm.mu.Lock()
150 2 : defer cm.mu.Unlock()
151 2 : n := cm.mu.totalJobs
152 2 : for cm.mu.completedJobs < n {
153 2 : cm.mu.completedJobsCond.Wait()
154 2 : }
155 : }
156 :
157 : // mainLoop runs the manager's background goroutine.
158 2 : func (cm *cleanupManager) mainLoop() {
159 2 : defer cm.waitGroup.Done()
160 2 :
161 2 : var tb tokenbucket.TokenBucket
162 2 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
163 2 : tb.Init(1.0, 1.0)
164 2 : for job := range cm.jobsCh {
165 2 : for _, of := range job.obsoleteFiles {
166 2 : switch of.fileType {
167 2 : case base.FileTypeTable:
168 2 : cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
169 2 : cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
170 2 : cm.deleteObsoleteObject(of.fileType, job.jobID, of.nonLogFile.fileNum)
171 1 : case base.FileTypeBlob:
172 1 : cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
173 1 : cm.deleteObsoleteObject(of.fileType, job.jobID, of.nonLogFile.fileNum)
174 2 : case base.FileTypeLog:
175 2 : cm.deleteObsoleteFile(of.logFile.FS, base.FileTypeLog, job.jobID, of.logFile.Path,
176 2 : base.DiskFileNum(of.logFile.NumWAL))
177 2 : default:
178 2 : path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
179 2 : cm.deleteObsoleteFile(
180 2 : cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum)
181 : }
182 : }
183 2 : cm.mu.Lock()
184 2 : cm.mu.completedJobs++
185 2 : cm.mu.completedJobsCond.Broadcast()
186 2 : cm.maybeLogLocked()
187 2 : cm.mu.Unlock()
188 : }
189 : }
190 :
191 : // fileNumIfSST is read iff fileType is fileTypeTable.
192 2 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
193 2 : if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
194 2 : return false
195 2 : }
196 2 : meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
197 2 : if err != nil {
198 2 : // The object was already removed from the provider; we won't actually
199 2 : // delete anything, so we don't need to pace.
200 2 : return false
201 2 : }
202 : // Don't throttle deletion of remote objects.
203 2 : return !meta.IsRemote()
204 : }
205 :
206 : // maybePace sleeps before deleting an object if appropriate. It is always
207 : // called from the background goroutine.
208 : func (cm *cleanupManager) maybePace(
209 : tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
210 2 : ) {
211 2 : if !cm.needsPacing(fileType, fileNum) {
212 2 : return
213 2 : }
214 :
215 2 : tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), fileSize)
216 2 : if tokens == 0.0 {
217 2 : // The token bucket might be in debt; it could make us wait even for 0
218 2 : // tokens. We don't want that if the pacer decided throttling should be
219 2 : // disabled.
220 2 : return
221 2 : }
222 : // Wait for tokens. We use a token bucket instead of sleeping outright because
223 : // the token bucket accumulates up to one second of unused tokens.
224 2 : for {
225 2 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
226 2 : if ok {
227 2 : break
228 : }
229 1 : time.Sleep(d)
230 : }
231 : }
232 :
233 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
234 : func (cm *cleanupManager) deleteObsoleteFile(
235 : fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
236 2 : ) {
237 2 : // TODO(peter): need to handle this error, probably by re-adding the
238 2 : // file that couldn't be deleted to one of the obsolete slices map.
239 2 : err := cm.opts.Cleaner.Clean(fs, fileType, path)
240 2 : if oserror.IsNotExist(err) {
241 2 : return
242 2 : }
243 :
244 2 : switch fileType {
245 2 : case base.FileTypeLog:
246 2 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
247 2 : JobID: int(jobID),
248 2 : Path: path,
249 2 : FileNum: fileNum,
250 2 : Err: err,
251 2 : })
252 2 : case base.FileTypeManifest:
253 2 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
254 2 : JobID: int(jobID),
255 2 : Path: path,
256 2 : FileNum: fileNum,
257 2 : Err: err,
258 2 : })
259 0 : case base.FileTypeTable, base.FileTypeBlob:
260 0 : panic("invalid deletion of object file")
261 : }
262 : }
263 :
264 : func (cm *cleanupManager) deleteObsoleteObject(
265 : fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
266 2 : ) {
267 2 : if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
268 0 : panic("not an object")
269 : }
270 :
271 2 : var path string
272 2 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
273 2 : if err != nil {
274 2 : path = "<nil>"
275 2 : } else {
276 2 : path = cm.objProvider.Path(meta)
277 2 : err = cm.objProvider.Remove(fileType, fileNum)
278 2 : }
279 2 : if cm.objProvider.IsNotExistError(err) {
280 2 : return
281 2 : }
282 :
283 2 : switch fileType {
284 2 : case base.FileTypeTable:
285 2 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
286 2 : JobID: int(jobID),
287 2 : Path: path,
288 2 : FileNum: fileNum,
289 2 : Err: err,
290 2 : })
291 : // TODO(jackson): Add BlobFileDeleted event.
292 : }
293 : }
294 :
295 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
296 : // when the job queue gets back to less than 10% full.
297 : //
298 : // Must be called with cm.mu locked.
299 2 : func (cm *cleanupManager) maybeLogLocked() {
300 2 : const highThreshold = jobsQueueDepth * 3 / 4
301 2 : const lowThreshold = jobsQueueDepth / 10
302 2 :
303 2 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
304 2 :
305 2 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
306 0 : cm.mu.jobsQueueWarningIssued = true
307 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
308 0 : }
309 :
310 2 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
311 0 : cm.mu.jobsQueueWarningIssued = false
312 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
313 0 : }
314 : }
315 :
316 2 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
317 2 : var pacerInfo deletionPacerInfo
318 2 : // Call GetDiskUsage after every file deletion. This may seem inefficient,
319 2 : // but in practice this was observed to take constant time, regardless of
320 2 : // volume size used, at least on linux with ext4 and zfs. All invocations
321 2 : // take 10 microseconds or less.
322 2 : pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
323 2 : d.mu.Lock()
324 2 : pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
325 2 : pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
326 2 : d.mu.Unlock()
327 2 : return pacerInfo
328 2 : }
329 :
330 : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
331 2 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
332 2 : d.mu.Lock()
333 2 : d.mu.versions.metrics.Table.ObsoleteCount--
334 2 : d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
335 2 : if isLocal {
336 2 : d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
337 2 : }
338 2 : d.mu.Unlock()
339 : }
340 :
341 : // scanObsoleteFiles scans the filesystem for files that are no longer needed
342 : // and adds those to the internal lists of obsolete files. Note that the files
343 : // are not actually deleted by this method. A subsequent call to
344 : // deleteObsoleteFiles must be performed. Must be not be called concurrently
345 : // with compactions and flushes. db.mu must be held when calling this function.
346 2 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
347 2 : // Disable automatic compactions temporarily to avoid concurrent compactions /
348 2 : // flushes from interfering. The original value is restored on completion.
349 2 : disabledPrev := d.opts.DisableAutomaticCompactions
350 2 : defer func() {
351 2 : d.opts.DisableAutomaticCompactions = disabledPrev
352 2 : }()
353 2 : d.opts.DisableAutomaticCompactions = true
354 2 :
355 2 : // Wait for any ongoing compaction to complete before continuing.
356 2 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
357 2 : d.mu.compact.cond.Wait()
358 2 : }
359 :
360 2 : liveFileNums := make(map[base.DiskFileNum]struct{})
361 2 : d.mu.versions.addLiveFileNums(liveFileNums)
362 2 : // Protect against files which are only referred to by the ingestedFlushable
363 2 : // from being deleted. These are added to the flushable queue on WAL replay
364 2 : // and handle their own obsoletion/deletion. We exclude them from this obsolete
365 2 : // file scan to avoid double-deleting these files.
366 2 : for _, f := range flushableIngests {
367 2 : for _, file := range f.files {
368 2 : liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
369 2 : }
370 : }
371 :
372 2 : manifestFileNum := d.mu.versions.manifestFileNum
373 2 :
374 2 : var obsoleteTables []objectInfo
375 2 : var obsoleteBlobs []objectInfo
376 2 : var obsoleteManifests []fileInfo
377 2 : var obsoleteOptions []fileInfo
378 2 :
379 2 : for _, filename := range list {
380 2 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
381 2 : if !ok {
382 2 : continue
383 : }
384 2 : switch fileType {
385 2 : case base.FileTypeManifest:
386 2 : if diskFileNum >= manifestFileNum {
387 2 : continue
388 : }
389 2 : fi := fileInfo{FileNum: diskFileNum}
390 2 : if stat, err := d.opts.FS.Stat(filename); err == nil {
391 1 : fi.FileSize = uint64(stat.Size())
392 1 : }
393 2 : obsoleteManifests = append(obsoleteManifests, fi)
394 2 : case base.FileTypeOptions:
395 2 : if diskFileNum >= d.optionsFileNum {
396 2 : continue
397 : }
398 2 : fi := fileInfo{FileNum: diskFileNum}
399 2 : if stat, err := d.opts.FS.Stat(filename); err == nil {
400 1 : fi.FileSize = uint64(stat.Size())
401 1 : }
402 2 : obsoleteOptions = append(obsoleteOptions, fi)
403 2 : case base.FileTypeTable, base.FileTypeBlob:
404 : // Objects are handled through the objstorage provider below.
405 2 : default:
406 : // Don't delete files we don't know about.
407 : }
408 : }
409 :
410 2 : objects := d.objProvider.List()
411 2 : for _, obj := range objects {
412 2 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
413 2 : continue
414 : }
415 2 : makeObjectInfo := func() objectInfo {
416 2 : fileInfo := fileInfo{FileNum: obj.DiskFileNum}
417 2 : if size, err := d.objProvider.Size(obj); err == nil {
418 2 : fileInfo.FileSize = uint64(size)
419 2 : }
420 2 : return objectInfo{
421 2 : fileInfo: fileInfo,
422 2 : isLocal: !obj.IsRemote(),
423 2 : }
424 : }
425 :
426 2 : switch obj.FileType {
427 2 : case base.FileTypeTable:
428 2 : obsoleteTables = append(obsoleteTables, makeObjectInfo())
429 1 : case base.FileTypeBlob:
430 1 : obsoleteBlobs = append(obsoleteBlobs, makeObjectInfo())
431 0 : default:
432 : // Ignore object types we don't know about.
433 : }
434 : }
435 :
436 2 : d.mu.versions.obsoleteTables = mergeObjectInfos(d.mu.versions.obsoleteTables, obsoleteTables)
437 2 : d.mu.versions.obsoleteBlobs = mergeObjectInfos(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
438 2 : d.mu.versions.updateObsoleteTableMetricsLocked()
439 2 : d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
440 2 : d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
441 : }
442 :
443 : // disableFileDeletions disables file deletions and then waits for any
444 : // in-progress deletion to finish. The caller is required to call
445 : // enableFileDeletions in order to enable file deletions again. It is ok for
446 : // multiple callers to disable file deletions simultaneously, though they must
447 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
448 : // (there is an internal reference count on file deletion disablement).
449 : //
450 : // d.mu must be held when calling this method.
451 2 : func (d *DB) disableFileDeletions() {
452 2 : d.mu.disableFileDeletions++
453 2 : d.mu.Unlock()
454 2 : defer d.mu.Lock()
455 2 : d.cleanupManager.Wait()
456 2 : }
457 :
458 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
459 : // is queued if necessary.
460 : //
461 : // d.mu must be held when calling this method.
462 2 : func (d *DB) enableFileDeletions() {
463 2 : if d.mu.disableFileDeletions <= 0 {
464 1 : panic("pebble: file deletion disablement invariant violated")
465 : }
466 2 : d.mu.disableFileDeletions--
467 2 : if d.mu.disableFileDeletions > 0 {
468 0 : return
469 0 : }
470 2 : d.deleteObsoleteFiles(d.newJobIDLocked())
471 : }
472 :
473 : type fileInfo = base.FileInfo
474 :
475 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
476 : //
477 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
478 : //
479 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
480 : // cleanup job will be scheduled when file deletions are re-enabled.
481 2 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
482 2 : if d.mu.disableFileDeletions > 0 {
483 2 : return
484 2 : }
485 2 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
486 2 :
487 2 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
488 2 : // log that has not had its contents flushed to an sstable.
489 2 : obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
490 2 : if err != nil {
491 0 : panic(err)
492 : }
493 :
494 2 : obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
495 2 : d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
496 2 : obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
497 2 : d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
498 2 :
499 2 : for _, tbl := range obsoleteTables {
500 2 : delete(d.mu.versions.zombieTables, tbl.FileNum)
501 2 : }
502 :
503 : // Sort the manifests cause we want to delete some contiguous prefix
504 : // of the older manifests.
505 2 : slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
506 2 : return cmp.Compare(a.FileNum, b.FileNum)
507 2 : })
508 :
509 2 : var obsoleteManifests []fileInfo
510 2 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
511 2 : if manifestsToDelete > 0 {
512 2 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
513 2 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
514 2 : if len(d.mu.versions.obsoleteManifests) == 0 {
515 0 : d.mu.versions.obsoleteManifests = nil
516 0 : }
517 : }
518 :
519 2 : obsoleteOptions := d.mu.versions.obsoleteOptions
520 2 : d.mu.versions.obsoleteOptions = nil
521 2 :
522 2 : // Release d.mu while preparing the cleanup job and possibly waiting.
523 2 : // Note the unusual order: Unlock and then Lock.
524 2 : d.mu.Unlock()
525 2 : defer d.mu.Lock()
526 2 :
527 2 : filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
528 2 : for _, f := range obsoleteLogs {
529 2 : filesToDelete = append(filesToDelete, obsoleteFile{fileType: base.FileTypeLog, logFile: f})
530 2 : }
531 : // We sort to make the order of deletions deterministic, which is nice for
532 : // tests.
533 2 : slices.SortFunc(obsoleteTables, func(a, b objectInfo) int {
534 2 : return cmp.Compare(a.FileNum, b.FileNum)
535 2 : })
536 2 : slices.SortFunc(obsoleteBlobs, func(a, b objectInfo) int {
537 1 : return cmp.Compare(a.FileNum, b.FileNum)
538 1 : })
539 2 : for _, f := range obsoleteTables {
540 2 : d.fileCache.evict(f.FileNum)
541 2 : filesToDelete = append(filesToDelete, obsoleteFile{
542 2 : fileType: base.FileTypeTable,
543 2 : nonLogFile: deletableFile{
544 2 : dir: d.dirname,
545 2 : fileNum: f.FileNum,
546 2 : fileSize: f.FileSize,
547 2 : isLocal: f.isLocal,
548 2 : },
549 2 : })
550 2 : }
551 2 : for _, f := range obsoleteBlobs {
552 1 : d.fileCache.evict(f.FileNum)
553 1 : filesToDelete = append(filesToDelete, obsoleteFile{
554 1 : fileType: base.FileTypeBlob,
555 1 : nonLogFile: deletableFile{
556 1 : dir: d.dirname,
557 1 : fileNum: f.FileNum,
558 1 : fileSize: f.FileSize,
559 1 : isLocal: f.isLocal,
560 1 : },
561 1 : })
562 1 : }
563 :
564 2 : files := [2]struct {
565 2 : fileType base.FileType
566 2 : obsolete []fileInfo
567 2 : }{
568 2 : {base.FileTypeManifest, obsoleteManifests},
569 2 : {base.FileTypeOptions, obsoleteOptions},
570 2 : }
571 2 : for _, f := range files {
572 2 : // We sort to make the order of deletions deterministic, which is nice for
573 2 : // tests.
574 2 : slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
575 2 : return cmp.Compare(a.FileNum, b.FileNum)
576 2 : })
577 2 : for _, fi := range f.obsolete {
578 2 : dir := d.dirname
579 2 : filesToDelete = append(filesToDelete, obsoleteFile{
580 2 : fileType: f.fileType,
581 2 : nonLogFile: deletableFile{
582 2 : dir: dir,
583 2 : fileNum: fi.FileNum,
584 2 : fileSize: fi.FileSize,
585 2 : isLocal: true,
586 2 : },
587 2 : })
588 2 : }
589 : }
590 2 : if len(filesToDelete) > 0 {
591 2 : d.cleanupManager.EnqueueJob(jobID, filesToDelete)
592 2 : }
593 2 : if d.opts.private.testingAlwaysWaitForCleanup {
594 1 : d.cleanupManager.Wait()
595 1 : }
596 : }
597 :
598 2 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
599 2 : d.mu.Lock()
600 2 : defer d.mu.Unlock()
601 2 : d.maybeScheduleObsoleteTableDeletionLocked()
602 2 : }
603 :
604 2 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
605 2 : if len(d.mu.versions.obsoleteTables) > 0 {
606 2 : d.deleteObsoleteFiles(d.newJobIDLocked())
607 2 : }
608 : }
609 :
610 2 : func merge(a, b []fileInfo) []fileInfo {
611 2 : if len(b) == 0 {
612 2 : return a
613 2 : }
614 :
615 2 : a = append(a, b...)
616 2 : slices.SortFunc(a, func(a, b fileInfo) int {
617 2 : return cmp.Compare(a.FileNum, b.FileNum)
618 2 : })
619 2 : return slices.CompactFunc(a, func(a, b fileInfo) bool {
620 2 : return a.FileNum == b.FileNum
621 2 : })
622 : }
623 :
624 2 : func mergeObjectInfos(a, b []objectInfo) []objectInfo {
625 2 : if len(b) == 0 {
626 2 : return a
627 2 : }
628 :
629 2 : a = append(a, b...)
630 2 : slices.SortFunc(a, func(a, b objectInfo) int {
631 2 : return cmp.Compare(a.FileNum, b.FileNum)
632 2 : })
633 2 : return slices.CompactFunc(a, func(a, b objectInfo) bool {
634 2 : return a.FileNum == b.FileNum
635 2 : })
636 : }
|