LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - remote.go (source / functions) Hit Total Coverage
Test: 2024-10-30 08:17Z 405b4e12 - tests only.lcov Lines: 208 286 72.7 %
Date: 2024-10-30 08:18:59 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "runtime"
      11             :         "slices"
      12             :         "sync"
      13             :         "sync/atomic"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/internal/invariants"
      18             :         "github.com/cockroachdb/pebble/objstorage"
      19             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
      20             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      21             :         "github.com/cockroachdb/pebble/objstorage/remote"
      22             :         "github.com/cockroachdb/redact"
      23             : )
      24             : 
      25             : // remoteSubsystem contains the provider fields related to remote storage.
      26             : // All fields remain unset if remote storage is not configured.
      27             : type remoteSubsystem struct {
      28             :         catalog *remoteobjcat.Catalog
      29             :         // catalogSyncMutex is used to correctly serialize two sharedSync operations.
      30             :         // It must be acquired before the provider mutex.
      31             :         catalogSyncMutex sync.Mutex
      32             : 
      33             :         cache *sharedcache.Cache
      34             : 
      35             :         // shared contains the fields relevant to shared objects, i.e. objects that
      36             :         // are created by Pebble and potentially shared between Pebble instances.
      37             :         shared struct {
      38             :                 // initialized guards access to the creatorID field.
      39             :                 initialized atomic.Bool
      40             :                 creatorID   objstorage.CreatorID
      41             :                 initOnce    sync.Once
      42             : 
      43             :                 // checkRefsOnOpen controls whether we check the ref marker file when opening
      44             :                 // an object. Normally this is true when invariants are enabled (but the provider
      45             :                 // test tweaks this field).
      46             :                 checkRefsOnOpen bool
      47             :         }
      48             : }
      49             : 
      50             : type remoteLockedState struct {
      51             :         // catalogBatch accumulates remote object creations and deletions until
      52             :         // Sync is called.
      53             :         catalogBatch remoteobjcat.Batch
      54             : 
      55             :         storageObjects map[remote.Locator]remote.Storage
      56             : 
      57             :         externalObjects map[remote.ObjectKey][]base.DiskFileNum
      58             : }
      59             : 
      60           1 : func (rs *remoteLockedState) addExternalObject(meta objstorage.ObjectMetadata) {
      61           1 :         if rs.externalObjects == nil {
      62           1 :                 rs.externalObjects = make(map[remote.ObjectKey][]base.DiskFileNum)
      63           1 :         }
      64           1 :         key := remote.MakeObjectKey(meta.Remote.Locator, meta.Remote.CustomObjectName)
      65           1 :         rs.externalObjects[key] = append(rs.externalObjects[key], meta.DiskFileNum)
      66             : }
      67             : 
      68           1 : func (rs *remoteLockedState) removeExternalObject(meta objstorage.ObjectMetadata) {
      69           1 :         key := remote.MakeObjectKey(meta.Remote.Locator, meta.Remote.CustomObjectName)
      70           1 :         newSlice := slices.DeleteFunc(rs.externalObjects[key], func(n base.DiskFileNum) bool {
      71           1 :                 return n == meta.DiskFileNum
      72           1 :         })
      73           1 :         if len(newSlice) == 0 {
      74           1 :                 delete(rs.externalObjects, key)
      75           1 :         } else {
      76           1 :                 rs.externalObjects[key] = newSlice
      77           1 :         }
      78             : }
      79             : 
      80             : func (rs *remoteLockedState) getExternalObjects(
      81             :         locator remote.Locator, objName string,
      82           1 : ) []base.DiskFileNum {
      83           1 :         key := remote.MakeObjectKey(locator, objName)
      84           1 :         return rs.externalObjects[key]
      85           1 : }
      86             : 
      87             : // remoteInit initializes the remote object subsystem (if configured) and finds
      88             : // any remote objects.
      89           1 : func (p *provider) remoteInit() error {
      90           1 :         if p.st.Remote.StorageFactory == nil {
      91           1 :                 return nil
      92           1 :         }
      93           1 :         catalog, contents, err := remoteobjcat.Open(p.st.FS, p.st.FSDirName)
      94           1 :         if err != nil {
      95           0 :                 return errors.Wrapf(err, "pebble: could not open remote object catalog")
      96           0 :         }
      97           1 :         p.remote.catalog = catalog
      98           1 :         p.remote.shared.checkRefsOnOpen = invariants.Enabled
      99           1 : 
     100           1 :         // The creator ID may or may not be initialized yet.
     101           1 :         if contents.CreatorID.IsSet() {
     102           1 :                 p.remote.initShared(contents.CreatorID)
     103           1 :                 p.st.Logger.Infof("remote storage configured; creatorID = %s", contents.CreatorID)
     104           1 :         } else {
     105           1 :                 p.st.Logger.Infof("remote storage configured; no creatorID yet")
     106           1 :         }
     107             : 
     108           1 :         if p.st.Remote.CacheSizeBytes > 0 {
     109           0 :                 const defaultBlockSize = 32 * 1024
     110           0 :                 blockSize := p.st.Remote.CacheBlockSize
     111           0 :                 if blockSize == 0 {
     112           0 :                         blockSize = defaultBlockSize
     113           0 :                 }
     114             : 
     115           0 :                 const defaultShardingBlockSize = 1024 * 1024
     116           0 :                 shardingBlockSize := p.st.Remote.ShardingBlockSize
     117           0 :                 if shardingBlockSize == 0 {
     118           0 :                         shardingBlockSize = defaultShardingBlockSize
     119           0 :                 }
     120             : 
     121           0 :                 numShards := p.st.Remote.CacheShardCount
     122           0 :                 if numShards == 0 {
     123           0 :                         numShards = 2 * runtime.GOMAXPROCS(0)
     124           0 :                 }
     125             : 
     126           0 :                 p.remote.cache, err = sharedcache.Open(
     127           0 :                         p.st.FS, p.st.Logger, p.st.FSDirName, blockSize, shardingBlockSize, p.st.Remote.CacheSizeBytes, numShards)
     128           0 :                 if err != nil {
     129           0 :                         return errors.Wrapf(err, "pebble: could not open remote object cache")
     130           0 :                 }
     131             :         }
     132             : 
     133           1 :         for _, meta := range contents.Objects {
     134           1 :                 o := objstorage.ObjectMetadata{
     135           1 :                         DiskFileNum: meta.FileNum,
     136           1 :                         FileType:    meta.FileType,
     137           1 :                 }
     138           1 :                 o.Remote.CreatorID = meta.CreatorID
     139           1 :                 o.Remote.CreatorFileNum = meta.CreatorFileNum
     140           1 :                 o.Remote.CleanupMethod = meta.CleanupMethod
     141           1 :                 o.Remote.Locator = meta.Locator
     142           1 :                 o.Remote.CustomObjectName = meta.CustomObjectName
     143           1 :                 o.Remote.Storage, err = p.ensureStorageLocked(o.Remote.Locator)
     144           1 :                 if err != nil {
     145           0 :                         return errors.Wrapf(err, "creating remote.Storage object for locator '%s'", o.Remote.Locator)
     146           0 :                 }
     147           1 :                 if invariants.Enabled {
     148           1 :                         o.AssertValid()
     149           1 :                 }
     150           1 :                 p.mu.knownObjects[o.DiskFileNum] = o
     151           1 :                 if o.IsExternal() {
     152           1 :                         p.mu.remote.addExternalObject(o)
     153           1 :                 }
     154             :         }
     155           1 :         return nil
     156             : }
     157             : 
     158             : // initShared initializes the creator ID, allowing use of shared objects.
     159           1 : func (ss *remoteSubsystem) initShared(creatorID objstorage.CreatorID) {
     160           1 :         ss.shared.initOnce.Do(func() {
     161           1 :                 ss.shared.creatorID = creatorID
     162           1 :                 ss.shared.initialized.Store(true)
     163           1 :         })
     164             : }
     165             : 
     166           1 : func (p *provider) sharedClose() error {
     167           1 :         if p.st.Remote.StorageFactory == nil {
     168           1 :                 return nil
     169           1 :         }
     170           1 :         err := p.sharedSync()
     171           1 :         if p.remote.cache != nil {
     172           0 :                 err = firstError(err, p.remote.cache.Close())
     173           0 :                 p.remote.cache = nil
     174           0 :         }
     175           1 :         if p.remote.catalog != nil {
     176           1 :                 err = firstError(err, p.remote.catalog.Close())
     177           1 :                 p.remote.catalog = nil
     178           1 :         }
     179           1 :         return err
     180             : }
     181             : 
     182             : // SetCreatorID is part of the objstorage.Provider interface.
     183           1 : func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error {
     184           1 :         if p.st.Remote.StorageFactory == nil {
     185           0 :                 return base.AssertionFailedf("attempt to set CreatorID but remote storage not enabled")
     186           0 :         }
     187             :         // Note: this call is a cheap no-op if the creator ID was already set. This
     188             :         // call also checks if we are trying to change the ID.
     189           1 :         if err := p.remote.catalog.SetCreatorID(creatorID); err != nil {
     190           0 :                 return err
     191           0 :         }
     192           1 :         if !p.remote.shared.initialized.Load() {
     193           1 :                 p.st.Logger.Infof("remote storage creatorID set to %s", creatorID)
     194           1 :                 p.remote.initShared(creatorID)
     195           1 :         }
     196           1 :         return nil
     197             : }
     198             : 
     199             : // IsSharedForeign is part of the objstorage.Provider interface.
     200           1 : func (p *provider) IsSharedForeign(meta objstorage.ObjectMetadata) bool {
     201           1 :         if !p.remote.shared.initialized.Load() {
     202           0 :                 return false
     203           0 :         }
     204           1 :         return meta.IsShared() && (meta.Remote.CreatorID != p.remote.shared.creatorID)
     205             : }
     206             : 
     207           1 : func (p *provider) remoteCheckInitialized() error {
     208           1 :         if p.st.Remote.StorageFactory == nil {
     209           0 :                 return errors.Errorf("remote object support not configured")
     210           0 :         }
     211           1 :         return nil
     212             : }
     213             : 
     214           1 : func (p *provider) sharedCheckInitialized() error {
     215           1 :         if err := p.remoteCheckInitialized(); err != nil {
     216           0 :                 return err
     217           0 :         }
     218           1 :         if !p.remote.shared.initialized.Load() {
     219           0 :                 return errors.Errorf("remote object support not available: remote creator ID not yet set")
     220           0 :         }
     221           1 :         return nil
     222             : }
     223             : 
     224           1 : func (p *provider) sharedSync() error {
     225           1 :         // Serialize parallel sync operations. Note that ApplyBatch is already
     226           1 :         // serialized internally, but we want to make sure they get called with
     227           1 :         // batches in the right order.
     228           1 :         p.remote.catalogSyncMutex.Lock()
     229           1 :         defer p.remote.catalogSyncMutex.Unlock()
     230           1 : 
     231           1 :         batch := func() remoteobjcat.Batch {
     232           1 :                 p.mu.Lock()
     233           1 :                 defer p.mu.Unlock()
     234           1 :                 res := p.mu.remote.catalogBatch.Copy()
     235           1 :                 p.mu.remote.catalogBatch.Reset()
     236           1 :                 return res
     237           1 :         }()
     238             : 
     239           1 :         if batch.IsEmpty() {
     240           1 :                 return nil
     241           1 :         }
     242             : 
     243           1 :         if err := p.remote.catalog.ApplyBatch(batch); err != nil {
     244           0 :                 // Put back the batch (for the next Sync), appending any operations that
     245           0 :                 // happened in the meantime.
     246           0 :                 p.mu.Lock()
     247           0 :                 defer p.mu.Unlock()
     248           0 :                 batch.Append(p.mu.remote.catalogBatch)
     249           0 :                 p.mu.remote.catalogBatch = batch
     250           0 :                 return err
     251           0 :         }
     252             : 
     253           1 :         return nil
     254             : }
     255             : 
     256           1 : func (p *provider) remotePath(meta objstorage.ObjectMetadata) string {
     257           1 :         if meta.Remote.Locator != "" {
     258           1 :                 return fmt.Sprintf("remote-%s://%s", meta.Remote.Locator, remoteObjectName(meta))
     259           1 :         }
     260           1 :         return "remote://" + remoteObjectName(meta)
     261             : }
     262             : 
     263             : // sharedCreateRef creates a reference marker object.
     264           1 : func (p *provider) sharedCreateRef(meta objstorage.ObjectMetadata) error {
     265           1 :         if err := p.sharedCheckInitialized(); err != nil {
     266           0 :                 return err
     267           0 :         }
     268           1 :         if meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
     269           1 :                 return nil
     270           1 :         }
     271           1 :         refName := p.sharedObjectRefName(meta)
     272           1 :         writer, err := meta.Remote.Storage.CreateObject(refName)
     273           1 :         if err == nil {
     274           1 :                 // The object is empty, just close the writer.
     275           1 :                 err = writer.Close()
     276           1 :         }
     277           1 :         if err != nil {
     278           0 :                 return errors.Wrapf(err, "creating marker object %q", errors.Safe(refName))
     279           0 :         }
     280           1 :         return nil
     281             : }
     282             : 
     283             : func (p *provider) sharedCreate(
     284             :         _ context.Context,
     285             :         fileType base.FileType,
     286             :         fileNum base.DiskFileNum,
     287             :         locator remote.Locator,
     288             :         opts objstorage.CreateOptions,
     289           1 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
     290           1 :         if err := p.sharedCheckInitialized(); err != nil {
     291           0 :                 return nil, objstorage.ObjectMetadata{}, err
     292           0 :         }
     293           1 :         storage, err := p.ensureStorage(locator)
     294           1 :         if err != nil {
     295           0 :                 return nil, objstorage.ObjectMetadata{}, err
     296           0 :         }
     297           1 :         meta := objstorage.ObjectMetadata{
     298           1 :                 DiskFileNum: fileNum,
     299           1 :                 FileType:    fileType,
     300           1 :         }
     301           1 :         meta.Remote.CreatorID = p.remote.shared.creatorID
     302           1 :         meta.Remote.CreatorFileNum = fileNum
     303           1 :         meta.Remote.CleanupMethod = opts.SharedCleanupMethod
     304           1 :         meta.Remote.Locator = locator
     305           1 :         meta.Remote.Storage = storage
     306           1 : 
     307           1 :         objName := remoteObjectName(meta)
     308           1 :         writer, err := storage.CreateObject(objName)
     309           1 :         if err != nil {
     310           0 :                 return nil, objstorage.ObjectMetadata{}, errors.Wrapf(err, "creating object %q", errors.Safe(objName))
     311           0 :         }
     312           1 :         return &sharedWritable{
     313           1 :                 p:             p,
     314           1 :                 meta:          meta,
     315           1 :                 storageWriter: writer,
     316           1 :         }, meta, nil
     317             : }
     318             : 
     319             : func (p *provider) remoteOpenForReading(
     320             :         ctx context.Context, meta objstorage.ObjectMetadata, opts objstorage.OpenOptions,
     321           1 : ) (objstorage.Readable, error) {
     322           1 :         if err := p.remoteCheckInitialized(); err != nil {
     323           0 :                 return nil, err
     324           0 :         }
     325             :         // Verify we have a reference on this object; for performance reasons, we only
     326             :         // do this in testing scenarios.
     327           1 :         if p.remote.shared.checkRefsOnOpen && meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
     328           1 :                 if err := p.sharedCheckInitialized(); err != nil {
     329           0 :                         return nil, err
     330           0 :                 }
     331           1 :                 refName := p.sharedObjectRefName(meta)
     332           1 :                 if _, err := meta.Remote.Storage.Size(refName); err != nil {
     333           0 :                         if meta.Remote.Storage.IsNotExistError(err) {
     334           0 :                                 if opts.MustExist {
     335           0 :                                         p.st.Logger.Fatalf("marker object %q does not exist", errors.Safe(refName))
     336           0 :                                         // TODO(radu): maybe list references for the object.
     337           0 :                                 }
     338           0 :                                 return nil, errors.Errorf("marker object %q does not exist", errors.Safe(refName))
     339             :                         }
     340           0 :                         return nil, errors.Wrapf(err, "checking marker object %q", errors.Safe(refName))
     341             :                 }
     342             :         }
     343           1 :         objName := remoteObjectName(meta)
     344           1 :         reader, size, err := meta.Remote.Storage.ReadObject(ctx, objName)
     345           1 :         if err != nil {
     346           1 :                 if opts.MustExist && meta.Remote.Storage.IsNotExistError(err) {
     347           0 :                         p.st.Logger.Fatalf("object %q does not exist", redact.SafeString(objName))
     348           0 :                         // TODO(radu): maybe list references for the object.
     349           0 :                 }
     350           1 :                 return nil, err
     351             :         }
     352           1 :         return p.newRemoteReadable(reader, size, meta.DiskFileNum), nil
     353             : }
     354             : 
     355           1 : func (p *provider) remoteSize(meta objstorage.ObjectMetadata) (int64, error) {
     356           1 :         if err := p.remoteCheckInitialized(); err != nil {
     357           0 :                 return 0, err
     358           0 :         }
     359           1 :         objName := remoteObjectName(meta)
     360           1 :         return meta.Remote.Storage.Size(objName)
     361             : }
     362             : 
     363             : // sharedUnref implements object "removal" with the remote backend. The ref
     364             : // marker object is removed and the backing object is removed only if there are
     365             : // no other ref markers.
     366           1 : func (p *provider) sharedUnref(meta objstorage.ObjectMetadata) error {
     367           1 :         if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
     368           1 :                 // Never delete objects in this mode.
     369           1 :                 return nil
     370           1 :         }
     371           1 :         if p.isProtected(meta.DiskFileNum) {
     372           1 :                 // TODO(radu): we need a mechanism to unref the object when it becomes
     373           1 :                 // unprotected.
     374           1 :                 return nil
     375           1 :         }
     376             : 
     377           1 :         refName := p.sharedObjectRefName(meta)
     378           1 :         // Tolerate a not-exists error.
     379           1 :         if err := meta.Remote.Storage.Delete(refName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
     380           0 :                 return err
     381           0 :         }
     382           1 :         otherRefs, err := meta.Remote.Storage.List(sharedObjectRefPrefix(meta), "" /* delimiter */)
     383           1 :         if err != nil {
     384           0 :                 return err
     385           0 :         }
     386           1 :         if len(otherRefs) == 0 {
     387           1 :                 objName := remoteObjectName(meta)
     388           1 :                 if err := meta.Remote.Storage.Delete(objName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
     389           0 :                         return err
     390           0 :                 }
     391             :         }
     392           1 :         return nil
     393             : }
     394             : 
     395             : // ensureStorageLocked populates the remote.Storage object for the given
     396             : // locator, if necessary. p.mu must be held.
     397           1 : func (p *provider) ensureStorageLocked(locator remote.Locator) (remote.Storage, error) {
     398           1 :         if p.mu.remote.storageObjects == nil {
     399           1 :                 p.mu.remote.storageObjects = make(map[remote.Locator]remote.Storage)
     400           1 :         }
     401           1 :         if res, ok := p.mu.remote.storageObjects[locator]; ok {
     402           1 :                 return res, nil
     403           1 :         }
     404           1 :         res, err := p.st.Remote.StorageFactory.CreateStorage(locator)
     405           1 :         if err != nil {
     406           1 :                 return nil, err
     407           1 :         }
     408             : 
     409           1 :         p.mu.remote.storageObjects[locator] = res
     410           1 :         return res, nil
     411             : }
     412             : 
     413             : // ensureStorage populates the remote.Storage object for the given locator, if necessary.
     414           1 : func (p *provider) ensureStorage(locator remote.Locator) (remote.Storage, error) {
     415           1 :         p.mu.Lock()
     416           1 :         defer p.mu.Unlock()
     417           1 :         return p.ensureStorageLocked(locator)
     418           1 : }
     419             : 
     420             : // GetExternalObjects is part of the Provider interface.
     421           1 : func (p *provider) GetExternalObjects(locator remote.Locator, objName string) []base.DiskFileNum {
     422           1 :         p.mu.Lock()
     423           1 :         defer p.mu.Unlock()
     424           1 :         return slices.Clone(p.mu.remote.getExternalObjects(locator, objName))
     425           1 : }

Generated by: LCOV version 1.14