Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "io"
11 : "os"
12 : "slices"
13 : "sync"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/errors/oserror"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
21 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
22 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
23 : "github.com/cockroachdb/pebble/objstorage/remote"
24 : "github.com/cockroachdb/pebble/vfs"
25 : )
26 :
27 : // provider is the implementation of objstorage.Provider.
28 : type provider struct {
29 : st Settings
30 :
31 : fsDir vfs.File
32 :
33 : tracer *objiotracing.Tracer
34 :
35 : remote remoteSubsystem
36 :
37 : mu struct {
38 : sync.RWMutex
39 :
40 : remote remoteLockedState
41 :
42 : // localObjectsChanged is set if non-remote objects were created or deleted
43 : // but Sync was not yet called.
44 : localObjectsChanged bool
45 :
46 : // knownObjects maintains information about objects that are known to the provider.
47 : // It is initialized with the list of files in the manifest when we open a DB.
48 : knownObjects map[base.DiskFileNum]objstorage.ObjectMetadata
49 :
50 : // protectedObjects are objects that cannot be unreferenced because they
51 : // have outstanding SharedObjectBackingHandles. The value is a count of outstanding handles
52 : protectedObjects map[base.DiskFileNum]int
53 : }
54 : }
55 :
56 : var _ objstorage.Provider = (*provider)(nil)
57 :
58 : // Settings that must be specified when creating the provider.
59 : type Settings struct {
60 : Logger base.Logger
61 :
62 : // Local filesystem configuration.
63 : FS vfs.FS
64 : FSDirName string
65 :
66 : // FSDirInitialListing is a listing of FSDirName at the time of calling Open.
67 : //
68 : // This is an optional optimization to avoid double listing on Open when the
69 : // higher layer already has a listing. When nil, we obtain the listing on
70 : // Open.
71 : FSDirInitialListing []string
72 :
73 : // Cleaner cleans obsolete files from the local filesystem.
74 : //
75 : // The default cleaner uses the DeleteCleaner.
76 : FSCleaner base.Cleaner
77 :
78 : // NoSyncOnClose decides whether the implementation will enforce a
79 : // close-time synchronization (e.g., fdatasync() or sync_file_range())
80 : // on files it writes to. Setting this to true removes the guarantee for a
81 : // sync on close. Some implementations can still issue a non-blocking sync.
82 : NoSyncOnClose bool
83 :
84 : // BytesPerSync enables periodic syncing of files in order to smooth out
85 : // writes to disk. This option does not provide any persistence guarantee, but
86 : // is used to avoid latency spikes if the OS automatically decides to write
87 : // out a large chunk of dirty filesystem buffers.
88 : BytesPerSync int
89 :
90 : // Fields here are set only if the provider is to support remote objects
91 : // (experimental).
92 : Remote struct {
93 : StorageFactory remote.StorageFactory
94 :
95 : // If CreateOnShared is non-zero, sstables are created on remote storage using
96 : // the CreateOnSharedLocator (when the PreferSharedStorage create option is
97 : // true).
98 : CreateOnShared remote.CreateOnSharedStrategy
99 : CreateOnSharedLocator remote.Locator
100 :
101 : // CacheSizeBytes is the size of the on-disk block cache for objects
102 : // on remote storage. If it is 0, no cache is used.
103 : CacheSizeBytes int64
104 :
105 : // CacheBlockSize is the block size of the cache; if 0, the default of 32KB is used.
106 : CacheBlockSize int
107 :
108 : // ShardingBlockSize is the size of a shard block. The cache is split into contiguous
109 : // ShardingBlockSize units. The units are distributed across multiple independent shards
110 : // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
111 : // policies operate at the level of shard, not whole cache. This is done to reduce lock
112 : // contention.
113 : //
114 : // If ShardingBlockSize is 0, the default of 1 MB is used.
115 : ShardingBlockSize int64
116 :
117 : // The number of independent shards the cache leverages. Each shard is the same size,
118 : // and a hash of filenum & offset map a read to a certain shard. If set to 0,
119 : // 2*runtime.GOMAXPROCS is used as the shard count.
120 : CacheShardCount int
121 :
122 : // TODO(radu): allow the cache to live on another FS/location (e.g. to use
123 : // instance-local SSD).
124 : }
125 : }
126 :
127 : // DefaultSettings initializes default settings (with no remote storage),
128 : // suitable for tests and tools.
129 1 : func DefaultSettings(fs vfs.FS, dirName string) Settings {
130 1 : return Settings{
131 1 : Logger: base.DefaultLogger,
132 1 : FS: fs,
133 1 : FSDirName: dirName,
134 1 : FSCleaner: base.DeleteCleaner{},
135 1 : NoSyncOnClose: false,
136 1 : BytesPerSync: 512 * 1024, // 512KB
137 1 : }
138 1 : }
139 :
140 : // Open creates the provider.
141 2 : func Open(settings Settings) (objstorage.Provider, error) {
142 2 : // Note: we can't just `return open(settings)` because in an error case we
143 2 : // would return (*provider)(nil) which is not objstorage.Provider(nil).
144 2 : p, err := open(settings)
145 2 : if err != nil {
146 1 : return nil, err
147 1 : }
148 2 : return p, nil
149 : }
150 :
151 2 : func open(settings Settings) (p *provider, _ error) {
152 2 : fsDir, err := settings.FS.OpenDir(settings.FSDirName)
153 2 : if err != nil {
154 1 : return nil, err
155 1 : }
156 :
157 2 : defer func() {
158 2 : if p == nil {
159 0 : fsDir.Close()
160 0 : }
161 : }()
162 :
163 2 : p = &provider{
164 2 : st: settings,
165 2 : fsDir: fsDir,
166 2 : }
167 2 : p.mu.knownObjects = make(map[base.DiskFileNum]objstorage.ObjectMetadata)
168 2 : p.mu.protectedObjects = make(map[base.DiskFileNum]int)
169 2 :
170 2 : if objiotracing.Enabled {
171 0 : p.tracer = objiotracing.Open(settings.FS, settings.FSDirName)
172 0 : }
173 :
174 : // Add local FS objects.
175 2 : if err := p.vfsInit(); err != nil {
176 0 : return nil, err
177 0 : }
178 :
179 : // Initialize remote subsystem (if configured) and add remote objects.
180 2 : if err := p.remoteInit(); err != nil {
181 0 : return nil, err
182 0 : }
183 :
184 2 : return p, nil
185 : }
186 :
187 : // Close is part of the objstorage.Provider interface.
188 2 : func (p *provider) Close() error {
189 2 : err := p.sharedClose()
190 2 : if p.fsDir != nil {
191 2 : err = firstError(err, p.fsDir.Close())
192 2 : p.fsDir = nil
193 2 : }
194 2 : if objiotracing.Enabled {
195 0 : if p.tracer != nil {
196 0 : p.tracer.Close()
197 0 : p.tracer = nil
198 0 : }
199 : }
200 2 : return err
201 : }
202 :
203 : // OpenForReading opens an existing object.
204 : func (p *provider) OpenForReading(
205 : ctx context.Context,
206 : fileType base.FileType,
207 : fileNum base.DiskFileNum,
208 : opts objstorage.OpenOptions,
209 2 : ) (objstorage.Readable, error) {
210 2 : meta, err := p.Lookup(fileType, fileNum)
211 2 : if err != nil {
212 1 : if opts.MustExist {
213 0 : p.st.Logger.Fatalf("%v", err)
214 0 : }
215 1 : return nil, err
216 : }
217 :
218 2 : var r objstorage.Readable
219 2 : if !meta.IsRemote() {
220 2 : r, err = p.vfsOpenForReading(ctx, fileType, fileNum, opts)
221 2 : } else {
222 2 : r, err = p.remoteOpenForReading(ctx, meta, opts)
223 2 : if err != nil && p.isNotExistError(meta, err) {
224 1 : // Wrap the error so that IsNotExistError functions properly.
225 1 : err = errors.Mark(err, os.ErrNotExist)
226 1 : }
227 : }
228 2 : if err != nil {
229 1 : return nil, err
230 1 : }
231 2 : if objiotracing.Enabled {
232 0 : r = p.tracer.WrapReadable(ctx, r, fileNum)
233 0 : }
234 2 : return r, nil
235 : }
236 :
237 : // Create creates a new object and opens it for writing.
238 : //
239 : // The object is not guaranteed to be durable (accessible in case of crashes)
240 : // until Sync is called.
241 : func (p *provider) Create(
242 : ctx context.Context,
243 : fileType base.FileType,
244 : fileNum base.DiskFileNum,
245 : opts objstorage.CreateOptions,
246 2 : ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
247 2 : if opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone {
248 2 : w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts)
249 2 : } else {
250 2 : w, meta, err = p.vfsCreate(ctx, fileType, fileNum)
251 2 : }
252 2 : if err != nil {
253 1 : err = errors.Wrapf(err, "creating object %s", fileNum)
254 1 : return nil, objstorage.ObjectMetadata{}, err
255 1 : }
256 2 : p.addMetadata(meta)
257 2 : if objiotracing.Enabled {
258 0 : w = p.tracer.WrapWritable(ctx, w, fileNum)
259 0 : }
260 2 : return w, meta, nil
261 : }
262 :
263 : // Remove removes an object.
264 : //
265 : // Note that if the object is remote, the object is only (conceptually) removed
266 : // from this provider. If other providers have references on the remote object,
267 : // it will not be removed.
268 : //
269 : // The object is not guaranteed to be durably removed until Sync is called.
270 2 : func (p *provider) Remove(fileType base.FileType, fileNum base.DiskFileNum) error {
271 2 : meta, err := p.Lookup(fileType, fileNum)
272 2 : if err != nil {
273 1 : return err
274 1 : }
275 :
276 2 : if !meta.IsRemote() {
277 2 : err = p.vfsRemove(fileType, fileNum)
278 2 : } else {
279 2 : // TODO(radu): implement remote object removal (i.e. deref).
280 2 : err = p.sharedUnref(meta)
281 2 : if err != nil && p.isNotExistError(meta, err) {
282 0 : // Wrap the error so that IsNotExistError functions properly.
283 0 : err = errors.Mark(err, os.ErrNotExist)
284 0 : }
285 : }
286 2 : if err != nil && !p.IsNotExistError(err) {
287 1 : // We want to be able to retry a Remove, so we keep the object in our list.
288 1 : // TODO(radu): we should mark the object as "zombie" and not allow any other
289 1 : // operations.
290 1 : return errors.Wrapf(err, "removing object %s", fileNum)
291 1 : }
292 :
293 2 : p.removeMetadata(fileNum)
294 2 : return err
295 : }
296 :
297 1 : func (p *provider) isNotExistError(meta objstorage.ObjectMetadata, err error) bool {
298 1 : if meta.Remote.Storage != nil {
299 1 : return meta.Remote.Storage.IsNotExistError(err)
300 1 : }
301 0 : return oserror.IsNotExist(err)
302 : }
303 :
304 : // IsNotExistError is part of the objstorage.Provider interface.
305 2 : func (p *provider) IsNotExistError(err error) bool {
306 2 : // We use errors.Mark(err, os.ErrNotExist) for not-exist errors coming from
307 2 : // remote.Storage.
308 2 : return oserror.IsNotExist(err)
309 2 : }
310 :
311 : // Sync flushes the metadata from creation or removal of objects since the last Sync.
312 2 : func (p *provider) Sync() error {
313 2 : if err := p.vfsSync(); err != nil {
314 1 : return err
315 1 : }
316 2 : if err := p.sharedSync(); err != nil {
317 0 : return err
318 0 : }
319 2 : return nil
320 : }
321 :
322 : // LinkOrCopyFromLocal creates a new object that is either a copy of a given
323 : // local file or a hard link (if the new object is created on the same FS, and
324 : // if the FS supports it).
325 : //
326 : // The object is not guaranteed to be durable (accessible in case of crashes)
327 : // until Sync is called.
328 : func (p *provider) LinkOrCopyFromLocal(
329 : ctx context.Context,
330 : srcFS vfs.FS,
331 : srcFilePath string,
332 : dstFileType base.FileType,
333 : dstFileNum base.DiskFileNum,
334 : opts objstorage.CreateOptions,
335 2 : ) (objstorage.ObjectMetadata, error) {
336 2 : shared := opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone
337 2 : if !shared && srcFS == p.st.FS {
338 2 : // Wrap the normal filesystem with one which wraps newly created files with
339 2 : // vfs.NewSyncingFile.
340 2 : fs := vfs.NewSyncingFS(p.st.FS, vfs.SyncingFileOptions{
341 2 : NoSyncOnClose: p.st.NoSyncOnClose,
342 2 : BytesPerSync: p.st.BytesPerSync,
343 2 : })
344 2 : dstPath := p.vfsPath(dstFileType, dstFileNum)
345 2 : if err := vfs.LinkOrCopy(fs, srcFilePath, dstPath); err != nil {
346 1 : return objstorage.ObjectMetadata{}, err
347 1 : }
348 :
349 2 : meta := objstorage.ObjectMetadata{
350 2 : DiskFileNum: dstFileNum,
351 2 : FileType: dstFileType,
352 2 : }
353 2 : p.addMetadata(meta)
354 2 : return meta, nil
355 : }
356 : // Create the object and copy the data.
357 2 : w, meta, err := p.Create(ctx, dstFileType, dstFileNum, opts)
358 2 : if err != nil {
359 0 : return objstorage.ObjectMetadata{}, err
360 0 : }
361 2 : f, err := srcFS.Open(srcFilePath, vfs.SequentialReadsOption)
362 2 : if err != nil {
363 0 : return objstorage.ObjectMetadata{}, err
364 0 : }
365 2 : defer f.Close()
366 2 : buf := make([]byte, 64*1024)
367 2 : for {
368 2 : n, readErr := f.Read(buf)
369 2 : if readErr != nil && readErr != io.EOF {
370 0 : w.Abort()
371 0 : return objstorage.ObjectMetadata{}, readErr
372 0 : }
373 :
374 2 : if n > 0 {
375 2 : if err := w.Write(buf[:n]); err != nil {
376 0 : w.Abort()
377 0 : return objstorage.ObjectMetadata{}, err
378 0 : }
379 : }
380 :
381 2 : if readErr == io.EOF {
382 2 : break
383 : }
384 : }
385 2 : if err := w.Finish(); err != nil {
386 0 : return objstorage.ObjectMetadata{}, err
387 0 : }
388 2 : return meta, nil
389 : }
390 :
391 : // Lookup is part of the objstorage.Provider interface.
392 : func (p *provider) Lookup(
393 : fileType base.FileType, fileNum base.DiskFileNum,
394 2 : ) (objstorage.ObjectMetadata, error) {
395 2 : p.mu.RLock()
396 2 : defer p.mu.RUnlock()
397 2 : meta, ok := p.mu.knownObjects[fileNum]
398 2 : if !ok {
399 1 : return objstorage.ObjectMetadata{}, errors.Wrapf(
400 1 : os.ErrNotExist,
401 1 : "file %s (type %d) unknown to the objstorage provider",
402 1 : fileNum, errors.Safe(fileType),
403 1 : )
404 1 : }
405 2 : if meta.FileType != fileType {
406 0 : return objstorage.ObjectMetadata{}, errors.AssertionFailedf(
407 0 : "file %s type mismatch (known type %d, expected type %d)",
408 0 : fileNum, errors.Safe(meta.FileType), errors.Safe(fileType),
409 0 : )
410 0 : }
411 2 : return meta, nil
412 : }
413 :
414 : // Path is part of the objstorage.Provider interface.
415 2 : func (p *provider) Path(meta objstorage.ObjectMetadata) string {
416 2 : if !meta.IsRemote() {
417 2 : return p.vfsPath(meta.FileType, meta.DiskFileNum)
418 2 : }
419 2 : return p.remotePath(meta)
420 : }
421 :
422 : // Size returns the size of the object.
423 2 : func (p *provider) Size(meta objstorage.ObjectMetadata) (int64, error) {
424 2 : if !meta.IsRemote() {
425 2 : return p.vfsSize(meta.FileType, meta.DiskFileNum)
426 2 : }
427 2 : return p.remoteSize(meta)
428 : }
429 :
430 : // List is part of the objstorage.Provider interface.
431 2 : func (p *provider) List() []objstorage.ObjectMetadata {
432 2 : p.mu.RLock()
433 2 : defer p.mu.RUnlock()
434 2 : res := make([]objstorage.ObjectMetadata, 0, len(p.mu.knownObjects))
435 2 : for _, meta := range p.mu.knownObjects {
436 2 : res = append(res, meta)
437 2 : }
438 2 : slices.SortFunc(res, func(a, b objstorage.ObjectMetadata) int {
439 2 : return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
440 2 : })
441 2 : return res
442 : }
443 :
444 : // Metrics is part of the objstorage.Provider interface.
445 2 : func (p *provider) Metrics() sharedcache.Metrics {
446 2 : if p.remote.cache != nil {
447 2 : return p.remote.cache.Metrics()
448 2 : }
449 2 : return sharedcache.Metrics{}
450 : }
451 :
452 : // CheckpointState is part of the objstorage.Provider interface.
453 : func (p *provider) CheckpointState(
454 : fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum,
455 1 : ) error {
456 1 : p.mu.Lock()
457 1 : defer p.mu.Unlock()
458 1 : for i := range fileNums {
459 1 : if _, ok := p.mu.knownObjects[fileNums[i]]; !ok {
460 0 : return errors.Wrapf(
461 0 : os.ErrNotExist,
462 0 : "file %s (type %d) unknown to the objstorage provider",
463 0 : fileNums[i], errors.Safe(fileType),
464 0 : )
465 0 : }
466 : // Prevent this object from deletion, at least for the life of this instance.
467 1 : p.mu.protectedObjects[fileNums[i]] = p.mu.protectedObjects[fileNums[i]] + 1
468 : }
469 :
470 1 : if p.remote.catalog != nil {
471 1 : return p.remote.catalog.Checkpoint(fs, dir)
472 1 : }
473 0 : return nil
474 : }
475 :
476 2 : func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
477 2 : p.mu.Lock()
478 2 : defer p.mu.Unlock()
479 2 : p.addMetadataLocked(meta)
480 2 : }
481 :
482 2 : func (p *provider) addMetadataLocked(meta objstorage.ObjectMetadata) {
483 2 : if invariants.Enabled {
484 2 : meta.AssertValid()
485 2 : }
486 2 : p.mu.knownObjects[meta.DiskFileNum] = meta
487 2 : if meta.IsRemote() {
488 2 : p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
489 2 : FileNum: meta.DiskFileNum,
490 2 : FileType: meta.FileType,
491 2 : CreatorID: meta.Remote.CreatorID,
492 2 : CreatorFileNum: meta.Remote.CreatorFileNum,
493 2 : Locator: meta.Remote.Locator,
494 2 : CleanupMethod: meta.Remote.CleanupMethod,
495 2 : CustomObjectName: meta.Remote.CustomObjectName,
496 2 : })
497 2 : if meta.IsExternal() {
498 2 : p.mu.remote.addExternalObject(meta)
499 2 : }
500 2 : } else {
501 2 : p.mu.localObjectsChanged = true
502 2 : }
503 : }
504 :
505 2 : func (p *provider) removeMetadata(fileNum base.DiskFileNum) {
506 2 : p.mu.Lock()
507 2 : defer p.mu.Unlock()
508 2 :
509 2 : meta, ok := p.mu.knownObjects[fileNum]
510 2 : if !ok {
511 0 : return
512 0 : }
513 2 : delete(p.mu.knownObjects, fileNum)
514 2 : if meta.IsExternal() {
515 2 : p.mu.remote.removeExternalObject(meta)
516 2 : }
517 2 : if meta.IsRemote() {
518 2 : p.mu.remote.catalogBatch.DeleteObject(fileNum)
519 2 : } else {
520 2 : p.mu.localObjectsChanged = true
521 2 : }
522 : }
523 :
524 : // protectObject prevents the unreferencing of a remote object until
525 : // unprotectObject is called.
526 2 : func (p *provider) protectObject(fileNum base.DiskFileNum) {
527 2 : p.mu.Lock()
528 2 : defer p.mu.Unlock()
529 2 : p.mu.protectedObjects[fileNum] = p.mu.protectedObjects[fileNum] + 1
530 2 : }
531 :
532 1 : func (p *provider) unprotectObject(fileNum base.DiskFileNum) {
533 1 : p.mu.Lock()
534 1 : defer p.mu.Unlock()
535 1 : v := p.mu.protectedObjects[fileNum]
536 1 : if invariants.Enabled && v == 0 {
537 0 : panic("invalid protection count")
538 : }
539 1 : if v > 1 {
540 0 : p.mu.protectedObjects[fileNum] = v - 1
541 1 : } else {
542 1 : delete(p.mu.protectedObjects, fileNum)
543 1 : // TODO(radu): check if the object is still in knownObject; if not, unref it
544 1 : // now.
545 1 : }
546 : }
547 :
548 2 : func (p *provider) isProtected(fileNum base.DiskFileNum) bool {
549 2 : p.mu.Lock()
550 2 : defer p.mu.Unlock()
551 2 : return p.mu.protectedObjects[fileNum] > 0
552 2 : }
|