LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider/remoteobjcat - catalog.go (source / functions) Hit Total Coverage
Test: 2023-12-14 08:16Z 288bf0fb - tests only.lcov Lines: 170 225 75.6 %
Date: 2023-12-14 08:16:45 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package remoteobjcat
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "fmt"
      10             :         "io"
      11             :         "slices"
      12             :         "sync"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/objstorage"
      17             :         "github.com/cockroachdb/pebble/objstorage/remote"
      18             :         "github.com/cockroachdb/pebble/record"
      19             :         "github.com/cockroachdb/pebble/vfs"
      20             :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      21             : )
      22             : 
      23             : // Catalog is used to manage the on-disk remote object catalog.
      24             : //
      25             : // The catalog file is a log of records, where each record is an encoded
      26             : // VersionEdit.
      27             : type Catalog struct {
      28             :         fs      vfs.FS
      29             :         dirname string
      30             :         mu      struct {
      31             :                 sync.Mutex
      32             : 
      33             :                 creatorID objstorage.CreatorID
      34             :                 objects   map[base.DiskFileNum]RemoteObjectMetadata
      35             : 
      36             :                 marker *atomicfs.Marker
      37             : 
      38             :                 catalogFile      vfs.File
      39             :                 catalogRecWriter *record.Writer
      40             : 
      41             :                 rotationHelper record.RotationHelper
      42             : 
      43             :                 // catalogFilename is the filename of catalogFile when catalogFile != nil, otherwise
      44             :                 // it is the filename of the last catalog file.
      45             :                 catalogFilename string
      46             :         }
      47             : }
      48             : 
      49             : // RemoteObjectMetadata encapsulates the data stored in the catalog file for each object.
      50             : type RemoteObjectMetadata struct {
      51             :         // FileNum is the identifier for the object within the context of a single DB
      52             :         // instance.
      53             :         FileNum base.DiskFileNum
      54             :         // FileType is the type of the object. Only certain FileTypes are possible.
      55             :         FileType base.FileType
      56             :         // CreatorID identifies the DB instance that originally created the object.
      57             :         CreatorID objstorage.CreatorID
      58             :         // CreatorFileNum is the identifier for the object within the context of the
      59             :         // DB instance that originally created the object.
      60             :         CreatorFileNum base.DiskFileNum
      61             :         // CleanupMethod indicates the method for cleaning up unused shared objects.
      62             :         CleanupMethod objstorage.SharedCleanupMethod
      63             :         // Locator identifies a remote.Storage implementation.
      64             :         Locator remote.Locator
      65             :         // CustomObjectName (if it is set) overrides the object name that is normally
      66             :         // derived from the CreatorID and CreatorFileNum.
      67             :         CustomObjectName string
      68             : }
      69             : 
      70             : const (
      71             :         catalogFilenameBase = "REMOTE-OBJ-CATALOG"
      72             :         catalogMarkerName   = "remote-obj-catalog"
      73             : 
      74             :         // We create a new file when the size exceeds 1MB (and some other conditions
      75             :         // hold; see record.RotationHelper).
      76             :         rotateFileSize = 1024 * 1024 // 1MB
      77             : )
      78             : 
      79             : // CatalogContents contains the remote objects in the catalog.
      80             : type CatalogContents struct {
      81             :         // CreatorID, if it is set.
      82             :         CreatorID objstorage.CreatorID
      83             :         Objects   []RemoteObjectMetadata
      84             : }
      85             : 
      86             : // Open creates a Catalog and loads any existing catalog file, returning the
      87             : // creator ID (if it is set) and the contents.
      88           1 : func Open(fs vfs.FS, dirname string) (*Catalog, CatalogContents, error) {
      89           1 :         c := &Catalog{
      90           1 :                 fs:      fs,
      91           1 :                 dirname: dirname,
      92           1 :         }
      93           1 :         c.mu.objects = make(map[base.DiskFileNum]RemoteObjectMetadata)
      94           1 : 
      95           1 :         var err error
      96           1 :         c.mu.marker, c.mu.catalogFilename, err = atomicfs.LocateMarker(fs, dirname, catalogMarkerName)
      97           1 :         if err != nil {
      98           0 :                 return nil, CatalogContents{}, err
      99           0 :         }
     100             :         // If the filename is empty, there is no existing catalog.
     101           1 :         if c.mu.catalogFilename != "" {
     102           1 :                 if err := c.loadFromCatalogFile(c.mu.catalogFilename); err != nil {
     103           0 :                         return nil, CatalogContents{}, err
     104           0 :                 }
     105           1 :                 if err := c.mu.marker.RemoveObsolete(); err != nil {
     106           0 :                         return nil, CatalogContents{}, err
     107           0 :                 }
     108             :                 // TODO(radu): remove obsolete catalog files.
     109             :         }
     110           1 :         res := CatalogContents{
     111           1 :                 CreatorID: c.mu.creatorID,
     112           1 :                 Objects:   make([]RemoteObjectMetadata, 0, len(c.mu.objects)),
     113           1 :         }
     114           1 :         for _, meta := range c.mu.objects {
     115           1 :                 res.Objects = append(res.Objects, meta)
     116           1 :         }
     117             :         // Sort the objects so the function is deterministic.
     118           1 :         slices.SortFunc(res.Objects, func(a, b RemoteObjectMetadata) int {
     119           1 :                 return cmp.Compare(a.FileNum, b.FileNum)
     120           1 :         })
     121           1 :         return c, res, nil
     122             : }
     123             : 
     124             : // SetCreatorID sets the creator ID. If it is already set, it must match.
     125           1 : func (c *Catalog) SetCreatorID(id objstorage.CreatorID) error {
     126           1 :         if !id.IsSet() {
     127           0 :                 return errors.AssertionFailedf("attempt to unset CreatorID")
     128           0 :         }
     129             : 
     130           1 :         c.mu.Lock()
     131           1 :         defer c.mu.Unlock()
     132           1 : 
     133           1 :         if c.mu.creatorID.IsSet() {
     134           1 :                 if c.mu.creatorID != id {
     135           1 :                         return errors.AssertionFailedf("attempt to change CreatorID from %s to %s", c.mu.creatorID, id)
     136           1 :                 }
     137           1 :                 return nil
     138             :         }
     139             : 
     140           1 :         ve := VersionEdit{CreatorID: id}
     141           1 :         if err := c.writeToCatalogFileLocked(&ve); err != nil {
     142           0 :                 return errors.Wrapf(err, "pebble: could not write to remote object catalog")
     143           0 :         }
     144           1 :         c.mu.creatorID = id
     145           1 :         return nil
     146             : }
     147             : 
     148             : // Close any open files.
     149           1 : func (c *Catalog) Close() error {
     150           1 :         return c.closeCatalogFile()
     151           1 : }
     152             : 
     153           1 : func (c *Catalog) closeCatalogFile() error {
     154           1 :         if c.mu.catalogFile == nil {
     155           1 :                 return nil
     156           1 :         }
     157           1 :         err1 := c.mu.catalogRecWriter.Close()
     158           1 :         err2 := c.mu.catalogFile.Close()
     159           1 :         c.mu.catalogRecWriter = nil
     160           1 :         c.mu.catalogFile = nil
     161           1 :         if err1 != nil {
     162           0 :                 return err1
     163           0 :         }
     164           1 :         return err2
     165             : }
     166             : 
     167             : // Batch is used to perform multiple object additions/deletions at once.
     168             : type Batch struct {
     169             :         ve VersionEdit
     170             : }
     171             : 
     172             : // AddObject adds a new object to the batch.
     173             : //
     174             : // The given FileNum must be new - it must not match that of any object that was
     175             : // ever in the catalog.
     176           1 : func (b *Batch) AddObject(meta RemoteObjectMetadata) {
     177           1 :         b.ve.NewObjects = append(b.ve.NewObjects, meta)
     178           1 : }
     179             : 
     180             : // DeleteObject adds an object removal to the batch.
     181           1 : func (b *Batch) DeleteObject(fileNum base.DiskFileNum) {
     182           1 :         b.ve.DeletedObjects = append(b.ve.DeletedObjects, fileNum)
     183           1 : }
     184             : 
     185             : // Reset clears the batch.
     186           1 : func (b *Batch) Reset() {
     187           1 :         b.ve.NewObjects = b.ve.NewObjects[:0]
     188           1 :         b.ve.DeletedObjects = b.ve.DeletedObjects[:0]
     189           1 : }
     190             : 
     191             : // IsEmpty returns true if the batch is empty.
     192           1 : func (b *Batch) IsEmpty() bool {
     193           1 :         return len(b.ve.NewObjects) == 0 && len(b.ve.DeletedObjects) == 0
     194           1 : }
     195             : 
     196             : // Copy returns a copy of the Batch.
     197           1 : func (b *Batch) Copy() Batch {
     198           1 :         var res Batch
     199           1 :         if len(b.ve.NewObjects) > 0 {
     200           1 :                 res.ve.NewObjects = make([]RemoteObjectMetadata, len(b.ve.NewObjects))
     201           1 :                 copy(res.ve.NewObjects, b.ve.NewObjects)
     202           1 :         }
     203           1 :         if len(b.ve.DeletedObjects) > 0 {
     204           1 :                 res.ve.DeletedObjects = make([]base.DiskFileNum, len(b.ve.DeletedObjects))
     205           1 :                 copy(res.ve.DeletedObjects, b.ve.DeletedObjects)
     206           1 :         }
     207           1 :         return res
     208             : }
     209             : 
     210             : // Append merges two batches.
     211           0 : func (b *Batch) Append(other Batch) {
     212           0 :         b.ve.NewObjects = append(b.ve.NewObjects, other.ve.NewObjects...)
     213           0 :         b.ve.DeletedObjects = append(b.ve.DeletedObjects, other.ve.DeletedObjects...)
     214           0 : }
     215             : 
     216             : // ApplyBatch applies a batch of updates; returns after the change is stably
     217             : // recorded on storage.
     218           1 : func (c *Catalog) ApplyBatch(b Batch) error {
     219           1 :         c.mu.Lock()
     220           1 :         defer c.mu.Unlock()
     221           1 : 
     222           1 :         // Sanity checks.
     223           1 :         toAdd := make(map[base.DiskFileNum]struct{}, len(b.ve.NewObjects))
     224           1 :         exists := func(n base.DiskFileNum) bool {
     225           1 :                 _, ok := c.mu.objects[n]
     226           1 :                 if !ok {
     227           1 :                         _, ok = toAdd[n]
     228           1 :                 }
     229           1 :                 return ok
     230             :         }
     231           1 :         for _, meta := range b.ve.NewObjects {
     232           1 :                 if exists(meta.FileNum) {
     233           1 :                         return errors.AssertionFailedf("adding existing object %s", meta.FileNum)
     234           1 :                 }
     235           1 :                 toAdd[meta.FileNum] = struct{}{}
     236             :         }
     237           1 :         for _, n := range b.ve.DeletedObjects {
     238           1 :                 if !exists(n) {
     239           1 :                         return errors.AssertionFailedf("deleting non-existent object %s", n)
     240           1 :                 }
     241             :         }
     242             : 
     243           1 :         if err := c.writeToCatalogFileLocked(&b.ve); err != nil {
     244           0 :                 return errors.Wrapf(err, "pebble: could not write to remote object catalog")
     245           0 :         }
     246             : 
     247             :         // Add new objects before deleting any objects. This allows for cases where
     248             :         // the same batch adds and deletes an object.
     249           1 :         for _, meta := range b.ve.NewObjects {
     250           1 :                 c.mu.objects[meta.FileNum] = meta
     251           1 :         }
     252           1 :         for _, n := range b.ve.DeletedObjects {
     253           1 :                 delete(c.mu.objects, n)
     254           1 :         }
     255             : 
     256           1 :         return nil
     257             : }
     258             : 
     259           1 : func (c *Catalog) loadFromCatalogFile(filename string) error {
     260           1 :         catalogPath := c.fs.PathJoin(c.dirname, filename)
     261           1 :         f, err := c.fs.Open(catalogPath)
     262           1 :         if err != nil {
     263           0 :                 return errors.Wrapf(
     264           0 :                         err, "pebble: could not open remote object catalog file %q for DB %q",
     265           0 :                         errors.Safe(filename), c.dirname,
     266           0 :                 )
     267           0 :         }
     268           1 :         defer f.Close()
     269           1 :         rr := record.NewReader(f, 0 /* logNum */)
     270           1 :         for {
     271           1 :                 r, err := rr.Next()
     272           1 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     273           1 :                         break
     274             :                 }
     275           1 :                 if err != nil {
     276           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     277           0 :                                 errors.Safe(filename))
     278           0 :                 }
     279           1 :                 var ve VersionEdit
     280           1 :                 if err := ve.Decode(r); err != nil {
     281           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     282           0 :                                 errors.Safe(filename))
     283           0 :                 }
     284             :                 // Apply the version edit to the current state.
     285           1 :                 if err := ve.Apply(&c.mu.creatorID, c.mu.objects); err != nil {
     286           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     287           0 :                                 errors.Safe(filename))
     288           0 :                 }
     289             :         }
     290           1 :         return nil
     291             : }
     292             : 
     293             : // writeToCatalogFileLocked writes a VersionEdit to the catalog file.
     294             : // Creates a new file if this is the first write.
     295           1 : func (c *Catalog) writeToCatalogFileLocked(ve *VersionEdit) error {
     296           1 :         c.mu.rotationHelper.AddRecord(int64(len(ve.NewObjects) + len(ve.DeletedObjects)))
     297           1 :         snapshotSize := int64(len(c.mu.objects))
     298           1 : 
     299           1 :         var shouldRotate bool
     300           1 :         if c.mu.catalogFile == nil {
     301           1 :                 shouldRotate = true
     302           1 :         } else if c.mu.catalogRecWriter.Size() >= rotateFileSize {
     303           1 :                 shouldRotate = c.mu.rotationHelper.ShouldRotate(snapshotSize)
     304           1 :         }
     305             : 
     306           1 :         if shouldRotate {
     307           1 :                 if c.mu.catalogFile != nil {
     308           1 :                         if err := c.closeCatalogFile(); err != nil {
     309           0 :                                 return err
     310           0 :                         }
     311             :                 }
     312           1 :                 if err := c.createNewCatalogFileLocked(); err != nil {
     313           0 :                         return err
     314           0 :                 }
     315           1 :                 c.mu.rotationHelper.Rotate(snapshotSize)
     316             :         }
     317           1 :         return writeRecord(ve, c.mu.catalogFile, c.mu.catalogRecWriter)
     318             : }
     319             : 
     320           1 : func makeCatalogFilename(iter uint64) string {
     321           1 :         return fmt.Sprintf("%s-%06d", catalogFilenameBase, iter)
     322           1 : }
     323             : 
     324             : // createNewCatalogFileLocked creates a new catalog file, populates it with the
     325             : // current catalog and sets c.mu.catalogFile and c.mu.catalogRecWriter.
     326           1 : func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
     327           1 :         if c.mu.catalogFile != nil {
     328           0 :                 return errors.AssertionFailedf("catalogFile already open")
     329           0 :         }
     330           1 :         filename := makeCatalogFilename(c.mu.marker.NextIter())
     331           1 :         filepath := c.fs.PathJoin(c.dirname, filename)
     332           1 :         file, err := c.fs.Create(filepath)
     333           1 :         if err != nil {
     334           0 :                 return err
     335           0 :         }
     336           1 :         recWriter := record.NewWriter(file)
     337           1 :         err = func() error {
     338           1 :                 // Create a VersionEdit that gets us from an empty catalog to the current state.
     339           1 :                 var ve VersionEdit
     340           1 :                 ve.CreatorID = c.mu.creatorID
     341           1 :                 ve.NewObjects = make([]RemoteObjectMetadata, 0, len(c.mu.objects))
     342           1 :                 for _, meta := range c.mu.objects {
     343           1 :                         ve.NewObjects = append(ve.NewObjects, meta)
     344           1 :                 }
     345           1 :                 if err := writeRecord(&ve, file, recWriter); err != nil {
     346           0 :                         return err
     347           0 :                 }
     348             : 
     349             :                 // Move the marker to the new filename. Move handles syncing the data
     350             :                 // directory as well.
     351           1 :                 if err := c.mu.marker.Move(filename); err != nil {
     352           0 :                         return errors.Wrap(err, "moving marker")
     353           0 :                 }
     354             : 
     355           1 :                 return nil
     356             :         }()
     357             : 
     358           1 :         if err != nil {
     359           0 :                 _ = recWriter.Close()
     360           0 :                 _ = file.Close()
     361           0 :                 _ = c.fs.Remove(filepath)
     362           0 :                 return err
     363           0 :         }
     364             : 
     365             :         // Remove any previous file (ignoring any error).
     366           1 :         if c.mu.catalogFilename != "" {
     367           1 :                 _ = c.fs.Remove(c.fs.PathJoin(c.dirname, c.mu.catalogFilename))
     368           1 :         }
     369             : 
     370           1 :         c.mu.catalogFile = file
     371           1 :         c.mu.catalogRecWriter = recWriter
     372           1 :         c.mu.catalogFilename = filename
     373           1 :         return nil
     374             : }
     375             : 
     376           1 : func writeRecord(ve *VersionEdit, file vfs.File, recWriter *record.Writer) error {
     377           1 :         w, err := recWriter.Next()
     378           1 :         if err != nil {
     379           0 :                 return err
     380           0 :         }
     381           1 :         if err := ve.Encode(w); err != nil {
     382           0 :                 return err
     383           0 :         }
     384           1 :         if err := recWriter.Flush(); err != nil {
     385           0 :                 return err
     386           0 :         }
     387           1 :         return file.Sync()
     388             : }

Generated by: LCOV version 1.14