Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "runtime/pprof"
11 : "slices"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/errors/oserror"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/vfs"
19 : "github.com/cockroachdb/pebble/wal"
20 : "github.com/cockroachdb/tokenbucket"
21 : )
22 :
23 : // Cleaner exports the base.Cleaner type.
24 : type Cleaner = base.Cleaner
25 :
26 : // DeleteCleaner exports the base.DeleteCleaner type.
27 : type DeleteCleaner = base.DeleteCleaner
28 :
29 : // ArchiveCleaner exports the base.ArchiveCleaner type.
30 : type ArchiveCleaner = base.ArchiveCleaner
31 :
32 : type cleanupManager struct {
33 : opts *Options
34 : objProvider objstorage.Provider
35 : onTableDeleteFn func(fileSize uint64, isLocal bool)
36 : deletePacer *deletionPacer
37 :
38 : // jobsCh is used as the cleanup job queue.
39 : jobsCh chan *cleanupJob
40 : // waitGroup is used to wait for the background goroutine to exit.
41 : waitGroup sync.WaitGroup
42 :
43 : mu struct {
44 : sync.Mutex
45 : // totalJobs is the total number of enqueued jobs (completed or in progress).
46 : totalJobs int
47 : completedJobs int
48 : completedJobsCond sync.Cond
49 : jobsQueueWarningIssued bool
50 : }
51 : }
52 :
53 : // We can queue this many jobs before we have to block EnqueueJob.
54 : const jobsQueueDepth = 1000
55 :
56 : // deletableFile is used for non log files.
57 : type deletableFile struct {
58 : dir string
59 : fileNum base.DiskFileNum
60 : fileSize uint64
61 : isLocal bool
62 : }
63 :
64 : // obsoleteFile holds information about a file that needs to be deleted soon.
65 : type obsoleteFile struct {
66 : fileType fileType
67 : // nonLogFile is populated when fileType != fileTypeLog.
68 : nonLogFile deletableFile
69 : // logFile is populated when fileType == fileTypeLog.
70 : logFile wal.DeletableLog
71 : }
72 :
73 : type cleanupJob struct {
74 : jobID JobID
75 : obsoleteFiles []obsoleteFile
76 : }
77 :
78 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
79 : // The cleanupManager must be Close()d.
80 : func openCleanupManager(
81 : opts *Options,
82 : objProvider objstorage.Provider,
83 : onTableDeleteFn func(fileSize uint64, isLocal bool),
84 : getDeletePacerInfo func() deletionPacerInfo,
85 1 : ) *cleanupManager {
86 1 : cm := &cleanupManager{
87 1 : opts: opts,
88 1 : objProvider: objProvider,
89 1 : onTableDeleteFn: onTableDeleteFn,
90 1 : deletePacer: newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
91 1 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
92 1 : }
93 1 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
94 1 : cm.waitGroup.Add(1)
95 1 :
96 1 : go func() {
97 1 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
98 1 : cm.mainLoop()
99 1 : })
100 : }()
101 :
102 1 : return cm
103 : }
104 :
105 : // Close stops the background goroutine, waiting until all queued jobs are completed.
106 : // Delete pacing is disabled for the remaining jobs.
107 1 : func (cm *cleanupManager) Close() {
108 1 : close(cm.jobsCh)
109 1 : cm.waitGroup.Wait()
110 1 : }
111 :
112 : // EnqueueJob adds a cleanup job to the manager's queue.
113 1 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
114 1 : job := &cleanupJob{
115 1 : jobID: jobID,
116 1 : obsoleteFiles: obsoleteFiles,
117 1 : }
118 1 :
119 1 : // Report deleted bytes to the pacer, which can use this data to potentially
120 1 : // increase the deletion rate to keep up. We want to do this at enqueue time
121 1 : // rather than when we get to the job, otherwise the reported bytes will be
122 1 : // subject to the throttling rate which defeats the purpose.
123 1 : var pacingBytes uint64
124 1 : for _, of := range obsoleteFiles {
125 1 : if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
126 1 : pacingBytes += of.nonLogFile.fileSize
127 1 : }
128 : }
129 1 : if pacingBytes > 0 {
130 1 : cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
131 1 : }
132 :
133 1 : cm.mu.Lock()
134 1 : cm.mu.totalJobs++
135 1 : cm.maybeLogLocked()
136 1 : cm.mu.Unlock()
137 1 :
138 1 : cm.jobsCh <- job
139 : }
140 :
141 : // Wait until the completion of all jobs that were already queued.
142 : //
143 : // Does not wait for jobs that are enqueued during the call.
144 : //
145 : // Note that DB.mu should not be held while calling this method; the background
146 : // goroutine needs to acquire DB.mu to update deleted table metrics.
147 1 : func (cm *cleanupManager) Wait() {
148 1 : cm.mu.Lock()
149 1 : defer cm.mu.Unlock()
150 1 : n := cm.mu.totalJobs
151 1 : for cm.mu.completedJobs < n {
152 1 : cm.mu.completedJobsCond.Wait()
153 1 : }
154 : }
155 :
156 : // mainLoop runs the manager's background goroutine.
157 1 : func (cm *cleanupManager) mainLoop() {
158 1 : defer cm.waitGroup.Done()
159 1 :
160 1 : var tb tokenbucket.TokenBucket
161 1 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
162 1 : tb.Init(1.0, 1.0)
163 1 : for job := range cm.jobsCh {
164 1 : for _, of := range job.obsoleteFiles {
165 1 : switch of.fileType {
166 1 : case fileTypeTable:
167 1 : cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
168 1 : cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
169 1 : cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
170 1 : case fileTypeLog:
171 1 : cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
172 1 : base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize)
173 1 : default:
174 1 : path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
175 1 : cm.deleteObsoleteFile(
176 1 : cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
177 : }
178 : }
179 1 : cm.mu.Lock()
180 1 : cm.mu.completedJobs++
181 1 : cm.mu.completedJobsCond.Broadcast()
182 1 : cm.maybeLogLocked()
183 1 : cm.mu.Unlock()
184 : }
185 : }
186 :
187 : // fileNumIfSST is read iff fileType is fileTypeTable.
188 1 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
189 1 : if fileType != fileTypeTable {
190 1 : return false
191 1 : }
192 1 : meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
193 1 : if err != nil {
194 0 : // The object was already removed from the provider; we won't actually
195 0 : // delete anything, so we don't need to pace.
196 0 : return false
197 0 : }
198 : // Don't throttle deletion of remote objects.
199 1 : return !meta.IsRemote()
200 : }
201 :
202 : // maybePace sleeps before deleting an object if appropriate. It is always
203 : // called from the background goroutine.
204 : func (cm *cleanupManager) maybePace(
205 : tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
206 1 : ) {
207 1 : if !cm.needsPacing(fileType, fileNum) {
208 1 : return
209 1 : }
210 :
211 1 : tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
212 1 : if tokens == 0.0 {
213 1 : // The token bucket might be in debt; it could make us wait even for 0
214 1 : // tokens. We don't want that if the pacer decided throttling should be
215 1 : // disabled.
216 1 : return
217 1 : }
218 : // Wait for tokens. We use a token bucket instead of sleeping outright because
219 : // the token bucket accumulates up to one second of unused tokens.
220 1 : for {
221 1 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
222 1 : if ok {
223 1 : break
224 : }
225 0 : time.Sleep(d)
226 : }
227 : }
228 :
229 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
230 : func (cm *cleanupManager) deleteObsoleteFile(
231 : fs vfs.FS, fileType fileType, jobID JobID, path string, fileNum base.DiskFileNum, fileSize uint64,
232 1 : ) {
233 1 : // TODO(peter): need to handle this error, probably by re-adding the
234 1 : // file that couldn't be deleted to one of the obsolete slices map.
235 1 : err := cm.opts.Cleaner.Clean(fs, fileType, path)
236 1 : if oserror.IsNotExist(err) {
237 0 : return
238 0 : }
239 :
240 1 : switch fileType {
241 1 : case fileTypeLog:
242 1 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
243 1 : JobID: int(jobID),
244 1 : Path: path,
245 1 : FileNum: fileNum,
246 1 : Err: err,
247 1 : })
248 1 : case fileTypeManifest:
249 1 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
250 1 : JobID: int(jobID),
251 1 : Path: path,
252 1 : FileNum: fileNum,
253 1 : Err: err,
254 1 : })
255 0 : case fileTypeTable:
256 0 : panic("invalid deletion of object file")
257 : }
258 : }
259 :
260 : func (cm *cleanupManager) deleteObsoleteObject(
261 : fileType fileType, jobID JobID, fileNum base.DiskFileNum,
262 1 : ) {
263 1 : if fileType != fileTypeTable {
264 0 : panic("not an object")
265 : }
266 :
267 1 : var path string
268 1 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
269 1 : if err != nil {
270 0 : path = "<nil>"
271 1 : } else {
272 1 : path = cm.objProvider.Path(meta)
273 1 : err = cm.objProvider.Remove(fileType, fileNum)
274 1 : }
275 1 : if cm.objProvider.IsNotExistError(err) {
276 0 : return
277 0 : }
278 :
279 1 : switch fileType {
280 1 : case fileTypeTable:
281 1 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
282 1 : JobID: int(jobID),
283 1 : Path: path,
284 1 : FileNum: fileNum,
285 1 : Err: err,
286 1 : })
287 : }
288 : }
289 :
290 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
291 : // when the job queue gets back to less than 10% full.
292 : //
293 : // Must be called with cm.mu locked.
294 1 : func (cm *cleanupManager) maybeLogLocked() {
295 1 : const highThreshold = jobsQueueDepth * 3 / 4
296 1 : const lowThreshold = jobsQueueDepth / 10
297 1 :
298 1 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
299 1 :
300 1 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
301 0 : cm.mu.jobsQueueWarningIssued = true
302 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
303 0 : }
304 :
305 1 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
306 0 : cm.mu.jobsQueueWarningIssued = false
307 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
308 0 : }
309 : }
310 :
311 1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
312 1 : var pacerInfo deletionPacerInfo
313 1 : // Call GetDiskUsage after every file deletion. This may seem inefficient,
314 1 : // but in practice this was observed to take constant time, regardless of
315 1 : // volume size used, at least on linux with ext4 and zfs. All invocations
316 1 : // take 10 microseconds or less.
317 1 : pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
318 1 : d.mu.Lock()
319 1 : pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
320 1 : pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
321 1 : d.mu.Unlock()
322 1 : return pacerInfo
323 1 : }
324 :
325 : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
326 1 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
327 1 : d.mu.Lock()
328 1 : d.mu.versions.metrics.Table.ObsoleteCount--
329 1 : d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
330 1 : if isLocal {
331 1 : d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
332 1 : }
333 1 : d.mu.Unlock()
334 : }
335 :
336 : // scanObsoleteFiles scans the filesystem for files that are no longer needed
337 : // and adds those to the internal lists of obsolete files. Note that the files
338 : // are not actually deleted by this method. A subsequent call to
339 : // deleteObsoleteFiles must be performed. Must be not be called concurrently
340 : // with compactions and flushes. db.mu must be held when calling this function.
341 1 : func (d *DB) scanObsoleteFiles(list []string) {
342 1 : // Disable automatic compactions temporarily to avoid concurrent compactions /
343 1 : // flushes from interfering. The original value is restored on completion.
344 1 : disabledPrev := d.opts.DisableAutomaticCompactions
345 1 : defer func() {
346 1 : d.opts.DisableAutomaticCompactions = disabledPrev
347 1 : }()
348 1 : d.opts.DisableAutomaticCompactions = true
349 1 :
350 1 : // Wait for any ongoing compaction to complete before continuing.
351 1 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
352 0 : d.mu.compact.cond.Wait()
353 0 : }
354 :
355 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
356 1 : d.mu.versions.addLiveFileNums(liveFileNums)
357 1 : // Protect against files which are only referred to by the ingestedFlushable
358 1 : // from being deleted. These are added to the flushable queue on WAL replay
359 1 : // during read only mode and aren't part of the Version. Note that if
360 1 : // !d.opts.ReadOnly, then all flushables of type ingestedFlushable have
361 1 : // already been flushed.
362 1 : for _, fEntry := range d.mu.mem.queue {
363 1 : if f, ok := fEntry.flushable.(*ingestedFlushable); ok {
364 0 : for _, file := range f.files {
365 0 : liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
366 0 : }
367 : }
368 : }
369 :
370 1 : manifestFileNum := d.mu.versions.manifestFileNum
371 1 :
372 1 : var obsoleteTables []tableInfo
373 1 : var obsoleteManifests []fileInfo
374 1 : var obsoleteOptions []fileInfo
375 1 :
376 1 : for _, filename := range list {
377 1 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
378 1 : if !ok {
379 1 : continue
380 : }
381 1 : switch fileType {
382 1 : case fileTypeManifest:
383 1 : if diskFileNum >= manifestFileNum {
384 0 : continue
385 : }
386 1 : fi := fileInfo{FileNum: diskFileNum}
387 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
388 0 : fi.FileSize = uint64(stat.Size())
389 0 : }
390 1 : obsoleteManifests = append(obsoleteManifests, fi)
391 1 : case fileTypeOptions:
392 1 : if diskFileNum >= d.optionsFileNum {
393 0 : continue
394 : }
395 1 : fi := fileInfo{FileNum: diskFileNum}
396 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
397 0 : fi.FileSize = uint64(stat.Size())
398 0 : }
399 1 : obsoleteOptions = append(obsoleteOptions, fi)
400 1 : case fileTypeTable:
401 : // Objects are handled through the objstorage provider below.
402 1 : default:
403 : // Don't delete files we don't know about.
404 : }
405 : }
406 :
407 1 : objects := d.objProvider.List()
408 1 : for _, obj := range objects {
409 1 : switch obj.FileType {
410 1 : case fileTypeTable:
411 1 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
412 1 : continue
413 : }
414 1 : fileInfo := fileInfo{
415 1 : FileNum: obj.DiskFileNum,
416 1 : }
417 1 : if size, err := d.objProvider.Size(obj); err == nil {
418 1 : fileInfo.FileSize = uint64(size)
419 1 : }
420 1 : obsoleteTables = append(obsoleteTables, tableInfo{
421 1 : fileInfo: fileInfo,
422 1 : isLocal: !obj.IsRemote(),
423 1 : })
424 :
425 0 : default:
426 : // Ignore object types we don't know about.
427 : }
428 : }
429 :
430 1 : d.mu.versions.obsoleteTables = mergeTableInfos(d.mu.versions.obsoleteTables, obsoleteTables)
431 1 : d.mu.versions.updateObsoleteTableMetricsLocked()
432 1 : d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
433 1 : d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
434 : }
435 :
436 : // disableFileDeletions disables file deletions and then waits for any
437 : // in-progress deletion to finish. The caller is required to call
438 : // enableFileDeletions in order to enable file deletions again. It is ok for
439 : // multiple callers to disable file deletions simultaneously, though they must
440 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
441 : // (there is an internal reference count on file deletion disablement).
442 : //
443 : // d.mu must be held when calling this method.
444 1 : func (d *DB) disableFileDeletions() {
445 1 : d.mu.disableFileDeletions++
446 1 : d.mu.Unlock()
447 1 : defer d.mu.Lock()
448 1 : d.cleanupManager.Wait()
449 1 : }
450 :
451 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
452 : // is queued if necessary.
453 : //
454 : // d.mu must be held when calling this method.
455 1 : func (d *DB) enableFileDeletions() {
456 1 : if d.mu.disableFileDeletions <= 0 {
457 0 : panic("pebble: file deletion disablement invariant violated")
458 : }
459 1 : d.mu.disableFileDeletions--
460 1 : if d.mu.disableFileDeletions > 0 {
461 0 : return
462 0 : }
463 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
464 : }
465 :
466 : type fileInfo = base.FileInfo
467 :
468 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
469 : //
470 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
471 : //
472 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
473 : // cleanup job will be scheduled when file deletions are re-enabled.
474 1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
475 1 : if d.mu.disableFileDeletions > 0 {
476 1 : return
477 1 : }
478 1 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
479 1 :
480 1 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
481 1 : // log that has not had its contents flushed to an sstable.
482 1 : obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
483 1 : if err != nil {
484 0 : panic(err)
485 : }
486 :
487 1 : obsoleteTables := append([]tableInfo(nil), d.mu.versions.obsoleteTables...)
488 1 : d.mu.versions.obsoleteTables = nil
489 1 :
490 1 : for _, tbl := range obsoleteTables {
491 1 : delete(d.mu.versions.zombieTables, tbl.FileNum)
492 1 : }
493 :
494 : // Sort the manifests cause we want to delete some contiguous prefix
495 : // of the older manifests.
496 1 : slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
497 1 : return cmp.Compare(a.FileNum, b.FileNum)
498 1 : })
499 :
500 1 : var obsoleteManifests []fileInfo
501 1 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
502 1 : if manifestsToDelete > 0 {
503 1 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
504 1 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
505 1 : if len(d.mu.versions.obsoleteManifests) == 0 {
506 0 : d.mu.versions.obsoleteManifests = nil
507 0 : }
508 : }
509 :
510 1 : obsoleteOptions := d.mu.versions.obsoleteOptions
511 1 : d.mu.versions.obsoleteOptions = nil
512 1 :
513 1 : // Release d.mu while preparing the cleanup job and possibly waiting.
514 1 : // Note the unusual order: Unlock and then Lock.
515 1 : d.mu.Unlock()
516 1 : defer d.mu.Lock()
517 1 :
518 1 : filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
519 1 : for _, f := range obsoleteLogs {
520 1 : filesToDelete = append(filesToDelete, obsoleteFile{fileType: fileTypeLog, logFile: f})
521 1 : }
522 : // We sort to make the order of deletions deterministic, which is nice for
523 : // tests.
524 1 : slices.SortFunc(obsoleteTables, func(a, b tableInfo) int {
525 1 : return cmp.Compare(a.FileNum, b.FileNum)
526 1 : })
527 1 : for _, f := range obsoleteTables {
528 1 : d.tableCache.evict(f.FileNum)
529 1 : filesToDelete = append(filesToDelete, obsoleteFile{
530 1 : fileType: fileTypeTable,
531 1 : nonLogFile: deletableFile{
532 1 : dir: d.dirname,
533 1 : fileNum: f.FileNum,
534 1 : fileSize: f.FileSize,
535 1 : isLocal: f.isLocal,
536 1 : },
537 1 : })
538 1 : }
539 1 : files := [2]struct {
540 1 : fileType fileType
541 1 : obsolete []fileInfo
542 1 : }{
543 1 : {fileTypeManifest, obsoleteManifests},
544 1 : {fileTypeOptions, obsoleteOptions},
545 1 : }
546 1 : for _, f := range files {
547 1 : // We sort to make the order of deletions deterministic, which is nice for
548 1 : // tests.
549 1 : slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
550 1 : return cmp.Compare(a.FileNum, b.FileNum)
551 1 : })
552 1 : for _, fi := range f.obsolete {
553 1 : dir := d.dirname
554 1 : filesToDelete = append(filesToDelete, obsoleteFile{
555 1 : fileType: f.fileType,
556 1 : nonLogFile: deletableFile{
557 1 : dir: dir,
558 1 : fileNum: fi.FileNum,
559 1 : fileSize: fi.FileSize,
560 1 : isLocal: true,
561 1 : },
562 1 : })
563 1 : }
564 : }
565 1 : if len(filesToDelete) > 0 {
566 1 : d.cleanupManager.EnqueueJob(jobID, filesToDelete)
567 1 : }
568 1 : if d.opts.private.testingAlwaysWaitForCleanup {
569 0 : d.cleanupManager.Wait()
570 0 : }
571 : }
572 :
573 1 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
574 1 : d.mu.Lock()
575 1 : defer d.mu.Unlock()
576 1 : d.maybeScheduleObsoleteTableDeletionLocked()
577 1 : }
578 :
579 1 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
580 1 : if len(d.mu.versions.obsoleteTables) > 0 {
581 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
582 1 : }
583 : }
584 :
585 1 : func merge(a, b []fileInfo) []fileInfo {
586 1 : if len(b) == 0 {
587 1 : return a
588 1 : }
589 :
590 1 : a = append(a, b...)
591 1 : slices.SortFunc(a, func(a, b fileInfo) int {
592 1 : return cmp.Compare(a.FileNum, b.FileNum)
593 1 : })
594 1 : return slices.CompactFunc(a, func(a, b fileInfo) bool {
595 1 : return a.FileNum == b.FileNum
596 1 : })
597 : }
598 :
599 1 : func mergeTableInfos(a, b []tableInfo) []tableInfo {
600 1 : if len(b) == 0 {
601 1 : return a
602 1 : }
603 :
604 1 : a = append(a, b...)
605 1 : slices.SortFunc(a, func(a, b tableInfo) int {
606 1 : return cmp.Compare(a.FileNum, b.FileNum)
607 1 : })
608 1 : return slices.CompactFunc(a, func(a, b tableInfo) bool {
609 1 : return a.FileNum == b.FileNum
610 1 : })
611 : }
|