LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider/remoteobjcat - catalog.go (source / functions) Hit Total Coverage
Test: 2024-05-18 08:15Z 6195a2cb - tests + meta.lcov Lines: 187 248 75.4 %
Date: 2024-05-18 08:16:58 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package remoteobjcat
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "fmt"
      10             :         "io"
      11             :         "path/filepath"
      12             :         "slices"
      13             :         "sync"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/objstorage"
      18             :         "github.com/cockroachdb/pebble/objstorage/remote"
      19             :         "github.com/cockroachdb/pebble/record"
      20             :         "github.com/cockroachdb/pebble/vfs"
      21             :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      22             : )
      23             : 
      24             : // Catalog is used to manage the on-disk remote object catalog.
      25             : //
      26             : // The catalog file is a log of records, where each record is an encoded
      27             : // VersionEdit.
      28             : type Catalog struct {
      29             :         fs      vfs.FS
      30             :         dirname string
      31             :         mu      struct {
      32             :                 sync.Mutex
      33             : 
      34             :                 creatorID objstorage.CreatorID
      35             :                 objects   map[base.DiskFileNum]RemoteObjectMetadata
      36             : 
      37             :                 marker *atomicfs.Marker
      38             : 
      39             :                 catalogFile      vfs.File
      40             :                 catalogRecWriter *record.Writer
      41             : 
      42             :                 rotationHelper record.RotationHelper
      43             : 
      44             :                 // catalogFilename is the filename of catalogFile when catalogFile != nil, otherwise
      45             :                 // it is the filename of the last catalog file.
      46             :                 catalogFilename string
      47             :         }
      48             : }
      49             : 
      50             : // RemoteObjectMetadata encapsulates the data stored in the catalog file for each object.
      51             : type RemoteObjectMetadata struct {
      52             :         // FileNum is the identifier for the object within the context of a single DB
      53             :         // instance.
      54             :         FileNum base.DiskFileNum
      55             :         // FileType is the type of the object. Only certain FileTypes are possible.
      56             :         FileType base.FileType
      57             :         // CreatorID identifies the DB instance that originally created the object.
      58             :         CreatorID objstorage.CreatorID
      59             :         // CreatorFileNum is the identifier for the object within the context of the
      60             :         // DB instance that originally created the object.
      61             :         CreatorFileNum base.DiskFileNum
      62             :         // CleanupMethod indicates the method for cleaning up unused shared objects.
      63             :         CleanupMethod objstorage.SharedCleanupMethod
      64             :         // Locator identifies a remote.Storage implementation.
      65             :         Locator remote.Locator
      66             :         // CustomObjectName (if it is set) overrides the object name that is normally
      67             :         // derived from the CreatorID and CreatorFileNum.
      68             :         CustomObjectName string
      69             : }
      70             : 
      71             : const (
      72             :         catalogFilenameBase = "REMOTE-OBJ-CATALOG"
      73             :         catalogMarkerName   = "remote-obj-catalog"
      74             : 
      75             :         // We create a new file when the size exceeds 1MB (and some other conditions
      76             :         // hold; see record.RotationHelper).
      77             :         rotateFileSize = 1024 * 1024 // 1MB
      78             : )
      79             : 
      80             : // CatalogContents contains the remote objects in the catalog.
      81             : type CatalogContents struct {
      82             :         // CreatorID, if it is set.
      83             :         CreatorID objstorage.CreatorID
      84             :         Objects   []RemoteObjectMetadata
      85             : }
      86             : 
      87             : // Open creates a Catalog and loads any existing catalog file, returning the
      88             : // creator ID (if it is set) and the contents.
      89           2 : func Open(fs vfs.FS, dirname string) (*Catalog, CatalogContents, error) {
      90           2 :         c := &Catalog{
      91           2 :                 fs:      fs,
      92           2 :                 dirname: dirname,
      93           2 :         }
      94           2 :         c.mu.objects = make(map[base.DiskFileNum]RemoteObjectMetadata)
      95           2 : 
      96           2 :         var err error
      97           2 :         c.mu.marker, c.mu.catalogFilename, err = atomicfs.LocateMarker(fs, dirname, catalogMarkerName)
      98           2 :         if err != nil {
      99           0 :                 return nil, CatalogContents{}, err
     100           0 :         }
     101             :         // If the filename is empty, there is no existing catalog.
     102           2 :         if c.mu.catalogFilename != "" {
     103           2 :                 if err := c.loadFromCatalogFile(c.mu.catalogFilename); err != nil {
     104           0 :                         return nil, CatalogContents{}, err
     105           0 :                 }
     106           2 :                 if err := c.mu.marker.RemoveObsolete(); err != nil {
     107           0 :                         return nil, CatalogContents{}, err
     108           0 :                 }
     109             :                 // TODO(radu): remove obsolete catalog files.
     110             :         }
     111           2 :         res := CatalogContents{
     112           2 :                 CreatorID: c.mu.creatorID,
     113           2 :                 Objects:   make([]RemoteObjectMetadata, 0, len(c.mu.objects)),
     114           2 :         }
     115           2 :         for _, meta := range c.mu.objects {
     116           2 :                 res.Objects = append(res.Objects, meta)
     117           2 :         }
     118             :         // Sort the objects so the function is deterministic.
     119           2 :         slices.SortFunc(res.Objects, func(a, b RemoteObjectMetadata) int {
     120           2 :                 return cmp.Compare(a.FileNum, b.FileNum)
     121           2 :         })
     122           2 :         return c, res, nil
     123             : }
     124             : 
     125             : // SetCreatorID sets the creator ID. If it is already set, it must match.
     126           2 : func (c *Catalog) SetCreatorID(id objstorage.CreatorID) error {
     127           2 :         if !id.IsSet() {
     128           0 :                 return base.AssertionFailedf("attempt to unset CreatorID")
     129           0 :         }
     130             : 
     131           2 :         c.mu.Lock()
     132           2 :         defer c.mu.Unlock()
     133           2 : 
     134           2 :         if c.mu.creatorID.IsSet() {
     135           1 :                 if c.mu.creatorID != id {
     136           1 :                         return base.AssertionFailedf("attempt to change CreatorID from %s to %s", c.mu.creatorID, id)
     137           1 :                 }
     138           1 :                 return nil
     139             :         }
     140             : 
     141           2 :         ve := VersionEdit{CreatorID: id}
     142           2 :         if err := c.writeToCatalogFileLocked(&ve); err != nil {
     143           0 :                 return errors.Wrapf(err, "pebble: could not write to remote object catalog")
     144           0 :         }
     145           2 :         c.mu.creatorID = id
     146           2 :         return nil
     147             : }
     148             : 
     149             : // Close any open files.
     150           2 : func (c *Catalog) Close() error {
     151           2 :         var err error
     152           2 :         if c.mu.marker != nil {
     153           2 :                 err = c.mu.marker.Close()
     154           2 :                 c.mu.marker = nil
     155           2 :         }
     156           2 :         return errors.CombineErrors(err, c.closeCatalogFile())
     157             : }
     158             : 
     159           2 : func (c *Catalog) closeCatalogFile() error {
     160           2 :         if c.mu.catalogFile == nil {
     161           2 :                 return nil
     162           2 :         }
     163           2 :         err1 := c.mu.catalogRecWriter.Close()
     164           2 :         err2 := c.mu.catalogFile.Close()
     165           2 :         c.mu.catalogRecWriter = nil
     166           2 :         c.mu.catalogFile = nil
     167           2 :         if err1 != nil {
     168           0 :                 return err1
     169           0 :         }
     170           2 :         return err2
     171             : }
     172             : 
     173             : // Batch is used to perform multiple object additions/deletions at once.
     174             : type Batch struct {
     175             :         ve VersionEdit
     176             : }
     177             : 
     178             : // AddObject adds a new object to the batch.
     179             : //
     180             : // The given FileNum must be new - it must not match that of any object that was
     181             : // ever in the catalog.
     182           2 : func (b *Batch) AddObject(meta RemoteObjectMetadata) {
     183           2 :         b.ve.NewObjects = append(b.ve.NewObjects, meta)
     184           2 : }
     185             : 
     186             : // DeleteObject adds an object removal to the batch.
     187           2 : func (b *Batch) DeleteObject(fileNum base.DiskFileNum) {
     188           2 :         b.ve.DeletedObjects = append(b.ve.DeletedObjects, fileNum)
     189           2 : }
     190             : 
     191             : // Reset clears the batch.
     192           2 : func (b *Batch) Reset() {
     193           2 :         b.ve.NewObjects = b.ve.NewObjects[:0]
     194           2 :         b.ve.DeletedObjects = b.ve.DeletedObjects[:0]
     195           2 : }
     196             : 
     197             : // IsEmpty returns true if the batch is empty.
     198           2 : func (b *Batch) IsEmpty() bool {
     199           2 :         return len(b.ve.NewObjects) == 0 && len(b.ve.DeletedObjects) == 0
     200           2 : }
     201             : 
     202             : // Copy returns a copy of the Batch.
     203           2 : func (b *Batch) Copy() Batch {
     204           2 :         var res Batch
     205           2 :         if len(b.ve.NewObjects) > 0 {
     206           2 :                 res.ve.NewObjects = make([]RemoteObjectMetadata, len(b.ve.NewObjects))
     207           2 :                 copy(res.ve.NewObjects, b.ve.NewObjects)
     208           2 :         }
     209           2 :         if len(b.ve.DeletedObjects) > 0 {
     210           2 :                 res.ve.DeletedObjects = make([]base.DiskFileNum, len(b.ve.DeletedObjects))
     211           2 :                 copy(res.ve.DeletedObjects, b.ve.DeletedObjects)
     212           2 :         }
     213           2 :         return res
     214             : }
     215             : 
     216             : // Append merges two batches.
     217           0 : func (b *Batch) Append(other Batch) {
     218           0 :         b.ve.NewObjects = append(b.ve.NewObjects, other.ve.NewObjects...)
     219           0 :         b.ve.DeletedObjects = append(b.ve.DeletedObjects, other.ve.DeletedObjects...)
     220           0 : }
     221             : 
     222             : // ApplyBatch applies a batch of updates; returns after the change is stably
     223             : // recorded on storage.
     224           2 : func (c *Catalog) ApplyBatch(b Batch) error {
     225           2 :         c.mu.Lock()
     226           2 :         defer c.mu.Unlock()
     227           2 : 
     228           2 :         // Sanity checks.
     229           2 :         toAdd := make(map[base.DiskFileNum]struct{}, len(b.ve.NewObjects))
     230           2 :         exists := func(n base.DiskFileNum) bool {
     231           2 :                 _, ok := c.mu.objects[n]
     232           2 :                 if !ok {
     233           2 :                         _, ok = toAdd[n]
     234           2 :                 }
     235           2 :                 return ok
     236             :         }
     237           2 :         for _, meta := range b.ve.NewObjects {
     238           2 :                 if exists(meta.FileNum) {
     239           1 :                         return base.AssertionFailedf("adding existing object %s", meta.FileNum)
     240           1 :                 }
     241           2 :                 toAdd[meta.FileNum] = struct{}{}
     242             :         }
     243           2 :         for _, n := range b.ve.DeletedObjects {
     244           2 :                 if !exists(n) {
     245           1 :                         return base.AssertionFailedf("deleting non-existent object %s", n)
     246           1 :                 }
     247             :         }
     248             : 
     249           2 :         if err := c.writeToCatalogFileLocked(&b.ve); err != nil {
     250           0 :                 return errors.Wrapf(err, "pebble: could not write to remote object catalog")
     251           0 :         }
     252             : 
     253             :         // Add new objects before deleting any objects. This allows for cases where
     254             :         // the same batch adds and deletes an object.
     255           2 :         for _, meta := range b.ve.NewObjects {
     256           2 :                 c.mu.objects[meta.FileNum] = meta
     257           2 :         }
     258           2 :         for _, n := range b.ve.DeletedObjects {
     259           2 :                 delete(c.mu.objects, n)
     260           2 :         }
     261             : 
     262           2 :         return nil
     263             : }
     264             : 
     265           2 : func (c *Catalog) loadFromCatalogFile(filename string) error {
     266           2 :         catalogPath := c.fs.PathJoin(c.dirname, filename)
     267           2 :         f, err := c.fs.Open(catalogPath)
     268           2 :         if err != nil {
     269           0 :                 return errors.Wrapf(
     270           0 :                         err, "pebble: could not open remote object catalog file %q for DB %q",
     271           0 :                         errors.Safe(filename), c.dirname,
     272           0 :                 )
     273           0 :         }
     274           2 :         defer f.Close()
     275           2 :         rr := record.NewReader(f, 0 /* logNum */)
     276           2 :         for {
     277           2 :                 r, err := rr.Next()
     278           2 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     279           2 :                         break
     280             :                 }
     281           2 :                 if err != nil {
     282           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     283           0 :                                 errors.Safe(filename))
     284           0 :                 }
     285           2 :                 var ve VersionEdit
     286           2 :                 if err := ve.Decode(r); err != nil {
     287           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     288           0 :                                 errors.Safe(filename))
     289           0 :                 }
     290             :                 // Apply the version edit to the current state.
     291           2 :                 if err := ve.Apply(&c.mu.creatorID, c.mu.objects); err != nil {
     292           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     293           0 :                                 errors.Safe(filename))
     294           0 :                 }
     295             :         }
     296           2 :         return nil
     297             : }
     298             : 
     299             : // writeToCatalogFileLocked writes a VersionEdit to the catalog file.
     300             : // Creates a new file if this is the first write.
     301           2 : func (c *Catalog) writeToCatalogFileLocked(ve *VersionEdit) error {
     302           2 :         c.mu.rotationHelper.AddRecord(int64(len(ve.NewObjects) + len(ve.DeletedObjects)))
     303           2 :         snapshotSize := int64(len(c.mu.objects))
     304           2 : 
     305           2 :         var shouldRotate bool
     306           2 :         if c.mu.catalogFile == nil {
     307           2 :                 shouldRotate = true
     308           2 :         } else if c.mu.catalogRecWriter.Size() >= rotateFileSize {
     309           1 :                 shouldRotate = c.mu.rotationHelper.ShouldRotate(snapshotSize)
     310           1 :         }
     311             : 
     312           2 :         if shouldRotate {
     313           2 :                 if c.mu.catalogFile != nil {
     314           1 :                         if err := c.closeCatalogFile(); err != nil {
     315           0 :                                 return err
     316           0 :                         }
     317             :                 }
     318           2 :                 if err := c.createNewCatalogFileLocked(); err != nil {
     319           0 :                         return err
     320           0 :                 }
     321           2 :                 c.mu.rotationHelper.Rotate(snapshotSize)
     322             :         }
     323           2 :         return writeRecord(ve, c.mu.catalogFile, c.mu.catalogRecWriter)
     324             : }
     325             : 
     326           2 : func makeCatalogFilename(iter uint64) string {
     327           2 :         return fmt.Sprintf("%s-%06d", catalogFilenameBase, iter)
     328           2 : }
     329             : 
     330             : // createNewCatalogFileLocked creates a new catalog file, populates it with the
     331             : // current catalog and sets c.mu.catalogFile and c.mu.catalogRecWriter.
     332           2 : func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
     333           2 :         if c.mu.catalogFile != nil {
     334           0 :                 return base.AssertionFailedf("catalogFile already open")
     335           0 :         }
     336           2 :         filename := makeCatalogFilename(c.mu.marker.NextIter())
     337           2 :         filepath := c.fs.PathJoin(c.dirname, filename)
     338           2 :         file, err := c.fs.Create(filepath, "pebble-manifest")
     339           2 :         if err != nil {
     340           0 :                 return err
     341           0 :         }
     342           2 :         recWriter := record.NewWriter(file)
     343           2 :         err = func() error {
     344           2 :                 // Create a VersionEdit that gets us from an empty catalog to the current state.
     345           2 :                 var ve VersionEdit
     346           2 :                 ve.CreatorID = c.mu.creatorID
     347           2 :                 ve.NewObjects = make([]RemoteObjectMetadata, 0, len(c.mu.objects))
     348           2 :                 for _, meta := range c.mu.objects {
     349           2 :                         ve.NewObjects = append(ve.NewObjects, meta)
     350           2 :                 }
     351           2 :                 if err := writeRecord(&ve, file, recWriter); err != nil {
     352           0 :                         return err
     353           0 :                 }
     354             : 
     355             :                 // Move the marker to the new filename. Move handles syncing the data
     356             :                 // directory as well.
     357           2 :                 if err := c.mu.marker.Move(filename); err != nil {
     358           0 :                         return errors.Wrap(err, "moving marker")
     359           0 :                 }
     360             : 
     361           2 :                 return nil
     362             :         }()
     363             : 
     364           2 :         if err != nil {
     365           0 :                 _ = recWriter.Close()
     366           0 :                 _ = file.Close()
     367           0 :                 _ = c.fs.Remove(filepath)
     368           0 :                 return err
     369           0 :         }
     370             : 
     371             :         // Remove any previous file (ignoring any error).
     372           2 :         if c.mu.catalogFilename != "" {
     373           2 :                 _ = c.fs.Remove(c.fs.PathJoin(c.dirname, c.mu.catalogFilename))
     374           2 :         }
     375             : 
     376           2 :         c.mu.catalogFile = file
     377           2 :         c.mu.catalogRecWriter = recWriter
     378           2 :         c.mu.catalogFilename = filename
     379           2 :         return nil
     380             : }
     381             : 
     382             : // Checkpoint copies catalog state to a file in the specified directory
     383           1 : func (c *Catalog) Checkpoint(fs vfs.FS, dir string) error {
     384           1 :         c.mu.Lock()
     385           1 :         defer c.mu.Unlock()
     386           1 : 
     387           1 :         // NB: Every write to recWriter is flushed. We don't need to worry about
     388           1 :         // this new file descriptor not getting all the saved catalog entries.
     389           1 :         existingCatalogFilepath := filepath.Join(c.dirname, c.mu.catalogFilename)
     390           1 :         destPath := filepath.Join(dir, c.mu.catalogFilename)
     391           1 :         if err := vfs.CopyAcrossFS(c.fs, existingCatalogFilepath, fs, destPath); err != nil {
     392           0 :                 return err
     393           0 :         }
     394           1 :         catalogMarker, _, err := atomicfs.LocateMarker(fs, dir, catalogMarkerName)
     395           1 :         if err != nil {
     396           0 :                 return err
     397           0 :         }
     398           1 :         if err := catalogMarker.Move(c.mu.catalogFilename); err != nil {
     399           0 :                 return err
     400           0 :         }
     401           1 :         return catalogMarker.Close()
     402             : }
     403             : 
     404           2 : func writeRecord(ve *VersionEdit, file vfs.File, recWriter *record.Writer) error {
     405           2 :         w, err := recWriter.Next()
     406           2 :         if err != nil {
     407           0 :                 return err
     408           0 :         }
     409           2 :         if err := ve.Encode(w); err != nil {
     410           0 :                 return err
     411           0 :         }
     412           2 :         if err := recWriter.Flush(); err != nil {
     413           0 :                 return err
     414           0 :         }
     415           2 :         return file.Sync()
     416             : }

Generated by: LCOV version 1.14