LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - remote.go (source / functions) Hit Total Coverage
Test: 2023-09-12 08:17Z 1efa535d - tests + meta.lcov Lines: 200 258 77.5 %
Date: 2023-09-12 08:18:51 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "runtime"
      11             :         "sync"
      12             :         "sync/atomic"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/internal/invariants"
      17             :         "github.com/cockroachdb/pebble/objstorage"
      18             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
      19             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      20             :         "github.com/cockroachdb/pebble/objstorage/remote"
      21             : )
      22             : 
      23             : // remoteSubsystem contains the provider fields related to remote storage.
      24             : // All fields remain unset if remote storage is not configured.
      25             : type remoteSubsystem struct {
      26             :         catalog *remoteobjcat.Catalog
      27             :         // catalogSyncMutex is used to correctly serialize two sharedSync operations.
      28             :         // It must be acquired before the provider mutex.
      29             :         catalogSyncMutex sync.Mutex
      30             : 
      31             :         cache *sharedcache.Cache
      32             : 
      33             :         // shared contains the fields relevant to shared objects, i.e. objects that
      34             :         // are created by Pebble and potentially shared between Pebble instances.
      35             :         shared struct {
      36             :                 // initialized guards access to the creatorID field.
      37             :                 initialized atomic.Bool
      38             :                 creatorID   objstorage.CreatorID
      39             :                 initOnce    sync.Once
      40             : 
      41             :                 // checkRefsOnOpen controls whether we check the ref marker file when opening
      42             :                 // an object. Normally this is true when invariants are enabled (but the provider
      43             :                 // test tweaks this field).
      44             :                 checkRefsOnOpen bool
      45             :         }
      46             : }
      47             : 
      48             : // remoteInit initializes the remote object subsystem (if configured) and finds
      49             : // any remote objects.
      50           2 : func (p *provider) remoteInit() error {
      51           2 :         if p.st.Remote.StorageFactory == nil {
      52           2 :                 return nil
      53           2 :         }
      54           2 :         catalog, contents, err := remoteobjcat.Open(p.st.FS, p.st.FSDirName)
      55           2 :         if err != nil {
      56           0 :                 return errors.Wrapf(err, "pebble: could not open remote object catalog")
      57           0 :         }
      58           2 :         p.remote.catalog = catalog
      59           2 :         p.remote.shared.checkRefsOnOpen = invariants.Enabled
      60           2 : 
      61           2 :         // The creator ID may or may not be initialized yet.
      62           2 :         if contents.CreatorID.IsSet() {
      63           2 :                 p.remote.initShared(contents.CreatorID)
      64           2 :                 p.st.Logger.Infof("remote storage configured; creatorID = %s", contents.CreatorID)
      65           2 :         } else {
      66           2 :                 p.st.Logger.Infof("remote storage configured; no creatorID yet")
      67           2 :         }
      68             : 
      69           2 :         if p.st.Remote.CacheSizeBytes > 0 {
      70           1 :                 const defaultBlockSize = 32 * 1024
      71           1 :                 blockSize := p.st.Remote.CacheBlockSize
      72           1 :                 if blockSize == 0 {
      73           1 :                         blockSize = defaultBlockSize
      74           1 :                 }
      75             : 
      76           1 :                 const defaultShardingBlockSize = 1024 * 1024
      77           1 :                 shardingBlockSize := p.st.Remote.ShardingBlockSize
      78           1 :                 if shardingBlockSize == 0 {
      79           1 :                         shardingBlockSize = defaultShardingBlockSize
      80           1 :                 }
      81             : 
      82           1 :                 numShards := p.st.Remote.CacheShardCount
      83           1 :                 if numShards == 0 {
      84           1 :                         numShards = 2 * runtime.GOMAXPROCS(0)
      85           1 :                 }
      86             : 
      87           1 :                 p.remote.cache, err = sharedcache.Open(
      88           1 :                         p.st.FS, p.st.Logger, p.st.FSDirName, blockSize, shardingBlockSize, p.st.Remote.CacheSizeBytes, numShards)
      89           1 :                 if err != nil {
      90           0 :                         return errors.Wrapf(err, "pebble: could not open remote object cache")
      91           0 :                 }
      92             :         }
      93             : 
      94           2 :         for _, meta := range contents.Objects {
      95           2 :                 o := objstorage.ObjectMetadata{
      96           2 :                         DiskFileNum: meta.FileNum,
      97           2 :                         FileType:    meta.FileType,
      98           2 :                 }
      99           2 :                 o.Remote.CreatorID = meta.CreatorID
     100           2 :                 o.Remote.CreatorFileNum = meta.CreatorFileNum
     101           2 :                 o.Remote.CleanupMethod = meta.CleanupMethod
     102           2 :                 o.Remote.Locator = meta.Locator
     103           2 :                 o.Remote.CustomObjectName = meta.CustomObjectName
     104           2 :                 o.Remote.Storage, err = p.ensureStorageLocked(o.Remote.Locator)
     105           2 :                 if err != nil {
     106           0 :                         return errors.Wrapf(err, "creating remote.Storage object for locator '%s'", o.Remote.Locator)
     107           0 :                 }
     108           2 :                 if invariants.Enabled {
     109           0 :                         o.AssertValid()
     110           0 :                 }
     111           2 :                 p.mu.knownObjects[o.DiskFileNum] = o
     112             :         }
     113           2 :         return nil
     114             : }
     115             : 
     116             : // initShared initializes the creator ID, allowing use of shared objects.
     117           2 : func (ss *remoteSubsystem) initShared(creatorID objstorage.CreatorID) {
     118           2 :         ss.shared.initOnce.Do(func() {
     119           2 :                 ss.shared.creatorID = creatorID
     120           2 :                 ss.shared.initialized.Store(true)
     121           2 :         })
     122             : }
     123             : 
     124           2 : func (p *provider) sharedClose() error {
     125           2 :         if p.st.Remote.StorageFactory == nil {
     126           2 :                 return nil
     127           2 :         }
     128           2 :         var err error
     129           2 :         if p.remote.cache != nil {
     130           1 :                 err = p.remote.cache.Close()
     131           1 :                 p.remote.cache = nil
     132           1 :         }
     133           2 :         if p.remote.catalog != nil {
     134           2 :                 err = firstError(err, p.remote.catalog.Close())
     135           2 :                 p.remote.catalog = nil
     136           2 :         }
     137           2 :         return err
     138             : }
     139             : 
     140             : // SetCreatorID is part of the objstorage.Provider interface.
     141           2 : func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error {
     142           2 :         if p.st.Remote.StorageFactory == nil {
     143           0 :                 return errors.AssertionFailedf("attempt to set CreatorID but remote storage not enabled")
     144           0 :         }
     145             :         // Note: this call is a cheap no-op if the creator ID was already set. This
     146             :         // call also checks if we are trying to change the ID.
     147           2 :         if err := p.remote.catalog.SetCreatorID(creatorID); err != nil {
     148           0 :                 return err
     149           0 :         }
     150           2 :         if !p.remote.shared.initialized.Load() {
     151           2 :                 p.st.Logger.Infof("remote storage creatorID set to %s", creatorID)
     152           2 :                 p.remote.initShared(creatorID)
     153           2 :         }
     154           2 :         return nil
     155             : }
     156             : 
     157             : // IsSharedForeign is part of the objstorage.Provider interface.
     158           2 : func (p *provider) IsSharedForeign(meta objstorage.ObjectMetadata) bool {
     159           2 :         if !p.remote.shared.initialized.Load() {
     160           2 :                 return false
     161           2 :         }
     162           2 :         return meta.IsShared() && (meta.Remote.CreatorID != p.remote.shared.creatorID)
     163             : }
     164             : 
     165           2 : func (p *provider) remoteCheckInitialized() error {
     166           2 :         if p.st.Remote.StorageFactory == nil {
     167           0 :                 return errors.Errorf("remote object support not configured")
     168           0 :         }
     169           2 :         return nil
     170             : }
     171             : 
     172           2 : func (p *provider) sharedCheckInitialized() error {
     173           2 :         if err := p.remoteCheckInitialized(); err != nil {
     174           0 :                 return err
     175           0 :         }
     176           2 :         if !p.remote.shared.initialized.Load() {
     177           0 :                 return errors.Errorf("remote object support not available: remote creator ID not yet set")
     178           0 :         }
     179           2 :         return nil
     180             : }
     181             : 
     182           2 : func (p *provider) sharedSync() error {
     183           2 :         // Serialize parallel sync operations. Note that ApplyBatch is already
     184           2 :         // serialized internally, but we want to make sure they get called with
     185           2 :         // batches in the right order.
     186           2 :         p.remote.catalogSyncMutex.Lock()
     187           2 :         defer p.remote.catalogSyncMutex.Unlock()
     188           2 : 
     189           2 :         batch := func() remoteobjcat.Batch {
     190           2 :                 p.mu.Lock()
     191           2 :                 defer p.mu.Unlock()
     192           2 :                 res := p.mu.remote.catalogBatch.Copy()
     193           2 :                 p.mu.remote.catalogBatch.Reset()
     194           2 :                 return res
     195           2 :         }()
     196             : 
     197           2 :         if batch.IsEmpty() {
     198           2 :                 return nil
     199           2 :         }
     200             : 
     201           2 :         if err := p.remote.catalog.ApplyBatch(batch); err != nil {
     202           0 :                 // Put back the batch (for the next Sync), appending any operations that
     203           0 :                 // happened in the meantime.
     204           0 :                 p.mu.Lock()
     205           0 :                 defer p.mu.Unlock()
     206           0 :                 batch.Append(p.mu.remote.catalogBatch)
     207           0 :                 p.mu.remote.catalogBatch = batch
     208           0 :                 return err
     209           0 :         }
     210             : 
     211           2 :         return nil
     212             : }
     213             : 
     214           2 : func (p *provider) remotePath(meta objstorage.ObjectMetadata) string {
     215           2 :         if meta.Remote.Locator != "" {
     216           1 :                 return fmt.Sprintf("remote-%s://%s", meta.Remote.Locator, remoteObjectName(meta))
     217           1 :         }
     218           2 :         return "remote://" + remoteObjectName(meta)
     219             : }
     220             : 
     221             : // sharedCreateRef creates a reference marker object.
     222           2 : func (p *provider) sharedCreateRef(meta objstorage.ObjectMetadata) error {
     223           2 :         if err := p.sharedCheckInitialized(); err != nil {
     224           0 :                 return err
     225           0 :         }
     226           2 :         if meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
     227           1 :                 return nil
     228           1 :         }
     229           2 :         refName := p.sharedObjectRefName(meta)
     230           2 :         writer, err := meta.Remote.Storage.CreateObject(refName)
     231           2 :         if err == nil {
     232           2 :                 // The object is empty, just close the writer.
     233           2 :                 err = writer.Close()
     234           2 :         }
     235           2 :         if err != nil {
     236           0 :                 return errors.Wrapf(err, "creating marker object %q", refName)
     237           0 :         }
     238           2 :         return nil
     239             : }
     240             : 
     241             : func (p *provider) sharedCreate(
     242             :         _ context.Context,
     243             :         fileType base.FileType,
     244             :         fileNum base.DiskFileNum,
     245             :         locator remote.Locator,
     246             :         opts objstorage.CreateOptions,
     247           2 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
     248           2 :         if err := p.sharedCheckInitialized(); err != nil {
     249           0 :                 return nil, objstorage.ObjectMetadata{}, err
     250           0 :         }
     251           2 :         storage, err := p.ensureStorage(locator)
     252           2 :         if err != nil {
     253           0 :                 return nil, objstorage.ObjectMetadata{}, err
     254           0 :         }
     255           2 :         meta := objstorage.ObjectMetadata{
     256           2 :                 DiskFileNum: fileNum,
     257           2 :                 FileType:    fileType,
     258           2 :         }
     259           2 :         meta.Remote.CreatorID = p.remote.shared.creatorID
     260           2 :         meta.Remote.CreatorFileNum = fileNum
     261           2 :         meta.Remote.CleanupMethod = opts.SharedCleanupMethod
     262           2 :         meta.Remote.Locator = locator
     263           2 :         meta.Remote.Storage = storage
     264           2 : 
     265           2 :         objName := remoteObjectName(meta)
     266           2 :         writer, err := storage.CreateObject(objName)
     267           2 :         if err != nil {
     268           0 :                 return nil, objstorage.ObjectMetadata{}, errors.Wrapf(err, "creating object %q", objName)
     269           0 :         }
     270           2 :         return &sharedWritable{
     271           2 :                 p:             p,
     272           2 :                 meta:          meta,
     273           2 :                 storageWriter: writer,
     274           2 :         }, meta, nil
     275             : }
     276             : 
     277             : func (p *provider) remoteOpenForReading(
     278             :         ctx context.Context, meta objstorage.ObjectMetadata, opts objstorage.OpenOptions,
     279           2 : ) (objstorage.Readable, error) {
     280           2 :         if err := p.remoteCheckInitialized(); err != nil {
     281           0 :                 return nil, err
     282           0 :         }
     283             :         // Verify we have a reference on this object; for performance reasons, we only
     284             :         // do this in testing scenarios.
     285           2 :         if p.remote.shared.checkRefsOnOpen && meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
     286           1 :                 if err := p.sharedCheckInitialized(); err != nil {
     287           0 :                         return nil, err
     288           0 :                 }
     289           1 :                 refName := p.sharedObjectRefName(meta)
     290           1 :                 if _, err := meta.Remote.Storage.Size(refName); err != nil {
     291           0 :                         if meta.Remote.Storage.IsNotExistError(err) {
     292           0 :                                 if opts.MustExist {
     293           0 :                                         p.st.Logger.Fatalf("marker object %q does not exist", refName)
     294           0 :                                         // TODO(radu): maybe list references for the object.
     295           0 :                                 }
     296           0 :                                 return nil, errors.Errorf("marker object %q does not exist", refName)
     297             :                         }
     298           0 :                         return nil, errors.Wrapf(err, "checking marker object %q", refName)
     299             :                 }
     300             :         }
     301           2 :         objName := remoteObjectName(meta)
     302           2 :         reader, size, err := meta.Remote.Storage.ReadObject(ctx, objName)
     303           2 :         if err != nil {
     304           1 :                 if opts.MustExist && meta.Remote.Storage.IsNotExistError(err) {
     305           0 :                         p.st.Logger.Fatalf("object %q does not exist", objName)
     306           0 :                         // TODO(radu): maybe list references for the object.
     307           0 :                 }
     308           1 :                 return nil, err
     309             :         }
     310           2 :         return p.newRemoteReadable(reader, size, meta.DiskFileNum), nil
     311             : }
     312             : 
     313           2 : func (p *provider) remoteSize(meta objstorage.ObjectMetadata) (int64, error) {
     314           2 :         if err := p.remoteCheckInitialized(); err != nil {
     315           0 :                 return 0, err
     316           0 :         }
     317           2 :         objName := remoteObjectName(meta)
     318           2 :         return meta.Remote.Storage.Size(objName)
     319             : }
     320             : 
     321             : // sharedUnref implements object "removal" with the remote backend. The ref
     322             : // marker object is removed and the backing object is removed only if there are
     323             : // no other ref markers.
     324           2 : func (p *provider) sharedUnref(meta objstorage.ObjectMetadata) error {
     325           2 :         if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
     326           1 :                 // Never delete objects in this mode.
     327           1 :                 return nil
     328           1 :         }
     329           2 :         if p.isProtected(meta.DiskFileNum) {
     330           1 :                 // TODO(radu): we need a mechanism to unref the object when it becomes
     331           1 :                 // unprotected.
     332           1 :                 return nil
     333           1 :         }
     334             : 
     335           2 :         refName := p.sharedObjectRefName(meta)
     336           2 :         // Tolerate a not-exists error.
     337           2 :         if err := meta.Remote.Storage.Delete(refName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
     338           0 :                 return err
     339           0 :         }
     340           2 :         otherRefs, err := meta.Remote.Storage.List(sharedObjectRefPrefix(meta), "" /* delimiter */)
     341           2 :         if err != nil {
     342           0 :                 return err
     343           0 :         }
     344           2 :         if len(otherRefs) == 0 {
     345           2 :                 objName := remoteObjectName(meta)
     346           2 :                 if err := meta.Remote.Storage.Delete(objName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
     347           0 :                         return err
     348           0 :                 }
     349             :         }
     350           2 :         return nil
     351             : }
     352             : 
     353             : // ensureStorageLocked populates the remote.Storage object for the given
     354             : // locator, if necessary. p.mu must be held.
     355           2 : func (p *provider) ensureStorageLocked(locator remote.Locator) (remote.Storage, error) {
     356           2 :         if p.mu.remote.storageObjects == nil {
     357           2 :                 p.mu.remote.storageObjects = make(map[remote.Locator]remote.Storage)
     358           2 :         }
     359           2 :         if res, ok := p.mu.remote.storageObjects[locator]; ok {
     360           2 :                 return res, nil
     361           2 :         }
     362           2 :         res, err := p.st.Remote.StorageFactory.CreateStorage(locator)
     363           2 :         if err != nil {
     364           1 :                 return nil, err
     365           1 :         }
     366             : 
     367           2 :         p.mu.remote.storageObjects[locator] = res
     368           2 :         return res, nil
     369             : }
     370             : 
     371             : // ensureStorage populates the remote.Storage object for the given locator, if necessary.
     372           2 : func (p *provider) ensureStorage(locator remote.Locator) (remote.Storage, error) {
     373           2 :         p.mu.Lock()
     374           2 :         defer p.mu.Unlock()
     375           2 :         return p.ensureStorageLocked(locator)
     376           2 : }

Generated by: LCOV version 1.14