LCOV - code coverage report
Current view: top level - pebble - pacer.go (source / functions) Hit Total Coverage
Test: 2023-10-12 08:18Z ede31f1a - tests + meta.lcov Lines: 81 81 100.0 %
Date: 2023-10-12 08:19:59 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "sync"
       9             :         "time"
      10             : )
      11             : 
      12             : // deletionPacerInfo contains any info from the db necessary to make deletion
      13             : // pacing decisions (to limit background IO usage so that it does not contend
      14             : // with foreground traffic).
      15             : type deletionPacerInfo struct {
      16             :         freeBytes     uint64
      17             :         obsoleteBytes uint64
      18             :         liveBytes     uint64
      19             : }
      20             : 
      21             : // deletionPacer rate limits deletions of obsolete files. This is necessary to
      22             : // prevent overloading the disk with too many deletions too quickly after a
      23             : // large compaction, or an iterator close. On some SSDs, disk performance can be
      24             : // negatively impacted if too many blocks are deleted very quickly, so this
      25             : // mechanism helps mitigate that.
      26             : type deletionPacer struct {
      27             :         // If there are less than freeSpaceThreshold bytes of free space on
      28             :         // disk, increase the pace of deletions such that we delete enough bytes to
      29             :         // get back to the threshold within the freeSpaceTimeframe.
      30             :         freeSpaceThreshold uint64
      31             :         freeSpaceTimeframe time.Duration
      32             : 
      33             :         // If the ratio of obsolete bytes to live bytes is greater than
      34             :         // obsoleteBytesMaxRatio, increase the pace of deletions such that we delete
      35             :         // enough bytes to get back to the threshold within the obsoleteBytesTimeframe.
      36             :         obsoleteBytesMaxRatio  float64
      37             :         obsoleteBytesTimeframe time.Duration
      38             : 
      39             :         mu struct {
      40             :                 sync.Mutex
      41             : 
      42             :                 // history keeps rack of recent deletion history; it used to increase the
      43             :                 // deletion rate to match the pace of deletions.
      44             :                 history history
      45             :         }
      46             : 
      47             :         targetByteDeletionRate int64
      48             : 
      49             :         getInfo func() deletionPacerInfo
      50             : }
      51             : 
      52             : const deletePacerHistory = 5 * time.Minute
      53             : 
      54             : // newDeletionPacer instantiates a new deletionPacer for use when deleting
      55             : // obsolete files.
      56             : //
      57             : // targetByteDeletionRate is the rate (in bytes/sec) at which we want to
      58             : // normally limit deletes (when we are not falling behind or running out of
      59             : // space). A value of 0.0 disables pacing.
      60             : func newDeletionPacer(
      61             :         now time.Time, targetByteDeletionRate int64, getInfo func() deletionPacerInfo,
      62           2 : ) *deletionPacer {
      63           2 :         d := &deletionPacer{
      64           2 :                 freeSpaceThreshold: 16 << 30, // 16 GB
      65           2 :                 freeSpaceTimeframe: 10 * time.Second,
      66           2 : 
      67           2 :                 obsoleteBytesMaxRatio:  0.20,
      68           2 :                 obsoleteBytesTimeframe: 5 * time.Minute,
      69           2 : 
      70           2 :                 targetByteDeletionRate: targetByteDeletionRate,
      71           2 :                 getInfo:                getInfo,
      72           2 :         }
      73           2 :         d.mu.history.Init(now, deletePacerHistory)
      74           2 :         return d
      75           2 : }
      76             : 
      77             : // ReportDeletion is used to report a deletion to the pacer. The pacer uses it
      78             : // to keep track of the recent rate of deletions and potentially increase the
      79             : // deletion rate accordingly.
      80             : //
      81             : // ReportDeletion is thread-safe.
      82           2 : func (p *deletionPacer) ReportDeletion(now time.Time, bytesToDelete uint64) {
      83           2 :         p.mu.Lock()
      84           2 :         defer p.mu.Unlock()
      85           2 :         p.mu.history.Add(now, int64(bytesToDelete))
      86           2 : }
      87             : 
      88             : // PacingDelay returns the recommended pacing wait time (in seconds) for
      89             : // deleting the given number of bytes.
      90             : //
      91             : // PacingDelay is thread-safe.
      92           2 : func (p *deletionPacer) PacingDelay(now time.Time, bytesToDelete uint64) (waitSeconds float64) {
      93           2 :         if p.targetByteDeletionRate == 0 {
      94           2 :                 // Pacing disabled.
      95           2 :                 return 0.0
      96           2 :         }
      97             : 
      98           2 :         baseRate := float64(p.targetByteDeletionRate)
      99           2 :         // If recent deletion rate is more than our target, use that so that we don't
     100           2 :         // fall behind.
     101           2 :         historicRate := func() float64 {
     102           2 :                 p.mu.Lock()
     103           2 :                 defer p.mu.Unlock()
     104           2 :                 return float64(p.mu.history.Sum(now)) / deletePacerHistory.Seconds()
     105           2 :         }()
     106           2 :         if historicRate > baseRate {
     107           1 :                 baseRate = historicRate
     108           1 :         }
     109             : 
     110             :         // Apply heuristics to increase the deletion rate.
     111           2 :         var extraRate float64
     112           2 :         info := p.getInfo()
     113           2 :         if info.freeBytes <= p.freeSpaceThreshold {
     114           1 :                 // Increase the rate so that we can free up enough bytes within the timeframe.
     115           1 :                 extraRate = float64(p.freeSpaceThreshold-info.freeBytes) / p.freeSpaceTimeframe.Seconds()
     116           1 :         }
     117           2 :         if info.liveBytes == 0 {
     118           2 :                 // We don't know the obsolete bytes ratio. Disable pacing altogether.
     119           2 :                 return 0.0
     120           2 :         }
     121           2 :         obsoleteBytesRatio := float64(info.obsoleteBytes) / float64(info.liveBytes)
     122           2 :         if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio {
     123           2 :                 // Increase the rate so that we can free up enough bytes within the timeframe.
     124           2 :                 r := (obsoleteBytesRatio - p.obsoleteBytesMaxRatio) * float64(info.liveBytes) / p.obsoleteBytesTimeframe.Seconds()
     125           2 :                 if extraRate < r {
     126           2 :                         extraRate = r
     127           2 :                 }
     128             :         }
     129             : 
     130           2 :         return float64(bytesToDelete) / (baseRate + extraRate)
     131             : }
     132             : 
     133             : // history is a helper used to keep track of the recent history of a set of
     134             : // data points (in our case deleted bytes), at limited granularity.
     135             : // Specifically, we split the desired timeframe into 100 "epochs" and all times
     136             : // are effectively rounded down to the nearest epoch boundary.
     137             : type history struct {
     138             :         epochDuration time.Duration
     139             :         startTime     time.Time
     140             :         // currEpoch is the epoch of the most recent operation.
     141             :         currEpoch int64
     142             :         // val contains the recent epoch values.
     143             :         // val[currEpoch % historyEpochs] is the current epoch.
     144             :         // val[(currEpoch + 1) % historyEpochs] is the oldest epoch.
     145             :         val [historyEpochs]int64
     146             :         // sum is always equal to the sum of values in val.
     147             :         sum int64
     148             : }
     149             : 
     150             : const historyEpochs = 100
     151             : 
     152             : // Init the history helper to keep track of data over the given number of
     153             : // seconds.
     154           2 : func (h *history) Init(now time.Time, timeframe time.Duration) {
     155           2 :         *h = history{
     156           2 :                 epochDuration: timeframe / time.Duration(historyEpochs),
     157           2 :                 startTime:     now,
     158           2 :                 currEpoch:     0,
     159           2 :                 sum:           0,
     160           2 :         }
     161           2 : }
     162             : 
     163             : // Add adds a value for the current time.
     164           2 : func (h *history) Add(now time.Time, val int64) {
     165           2 :         h.advance(now)
     166           2 :         h.val[h.currEpoch%historyEpochs] += val
     167           2 :         h.sum += val
     168           2 : }
     169             : 
     170             : // Sum returns the sum of recent values. The result is approximate in that the
     171             : // cut-off time is within 1% of the exact one.
     172           2 : func (h *history) Sum(now time.Time) int64 {
     173           2 :         h.advance(now)
     174           2 :         return h.sum
     175           2 : }
     176             : 
     177           2 : func (h *history) epoch(t time.Time) int64 {
     178           2 :         return int64(t.Sub(h.startTime) / h.epochDuration)
     179           2 : }
     180             : 
     181             : // advance advances the time to the given time.
     182           2 : func (h *history) advance(now time.Time) {
     183           2 :         epoch := h.epoch(now)
     184           2 :         for h.currEpoch < epoch {
     185           2 :                 h.currEpoch++
     186           2 :                 // Forget the data for the oldest epoch.
     187           2 :                 h.sum -= h.val[h.currEpoch%historyEpochs]
     188           2 :                 h.val[h.currEpoch%historyEpochs] = 0
     189           2 :         }
     190             : }

Generated by: LCOV version 1.14