LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - remote.go (source / functions) Hit Total Coverage
Test: 2023-11-29 08:16Z ce7560a8 - meta test only.lcov Lines: 187 258 72.5 %
Date: 2023-11-29 08:16:51 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "runtime"
      11             :         "sync"
      12             :         "sync/atomic"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/internal/invariants"
      17             :         "github.com/cockroachdb/pebble/objstorage"
      18             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
      19             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      20             :         "github.com/cockroachdb/pebble/objstorage/remote"
      21             :         "github.com/cockroachdb/redact"
      22             : )
      23             : 
      24             : // remoteSubsystem contains the provider fields related to remote storage.
      25             : // All fields remain unset if remote storage is not configured.
      26             : type remoteSubsystem struct {
      27             :         catalog *remoteobjcat.Catalog
      28             :         // catalogSyncMutex is used to correctly serialize two sharedSync operations.
      29             :         // It must be acquired before the provider mutex.
      30             :         catalogSyncMutex sync.Mutex
      31             : 
      32             :         cache *sharedcache.Cache
      33             : 
      34             :         // shared contains the fields relevant to shared objects, i.e. objects that
      35             :         // are created by Pebble and potentially shared between Pebble instances.
      36             :         shared struct {
      37             :                 // initialized guards access to the creatorID field.
      38             :                 initialized atomic.Bool
      39             :                 creatorID   objstorage.CreatorID
      40             :                 initOnce    sync.Once
      41             : 
      42             :                 // checkRefsOnOpen controls whether we check the ref marker file when opening
      43             :                 // an object. Normally this is true when invariants are enabled (but the provider
      44             :                 // test tweaks this field).
      45             :                 checkRefsOnOpen bool
      46             :         }
      47             : }
      48             : 
      49             : // remoteInit initializes the remote object subsystem (if configured) and finds
      50             : // any remote objects.
      51           1 : func (p *provider) remoteInit() error {
      52           1 :         if p.st.Remote.StorageFactory == nil {
      53           1 :                 return nil
      54           1 :         }
      55           1 :         catalog, contents, err := remoteobjcat.Open(p.st.FS, p.st.FSDirName)
      56           1 :         if err != nil {
      57           0 :                 return errors.Wrapf(err, "pebble: could not open remote object catalog")
      58           0 :         }
      59           1 :         p.remote.catalog = catalog
      60           1 :         p.remote.shared.checkRefsOnOpen = invariants.Enabled
      61           1 : 
      62           1 :         // The creator ID may or may not be initialized yet.
      63           1 :         if contents.CreatorID.IsSet() {
      64           1 :                 p.remote.initShared(contents.CreatorID)
      65           1 :                 p.st.Logger.Infof("remote storage configured; creatorID = %s", contents.CreatorID)
      66           1 :         } else {
      67           1 :                 p.st.Logger.Infof("remote storage configured; no creatorID yet")
      68           1 :         }
      69             : 
      70           1 :         if p.st.Remote.CacheSizeBytes > 0 {
      71           1 :                 const defaultBlockSize = 32 * 1024
      72           1 :                 blockSize := p.st.Remote.CacheBlockSize
      73           1 :                 if blockSize == 0 {
      74           1 :                         blockSize = defaultBlockSize
      75           1 :                 }
      76             : 
      77           1 :                 const defaultShardingBlockSize = 1024 * 1024
      78           1 :                 shardingBlockSize := p.st.Remote.ShardingBlockSize
      79           1 :                 if shardingBlockSize == 0 {
      80           1 :                         shardingBlockSize = defaultShardingBlockSize
      81           1 :                 }
      82             : 
      83           1 :                 numShards := p.st.Remote.CacheShardCount
      84           1 :                 if numShards == 0 {
      85           1 :                         numShards = 2 * runtime.GOMAXPROCS(0)
      86           1 :                 }
      87             : 
      88           1 :                 p.remote.cache, err = sharedcache.Open(
      89           1 :                         p.st.FS, p.st.Logger, p.st.FSDirName, blockSize, shardingBlockSize, p.st.Remote.CacheSizeBytes, numShards)
      90           1 :                 if err != nil {
      91           0 :                         return errors.Wrapf(err, "pebble: could not open remote object cache")
      92           0 :                 }
      93             :         }
      94             : 
      95           1 :         for _, meta := range contents.Objects {
      96           1 :                 o := objstorage.ObjectMetadata{
      97           1 :                         DiskFileNum: meta.FileNum,
      98           1 :                         FileType:    meta.FileType,
      99           1 :                 }
     100           1 :                 o.Remote.CreatorID = meta.CreatorID
     101           1 :                 o.Remote.CreatorFileNum = meta.CreatorFileNum
     102           1 :                 o.Remote.CleanupMethod = meta.CleanupMethod
     103           1 :                 o.Remote.Locator = meta.Locator
     104           1 :                 o.Remote.CustomObjectName = meta.CustomObjectName
     105           1 :                 o.Remote.Storage, err = p.ensureStorageLocked(o.Remote.Locator)
     106           1 :                 if err != nil {
     107           0 :                         return errors.Wrapf(err, "creating remote.Storage object for locator '%s'", o.Remote.Locator)
     108           0 :                 }
     109           1 :                 if invariants.Enabled {
     110           1 :                         o.AssertValid()
     111           1 :                 }
     112           1 :                 p.mu.knownObjects[o.DiskFileNum] = o
     113             :         }
     114           1 :         return nil
     115             : }
     116             : 
     117             : // initShared initializes the creator ID, allowing use of shared objects.
     118           1 : func (ss *remoteSubsystem) initShared(creatorID objstorage.CreatorID) {
     119           1 :         ss.shared.initOnce.Do(func() {
     120           1 :                 ss.shared.creatorID = creatorID
     121           1 :                 ss.shared.initialized.Store(true)
     122           1 :         })
     123             : }
     124             : 
     125           1 : func (p *provider) sharedClose() error {
     126           1 :         if p.st.Remote.StorageFactory == nil {
     127           1 :                 return nil
     128           1 :         }
     129           1 :         var err error
     130           1 :         if p.remote.cache != nil {
     131           1 :                 err = p.remote.cache.Close()
     132           1 :                 p.remote.cache = nil
     133           1 :         }
     134           1 :         if p.remote.catalog != nil {
     135           1 :                 err = firstError(err, p.remote.catalog.Close())
     136           1 :                 p.remote.catalog = nil
     137           1 :         }
     138           1 :         return err
     139             : }
     140             : 
     141             : // SetCreatorID is part of the objstorage.Provider interface.
     142           1 : func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error {
     143           1 :         if p.st.Remote.StorageFactory == nil {
     144           0 :                 return errors.AssertionFailedf("attempt to set CreatorID but remote storage not enabled")
     145           0 :         }
     146             :         // Note: this call is a cheap no-op if the creator ID was already set. This
     147             :         // call also checks if we are trying to change the ID.
     148           1 :         if err := p.remote.catalog.SetCreatorID(creatorID); err != nil {
     149           0 :                 return err
     150           0 :         }
     151           1 :         if !p.remote.shared.initialized.Load() {
     152           1 :                 p.st.Logger.Infof("remote storage creatorID set to %s", creatorID)
     153           1 :                 p.remote.initShared(creatorID)
     154           1 :         }
     155           1 :         return nil
     156             : }
     157             : 
     158             : // IsSharedForeign is part of the objstorage.Provider interface.
     159           1 : func (p *provider) IsSharedForeign(meta objstorage.ObjectMetadata) bool {
     160           1 :         if !p.remote.shared.initialized.Load() {
     161           1 :                 return false
     162           1 :         }
     163           1 :         return meta.IsShared() && (meta.Remote.CreatorID != p.remote.shared.creatorID)
     164             : }
     165             : 
     166           1 : func (p *provider) remoteCheckInitialized() error {
     167           1 :         if p.st.Remote.StorageFactory == nil {
     168           0 :                 return errors.Errorf("remote object support not configured")
     169           0 :         }
     170           1 :         return nil
     171             : }
     172             : 
     173           1 : func (p *provider) sharedCheckInitialized() error {
     174           1 :         if err := p.remoteCheckInitialized(); err != nil {
     175           0 :                 return err
     176           0 :         }
     177           1 :         if !p.remote.shared.initialized.Load() {
     178           0 :                 return errors.Errorf("remote object support not available: remote creator ID not yet set")
     179           0 :         }
     180           1 :         return nil
     181             : }
     182             : 
     183           1 : func (p *provider) sharedSync() error {
     184           1 :         // Serialize parallel sync operations. Note that ApplyBatch is already
     185           1 :         // serialized internally, but we want to make sure they get called with
     186           1 :         // batches in the right order.
     187           1 :         p.remote.catalogSyncMutex.Lock()
     188           1 :         defer p.remote.catalogSyncMutex.Unlock()
     189           1 : 
     190           1 :         batch := func() remoteobjcat.Batch {
     191           1 :                 p.mu.Lock()
     192           1 :                 defer p.mu.Unlock()
     193           1 :                 res := p.mu.remote.catalogBatch.Copy()
     194           1 :                 p.mu.remote.catalogBatch.Reset()
     195           1 :                 return res
     196           1 :         }()
     197             : 
     198           1 :         if batch.IsEmpty() {
     199           1 :                 return nil
     200           1 :         }
     201             : 
     202           1 :         if err := p.remote.catalog.ApplyBatch(batch); err != nil {
     203           0 :                 // Put back the batch (for the next Sync), appending any operations that
     204           0 :                 // happened in the meantime.
     205           0 :                 p.mu.Lock()
     206           0 :                 defer p.mu.Unlock()
     207           0 :                 batch.Append(p.mu.remote.catalogBatch)
     208           0 :                 p.mu.remote.catalogBatch = batch
     209           0 :                 return err
     210           0 :         }
     211             : 
     212           1 :         return nil
     213             : }
     214             : 
     215           1 : func (p *provider) remotePath(meta objstorage.ObjectMetadata) string {
     216           1 :         if meta.Remote.Locator != "" {
     217           0 :                 return fmt.Sprintf("remote-%s://%s", meta.Remote.Locator, remoteObjectName(meta))
     218           0 :         }
     219           1 :         return "remote://" + remoteObjectName(meta)
     220             : }
     221             : 
     222             : // sharedCreateRef creates a reference marker object.
     223           1 : func (p *provider) sharedCreateRef(meta objstorage.ObjectMetadata) error {
     224           1 :         if err := p.sharedCheckInitialized(); err != nil {
     225           0 :                 return err
     226           0 :         }
     227           1 :         if meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
     228           0 :                 return nil
     229           0 :         }
     230           1 :         refName := p.sharedObjectRefName(meta)
     231           1 :         writer, err := meta.Remote.Storage.CreateObject(refName)
     232           1 :         if err == nil {
     233           1 :                 // The object is empty, just close the writer.
     234           1 :                 err = writer.Close()
     235           1 :         }
     236           1 :         if err != nil {
     237           0 :                 return errors.Wrapf(err, "creating marker object %q", errors.Safe(refName))
     238           0 :         }
     239           1 :         return nil
     240             : }
     241             : 
     242             : func (p *provider) sharedCreate(
     243             :         _ context.Context,
     244             :         fileType base.FileType,
     245             :         fileNum base.DiskFileNum,
     246             :         locator remote.Locator,
     247             :         opts objstorage.CreateOptions,
     248           1 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
     249           1 :         if err := p.sharedCheckInitialized(); err != nil {
     250           0 :                 return nil, objstorage.ObjectMetadata{}, err
     251           0 :         }
     252           1 :         storage, err := p.ensureStorage(locator)
     253           1 :         if err != nil {
     254           0 :                 return nil, objstorage.ObjectMetadata{}, err
     255           0 :         }
     256           1 :         meta := objstorage.ObjectMetadata{
     257           1 :                 DiskFileNum: fileNum,
     258           1 :                 FileType:    fileType,
     259           1 :         }
     260           1 :         meta.Remote.CreatorID = p.remote.shared.creatorID
     261           1 :         meta.Remote.CreatorFileNum = fileNum
     262           1 :         meta.Remote.CleanupMethod = opts.SharedCleanupMethod
     263           1 :         meta.Remote.Locator = locator
     264           1 :         meta.Remote.Storage = storage
     265           1 : 
     266           1 :         objName := remoteObjectName(meta)
     267           1 :         writer, err := storage.CreateObject(objName)
     268           1 :         if err != nil {
     269           0 :                 return nil, objstorage.ObjectMetadata{}, errors.Wrapf(err, "creating object %q", errors.Safe(objName))
     270           0 :         }
     271           1 :         return &sharedWritable{
     272           1 :                 p:             p,
     273           1 :                 meta:          meta,
     274           1 :                 storageWriter: writer,
     275           1 :         }, meta, nil
     276             : }
     277             : 
     278             : func (p *provider) remoteOpenForReading(
     279             :         ctx context.Context, meta objstorage.ObjectMetadata, opts objstorage.OpenOptions,
     280           1 : ) (objstorage.Readable, error) {
     281           1 :         if err := p.remoteCheckInitialized(); err != nil {
     282           0 :                 return nil, err
     283           0 :         }
     284             :         // Verify we have a reference on this object; for performance reasons, we only
     285             :         // do this in testing scenarios.
     286           1 :         if p.remote.shared.checkRefsOnOpen && meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
     287           1 :                 if err := p.sharedCheckInitialized(); err != nil {
     288           0 :                         return nil, err
     289           0 :                 }
     290           1 :                 refName := p.sharedObjectRefName(meta)
     291           1 :                 if _, err := meta.Remote.Storage.Size(refName); err != nil {
     292           0 :                         if meta.Remote.Storage.IsNotExistError(err) {
     293           0 :                                 if opts.MustExist {
     294           0 :                                         p.st.Logger.Fatalf("marker object %q does not exist", errors.Safe(refName))
     295           0 :                                         // TODO(radu): maybe list references for the object.
     296           0 :                                 }
     297           0 :                                 return nil, errors.Errorf("marker object %q does not exist", errors.Safe(refName))
     298             :                         }
     299           0 :                         return nil, errors.Wrapf(err, "checking marker object %q", errors.Safe(refName))
     300             :                 }
     301             :         }
     302           1 :         objName := remoteObjectName(meta)
     303           1 :         reader, size, err := meta.Remote.Storage.ReadObject(ctx, objName)
     304           1 :         if err != nil {
     305           0 :                 if opts.MustExist && meta.Remote.Storage.IsNotExistError(err) {
     306           0 :                         p.st.Logger.Fatalf("object %q does not exist", redact.SafeString(objName))
     307           0 :                         // TODO(radu): maybe list references for the object.
     308           0 :                 }
     309           0 :                 return nil, err
     310             :         }
     311           1 :         return p.newRemoteReadable(reader, size, meta.DiskFileNum), nil
     312             : }
     313             : 
     314           1 : func (p *provider) remoteSize(meta objstorage.ObjectMetadata) (int64, error) {
     315           1 :         if err := p.remoteCheckInitialized(); err != nil {
     316           0 :                 return 0, err
     317           0 :         }
     318           1 :         objName := remoteObjectName(meta)
     319           1 :         return meta.Remote.Storage.Size(objName)
     320             : }
     321             : 
     322             : // sharedUnref implements object "removal" with the remote backend. The ref
     323             : // marker object is removed and the backing object is removed only if there are
     324             : // no other ref markers.
     325           1 : func (p *provider) sharedUnref(meta objstorage.ObjectMetadata) error {
     326           1 :         if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
     327           0 :                 // Never delete objects in this mode.
     328           0 :                 return nil
     329           0 :         }
     330           1 :         if p.isProtected(meta.DiskFileNum) {
     331           0 :                 // TODO(radu): we need a mechanism to unref the object when it becomes
     332           0 :                 // unprotected.
     333           0 :                 return nil
     334           0 :         }
     335             : 
     336           1 :         refName := p.sharedObjectRefName(meta)
     337           1 :         // Tolerate a not-exists error.
     338           1 :         if err := meta.Remote.Storage.Delete(refName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
     339           0 :                 return err
     340           0 :         }
     341           1 :         otherRefs, err := meta.Remote.Storage.List(sharedObjectRefPrefix(meta), "" /* delimiter */)
     342           1 :         if err != nil {
     343           0 :                 return err
     344           0 :         }
     345           1 :         if len(otherRefs) == 0 {
     346           1 :                 objName := remoteObjectName(meta)
     347           1 :                 if err := meta.Remote.Storage.Delete(objName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
     348           0 :                         return err
     349           0 :                 }
     350             :         }
     351           1 :         return nil
     352             : }
     353             : 
     354             : // ensureStorageLocked populates the remote.Storage object for the given
     355             : // locator, if necessary. p.mu must be held.
     356           1 : func (p *provider) ensureStorageLocked(locator remote.Locator) (remote.Storage, error) {
     357           1 :         if p.mu.remote.storageObjects == nil {
     358           1 :                 p.mu.remote.storageObjects = make(map[remote.Locator]remote.Storage)
     359           1 :         }
     360           1 :         if res, ok := p.mu.remote.storageObjects[locator]; ok {
     361           1 :                 return res, nil
     362           1 :         }
     363           1 :         res, err := p.st.Remote.StorageFactory.CreateStorage(locator)
     364           1 :         if err != nil {
     365           0 :                 return nil, err
     366           0 :         }
     367             : 
     368           1 :         p.mu.remote.storageObjects[locator] = res
     369           1 :         return res, nil
     370             : }
     371             : 
     372             : // ensureStorage populates the remote.Storage object for the given locator, if necessary.
     373           1 : func (p *provider) ensureStorage(locator remote.Locator) (remote.Storage, error) {
     374           1 :         p.mu.Lock()
     375           1 :         defer p.mu.Unlock()
     376           1 :         return p.ensureStorageLocked(locator)
     377           1 : }

Generated by: LCOV version 1.14