Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "runtime/pprof"
11 : "slices"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/crlib/crtime"
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/errors/oserror"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/vfs"
22 : "github.com/cockroachdb/pebble/wal"
23 : "github.com/cockroachdb/tokenbucket"
24 : )
25 :
26 : // Cleaner exports the base.Cleaner type.
27 : type Cleaner = base.Cleaner
28 :
29 : // DeleteCleaner exports the base.DeleteCleaner type.
30 : type DeleteCleaner = base.DeleteCleaner
31 :
32 : // ArchiveCleaner exports the base.ArchiveCleaner type.
33 : type ArchiveCleaner = base.ArchiveCleaner
34 :
35 : type cleanupManager struct {
36 : opts *Options
37 : objProvider objstorage.Provider
38 : deletePacer *deletionPacer
39 :
40 : // jobsCh is used as the cleanup job queue.
41 : jobsCh chan *cleanupJob
42 : // waitGroup is used to wait for the background goroutine to exit.
43 : waitGroup sync.WaitGroup
44 : // stopCh is closed when the pacer is disabled after closing the cleanup manager.
45 : stopCh chan struct{}
46 :
47 : mu struct {
48 : sync.Mutex
49 : // totalJobs is the total number of enqueued jobs (completed or in progress).
50 : totalJobs int
51 : completedStats obsoleteObjectStats
52 : completedJobs int
53 : completedJobsCond sync.Cond
54 : jobsQueueWarningIssued bool
55 : }
56 : }
57 :
58 : // CompletedStats returns the stats summarizing objects deleted. The returned
59 : // stats increase monotonically over the lifetime of the DB.
60 1 : func (m *cleanupManager) CompletedStats() obsoleteObjectStats {
61 1 : m.mu.Lock()
62 1 : defer m.mu.Unlock()
63 1 : return m.mu.completedStats
64 1 : }
65 :
66 : // We can queue this many jobs before we have to block EnqueueJob.
67 : const jobsQueueDepth = 1000
68 :
69 : // obsoleteFile holds information about a file that needs to be deleted soon.
70 : type obsoleteFile struct {
71 : fileType base.FileType
72 : fs vfs.FS
73 : path string
74 : fileNum base.DiskFileNum
75 : fileSize uint64 // approx for log files
76 : isLocal bool
77 : }
78 :
79 1 : func (of *obsoleteFile) needsPacing() bool {
80 1 : // We only need to pace local objects--sstables and blob files.
81 1 : return of.isLocal && (of.fileType == base.FileTypeTable || of.fileType == base.FileTypeBlob)
82 1 : }
83 :
84 : type cleanupJob struct {
85 : jobID JobID
86 : obsoleteFiles []obsoleteFile
87 : stats obsoleteObjectStats
88 : }
89 :
90 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
91 : // The cleanupManager must be Close()d.
92 : func openCleanupManager(
93 : opts *Options, objProvider objstorage.Provider, getDeletePacerInfo func() deletionPacerInfo,
94 1 : ) *cleanupManager {
95 1 : cm := &cleanupManager{
96 1 : opts: opts,
97 1 : objProvider: objProvider,
98 1 : deletePacer: newDeletionPacer(
99 1 : crtime.NowMono(),
100 1 : opts.FreeSpaceThresholdBytes,
101 1 : opts.TargetByteDeletionRate,
102 1 : opts.FreeSpaceTimeframe,
103 1 : opts.ObsoleteBytesMaxRatio,
104 1 : opts.ObsoleteBytesTimeframe,
105 1 : getDeletePacerInfo,
106 1 : ),
107 1 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
108 1 : stopCh: make(chan struct{}),
109 1 : }
110 1 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
111 1 : cm.waitGroup.Add(1)
112 1 :
113 1 : go func() {
114 1 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
115 1 : cm.mainLoop()
116 1 : })
117 : }()
118 :
119 1 : return cm
120 : }
121 :
122 : // Close stops the background goroutine, waiting until all queued jobs are completed.
123 : // Delete pacing is disabled for the remaining jobs.
124 1 : func (cm *cleanupManager) Close() {
125 1 : close(cm.jobsCh)
126 1 : close(cm.stopCh)
127 1 : cm.waitGroup.Wait()
128 1 : }
129 :
130 : // EnqueueJob adds a cleanup job to the manager's queue.
131 : func (cm *cleanupManager) EnqueueJob(
132 : jobID JobID, obsoleteFiles []obsoleteFile, stats obsoleteObjectStats,
133 1 : ) {
134 1 : job := &cleanupJob{
135 1 : jobID: jobID,
136 1 : obsoleteFiles: obsoleteFiles,
137 1 : stats: stats,
138 1 : }
139 1 :
140 1 : // Report deleted bytes to the pacer, which can use this data to potentially
141 1 : // increase the deletion rate to keep up. We want to do this at enqueue time
142 1 : // rather than when we get to the job, otherwise the reported bytes will be
143 1 : // subject to the throttling rate which defeats the purpose.
144 1 : var pacingBytes uint64
145 1 : for _, of := range obsoleteFiles {
146 1 : if of.needsPacing() {
147 1 : pacingBytes += of.fileSize
148 1 : }
149 : }
150 1 : if pacingBytes > 0 {
151 1 : cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
152 1 : }
153 :
154 1 : cm.mu.Lock()
155 1 : cm.mu.totalJobs++
156 1 : cm.maybeLogLocked()
157 1 : cm.mu.Unlock()
158 1 :
159 1 : cm.jobsCh <- job
160 : }
161 :
162 : // Wait until the completion of all jobs that were already queued.
163 : //
164 : // Does not wait for jobs that are enqueued during the call.
165 : //
166 : // Note that DB.mu should not be held while calling this method; the background
167 : // goroutine needs to acquire DB.mu to update deleted table metrics.
168 1 : func (cm *cleanupManager) Wait() {
169 1 : cm.mu.Lock()
170 1 : defer cm.mu.Unlock()
171 1 : n := cm.mu.totalJobs
172 1 : for cm.mu.completedJobs < n {
173 1 : cm.mu.completedJobsCond.Wait()
174 1 : }
175 : }
176 :
177 : // mainLoop runs the manager's background goroutine.
178 1 : func (cm *cleanupManager) mainLoop() {
179 1 : defer cm.waitGroup.Done()
180 1 :
181 1 : paceTimer := time.NewTimer(time.Duration(0))
182 1 : defer paceTimer.Stop()
183 1 :
184 1 : var tb tokenbucket.TokenBucket
185 1 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
186 1 : tb.Init(1.0, 1.0)
187 1 : for job := range cm.jobsCh {
188 1 : cm.deleteObsoleteFilesInJob(job, &tb, paceTimer)
189 1 : cm.mu.Lock()
190 1 : cm.mu.completedJobs++
191 1 : cm.mu.completedStats.Add(job.stats)
192 1 : cm.mu.completedJobsCond.Broadcast()
193 1 : cm.maybeLogLocked()
194 1 : cm.mu.Unlock()
195 1 : }
196 : }
197 :
198 : // deleteObsoleteFilesInJob deletes all obsolete files in the given job. If tb
199 : // and paceTimer are provided, files that need pacing will be throttled
200 : // according to the deletion rate. If tb is nil, files are deleted immediately
201 : // without pacing (used when the cleanup manager is being closed).
202 : func (cm *cleanupManager) deleteObsoleteFilesInJob(
203 : job *cleanupJob, tb *tokenbucket.TokenBucket, paceTimer *time.Timer,
204 1 : ) {
205 1 : for _, of := range job.obsoleteFiles {
206 1 : switch of.fileType {
207 1 : case base.FileTypeTable, base.FileTypeBlob:
208 1 : if tb != nil {
209 1 : cm.maybePace(tb, &of, paceTimer)
210 1 : }
211 1 : cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
212 1 : default:
213 1 : cm.deleteObsoleteFile(of.fs, of.fileType, job.jobID, of.path, of.fileNum)
214 : }
215 : }
216 : }
217 :
218 : // maybePace sleeps before deleting an object if appropriate. It is always
219 : // called from the background goroutine.
220 : func (cm *cleanupManager) maybePace(
221 : tb *tokenbucket.TokenBucket, of *obsoleteFile, paceTimer *time.Timer,
222 1 : ) {
223 1 : if !of.needsPacing() {
224 1 : return
225 1 : }
226 :
227 1 : tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), of.fileSize)
228 1 : if tokens == 0.0 {
229 1 : // The token bucket might be in debt; it could make us wait even for 0
230 1 : // tokens. We don't want that if the pacer decided throttling should be
231 1 : // disabled.
232 1 : return
233 1 : }
234 : // Wait for tokens. We use a token bucket instead of sleeping outright because
235 : // the token bucket accumulates up to one second of unused tokens.
236 1 : for {
237 1 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
238 1 : if ok {
239 1 : break
240 : }
241 1 : paceTimer.Reset(d)
242 1 : select {
243 0 : case <-paceTimer.C:
244 1 : case <-cm.stopCh:
245 1 : // The cleanup manager is being closed. Delete without pacing.
246 1 : return
247 : }
248 : }
249 : }
250 :
251 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
252 : func (cm *cleanupManager) deleteObsoleteFile(
253 : fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
254 1 : ) {
255 1 : // TODO(peter): need to handle this error, probably by re-adding the
256 1 : // file that couldn't be deleted to one of the obsolete slices map.
257 1 : err := cm.opts.Cleaner.Clean(fs, fileType, path)
258 1 : if oserror.IsNotExist(err) {
259 0 : return
260 0 : }
261 :
262 1 : switch fileType {
263 1 : case base.FileTypeLog:
264 1 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
265 1 : JobID: int(jobID),
266 1 : Path: path,
267 1 : FileNum: fileNum,
268 1 : Err: err,
269 1 : })
270 1 : case base.FileTypeManifest:
271 1 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
272 1 : JobID: int(jobID),
273 1 : Path: path,
274 1 : FileNum: fileNum,
275 1 : Err: err,
276 1 : })
277 0 : case base.FileTypeTable, base.FileTypeBlob:
278 0 : panic("invalid deletion of object file")
279 : }
280 : }
281 :
282 : func (cm *cleanupManager) deleteObsoleteObject(
283 : fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
284 1 : ) {
285 1 : if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
286 0 : panic("not an object")
287 : }
288 :
289 1 : var path string
290 1 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
291 1 : if err != nil {
292 1 : path = "<nil>"
293 1 : } else {
294 1 : path = cm.objProvider.Path(meta)
295 1 : err = cm.objProvider.Remove(fileType, fileNum)
296 1 : }
297 1 : if cm.objProvider.IsNotExistError(err) {
298 1 : return
299 1 : }
300 :
301 1 : switch fileType {
302 1 : case base.FileTypeTable:
303 1 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
304 1 : JobID: int(jobID),
305 1 : Path: path,
306 1 : FileNum: fileNum,
307 1 : Err: err,
308 1 : })
309 1 : case base.FileTypeBlob:
310 1 : cm.opts.EventListener.BlobFileDeleted(BlobFileDeleteInfo{
311 1 : JobID: int(jobID),
312 1 : Path: path,
313 1 : FileNum: fileNum,
314 1 : Err: err,
315 1 : })
316 : }
317 : }
318 :
319 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
320 : // when the job queue gets back to less than 10% full.
321 : //
322 : // Must be called with cm.mu locked.
323 1 : func (cm *cleanupManager) maybeLogLocked() {
324 1 : const highThreshold = jobsQueueDepth * 3 / 4
325 1 : const lowThreshold = jobsQueueDepth / 10
326 1 :
327 1 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
328 1 :
329 1 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
330 0 : cm.mu.jobsQueueWarningIssued = true
331 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
332 0 : }
333 :
334 1 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
335 0 : cm.mu.jobsQueueWarningIssued = false
336 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
337 0 : }
338 : }
339 :
340 1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
341 1 : var pacerInfo deletionPacerInfo
342 1 : // Call GetDiskUsage after every file deletion. This may seem inefficient,
343 1 : // but in practice this was observed to take constant time, regardless of
344 1 : // volume size used, at least on linux with ext4 and zfs. All invocations
345 1 : // take 10 microseconds or less.
346 1 : pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
347 1 : d.mu.Lock()
348 1 : pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
349 1 : total := d.mu.versions.metrics.Total()
350 1 : d.mu.Unlock()
351 1 : pacerInfo.liveBytes = uint64(total.AggregateSize())
352 1 : return pacerInfo
353 1 : }
354 :
355 : // scanObsoleteFiles scans the filesystem for files that are no longer needed
356 : // and adds those to the internal lists of obsolete files. Note that the files
357 : // are not actually deleted by this method. A subsequent call to
358 : // deleteObsoleteFiles must be performed. Must be not be called concurrently
359 : // with compactions and flushes. db.mu must be held when calling this function.
360 1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
361 1 : // Disable automatic compactions temporarily to avoid concurrent compactions /
362 1 : // flushes from interfering. The original value is restored on completion.
363 1 : disabledPrev := d.opts.DisableAutomaticCompactions
364 1 : defer func() {
365 1 : d.opts.DisableAutomaticCompactions = disabledPrev
366 1 : }()
367 1 : d.opts.DisableAutomaticCompactions = true
368 1 :
369 1 : // Wait for any ongoing compaction to complete before continuing.
370 1 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
371 1 : d.mu.compact.cond.Wait()
372 1 : }
373 :
374 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
375 1 : d.mu.versions.addLiveFileNums(liveFileNums)
376 1 : // Protect against files which are only referred to by the ingestedFlushable
377 1 : // from being deleted. These are added to the flushable queue on WAL replay
378 1 : // and handle their own obsoletion/deletion. We exclude them from this obsolete
379 1 : // file scan to avoid double-deleting these files.
380 1 : for _, f := range flushableIngests {
381 1 : for _, file := range f.files {
382 1 : liveFileNums[file.TableBacking.DiskFileNum] = struct{}{}
383 1 : }
384 : }
385 :
386 1 : manifestFileNum := d.mu.versions.manifestFileNum
387 1 :
388 1 : var obsoleteTables []obsoleteFile
389 1 : var obsoleteBlobs []obsoleteFile
390 1 : var obsoleteOptions []obsoleteFile
391 1 : var obsoleteManifests []obsoleteFile
392 1 :
393 1 : for _, filename := range list {
394 1 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
395 1 : if !ok {
396 1 : continue
397 : }
398 1 : makeObsoleteFile := func() obsoleteFile {
399 1 : of := obsoleteFile{
400 1 : fileType: fileType,
401 1 : fs: d.opts.FS,
402 1 : path: d.opts.FS.PathJoin(d.dirname, filename),
403 1 : fileNum: diskFileNum,
404 1 : isLocal: true,
405 1 : }
406 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
407 1 : of.fileSize = uint64(stat.Size())
408 1 : }
409 1 : return of
410 : }
411 1 : switch fileType {
412 1 : case base.FileTypeManifest:
413 1 : if diskFileNum >= manifestFileNum {
414 1 : continue
415 : }
416 1 : obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
417 1 : case base.FileTypeOptions:
418 1 : if diskFileNum >= d.optionsFileNum {
419 1 : continue
420 : }
421 1 : obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
422 1 : case base.FileTypeTable, base.FileTypeBlob:
423 : // Objects are handled through the objstorage provider below.
424 1 : default:
425 : // Don't delete files we don't know about.
426 : }
427 : }
428 :
429 1 : objects := d.objProvider.List()
430 1 : for _, obj := range objects {
431 1 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
432 1 : continue
433 : }
434 1 : if obj.FileType != base.FileTypeTable && obj.FileType != base.FileTypeBlob {
435 0 : // Ignore object types we don't know about.
436 0 : continue
437 : }
438 1 : of := obsoleteFile{
439 1 : fileType: obj.FileType,
440 1 : fs: d.opts.FS,
441 1 : path: base.MakeFilepath(d.opts.FS, d.dirname, obj.FileType, obj.DiskFileNum),
442 1 : fileNum: obj.DiskFileNum,
443 1 : isLocal: true,
444 1 : }
445 1 : if size, err := d.objProvider.Size(obj); err == nil {
446 1 : of.fileSize = uint64(size)
447 1 : }
448 1 : if obj.FileType == base.FileTypeTable {
449 1 : obsoleteTables = append(obsoleteTables, of)
450 1 : } else {
451 1 : obsoleteBlobs = append(obsoleteBlobs, of)
452 1 : }
453 : }
454 :
455 1 : d.mu.versions.obsoleteTables = mergeObsoleteFiles(d.mu.versions.obsoleteTables, obsoleteTables)
456 1 : d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
457 1 : d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
458 1 : d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
459 1 : d.mu.versions.updateObsoleteObjectMetricsLocked()
460 : }
461 :
462 : // disableFileDeletions disables file deletions and then waits for any
463 : // in-progress deletion to finish. The caller is required to call
464 : // enableFileDeletions in order to enable file deletions again. It is ok for
465 : // multiple callers to disable file deletions simultaneously, though they must
466 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
467 : // (there is an internal reference count on file deletion disablement).
468 : //
469 : // d.mu must be held when calling this method.
470 1 : func (d *DB) disableFileDeletions() {
471 1 : d.mu.fileDeletions.disableCount++
472 1 : d.mu.Unlock()
473 1 : defer d.mu.Lock()
474 1 : d.cleanupManager.Wait()
475 1 : }
476 :
477 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
478 : // is queued if necessary.
479 : //
480 : // d.mu must be held when calling this method.
481 1 : func (d *DB) enableFileDeletions() {
482 1 : if d.mu.fileDeletions.disableCount <= 0 {
483 1 : panic("pebble: file deletion disablement invariant violated")
484 : }
485 1 : d.mu.fileDeletions.disableCount--
486 1 : if d.mu.fileDeletions.disableCount > 0 {
487 0 : return
488 0 : }
489 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
490 : }
491 :
492 : type fileInfo = base.FileInfo
493 :
494 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
495 : //
496 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
497 : //
498 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
499 : // cleanup job will be scheduled when file deletions are re-enabled.
500 1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
501 1 : if d.mu.fileDeletions.disableCount > 0 {
502 1 : return
503 1 : }
504 1 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
505 1 :
506 1 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
507 1 : // log that has not had its contents flushed to an sstable.
508 1 : obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
509 1 : if err != nil {
510 0 : panic(err)
511 : }
512 :
513 1 : obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
514 1 : d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
515 1 : obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
516 1 : d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
517 1 :
518 1 : // Ensure everything is already sorted. We want determinism for testing, and
519 1 : // we need the manifests to be sorted because we want to delete some
520 1 : // contiguous prefix of the older manifests.
521 1 : if invariants.Enabled {
522 1 : switch {
523 0 : case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
524 0 : d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
525 0 : case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
526 0 : d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
527 0 : case !slices.IsSortedFunc(obsoleteTables, cmpObsoleteFileNumbers):
528 0 : d.opts.Logger.Fatalf("obsoleteTables is not sorted")
529 0 : case !slices.IsSortedFunc(obsoleteBlobs, cmpObsoleteFileNumbers):
530 0 : d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
531 : }
532 : }
533 :
534 1 : var obsoleteManifests []obsoleteFile
535 1 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
536 1 : if manifestsToDelete > 0 {
537 1 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
538 1 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
539 1 : if len(d.mu.versions.obsoleteManifests) == 0 {
540 0 : d.mu.versions.obsoleteManifests = nil
541 0 : }
542 : }
543 :
544 1 : obsoleteOptions := d.mu.versions.obsoleteOptions
545 1 : d.mu.versions.obsoleteOptions = nil
546 1 :
547 1 : // Compute the stats for the files being queued for deletion and add them to
548 1 : // the running total. These stats will be used during DB.Metrics() to
549 1 : // calculate the count and size of pending obsolete files by diffing these
550 1 : // stats and the stats reported by the cleanup manager.
551 1 : var objectStats obsoleteObjectStats
552 1 : objectStats.tablesAll, objectStats.tablesLocal = calculateObsoleteObjectStats(obsoleteTables)
553 1 : objectStats.blobFilesAll, objectStats.blobFilesLocal = calculateObsoleteObjectStats(obsoleteBlobs)
554 1 : d.mu.fileDeletions.queuedStats.Add(objectStats)
555 1 : d.mu.versions.updateObsoleteObjectMetricsLocked()
556 1 :
557 1 : // Release d.mu while preparing the cleanup job and possibly waiting.
558 1 : // Note the unusual order: Unlock and then Lock.
559 1 : d.mu.Unlock()
560 1 : defer d.mu.Lock()
561 1 :
562 1 : n := len(obsoleteLogs) + len(obsoleteTables) + len(obsoleteBlobs) + len(obsoleteManifests) + len(obsoleteOptions)
563 1 : filesToDelete := make([]obsoleteFile, 0, n)
564 1 : filesToDelete = append(filesToDelete, obsoleteManifests...)
565 1 : filesToDelete = append(filesToDelete, obsoleteOptions...)
566 1 : filesToDelete = append(filesToDelete, obsoleteTables...)
567 1 : filesToDelete = append(filesToDelete, obsoleteBlobs...)
568 1 : for _, f := range obsoleteLogs {
569 1 : filesToDelete = append(filesToDelete, obsoleteFile{
570 1 : fileType: base.FileTypeLog,
571 1 : fs: f.FS,
572 1 : path: f.Path,
573 1 : fileNum: base.DiskFileNum(f.NumWAL),
574 1 : fileSize: f.ApproxFileSize,
575 1 : isLocal: true,
576 1 : })
577 1 : }
578 1 : for _, f := range obsoleteTables {
579 1 : d.fileCache.Evict(f.fileNum, base.FileTypeTable)
580 1 : }
581 1 : for _, f := range obsoleteBlobs {
582 1 : d.fileCache.Evict(f.fileNum, base.FileTypeBlob)
583 1 : }
584 1 : if len(filesToDelete) > 0 {
585 1 : d.cleanupManager.EnqueueJob(jobID, filesToDelete, objectStats)
586 1 : }
587 1 : if d.opts.private.testingAlwaysWaitForCleanup {
588 1 : d.cleanupManager.Wait()
589 1 : }
590 : }
591 :
592 1 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
593 1 : d.mu.Lock()
594 1 : defer d.mu.Unlock()
595 1 : if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
596 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
597 1 : }
598 : }
599 :
600 1 : func mergeObsoleteFiles(a, b []obsoleteFile) []obsoleteFile {
601 1 : if len(b) == 0 {
602 1 : return a
603 1 : }
604 :
605 1 : a = append(a, b...)
606 1 : slices.SortFunc(a, cmpObsoleteFileNumbers)
607 1 : return slices.CompactFunc(a, func(a, b obsoleteFile) bool {
608 1 : return a.fileNum == b.fileNum
609 1 : })
610 : }
611 :
612 1 : func cmpObsoleteFileNumbers(a, b obsoleteFile) int {
613 1 : return cmp.Compare(a.fileNum, b.fileNum)
614 1 : }
615 :
616 : // objectInfo describes an object in object storage (either a sstable or a blob
617 : // file).
618 : type objectInfo struct {
619 : fileInfo
620 : isLocal bool
621 : }
622 :
623 1 : func (o objectInfo) asObsoleteFile(fs vfs.FS, fileType base.FileType, dirname string) obsoleteFile {
624 1 : return obsoleteFile{
625 1 : fileType: fileType,
626 1 : fs: fs,
627 1 : path: base.MakeFilepath(fs, dirname, fileType, o.FileNum),
628 1 : fileNum: o.FileNum,
629 1 : fileSize: o.FileSize,
630 1 : isLocal: o.isLocal,
631 1 : }
632 1 : }
633 :
634 1 : func calculateObsoleteObjectStats(files []obsoleteFile) (total, local countAndSize) {
635 1 : for _, of := range files {
636 1 : if of.isLocal {
637 1 : local.count++
638 1 : local.size += of.fileSize
639 1 : }
640 1 : total.count++
641 1 : total.size += of.fileSize
642 : }
643 1 : return total, local
644 : }
645 :
646 : type obsoleteObjectStats struct {
647 : tablesLocal countAndSize
648 : tablesAll countAndSize
649 : blobFilesLocal countAndSize
650 : blobFilesAll countAndSize
651 : }
652 :
653 1 : func (s *obsoleteObjectStats) Add(other obsoleteObjectStats) {
654 1 : s.tablesLocal.Add(other.tablesLocal)
655 1 : s.tablesAll.Add(other.tablesAll)
656 1 : s.blobFilesLocal.Add(other.blobFilesLocal)
657 1 : s.blobFilesAll.Add(other.blobFilesAll)
658 1 : }
659 :
660 1 : func (s *obsoleteObjectStats) Sub(other obsoleteObjectStats) {
661 1 : s.tablesLocal.Sub(other.tablesLocal)
662 1 : s.tablesAll.Sub(other.tablesAll)
663 1 : s.blobFilesLocal.Sub(other.blobFilesLocal)
664 1 : s.blobFilesAll.Sub(other.blobFilesAll)
665 1 : }
666 :
667 : type countAndSize struct {
668 : count uint64
669 : size uint64
670 : }
671 :
672 1 : func (c *countAndSize) Add(other countAndSize) {
673 1 : c.count += other.count
674 1 : c.size += other.size
675 1 : }
676 :
677 1 : func (c *countAndSize) Sub(other countAndSize) {
678 1 : c.count = invariants.SafeSub(c.count, other.count)
679 1 : c.size = invariants.SafeSub(c.size, other.size)
680 1 : }
681 :
682 1 : func makeZombieObjects() zombieObjects {
683 1 : return zombieObjects{
684 1 : objs: make(map[base.DiskFileNum]objectInfo),
685 1 : }
686 1 : }
687 :
688 : // zombieObjects tracks a set of objects that are no longer required by the most
689 : // recent version of the LSM, but may still need to be accessed by an open
690 : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
691 : // may access them are closed.
692 : type zombieObjects struct {
693 : objs map[base.DiskFileNum]objectInfo
694 : totalSize uint64
695 : localSize uint64
696 : localCount uint64
697 : }
698 :
699 : // Add adds an object to the set of zombie objects.
700 1 : func (z *zombieObjects) Add(obj objectInfo) {
701 1 : if _, ok := z.objs[obj.FileNum]; ok {
702 0 : panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
703 : }
704 1 : z.objs[obj.FileNum] = obj
705 1 : z.totalSize += obj.FileSize
706 1 : if obj.isLocal {
707 1 : z.localSize += obj.FileSize
708 1 : z.localCount++
709 1 : }
710 : }
711 :
712 : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
713 1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
714 1 : z.Add(objectInfo{
715 1 : fileInfo: fileInfo{
716 1 : FileNum: meta.DiskFileNum,
717 1 : FileSize: size,
718 1 : },
719 1 : isLocal: !meta.IsRemote(),
720 1 : })
721 1 : }
722 :
723 : // Count returns the number of zombie objects.
724 1 : func (z *zombieObjects) Count() int {
725 1 : return len(z.objs)
726 1 : }
727 :
728 : // Extract removes an object from the set of zombie objects, returning the
729 : // object that was removed.
730 1 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
731 1 : obj, ok := z.objs[fileNum]
732 1 : if !ok {
733 0 : panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
734 : }
735 1 : delete(z.objs, fileNum)
736 1 :
737 1 : // Detect underflow in case we have a bug that causes an object's size to be
738 1 : // mutated.
739 1 : if z.totalSize < obj.FileSize {
740 0 : panic(errors.AssertionFailedf("zombie object %s size %d is greater than total size %d", fileNum, obj.FileSize, z.totalSize))
741 : }
742 1 : if obj.isLocal && z.localSize < obj.FileSize {
743 0 : panic(errors.AssertionFailedf("zombie object %s size %d is greater than local size %d", fileNum, obj.FileSize, z.localSize))
744 : }
745 :
746 1 : z.totalSize -= obj.FileSize
747 1 : if obj.isLocal {
748 1 : z.localSize -= obj.FileSize
749 1 : z.localCount--
750 1 : }
751 1 : return obj
752 : }
753 :
754 : // TotalSize returns the size of all objects in the set.
755 1 : func (z *zombieObjects) TotalSize() uint64 {
756 1 : return z.totalSize
757 1 : }
758 :
759 : // LocalStats returns the count and size of all local objects in the set.
760 1 : func (z *zombieObjects) LocalStats() (count uint64, size uint64) {
761 1 : return z.localCount, z.localSize
762 1 : }
|