LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - provider.go (source / functions) Hit Total Coverage
Test: 2024-04-25 08:15Z 16950aa3 - tests + meta.lcov Lines: 259 311 83.3 %
Date: 2024-04-25 08:16:48 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "context"
      10             :         "io"
      11             :         "os"
      12             :         "slices"
      13             :         "sync"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/errors/oserror"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/invariants"
      19             :         "github.com/cockroachdb/pebble/objstorage"
      20             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      21             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
      22             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      23             :         "github.com/cockroachdb/pebble/objstorage/remote"
      24             :         "github.com/cockroachdb/pebble/vfs"
      25             : )
      26             : 
      27             : // provider is the implementation of objstorage.Provider.
      28             : type provider struct {
      29             :         st Settings
      30             : 
      31             :         fsDir vfs.File
      32             : 
      33             :         tracer *objiotracing.Tracer
      34             : 
      35             :         remote remoteSubsystem
      36             : 
      37             :         mu struct {
      38             :                 sync.RWMutex
      39             : 
      40             :                 remote remoteLockedState
      41             : 
      42             :                 // localObjectsChanged is set if non-remote objects were created or deleted
      43             :                 // but Sync was not yet called.
      44             :                 localObjectsChanged bool
      45             : 
      46             :                 // knownObjects maintains information about objects that are known to the provider.
      47             :                 // It is initialized with the list of files in the manifest when we open a DB.
      48             :                 knownObjects map[base.DiskFileNum]objstorage.ObjectMetadata
      49             : 
      50             :                 // protectedObjects are objects that cannot be unreferenced because they
      51             :                 // have outstanding SharedObjectBackingHandles. The value is a count of outstanding handles
      52             :                 protectedObjects map[base.DiskFileNum]int
      53             :         }
      54             : }
      55             : 
      56             : var _ objstorage.Provider = (*provider)(nil)
      57             : 
      58             : // Settings that must be specified when creating the provider.
      59             : type Settings struct {
      60             :         Logger base.Logger
      61             : 
      62             :         // Local filesystem configuration.
      63             :         FS        vfs.FS
      64             :         FSDirName string
      65             : 
      66             :         // FSDirInitialListing is a listing of FSDirName at the time of calling Open.
      67             :         //
      68             :         // This is an optional optimization to avoid double listing on Open when the
      69             :         // higher layer already has a listing. When nil, we obtain the listing on
      70             :         // Open.
      71             :         FSDirInitialListing []string
      72             : 
      73             :         // Cleaner cleans obsolete files from the local filesystem.
      74             :         //
      75             :         // The default cleaner uses the DeleteCleaner.
      76             :         FSCleaner base.Cleaner
      77             : 
      78             :         // NoSyncOnClose decides whether the implementation will enforce a
      79             :         // close-time synchronization (e.g., fdatasync() or sync_file_range())
      80             :         // on files it writes to. Setting this to true removes the guarantee for a
      81             :         // sync on close. Some implementations can still issue a non-blocking sync.
      82             :         NoSyncOnClose bool
      83             : 
      84             :         // BytesPerSync enables periodic syncing of files in order to smooth out
      85             :         // writes to disk. This option does not provide any persistence guarantee, but
      86             :         // is used to avoid latency spikes if the OS automatically decides to write
      87             :         // out a large chunk of dirty filesystem buffers.
      88             :         BytesPerSync int
      89             : 
      90             :         // Fields here are set only if the provider is to support remote objects
      91             :         // (experimental).
      92             :         Remote struct {
      93             :                 StorageFactory remote.StorageFactory
      94             : 
      95             :                 // If CreateOnShared is non-zero, sstables are created on remote storage using
      96             :                 // the CreateOnSharedLocator (when the PreferSharedStorage create option is
      97             :                 // true).
      98             :                 CreateOnShared        remote.CreateOnSharedStrategy
      99             :                 CreateOnSharedLocator remote.Locator
     100             : 
     101             :                 // CacheSizeBytes is the size of the on-disk block cache for objects
     102             :                 // on remote storage. If it is 0, no cache is used.
     103             :                 CacheSizeBytes int64
     104             : 
     105             :                 // CacheBlockSize is the block size of the cache; if 0, the default of 32KB is used.
     106             :                 CacheBlockSize int
     107             : 
     108             :                 // ShardingBlockSize is the size of a shard block. The cache is split into contiguous
     109             :                 // ShardingBlockSize units. The units are distributed across multiple independent shards
     110             :                 // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
     111             :                 // policies operate at the level of shard, not whole cache. This is done to reduce lock
     112             :                 // contention.
     113             :                 //
     114             :                 // If ShardingBlockSize is 0, the default of 1 MB is used.
     115             :                 ShardingBlockSize int64
     116             : 
     117             :                 // The number of independent shards the cache leverages. Each shard is the same size,
     118             :                 // and a hash of filenum & offset map a read to a certain shard. If set to 0,
     119             :                 // 2*runtime.GOMAXPROCS is used as the shard count.
     120             :                 CacheShardCount int
     121             : 
     122             :                 // TODO(radu): allow the cache to live on another FS/location (e.g. to use
     123             :                 // instance-local SSD).
     124             :         }
     125             : }
     126             : 
     127             : // DefaultSettings initializes default settings (with no remote storage),
     128             : // suitable for tests and tools.
     129           1 : func DefaultSettings(fs vfs.FS, dirName string) Settings {
     130           1 :         return Settings{
     131           1 :                 Logger:        base.DefaultLogger,
     132           1 :                 FS:            fs,
     133           1 :                 FSDirName:     dirName,
     134           1 :                 FSCleaner:     base.DeleteCleaner{},
     135           1 :                 NoSyncOnClose: false,
     136           1 :                 BytesPerSync:  512 * 1024, // 512KB
     137           1 :         }
     138           1 : }
     139             : 
     140             : // Open creates the provider.
     141           2 : func Open(settings Settings) (objstorage.Provider, error) {
     142           2 :         // Note: we can't just `return open(settings)` because in an error case we
     143           2 :         // would return (*provider)(nil) which is not objstorage.Provider(nil).
     144           2 :         p, err := open(settings)
     145           2 :         if err != nil {
     146           1 :                 return nil, err
     147           1 :         }
     148           2 :         return p, nil
     149             : }
     150             : 
     151           2 : func open(settings Settings) (p *provider, _ error) {
     152           2 :         fsDir, err := settings.FS.OpenDir(settings.FSDirName)
     153           2 :         if err != nil {
     154           1 :                 return nil, err
     155           1 :         }
     156             : 
     157           2 :         defer func() {
     158           2 :                 if p == nil {
     159           0 :                         fsDir.Close()
     160           0 :                 }
     161             :         }()
     162             : 
     163           2 :         p = &provider{
     164           2 :                 st:    settings,
     165           2 :                 fsDir: fsDir,
     166           2 :         }
     167           2 :         p.mu.knownObjects = make(map[base.DiskFileNum]objstorage.ObjectMetadata)
     168           2 :         p.mu.protectedObjects = make(map[base.DiskFileNum]int)
     169           2 : 
     170           2 :         if objiotracing.Enabled {
     171           0 :                 p.tracer = objiotracing.Open(settings.FS, settings.FSDirName)
     172           0 :         }
     173             : 
     174             :         // Add local FS objects.
     175           2 :         if err := p.vfsInit(); err != nil {
     176           0 :                 return nil, err
     177           0 :         }
     178             : 
     179             :         // Initialize remote subsystem (if configured) and add remote objects.
     180           2 :         if err := p.remoteInit(); err != nil {
     181           0 :                 return nil, err
     182           0 :         }
     183             : 
     184           2 :         return p, nil
     185             : }
     186             : 
     187             : // Close is part of the objstorage.Provider interface.
     188           2 : func (p *provider) Close() error {
     189           2 :         err := p.sharedClose()
     190           2 :         if p.fsDir != nil {
     191           2 :                 err = firstError(err, p.fsDir.Close())
     192           2 :                 p.fsDir = nil
     193           2 :         }
     194           2 :         if objiotracing.Enabled {
     195           0 :                 if p.tracer != nil {
     196           0 :                         p.tracer.Close()
     197           0 :                         p.tracer = nil
     198           0 :                 }
     199             :         }
     200           2 :         return err
     201             : }
     202             : 
     203             : // OpenForReading opens an existing object.
     204             : func (p *provider) OpenForReading(
     205             :         ctx context.Context,
     206             :         fileType base.FileType,
     207             :         fileNum base.DiskFileNum,
     208             :         opts objstorage.OpenOptions,
     209           2 : ) (objstorage.Readable, error) {
     210           2 :         meta, err := p.Lookup(fileType, fileNum)
     211           2 :         if err != nil {
     212           1 :                 if opts.MustExist {
     213           0 :                         p.st.Logger.Fatalf("%v", err)
     214           0 :                 }
     215           1 :                 return nil, err
     216             :         }
     217             : 
     218           2 :         var r objstorage.Readable
     219           2 :         if !meta.IsRemote() {
     220           2 :                 r, err = p.vfsOpenForReading(ctx, fileType, fileNum, opts)
     221           2 :         } else {
     222           2 :                 r, err = p.remoteOpenForReading(ctx, meta, opts)
     223           2 :                 if err != nil && p.isNotExistError(meta, err) {
     224           1 :                         // Wrap the error so that IsNotExistError functions properly.
     225           1 :                         err = errors.Mark(err, os.ErrNotExist)
     226           1 :                 }
     227             :         }
     228           2 :         if err != nil {
     229           1 :                 return nil, err
     230           1 :         }
     231           2 :         if objiotracing.Enabled {
     232           0 :                 r = p.tracer.WrapReadable(ctx, r, fileNum)
     233           0 :         }
     234           2 :         return r, nil
     235             : }
     236             : 
     237             : // Create creates a new object and opens it for writing.
     238             : //
     239             : // The object is not guaranteed to be durable (accessible in case of crashes)
     240             : // until Sync is called.
     241             : func (p *provider) Create(
     242             :         ctx context.Context,
     243             :         fileType base.FileType,
     244             :         fileNum base.DiskFileNum,
     245             :         opts objstorage.CreateOptions,
     246           2 : ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
     247           2 :         if opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone {
     248           2 :                 w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts)
     249           2 :         } else {
     250           2 :                 var category vfs.DiskWriteCategory
     251           2 :                 if opts.WriteCategory != "" {
     252           2 :                         category = opts.WriteCategory
     253           2 :                 } else {
     254           2 :                         category = vfs.WriteCategoryUnspecified
     255           2 :                 }
     256           2 :                 w, meta, err = p.vfsCreate(ctx, fileType, fileNum, category)
     257             :         }
     258           2 :         if err != nil {
     259           1 :                 err = errors.Wrapf(err, "creating object %s", fileNum)
     260           1 :                 return nil, objstorage.ObjectMetadata{}, err
     261           1 :         }
     262           2 :         p.addMetadata(meta)
     263           2 :         if objiotracing.Enabled {
     264           0 :                 w = p.tracer.WrapWritable(ctx, w, fileNum)
     265           0 :         }
     266           2 :         return w, meta, nil
     267             : }
     268             : 
     269             : // Remove removes an object.
     270             : //
     271             : // Note that if the object is remote, the object is only (conceptually) removed
     272             : // from this provider. If other providers have references on the remote object,
     273             : // it will not be removed.
     274             : //
     275             : // The object is not guaranteed to be durably removed until Sync is called.
     276           2 : func (p *provider) Remove(fileType base.FileType, fileNum base.DiskFileNum) error {
     277           2 :         meta, err := p.Lookup(fileType, fileNum)
     278           2 :         if err != nil {
     279           1 :                 return err
     280           1 :         }
     281             : 
     282           2 :         if !meta.IsRemote() {
     283           2 :                 err = p.vfsRemove(fileType, fileNum)
     284           2 :         } else {
     285           2 :                 // TODO(radu): implement remote object removal (i.e. deref).
     286           2 :                 err = p.sharedUnref(meta)
     287           2 :                 if err != nil && p.isNotExistError(meta, err) {
     288           0 :                         // Wrap the error so that IsNotExistError functions properly.
     289           0 :                         err = errors.Mark(err, os.ErrNotExist)
     290           0 :                 }
     291             :         }
     292           2 :         if err != nil && !p.IsNotExistError(err) {
     293           1 :                 // We want to be able to retry a Remove, so we keep the object in our list.
     294           1 :                 // TODO(radu): we should mark the object as "zombie" and not allow any other
     295           1 :                 // operations.
     296           1 :                 return errors.Wrapf(err, "removing object %s", fileNum)
     297           1 :         }
     298             : 
     299           2 :         p.removeMetadata(fileNum)
     300           2 :         return err
     301             : }
     302             : 
     303           1 : func (p *provider) isNotExistError(meta objstorage.ObjectMetadata, err error) bool {
     304           1 :         if meta.Remote.Storage != nil {
     305           1 :                 return meta.Remote.Storage.IsNotExistError(err)
     306           1 :         }
     307           0 :         return oserror.IsNotExist(err)
     308             : }
     309             : 
     310             : // IsNotExistError is part of the objstorage.Provider interface.
     311           2 : func (p *provider) IsNotExistError(err error) bool {
     312           2 :         // We use errors.Mark(err, os.ErrNotExist) for not-exist errors coming from
     313           2 :         // remote.Storage.
     314           2 :         return oserror.IsNotExist(err)
     315           2 : }
     316             : 
     317             : // Sync flushes the metadata from creation or removal of objects since the last Sync.
     318           2 : func (p *provider) Sync() error {
     319           2 :         if err := p.vfsSync(); err != nil {
     320           1 :                 return err
     321           1 :         }
     322           2 :         if err := p.sharedSync(); err != nil {
     323           0 :                 return err
     324           0 :         }
     325           2 :         return nil
     326             : }
     327             : 
     328             : // LinkOrCopyFromLocal creates a new object that is either a copy of a given
     329             : // local file or a hard link (if the new object is created on the same FS, and
     330             : // if the FS supports it).
     331             : //
     332             : // The object is not guaranteed to be durable (accessible in case of crashes)
     333             : // until Sync is called.
     334             : func (p *provider) LinkOrCopyFromLocal(
     335             :         ctx context.Context,
     336             :         srcFS vfs.FS,
     337             :         srcFilePath string,
     338             :         dstFileType base.FileType,
     339             :         dstFileNum base.DiskFileNum,
     340             :         opts objstorage.CreateOptions,
     341           2 : ) (objstorage.ObjectMetadata, error) {
     342           2 :         shared := opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone
     343           2 :         if !shared && srcFS == p.st.FS {
     344           2 :                 // Wrap the normal filesystem with one which wraps newly created files with
     345           2 :                 // vfs.NewSyncingFile.
     346           2 :                 fs := vfs.NewSyncingFS(p.st.FS, vfs.SyncingFileOptions{
     347           2 :                         NoSyncOnClose: p.st.NoSyncOnClose,
     348           2 :                         BytesPerSync:  p.st.BytesPerSync,
     349           2 :                 })
     350           2 :                 dstPath := p.vfsPath(dstFileType, dstFileNum)
     351           2 :                 if err := vfs.LinkOrCopy(fs, srcFilePath, dstPath); err != nil {
     352           1 :                         return objstorage.ObjectMetadata{}, err
     353           1 :                 }
     354             : 
     355           2 :                 meta := objstorage.ObjectMetadata{
     356           2 :                         DiskFileNum: dstFileNum,
     357           2 :                         FileType:    dstFileType,
     358           2 :                 }
     359           2 :                 p.addMetadata(meta)
     360           2 :                 return meta, nil
     361             :         }
     362             :         // Create the object and copy the data.
     363           2 :         w, meta, err := p.Create(ctx, dstFileType, dstFileNum, opts)
     364           2 :         if err != nil {
     365           0 :                 return objstorage.ObjectMetadata{}, err
     366           0 :         }
     367           2 :         f, err := srcFS.Open(srcFilePath, vfs.SequentialReadsOption)
     368           2 :         if err != nil {
     369           0 :                 return objstorage.ObjectMetadata{}, err
     370           0 :         }
     371           2 :         defer f.Close()
     372           2 :         buf := make([]byte, 64*1024)
     373           2 :         for {
     374           2 :                 n, readErr := f.Read(buf)
     375           2 :                 if readErr != nil && readErr != io.EOF {
     376           0 :                         w.Abort()
     377           0 :                         return objstorage.ObjectMetadata{}, readErr
     378           0 :                 }
     379             : 
     380           2 :                 if n > 0 {
     381           2 :                         if err := w.Write(buf[:n]); err != nil {
     382           0 :                                 w.Abort()
     383           0 :                                 return objstorage.ObjectMetadata{}, err
     384           0 :                         }
     385             :                 }
     386             : 
     387           2 :                 if readErr == io.EOF {
     388           2 :                         break
     389             :                 }
     390             :         }
     391           2 :         if err := w.Finish(); err != nil {
     392           0 :                 return objstorage.ObjectMetadata{}, err
     393           0 :         }
     394           2 :         return meta, nil
     395             : }
     396             : 
     397             : // Lookup is part of the objstorage.Provider interface.
     398             : func (p *provider) Lookup(
     399             :         fileType base.FileType, fileNum base.DiskFileNum,
     400           2 : ) (objstorage.ObjectMetadata, error) {
     401           2 :         p.mu.RLock()
     402           2 :         defer p.mu.RUnlock()
     403           2 :         meta, ok := p.mu.knownObjects[fileNum]
     404           2 :         if !ok {
     405           2 :                 return objstorage.ObjectMetadata{}, errors.Wrapf(
     406           2 :                         os.ErrNotExist,
     407           2 :                         "file %s (type %d) unknown to the objstorage provider",
     408           2 :                         fileNum, errors.Safe(fileType),
     409           2 :                 )
     410           2 :         }
     411           2 :         if meta.FileType != fileType {
     412           0 :                 return objstorage.ObjectMetadata{}, base.AssertionFailedf(
     413           0 :                         "file %s type mismatch (known type %d, expected type %d)",
     414           0 :                         fileNum, errors.Safe(meta.FileType), errors.Safe(fileType),
     415           0 :                 )
     416           0 :         }
     417           2 :         return meta, nil
     418             : }
     419             : 
     420             : // Path is part of the objstorage.Provider interface.
     421           2 : func (p *provider) Path(meta objstorage.ObjectMetadata) string {
     422           2 :         if !meta.IsRemote() {
     423           2 :                 return p.vfsPath(meta.FileType, meta.DiskFileNum)
     424           2 :         }
     425           2 :         return p.remotePath(meta)
     426             : }
     427             : 
     428             : // Size returns the size of the object.
     429           2 : func (p *provider) Size(meta objstorage.ObjectMetadata) (int64, error) {
     430           2 :         if !meta.IsRemote() {
     431           2 :                 return p.vfsSize(meta.FileType, meta.DiskFileNum)
     432           2 :         }
     433           2 :         return p.remoteSize(meta)
     434             : }
     435             : 
     436             : // List is part of the objstorage.Provider interface.
     437           2 : func (p *provider) List() []objstorage.ObjectMetadata {
     438           2 :         p.mu.RLock()
     439           2 :         defer p.mu.RUnlock()
     440           2 :         res := make([]objstorage.ObjectMetadata, 0, len(p.mu.knownObjects))
     441           2 :         for _, meta := range p.mu.knownObjects {
     442           2 :                 res = append(res, meta)
     443           2 :         }
     444           2 :         slices.SortFunc(res, func(a, b objstorage.ObjectMetadata) int {
     445           2 :                 return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
     446           2 :         })
     447           2 :         return res
     448             : }
     449             : 
     450             : // Metrics is part of the objstorage.Provider interface.
     451           2 : func (p *provider) Metrics() sharedcache.Metrics {
     452           2 :         if p.remote.cache != nil {
     453           1 :                 return p.remote.cache.Metrics()
     454           1 :         }
     455           2 :         return sharedcache.Metrics{}
     456             : }
     457             : 
     458             : // CheckpointState is part of the objstorage.Provider interface.
     459             : func (p *provider) CheckpointState(
     460             :         fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum,
     461           1 : ) error {
     462           1 :         p.mu.Lock()
     463           1 :         defer p.mu.Unlock()
     464           1 :         for i := range fileNums {
     465           1 :                 if _, ok := p.mu.knownObjects[fileNums[i]]; !ok {
     466           0 :                         return errors.Wrapf(
     467           0 :                                 os.ErrNotExist,
     468           0 :                                 "file %s (type %d) unknown to the objstorage provider",
     469           0 :                                 fileNums[i], errors.Safe(fileType),
     470           0 :                         )
     471           0 :                 }
     472             :                 // Prevent this object from deletion, at least for the life of this instance.
     473           1 :                 p.mu.protectedObjects[fileNums[i]] = p.mu.protectedObjects[fileNums[i]] + 1
     474             :         }
     475             : 
     476           1 :         if p.remote.catalog != nil {
     477           1 :                 return p.remote.catalog.Checkpoint(fs, dir)
     478           1 :         }
     479           0 :         return nil
     480             : }
     481             : 
     482           2 : func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
     483           2 :         p.mu.Lock()
     484           2 :         defer p.mu.Unlock()
     485           2 :         p.addMetadataLocked(meta)
     486           2 : }
     487             : 
     488           2 : func (p *provider) addMetadataLocked(meta objstorage.ObjectMetadata) {
     489           2 :         if invariants.Enabled {
     490           2 :                 meta.AssertValid()
     491           2 :         }
     492           2 :         p.mu.knownObjects[meta.DiskFileNum] = meta
     493           2 :         if meta.IsRemote() {
     494           2 :                 p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
     495           2 :                         FileNum:          meta.DiskFileNum,
     496           2 :                         FileType:         meta.FileType,
     497           2 :                         CreatorID:        meta.Remote.CreatorID,
     498           2 :                         CreatorFileNum:   meta.Remote.CreatorFileNum,
     499           2 :                         Locator:          meta.Remote.Locator,
     500           2 :                         CleanupMethod:    meta.Remote.CleanupMethod,
     501           2 :                         CustomObjectName: meta.Remote.CustomObjectName,
     502           2 :                 })
     503           2 :                 if meta.IsExternal() {
     504           2 :                         p.mu.remote.addExternalObject(meta)
     505           2 :                 }
     506           2 :         } else {
     507           2 :                 p.mu.localObjectsChanged = true
     508           2 :         }
     509             : }
     510             : 
     511           2 : func (p *provider) removeMetadata(fileNum base.DiskFileNum) {
     512           2 :         p.mu.Lock()
     513           2 :         defer p.mu.Unlock()
     514           2 : 
     515           2 :         meta, ok := p.mu.knownObjects[fileNum]
     516           2 :         if !ok {
     517           0 :                 return
     518           0 :         }
     519           2 :         delete(p.mu.knownObjects, fileNum)
     520           2 :         if meta.IsExternal() {
     521           2 :                 p.mu.remote.removeExternalObject(meta)
     522           2 :         }
     523           2 :         if meta.IsRemote() {
     524           2 :                 p.mu.remote.catalogBatch.DeleteObject(fileNum)
     525           2 :         } else {
     526           2 :                 p.mu.localObjectsChanged = true
     527           2 :         }
     528             : }
     529             : 
     530             : // protectObject prevents the unreferencing of a remote object until
     531             : // unprotectObject is called.
     532           2 : func (p *provider) protectObject(fileNum base.DiskFileNum) {
     533           2 :         p.mu.Lock()
     534           2 :         defer p.mu.Unlock()
     535           2 :         p.mu.protectedObjects[fileNum] = p.mu.protectedObjects[fileNum] + 1
     536           2 : }
     537             : 
     538           1 : func (p *provider) unprotectObject(fileNum base.DiskFileNum) {
     539           1 :         p.mu.Lock()
     540           1 :         defer p.mu.Unlock()
     541           1 :         v := p.mu.protectedObjects[fileNum]
     542           1 :         if invariants.Enabled && v == 0 {
     543           0 :                 panic("invalid protection count")
     544             :         }
     545           1 :         if v > 1 {
     546           0 :                 p.mu.protectedObjects[fileNum] = v - 1
     547           1 :         } else {
     548           1 :                 delete(p.mu.protectedObjects, fileNum)
     549           1 :                 // TODO(radu): check if the object is still in knownObject; if not, unref it
     550           1 :                 // now.
     551           1 :         }
     552             : }
     553             : 
     554           2 : func (p *provider) isProtected(fileNum base.DiskFileNum) bool {
     555           2 :         p.mu.Lock()
     556           2 :         defer p.mu.Unlock()
     557           2 :         return p.mu.protectedObjects[fileNum] > 0
     558           2 : }

Generated by: LCOV version 1.14