Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "runtime/pprof"
11 : "slices"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/crlib/crtime"
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/errors/oserror"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/vfs"
22 : "github.com/cockroachdb/pebble/wal"
23 : "github.com/cockroachdb/tokenbucket"
24 : )
25 :
26 : // Cleaner exports the base.Cleaner type.
27 : type Cleaner = base.Cleaner
28 :
29 : // DeleteCleaner exports the base.DeleteCleaner type.
30 : type DeleteCleaner = base.DeleteCleaner
31 :
32 : // ArchiveCleaner exports the base.ArchiveCleaner type.
33 : type ArchiveCleaner = base.ArchiveCleaner
34 :
35 : type cleanupManager struct {
36 : opts *Options
37 : objProvider objstorage.Provider
38 : deletePacer *deletionPacer
39 :
40 : // jobsCh is used as the cleanup job queue.
41 : jobsCh chan *cleanupJob
42 : // waitGroup is used to wait for the background goroutine to exit.
43 : waitGroup sync.WaitGroup
44 :
45 : mu struct {
46 : sync.Mutex
47 : // totalJobs is the total number of enqueued jobs (completed or in progress).
48 : totalJobs int
49 : completedStats obsoleteObjectStats
50 : completedJobs int
51 : completedJobsCond sync.Cond
52 : jobsQueueWarningIssued bool
53 : }
54 : }
55 :
56 : // CompletedStats returns the stats summarizing objects deleted. The returned
57 : // stats increase monotonically over the lifetime of the DB.
58 2 : func (m *cleanupManager) CompletedStats() obsoleteObjectStats {
59 2 : m.mu.Lock()
60 2 : defer m.mu.Unlock()
61 2 : return m.mu.completedStats
62 2 : }
63 :
64 : // We can queue this many jobs before we have to block EnqueueJob.
65 : const jobsQueueDepth = 1000
66 :
67 : // obsoleteFile holds information about a file that needs to be deleted soon.
68 : type obsoleteFile struct {
69 : fileType base.FileType
70 : fs vfs.FS
71 : path string
72 : fileNum base.DiskFileNum
73 : fileSize uint64 // approx for log files
74 : isLocal bool
75 : }
76 :
77 2 : func (of *obsoleteFile) needsPacing() bool {
78 2 : // We only need to pace local objects--sstables and blob files.
79 2 : return of.isLocal && (of.fileType == base.FileTypeTable || of.fileType == base.FileTypeBlob)
80 2 : }
81 :
82 : type cleanupJob struct {
83 : jobID JobID
84 : obsoleteFiles []obsoleteFile
85 : stats obsoleteObjectStats
86 : }
87 :
88 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
89 : // The cleanupManager must be Close()d.
90 : func openCleanupManager(
91 : opts *Options, objProvider objstorage.Provider, getDeletePacerInfo func() deletionPacerInfo,
92 2 : ) *cleanupManager {
93 2 : cm := &cleanupManager{
94 2 : opts: opts,
95 2 : objProvider: objProvider,
96 2 : deletePacer: newDeletionPacer(
97 2 : crtime.NowMono(),
98 2 : opts.FreeSpaceThresholdBytes,
99 2 : int64(opts.TargetByteDeletionRate),
100 2 : opts.FreeSpaceTimeframe,
101 2 : opts.ObsoleteBytesMaxRatio,
102 2 : opts.ObsoleteBytesTimeframe,
103 2 : getDeletePacerInfo,
104 2 : ),
105 2 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
106 2 : }
107 2 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
108 2 : cm.waitGroup.Add(1)
109 2 :
110 2 : go func() {
111 2 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
112 2 : cm.mainLoop()
113 2 : })
114 : }()
115 :
116 2 : return cm
117 : }
118 :
119 : // Close stops the background goroutine, waiting until all queued jobs are completed.
120 : // Delete pacing is disabled for the remaining jobs.
121 2 : func (cm *cleanupManager) Close() {
122 2 : close(cm.jobsCh)
123 2 : cm.waitGroup.Wait()
124 2 : }
125 :
126 : // EnqueueJob adds a cleanup job to the manager's queue.
127 : func (cm *cleanupManager) EnqueueJob(
128 : jobID JobID, obsoleteFiles []obsoleteFile, stats obsoleteObjectStats,
129 2 : ) {
130 2 : job := &cleanupJob{
131 2 : jobID: jobID,
132 2 : obsoleteFiles: obsoleteFiles,
133 2 : stats: stats,
134 2 : }
135 2 :
136 2 : // Report deleted bytes to the pacer, which can use this data to potentially
137 2 : // increase the deletion rate to keep up. We want to do this at enqueue time
138 2 : // rather than when we get to the job, otherwise the reported bytes will be
139 2 : // subject to the throttling rate which defeats the purpose.
140 2 : var pacingBytes uint64
141 2 : for _, of := range obsoleteFiles {
142 2 : if of.needsPacing() {
143 2 : pacingBytes += of.fileSize
144 2 : }
145 : }
146 2 : if pacingBytes > 0 {
147 2 : cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
148 2 : }
149 :
150 2 : cm.mu.Lock()
151 2 : cm.mu.totalJobs++
152 2 : cm.maybeLogLocked()
153 2 : cm.mu.Unlock()
154 2 :
155 2 : cm.jobsCh <- job
156 : }
157 :
158 : // Wait until the completion of all jobs that were already queued.
159 : //
160 : // Does not wait for jobs that are enqueued during the call.
161 : //
162 : // Note that DB.mu should not be held while calling this method; the background
163 : // goroutine needs to acquire DB.mu to update deleted table metrics.
164 2 : func (cm *cleanupManager) Wait() {
165 2 : cm.mu.Lock()
166 2 : defer cm.mu.Unlock()
167 2 : n := cm.mu.totalJobs
168 2 : for cm.mu.completedJobs < n {
169 2 : cm.mu.completedJobsCond.Wait()
170 2 : }
171 : }
172 :
173 : // mainLoop runs the manager's background goroutine.
174 2 : func (cm *cleanupManager) mainLoop() {
175 2 : defer cm.waitGroup.Done()
176 2 :
177 2 : var tb tokenbucket.TokenBucket
178 2 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
179 2 : tb.Init(1.0, 1.0)
180 2 : for job := range cm.jobsCh {
181 2 : for _, of := range job.obsoleteFiles {
182 2 : switch of.fileType {
183 2 : case base.FileTypeTable:
184 2 : cm.maybePace(&tb, &of)
185 2 : cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
186 2 : case base.FileTypeBlob:
187 2 : cm.maybePace(&tb, &of)
188 2 : cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
189 2 : default:
190 2 : cm.deleteObsoleteFile(of.fs, of.fileType, job.jobID, of.path, of.fileNum)
191 : }
192 : }
193 2 : cm.mu.Lock()
194 2 : cm.mu.completedJobs++
195 2 : cm.mu.completedStats.Add(job.stats)
196 2 : cm.mu.completedJobsCond.Broadcast()
197 2 : cm.maybeLogLocked()
198 2 : cm.mu.Unlock()
199 : }
200 : }
201 :
202 : // maybePace sleeps before deleting an object if appropriate. It is always
203 : // called from the background goroutine.
204 2 : func (cm *cleanupManager) maybePace(tb *tokenbucket.TokenBucket, of *obsoleteFile) {
205 2 : if !of.needsPacing() {
206 2 : return
207 2 : }
208 :
209 2 : tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), of.fileSize)
210 2 : if tokens == 0.0 {
211 2 : // The token bucket might be in debt; it could make us wait even for 0
212 2 : // tokens. We don't want that if the pacer decided throttling should be
213 2 : // disabled.
214 2 : return
215 2 : }
216 : // Wait for tokens. We use a token bucket instead of sleeping outright because
217 : // the token bucket accumulates up to one second of unused tokens.
218 2 : for {
219 2 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
220 2 : if ok {
221 2 : break
222 : }
223 1 : time.Sleep(d)
224 : }
225 : }
226 :
227 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
228 : func (cm *cleanupManager) deleteObsoleteFile(
229 : fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
230 2 : ) {
231 2 : // TODO(peter): need to handle this error, probably by re-adding the
232 2 : // file that couldn't be deleted to one of the obsolete slices map.
233 2 : err := cm.opts.Cleaner.Clean(fs, fileType, path)
234 2 : if oserror.IsNotExist(err) {
235 2 : return
236 2 : }
237 :
238 2 : switch fileType {
239 2 : case base.FileTypeLog:
240 2 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
241 2 : JobID: int(jobID),
242 2 : Path: path,
243 2 : FileNum: fileNum,
244 2 : Err: err,
245 2 : })
246 2 : case base.FileTypeManifest:
247 2 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
248 2 : JobID: int(jobID),
249 2 : Path: path,
250 2 : FileNum: fileNum,
251 2 : Err: err,
252 2 : })
253 0 : case base.FileTypeTable, base.FileTypeBlob:
254 0 : panic("invalid deletion of object file")
255 : }
256 : }
257 :
258 : func (cm *cleanupManager) deleteObsoleteObject(
259 : fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
260 2 : ) {
261 2 : if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
262 0 : panic("not an object")
263 : }
264 :
265 2 : var path string
266 2 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
267 2 : if err != nil {
268 2 : path = "<nil>"
269 2 : } else {
270 2 : path = cm.objProvider.Path(meta)
271 2 : err = cm.objProvider.Remove(fileType, fileNum)
272 2 : }
273 2 : if cm.objProvider.IsNotExistError(err) {
274 2 : return
275 2 : }
276 :
277 2 : switch fileType {
278 2 : case base.FileTypeTable:
279 2 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
280 2 : JobID: int(jobID),
281 2 : Path: path,
282 2 : FileNum: fileNum,
283 2 : Err: err,
284 2 : })
285 2 : case base.FileTypeBlob:
286 2 : cm.opts.EventListener.BlobFileDeleted(BlobFileDeleteInfo{
287 2 : JobID: int(jobID),
288 2 : Path: path,
289 2 : FileNum: fileNum,
290 2 : Err: err,
291 2 : })
292 : }
293 : }
294 :
295 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
296 : // when the job queue gets back to less than 10% full.
297 : //
298 : // Must be called with cm.mu locked.
299 2 : func (cm *cleanupManager) maybeLogLocked() {
300 2 : const highThreshold = jobsQueueDepth * 3 / 4
301 2 : const lowThreshold = jobsQueueDepth / 10
302 2 :
303 2 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
304 2 :
305 2 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
306 0 : cm.mu.jobsQueueWarningIssued = true
307 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
308 0 : }
309 :
310 2 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
311 0 : cm.mu.jobsQueueWarningIssued = false
312 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
313 0 : }
314 : }
315 :
316 2 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
317 2 : var pacerInfo deletionPacerInfo
318 2 : // Call GetDiskUsage after every file deletion. This may seem inefficient,
319 2 : // but in practice this was observed to take constant time, regardless of
320 2 : // volume size used, at least on linux with ext4 and zfs. All invocations
321 2 : // take 10 microseconds or less.
322 2 : pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
323 2 : d.mu.Lock()
324 2 : pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
325 2 : total := d.mu.versions.metrics.Total()
326 2 : d.mu.Unlock()
327 2 : pacerInfo.liveBytes = uint64(total.AggregateSize())
328 2 : return pacerInfo
329 2 : }
330 :
331 : // scanObsoleteFiles scans the filesystem for files that are no longer needed
332 : // and adds those to the internal lists of obsolete files. Note that the files
333 : // are not actually deleted by this method. A subsequent call to
334 : // deleteObsoleteFiles must be performed. Must be not be called concurrently
335 : // with compactions and flushes. db.mu must be held when calling this function.
336 2 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
337 2 : // Disable automatic compactions temporarily to avoid concurrent compactions /
338 2 : // flushes from interfering. The original value is restored on completion.
339 2 : disabledPrev := d.opts.DisableAutomaticCompactions
340 2 : defer func() {
341 2 : d.opts.DisableAutomaticCompactions = disabledPrev
342 2 : }()
343 2 : d.opts.DisableAutomaticCompactions = true
344 2 :
345 2 : // Wait for any ongoing compaction to complete before continuing.
346 2 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
347 2 : d.mu.compact.cond.Wait()
348 2 : }
349 :
350 2 : liveFileNums := make(map[base.DiskFileNum]struct{})
351 2 : d.mu.versions.addLiveFileNums(liveFileNums)
352 2 : // Protect against files which are only referred to by the ingestedFlushable
353 2 : // from being deleted. These are added to the flushable queue on WAL replay
354 2 : // and handle their own obsoletion/deletion. We exclude them from this obsolete
355 2 : // file scan to avoid double-deleting these files.
356 2 : for _, f := range flushableIngests {
357 2 : for _, file := range f.files {
358 2 : liveFileNums[file.TableBacking.DiskFileNum] = struct{}{}
359 2 : }
360 : }
361 :
362 2 : manifestFileNum := d.mu.versions.manifestFileNum
363 2 :
364 2 : var obsoleteTables []obsoleteFile
365 2 : var obsoleteBlobs []obsoleteFile
366 2 : var obsoleteOptions []obsoleteFile
367 2 : var obsoleteManifests []obsoleteFile
368 2 :
369 2 : for _, filename := range list {
370 2 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
371 2 : if !ok {
372 2 : continue
373 : }
374 2 : makeObsoleteFile := func() obsoleteFile {
375 2 : of := obsoleteFile{
376 2 : fileType: fileType,
377 2 : fs: d.opts.FS,
378 2 : path: d.opts.FS.PathJoin(d.dirname, filename),
379 2 : fileNum: diskFileNum,
380 2 : isLocal: true,
381 2 : }
382 2 : if stat, err := d.opts.FS.Stat(filename); err == nil {
383 1 : of.fileSize = uint64(stat.Size())
384 1 : }
385 2 : return of
386 : }
387 2 : switch fileType {
388 2 : case base.FileTypeManifest:
389 2 : if diskFileNum >= manifestFileNum {
390 2 : continue
391 : }
392 2 : obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
393 2 : case base.FileTypeOptions:
394 2 : if diskFileNum >= d.optionsFileNum {
395 2 : continue
396 : }
397 2 : obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
398 2 : case base.FileTypeTable, base.FileTypeBlob:
399 : // Objects are handled through the objstorage provider below.
400 2 : default:
401 : // Don't delete files we don't know about.
402 : }
403 : }
404 :
405 2 : objects := d.objProvider.List()
406 2 : for _, obj := range objects {
407 2 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
408 2 : continue
409 : }
410 2 : if obj.FileType != base.FileTypeTable && obj.FileType != base.FileTypeBlob {
411 0 : // Ignore object types we don't know about.
412 0 : continue
413 : }
414 2 : of := obsoleteFile{
415 2 : fileType: obj.FileType,
416 2 : fs: d.opts.FS,
417 2 : path: base.MakeFilepath(d.opts.FS, d.dirname, obj.FileType, obj.DiskFileNum),
418 2 : fileNum: obj.DiskFileNum,
419 2 : isLocal: true,
420 2 : }
421 2 : if size, err := d.objProvider.Size(obj); err == nil {
422 2 : of.fileSize = uint64(size)
423 2 : }
424 2 : if obj.FileType == base.FileTypeTable {
425 2 : obsoleteTables = append(obsoleteTables, of)
426 2 : } else {
427 2 : obsoleteBlobs = append(obsoleteBlobs, of)
428 2 : }
429 : }
430 :
431 2 : d.mu.versions.obsoleteTables = mergeObsoleteFiles(d.mu.versions.obsoleteTables, obsoleteTables)
432 2 : d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
433 2 : d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
434 2 : d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
435 2 : d.mu.versions.updateObsoleteObjectMetricsLocked()
436 : }
437 :
438 : // disableFileDeletions disables file deletions and then waits for any
439 : // in-progress deletion to finish. The caller is required to call
440 : // enableFileDeletions in order to enable file deletions again. It is ok for
441 : // multiple callers to disable file deletions simultaneously, though they must
442 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
443 : // (there is an internal reference count on file deletion disablement).
444 : //
445 : // d.mu must be held when calling this method.
446 2 : func (d *DB) disableFileDeletions() {
447 2 : d.mu.fileDeletions.disableCount++
448 2 : d.mu.Unlock()
449 2 : defer d.mu.Lock()
450 2 : d.cleanupManager.Wait()
451 2 : }
452 :
453 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
454 : // is queued if necessary.
455 : //
456 : // d.mu must be held when calling this method.
457 2 : func (d *DB) enableFileDeletions() {
458 2 : if d.mu.fileDeletions.disableCount <= 0 {
459 1 : panic("pebble: file deletion disablement invariant violated")
460 : }
461 2 : d.mu.fileDeletions.disableCount--
462 2 : if d.mu.fileDeletions.disableCount > 0 {
463 0 : return
464 0 : }
465 2 : d.deleteObsoleteFiles(d.newJobIDLocked())
466 : }
467 :
468 : type fileInfo = base.FileInfo
469 :
470 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
471 : //
472 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
473 : //
474 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
475 : // cleanup job will be scheduled when file deletions are re-enabled.
476 2 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
477 2 : if d.mu.fileDeletions.disableCount > 0 {
478 2 : return
479 2 : }
480 2 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
481 2 :
482 2 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
483 2 : // log that has not had its contents flushed to an sstable.
484 2 : obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
485 2 : if err != nil {
486 0 : panic(err)
487 : }
488 :
489 2 : obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
490 2 : d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
491 2 : obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
492 2 : d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
493 2 :
494 2 : // Ensure everything is already sorted. We want determinism for testing, and
495 2 : // we need the manifests to be sorted because we want to delete some
496 2 : // contiguous prefix of the older manifests.
497 2 : if invariants.Enabled {
498 2 : switch {
499 0 : case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
500 0 : d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
501 0 : case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
502 0 : d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
503 0 : case !slices.IsSortedFunc(obsoleteTables, cmpObsoleteFileNumbers):
504 0 : d.opts.Logger.Fatalf("obsoleteTables is not sorted")
505 0 : case !slices.IsSortedFunc(obsoleteBlobs, cmpObsoleteFileNumbers):
506 0 : d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
507 : }
508 : }
509 :
510 2 : var obsoleteManifests []obsoleteFile
511 2 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
512 2 : if manifestsToDelete > 0 {
513 2 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
514 2 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
515 2 : if len(d.mu.versions.obsoleteManifests) == 0 {
516 0 : d.mu.versions.obsoleteManifests = nil
517 0 : }
518 : }
519 :
520 2 : obsoleteOptions := d.mu.versions.obsoleteOptions
521 2 : d.mu.versions.obsoleteOptions = nil
522 2 :
523 2 : // Compute the stats for the files being queued for deletion and add them to
524 2 : // the running total. These stats will be used during DB.Metrics() to
525 2 : // calculate the count and size of pending obsolete files by diffing these
526 2 : // stats and the stats reported by the cleanup manager.
527 2 : var objectStats obsoleteObjectStats
528 2 : objectStats.tablesAll, objectStats.tablesLocal = calculateObsoleteObjectStats(obsoleteTables)
529 2 : objectStats.blobFilesAll, objectStats.blobFilesLocal = calculateObsoleteObjectStats(obsoleteBlobs)
530 2 : d.mu.fileDeletions.queuedStats.Add(objectStats)
531 2 : d.mu.versions.updateObsoleteObjectMetricsLocked()
532 2 :
533 2 : // Release d.mu while preparing the cleanup job and possibly waiting.
534 2 : // Note the unusual order: Unlock and then Lock.
535 2 : d.mu.Unlock()
536 2 : defer d.mu.Lock()
537 2 :
538 2 : n := len(obsoleteLogs) + len(obsoleteTables) + len(obsoleteBlobs) + len(obsoleteManifests) + len(obsoleteOptions)
539 2 : filesToDelete := make([]obsoleteFile, 0, n)
540 2 : filesToDelete = append(filesToDelete, obsoleteManifests...)
541 2 : filesToDelete = append(filesToDelete, obsoleteOptions...)
542 2 : filesToDelete = append(filesToDelete, obsoleteTables...)
543 2 : filesToDelete = append(filesToDelete, obsoleteBlobs...)
544 2 : for _, f := range obsoleteLogs {
545 2 : filesToDelete = append(filesToDelete, obsoleteFile{
546 2 : fileType: base.FileTypeLog,
547 2 : fs: f.FS,
548 2 : path: f.Path,
549 2 : fileNum: base.DiskFileNum(f.NumWAL),
550 2 : fileSize: f.ApproxFileSize,
551 2 : isLocal: true,
552 2 : })
553 2 : }
554 2 : for _, f := range obsoleteTables {
555 2 : d.fileCache.Evict(f.fileNum, base.FileTypeTable)
556 2 : }
557 2 : for _, f := range obsoleteBlobs {
558 2 : d.fileCache.Evict(f.fileNum, base.FileTypeBlob)
559 2 : }
560 2 : if len(filesToDelete) > 0 {
561 2 : d.cleanupManager.EnqueueJob(jobID, filesToDelete, objectStats)
562 2 : }
563 2 : if d.opts.private.testingAlwaysWaitForCleanup {
564 1 : d.cleanupManager.Wait()
565 1 : }
566 : }
567 :
568 2 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
569 2 : d.mu.Lock()
570 2 : defer d.mu.Unlock()
571 2 : if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
572 2 : d.deleteObsoleteFiles(d.newJobIDLocked())
573 2 : }
574 : }
575 :
576 2 : func mergeObsoleteFiles(a, b []obsoleteFile) []obsoleteFile {
577 2 : if len(b) == 0 {
578 2 : return a
579 2 : }
580 :
581 2 : a = append(a, b...)
582 2 : slices.SortFunc(a, cmpObsoleteFileNumbers)
583 2 : return slices.CompactFunc(a, func(a, b obsoleteFile) bool {
584 2 : return a.fileNum == b.fileNum
585 2 : })
586 : }
587 :
588 2 : func cmpObsoleteFileNumbers(a, b obsoleteFile) int {
589 2 : return cmp.Compare(a.fileNum, b.fileNum)
590 2 : }
591 :
592 : // objectInfo describes an object in object storage (either a sstable or a blob
593 : // file).
594 : type objectInfo struct {
595 : fileInfo
596 : isLocal bool
597 : }
598 :
599 2 : func (o objectInfo) asObsoleteFile(fs vfs.FS, fileType base.FileType, dirname string) obsoleteFile {
600 2 : return obsoleteFile{
601 2 : fileType: fileType,
602 2 : fs: fs,
603 2 : path: base.MakeFilepath(fs, dirname, fileType, o.FileNum),
604 2 : fileNum: o.FileNum,
605 2 : fileSize: o.FileSize,
606 2 : isLocal: o.isLocal,
607 2 : }
608 2 : }
609 :
610 2 : func calculateObsoleteObjectStats(files []obsoleteFile) (total, local countAndSize) {
611 2 : for _, of := range files {
612 2 : if of.isLocal {
613 2 : local.count++
614 2 : local.size += of.fileSize
615 2 : }
616 2 : total.count++
617 2 : total.size += of.fileSize
618 : }
619 2 : return total, local
620 : }
621 :
622 : type obsoleteObjectStats struct {
623 : tablesLocal countAndSize
624 : tablesAll countAndSize
625 : blobFilesLocal countAndSize
626 : blobFilesAll countAndSize
627 : }
628 :
629 2 : func (s *obsoleteObjectStats) Add(other obsoleteObjectStats) {
630 2 : s.tablesLocal.Add(other.tablesLocal)
631 2 : s.tablesAll.Add(other.tablesAll)
632 2 : s.blobFilesLocal.Add(other.blobFilesLocal)
633 2 : s.blobFilesAll.Add(other.blobFilesAll)
634 2 : }
635 :
636 2 : func (s *obsoleteObjectStats) Sub(other obsoleteObjectStats) {
637 2 : s.tablesLocal.Sub(other.tablesLocal)
638 2 : s.tablesAll.Sub(other.tablesAll)
639 2 : s.blobFilesLocal.Sub(other.blobFilesLocal)
640 2 : s.blobFilesAll.Sub(other.blobFilesAll)
641 2 : }
642 :
643 : type countAndSize struct {
644 : count uint64
645 : size uint64
646 : }
647 :
648 2 : func (c *countAndSize) Add(other countAndSize) {
649 2 : c.count += other.count
650 2 : c.size += other.size
651 2 : }
652 :
653 2 : func (c *countAndSize) Sub(other countAndSize) {
654 2 : c.count = invariants.SafeSub(c.count, other.count)
655 2 : c.size = invariants.SafeSub(c.size, other.size)
656 2 : }
657 :
658 2 : func makeZombieObjects() zombieObjects {
659 2 : return zombieObjects{
660 2 : objs: make(map[base.DiskFileNum]objectInfo),
661 2 : }
662 2 : }
663 :
664 : // zombieObjects tracks a set of objects that are no longer required by the most
665 : // recent version of the LSM, but may still need to be accessed by an open
666 : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
667 : // may access them are closed.
668 : type zombieObjects struct {
669 : objs map[base.DiskFileNum]objectInfo
670 : totalSize uint64
671 : localSize uint64
672 : localCount uint64
673 : }
674 :
675 : // Add adds an object to the set of zombie objects.
676 2 : func (z *zombieObjects) Add(obj objectInfo) {
677 2 : if _, ok := z.objs[obj.FileNum]; ok {
678 0 : panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
679 : }
680 2 : z.objs[obj.FileNum] = obj
681 2 : z.totalSize += obj.FileSize
682 2 : if obj.isLocal {
683 2 : z.localSize += obj.FileSize
684 2 : z.localCount++
685 2 : }
686 : }
687 :
688 : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
689 1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
690 1 : z.Add(objectInfo{
691 1 : fileInfo: fileInfo{
692 1 : FileNum: meta.DiskFileNum,
693 1 : FileSize: size,
694 1 : },
695 1 : isLocal: !meta.IsRemote(),
696 1 : })
697 1 : }
698 :
699 : // Count returns the number of zombie objects.
700 2 : func (z *zombieObjects) Count() int {
701 2 : return len(z.objs)
702 2 : }
703 :
704 : // Extract removes an object from the set of zombie objects, returning the
705 : // object that was removed.
706 2 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
707 2 : obj, ok := z.objs[fileNum]
708 2 : if !ok {
709 0 : panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
710 : }
711 2 : delete(z.objs, fileNum)
712 2 :
713 2 : // Detect underflow in case we have a bug that causes an object's size to be
714 2 : // mutated.
715 2 : if z.totalSize < obj.FileSize {
716 0 : panic(errors.AssertionFailedf("zombie object %s size %d is greater than total size %d", fileNum, obj.FileSize, z.totalSize))
717 : }
718 2 : if obj.isLocal && z.localSize < obj.FileSize {
719 0 : panic(errors.AssertionFailedf("zombie object %s size %d is greater than local size %d", fileNum, obj.FileSize, z.localSize))
720 : }
721 :
722 2 : z.totalSize -= obj.FileSize
723 2 : if obj.isLocal {
724 2 : z.localSize -= obj.FileSize
725 2 : z.localCount--
726 2 : }
727 2 : return obj
728 : }
729 :
730 : // TotalSize returns the size of all objects in the set.
731 2 : func (z *zombieObjects) TotalSize() uint64 {
732 2 : return z.totalSize
733 2 : }
734 :
735 : // LocalStats returns the count and size of all local objects in the set.
736 2 : func (z *zombieObjects) LocalStats() (count uint64, size uint64) {
737 2 : return z.localCount, z.localSize
738 2 : }
|