LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - provider.go (source / functions) Hit Total Coverage
Test: 2024-03-22 08:16Z e5c9f635 - tests + meta.lcov Lines: 254 306 83.0 %
Date: 2024-03-22 08:17:53 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "context"
      10             :         "io"
      11             :         "os"
      12             :         "slices"
      13             :         "sync"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/errors/oserror"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/invariants"
      19             :         "github.com/cockroachdb/pebble/objstorage"
      20             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      21             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
      22             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      23             :         "github.com/cockroachdb/pebble/objstorage/remote"
      24             :         "github.com/cockroachdb/pebble/vfs"
      25             : )
      26             : 
      27             : // provider is the implementation of objstorage.Provider.
      28             : type provider struct {
      29             :         st Settings
      30             : 
      31             :         fsDir vfs.File
      32             : 
      33             :         tracer *objiotracing.Tracer
      34             : 
      35             :         remote remoteSubsystem
      36             : 
      37             :         mu struct {
      38             :                 sync.RWMutex
      39             : 
      40             :                 remote remoteLockedState
      41             : 
      42             :                 // localObjectsChanged is set if non-remote objects were created or deleted
      43             :                 // but Sync was not yet called.
      44             :                 localObjectsChanged bool
      45             : 
      46             :                 // knownObjects maintains information about objects that are known to the provider.
      47             :                 // It is initialized with the list of files in the manifest when we open a DB.
      48             :                 knownObjects map[base.DiskFileNum]objstorage.ObjectMetadata
      49             : 
      50             :                 // protectedObjects are objects that cannot be unreferenced because they
      51             :                 // have outstanding SharedObjectBackingHandles. The value is a count of outstanding handles
      52             :                 protectedObjects map[base.DiskFileNum]int
      53             :         }
      54             : }
      55             : 
      56             : var _ objstorage.Provider = (*provider)(nil)
      57             : 
      58             : // Settings that must be specified when creating the provider.
      59             : type Settings struct {
      60             :         Logger base.Logger
      61             : 
      62             :         // Local filesystem configuration.
      63             :         FS        vfs.FS
      64             :         FSDirName string
      65             : 
      66             :         // FSDirInitialListing is a listing of FSDirName at the time of calling Open.
      67             :         //
      68             :         // This is an optional optimization to avoid double listing on Open when the
      69             :         // higher layer already has a listing. When nil, we obtain the listing on
      70             :         // Open.
      71             :         FSDirInitialListing []string
      72             : 
      73             :         // Cleaner cleans obsolete files from the local filesystem.
      74             :         //
      75             :         // The default cleaner uses the DeleteCleaner.
      76             :         FSCleaner base.Cleaner
      77             : 
      78             :         // NoSyncOnClose decides whether the implementation will enforce a
      79             :         // close-time synchronization (e.g., fdatasync() or sync_file_range())
      80             :         // on files it writes to. Setting this to true removes the guarantee for a
      81             :         // sync on close. Some implementations can still issue a non-blocking sync.
      82             :         NoSyncOnClose bool
      83             : 
      84             :         // BytesPerSync enables periodic syncing of files in order to smooth out
      85             :         // writes to disk. This option does not provide any persistence guarantee, but
      86             :         // is used to avoid latency spikes if the OS automatically decides to write
      87             :         // out a large chunk of dirty filesystem buffers.
      88             :         BytesPerSync int
      89             : 
      90             :         // Fields here are set only if the provider is to support remote objects
      91             :         // (experimental).
      92             :         Remote struct {
      93             :                 StorageFactory remote.StorageFactory
      94             : 
      95             :                 // If CreateOnShared is non-zero, sstables are created on remote storage using
      96             :                 // the CreateOnSharedLocator (when the PreferSharedStorage create option is
      97             :                 // true).
      98             :                 CreateOnShared        remote.CreateOnSharedStrategy
      99             :                 CreateOnSharedLocator remote.Locator
     100             : 
     101             :                 // CacheSizeBytes is the size of the on-disk block cache for objects
     102             :                 // on remote storage. If it is 0, no cache is used.
     103             :                 CacheSizeBytes int64
     104             : 
     105             :                 // CacheBlockSize is the block size of the cache; if 0, the default of 32KB is used.
     106             :                 CacheBlockSize int
     107             : 
     108             :                 // ShardingBlockSize is the size of a shard block. The cache is split into contiguous
     109             :                 // ShardingBlockSize units. The units are distributed across multiple independent shards
     110             :                 // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
     111             :                 // policies operate at the level of shard, not whole cache. This is done to reduce lock
     112             :                 // contention.
     113             :                 //
     114             :                 // If ShardingBlockSize is 0, the default of 1 MB is used.
     115             :                 ShardingBlockSize int64
     116             : 
     117             :                 // The number of independent shards the cache leverages. Each shard is the same size,
     118             :                 // and a hash of filenum & offset map a read to a certain shard. If set to 0,
     119             :                 // 2*runtime.GOMAXPROCS is used as the shard count.
     120             :                 CacheShardCount int
     121             : 
     122             :                 // TODO(radu): allow the cache to live on another FS/location (e.g. to use
     123             :                 // instance-local SSD).
     124             :         }
     125             : }
     126             : 
     127             : // DefaultSettings initializes default settings (with no remote storage),
     128             : // suitable for tests and tools.
     129           1 : func DefaultSettings(fs vfs.FS, dirName string) Settings {
     130           1 :         return Settings{
     131           1 :                 Logger:        base.DefaultLogger,
     132           1 :                 FS:            fs,
     133           1 :                 FSDirName:     dirName,
     134           1 :                 FSCleaner:     base.DeleteCleaner{},
     135           1 :                 NoSyncOnClose: false,
     136           1 :                 BytesPerSync:  512 * 1024, // 512KB
     137           1 :         }
     138           1 : }
     139             : 
     140             : // Open creates the provider.
     141           2 : func Open(settings Settings) (objstorage.Provider, error) {
     142           2 :         // Note: we can't just `return open(settings)` because in an error case we
     143           2 :         // would return (*provider)(nil) which is not objstorage.Provider(nil).
     144           2 :         p, err := open(settings)
     145           2 :         if err != nil {
     146           1 :                 return nil, err
     147           1 :         }
     148           2 :         return p, nil
     149             : }
     150             : 
     151           2 : func open(settings Settings) (p *provider, _ error) {
     152           2 :         fsDir, err := settings.FS.OpenDir(settings.FSDirName)
     153           2 :         if err != nil {
     154           1 :                 return nil, err
     155           1 :         }
     156             : 
     157           2 :         defer func() {
     158           2 :                 if p == nil {
     159           0 :                         fsDir.Close()
     160           0 :                 }
     161             :         }()
     162             : 
     163           2 :         p = &provider{
     164           2 :                 st:    settings,
     165           2 :                 fsDir: fsDir,
     166           2 :         }
     167           2 :         p.mu.knownObjects = make(map[base.DiskFileNum]objstorage.ObjectMetadata)
     168           2 :         p.mu.protectedObjects = make(map[base.DiskFileNum]int)
     169           2 : 
     170           2 :         if objiotracing.Enabled {
     171           0 :                 p.tracer = objiotracing.Open(settings.FS, settings.FSDirName)
     172           0 :         }
     173             : 
     174             :         // Add local FS objects.
     175           2 :         if err := p.vfsInit(); err != nil {
     176           0 :                 return nil, err
     177           0 :         }
     178             : 
     179             :         // Initialize remote subsystem (if configured) and add remote objects.
     180           2 :         if err := p.remoteInit(); err != nil {
     181           0 :                 return nil, err
     182           0 :         }
     183             : 
     184           2 :         return p, nil
     185             : }
     186             : 
     187             : // Close is part of the objstorage.Provider interface.
     188           2 : func (p *provider) Close() error {
     189           2 :         err := p.sharedClose()
     190           2 :         if p.fsDir != nil {
     191           2 :                 err = firstError(err, p.fsDir.Close())
     192           2 :                 p.fsDir = nil
     193           2 :         }
     194           2 :         if objiotracing.Enabled {
     195           0 :                 if p.tracer != nil {
     196           0 :                         p.tracer.Close()
     197           0 :                         p.tracer = nil
     198           0 :                 }
     199             :         }
     200           2 :         return err
     201             : }
     202             : 
     203             : // OpenForReading opens an existing object.
     204             : func (p *provider) OpenForReading(
     205             :         ctx context.Context,
     206             :         fileType base.FileType,
     207             :         fileNum base.DiskFileNum,
     208             :         opts objstorage.OpenOptions,
     209           2 : ) (objstorage.Readable, error) {
     210           2 :         meta, err := p.Lookup(fileType, fileNum)
     211           2 :         if err != nil {
     212           1 :                 if opts.MustExist {
     213           0 :                         p.st.Logger.Fatalf("%v", err)
     214           0 :                 }
     215           1 :                 return nil, err
     216             :         }
     217             : 
     218           2 :         var r objstorage.Readable
     219           2 :         if !meta.IsRemote() {
     220           2 :                 r, err = p.vfsOpenForReading(ctx, fileType, fileNum, opts)
     221           2 :         } else {
     222           2 :                 r, err = p.remoteOpenForReading(ctx, meta, opts)
     223           2 :                 if err != nil && p.isNotExistError(meta, err) {
     224           1 :                         // Wrap the error so that IsNotExistError functions properly.
     225           1 :                         err = errors.Mark(err, os.ErrNotExist)
     226           1 :                 }
     227             :         }
     228           2 :         if err != nil {
     229           1 :                 return nil, err
     230           1 :         }
     231           2 :         if objiotracing.Enabled {
     232           0 :                 r = p.tracer.WrapReadable(ctx, r, fileNum)
     233           0 :         }
     234           2 :         return r, nil
     235             : }
     236             : 
     237             : // Create creates a new object and opens it for writing.
     238             : //
     239             : // The object is not guaranteed to be durable (accessible in case of crashes)
     240             : // until Sync is called.
     241             : func (p *provider) Create(
     242             :         ctx context.Context,
     243             :         fileType base.FileType,
     244             :         fileNum base.DiskFileNum,
     245             :         opts objstorage.CreateOptions,
     246           2 : ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
     247           2 :         if opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone {
     248           2 :                 w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts)
     249           2 :         } else {
     250           2 :                 w, meta, err = p.vfsCreate(ctx, fileType, fileNum)
     251           2 :         }
     252           2 :         if err != nil {
     253           1 :                 err = errors.Wrapf(err, "creating object %s", fileNum)
     254           1 :                 return nil, objstorage.ObjectMetadata{}, err
     255           1 :         }
     256           2 :         p.addMetadata(meta)
     257           2 :         if objiotracing.Enabled {
     258           0 :                 w = p.tracer.WrapWritable(ctx, w, fileNum)
     259           0 :         }
     260           2 :         return w, meta, nil
     261             : }
     262             : 
     263             : // Remove removes an object.
     264             : //
     265             : // Note that if the object is remote, the object is only (conceptually) removed
     266             : // from this provider. If other providers have references on the remote object,
     267             : // it will not be removed.
     268             : //
     269             : // The object is not guaranteed to be durably removed until Sync is called.
     270           2 : func (p *provider) Remove(fileType base.FileType, fileNum base.DiskFileNum) error {
     271           2 :         meta, err := p.Lookup(fileType, fileNum)
     272           2 :         if err != nil {
     273           1 :                 return err
     274           1 :         }
     275             : 
     276           2 :         if !meta.IsRemote() {
     277           2 :                 err = p.vfsRemove(fileType, fileNum)
     278           2 :         } else {
     279           2 :                 // TODO(radu): implement remote object removal (i.e. deref).
     280           2 :                 err = p.sharedUnref(meta)
     281           2 :                 if err != nil && p.isNotExistError(meta, err) {
     282           0 :                         // Wrap the error so that IsNotExistError functions properly.
     283           0 :                         err = errors.Mark(err, os.ErrNotExist)
     284           0 :                 }
     285             :         }
     286           2 :         if err != nil && !p.IsNotExistError(err) {
     287           1 :                 // We want to be able to retry a Remove, so we keep the object in our list.
     288           1 :                 // TODO(radu): we should mark the object as "zombie" and not allow any other
     289           1 :                 // operations.
     290           1 :                 return errors.Wrapf(err, "removing object %s", fileNum)
     291           1 :         }
     292             : 
     293           2 :         p.removeMetadata(fileNum)
     294           2 :         return err
     295             : }
     296             : 
     297           1 : func (p *provider) isNotExistError(meta objstorage.ObjectMetadata, err error) bool {
     298           1 :         if meta.Remote.Storage != nil {
     299           1 :                 return meta.Remote.Storage.IsNotExistError(err)
     300           1 :         }
     301           0 :         return oserror.IsNotExist(err)
     302             : }
     303             : 
     304             : // IsNotExistError is part of the objstorage.Provider interface.
     305           2 : func (p *provider) IsNotExistError(err error) bool {
     306           2 :         // We use errors.Mark(err, os.ErrNotExist) for not-exist errors coming from
     307           2 :         // remote.Storage.
     308           2 :         return oserror.IsNotExist(err)
     309           2 : }
     310             : 
     311             : // Sync flushes the metadata from creation or removal of objects since the last Sync.
     312           2 : func (p *provider) Sync() error {
     313           2 :         if err := p.vfsSync(); err != nil {
     314           1 :                 return err
     315           1 :         }
     316           2 :         if err := p.sharedSync(); err != nil {
     317           0 :                 return err
     318           0 :         }
     319           2 :         return nil
     320             : }
     321             : 
     322             : // LinkOrCopyFromLocal creates a new object that is either a copy of a given
     323             : // local file or a hard link (if the new object is created on the same FS, and
     324             : // if the FS supports it).
     325             : //
     326             : // The object is not guaranteed to be durable (accessible in case of crashes)
     327             : // until Sync is called.
     328             : func (p *provider) LinkOrCopyFromLocal(
     329             :         ctx context.Context,
     330             :         srcFS vfs.FS,
     331             :         srcFilePath string,
     332             :         dstFileType base.FileType,
     333             :         dstFileNum base.DiskFileNum,
     334             :         opts objstorage.CreateOptions,
     335           2 : ) (objstorage.ObjectMetadata, error) {
     336           2 :         shared := opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone
     337           2 :         if !shared && srcFS == p.st.FS {
     338           2 :                 // Wrap the normal filesystem with one which wraps newly created files with
     339           2 :                 // vfs.NewSyncingFile.
     340           2 :                 fs := vfs.NewSyncingFS(p.st.FS, vfs.SyncingFileOptions{
     341           2 :                         NoSyncOnClose: p.st.NoSyncOnClose,
     342           2 :                         BytesPerSync:  p.st.BytesPerSync,
     343           2 :                 })
     344           2 :                 dstPath := p.vfsPath(dstFileType, dstFileNum)
     345           2 :                 if err := vfs.LinkOrCopy(fs, srcFilePath, dstPath); err != nil {
     346           1 :                         return objstorage.ObjectMetadata{}, err
     347           1 :                 }
     348             : 
     349           2 :                 meta := objstorage.ObjectMetadata{
     350           2 :                         DiskFileNum: dstFileNum,
     351           2 :                         FileType:    dstFileType,
     352           2 :                 }
     353           2 :                 p.addMetadata(meta)
     354           2 :                 return meta, nil
     355             :         }
     356             :         // Create the object and copy the data.
     357           2 :         w, meta, err := p.Create(ctx, dstFileType, dstFileNum, opts)
     358           2 :         if err != nil {
     359           0 :                 return objstorage.ObjectMetadata{}, err
     360           0 :         }
     361           2 :         f, err := srcFS.Open(srcFilePath, vfs.SequentialReadsOption)
     362           2 :         if err != nil {
     363           0 :                 return objstorage.ObjectMetadata{}, err
     364           0 :         }
     365           2 :         defer f.Close()
     366           2 :         buf := make([]byte, 64*1024)
     367           2 :         for {
     368           2 :                 n, readErr := f.Read(buf)
     369           2 :                 if readErr != nil && readErr != io.EOF {
     370           0 :                         w.Abort()
     371           0 :                         return objstorage.ObjectMetadata{}, readErr
     372           0 :                 }
     373             : 
     374           2 :                 if n > 0 {
     375           2 :                         if err := w.Write(buf[:n]); err != nil {
     376           0 :                                 w.Abort()
     377           0 :                                 return objstorage.ObjectMetadata{}, err
     378           0 :                         }
     379             :                 }
     380             : 
     381           2 :                 if readErr == io.EOF {
     382           2 :                         break
     383             :                 }
     384             :         }
     385           2 :         if err := w.Finish(); err != nil {
     386           0 :                 return objstorage.ObjectMetadata{}, err
     387           0 :         }
     388           2 :         return meta, nil
     389             : }
     390             : 
     391             : // Lookup is part of the objstorage.Provider interface.
     392             : func (p *provider) Lookup(
     393             :         fileType base.FileType, fileNum base.DiskFileNum,
     394           2 : ) (objstorage.ObjectMetadata, error) {
     395           2 :         p.mu.RLock()
     396           2 :         defer p.mu.RUnlock()
     397           2 :         meta, ok := p.mu.knownObjects[fileNum]
     398           2 :         if !ok {
     399           1 :                 return objstorage.ObjectMetadata{}, errors.Wrapf(
     400           1 :                         os.ErrNotExist,
     401           1 :                         "file %s (type %d) unknown to the objstorage provider",
     402           1 :                         fileNum, errors.Safe(fileType),
     403           1 :                 )
     404           1 :         }
     405           2 :         if meta.FileType != fileType {
     406           0 :                 return objstorage.ObjectMetadata{}, errors.AssertionFailedf(
     407           0 :                         "file %s type mismatch (known type %d, expected type %d)",
     408           0 :                         fileNum, errors.Safe(meta.FileType), errors.Safe(fileType),
     409           0 :                 )
     410           0 :         }
     411           2 :         return meta, nil
     412             : }
     413             : 
     414             : // Path is part of the objstorage.Provider interface.
     415           2 : func (p *provider) Path(meta objstorage.ObjectMetadata) string {
     416           2 :         if !meta.IsRemote() {
     417           2 :                 return p.vfsPath(meta.FileType, meta.DiskFileNum)
     418           2 :         }
     419           2 :         return p.remotePath(meta)
     420             : }
     421             : 
     422             : // Size returns the size of the object.
     423           2 : func (p *provider) Size(meta objstorage.ObjectMetadata) (int64, error) {
     424           2 :         if !meta.IsRemote() {
     425           2 :                 return p.vfsSize(meta.FileType, meta.DiskFileNum)
     426           2 :         }
     427           2 :         return p.remoteSize(meta)
     428             : }
     429             : 
     430             : // List is part of the objstorage.Provider interface.
     431           2 : func (p *provider) List() []objstorage.ObjectMetadata {
     432           2 :         p.mu.RLock()
     433           2 :         defer p.mu.RUnlock()
     434           2 :         res := make([]objstorage.ObjectMetadata, 0, len(p.mu.knownObjects))
     435           2 :         for _, meta := range p.mu.knownObjects {
     436           2 :                 res = append(res, meta)
     437           2 :         }
     438           2 :         slices.SortFunc(res, func(a, b objstorage.ObjectMetadata) int {
     439           2 :                 return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
     440           2 :         })
     441           2 :         return res
     442             : }
     443             : 
     444             : // Metrics is part of the objstorage.Provider interface.
     445           2 : func (p *provider) Metrics() sharedcache.Metrics {
     446           2 :         if p.remote.cache != nil {
     447           2 :                 return p.remote.cache.Metrics()
     448           2 :         }
     449           2 :         return sharedcache.Metrics{}
     450             : }
     451             : 
     452             : // CheckpointState is part of the objstorage.Provider interface.
     453             : func (p *provider) CheckpointState(
     454             :         fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum,
     455           1 : ) error {
     456           1 :         p.mu.Lock()
     457           1 :         defer p.mu.Unlock()
     458           1 :         for i := range fileNums {
     459           1 :                 if _, ok := p.mu.knownObjects[fileNums[i]]; !ok {
     460           0 :                         return errors.Wrapf(
     461           0 :                                 os.ErrNotExist,
     462           0 :                                 "file %s (type %d) unknown to the objstorage provider",
     463           0 :                                 fileNums[i], errors.Safe(fileType),
     464           0 :                         )
     465           0 :                 }
     466             :                 // Prevent this object from deletion, at least for the life of this instance.
     467           1 :                 p.mu.protectedObjects[fileNums[i]] = p.mu.protectedObjects[fileNums[i]] + 1
     468             :         }
     469             : 
     470           1 :         if p.remote.catalog != nil {
     471           1 :                 return p.remote.catalog.Checkpoint(fs, dir)
     472           1 :         }
     473           0 :         return nil
     474             : }
     475             : 
     476           2 : func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
     477           2 :         p.mu.Lock()
     478           2 :         defer p.mu.Unlock()
     479           2 :         p.addMetadataLocked(meta)
     480           2 : }
     481             : 
     482           2 : func (p *provider) addMetadataLocked(meta objstorage.ObjectMetadata) {
     483           2 :         if invariants.Enabled {
     484           2 :                 meta.AssertValid()
     485           2 :         }
     486           2 :         p.mu.knownObjects[meta.DiskFileNum] = meta
     487           2 :         if meta.IsRemote() {
     488           2 :                 p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
     489           2 :                         FileNum:          meta.DiskFileNum,
     490           2 :                         FileType:         meta.FileType,
     491           2 :                         CreatorID:        meta.Remote.CreatorID,
     492           2 :                         CreatorFileNum:   meta.Remote.CreatorFileNum,
     493           2 :                         Locator:          meta.Remote.Locator,
     494           2 :                         CleanupMethod:    meta.Remote.CleanupMethod,
     495           2 :                         CustomObjectName: meta.Remote.CustomObjectName,
     496           2 :                 })
     497           2 :                 if meta.IsExternal() {
     498           2 :                         p.mu.remote.addExternalObject(meta)
     499           2 :                 }
     500           2 :         } else {
     501           2 :                 p.mu.localObjectsChanged = true
     502           2 :         }
     503             : }
     504             : 
     505           2 : func (p *provider) removeMetadata(fileNum base.DiskFileNum) {
     506           2 :         p.mu.Lock()
     507           2 :         defer p.mu.Unlock()
     508           2 : 
     509           2 :         meta, ok := p.mu.knownObjects[fileNum]
     510           2 :         if !ok {
     511           0 :                 return
     512           0 :         }
     513           2 :         delete(p.mu.knownObjects, fileNum)
     514           2 :         if meta.IsExternal() {
     515           2 :                 p.mu.remote.removeExternalObject(meta)
     516           2 :         }
     517           2 :         if meta.IsRemote() {
     518           2 :                 p.mu.remote.catalogBatch.DeleteObject(fileNum)
     519           2 :         } else {
     520           2 :                 p.mu.localObjectsChanged = true
     521           2 :         }
     522             : }
     523             : 
     524             : // protectObject prevents the unreferencing of a remote object until
     525             : // unprotectObject is called.
     526           2 : func (p *provider) protectObject(fileNum base.DiskFileNum) {
     527           2 :         p.mu.Lock()
     528           2 :         defer p.mu.Unlock()
     529           2 :         p.mu.protectedObjects[fileNum] = p.mu.protectedObjects[fileNum] + 1
     530           2 : }
     531             : 
     532           1 : func (p *provider) unprotectObject(fileNum base.DiskFileNum) {
     533           1 :         p.mu.Lock()
     534           1 :         defer p.mu.Unlock()
     535           1 :         v := p.mu.protectedObjects[fileNum]
     536           1 :         if invariants.Enabled && v == 0 {
     537           0 :                 panic("invalid protection count")
     538             :         }
     539           1 :         if v > 1 {
     540           0 :                 p.mu.protectedObjects[fileNum] = v - 1
     541           1 :         } else {
     542           1 :                 delete(p.mu.protectedObjects, fileNum)
     543           1 :                 // TODO(radu): check if the object is still in knownObject; if not, unref it
     544           1 :                 // now.
     545           1 :         }
     546             : }
     547             : 
     548           2 : func (p *provider) isProtected(fileNum base.DiskFileNum) bool {
     549           2 :         p.mu.Lock()
     550           2 :         defer p.mu.Unlock()
     551           2 :         return p.mu.protectedObjects[fileNum] > 0
     552           2 : }

Generated by: LCOV version 1.14