Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 : "io"
10 : "os"
11 : "sort"
12 : "sync"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/errors/oserror"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/invariants"
18 : "github.com/cockroachdb/pebble/objstorage"
19 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
20 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
21 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
22 : "github.com/cockroachdb/pebble/objstorage/remote"
23 : "github.com/cockroachdb/pebble/vfs"
24 : )
25 :
26 : // provider is the implementation of objstorage.Provider.
27 : type provider struct {
28 : st Settings
29 :
30 : fsDir vfs.File
31 :
32 : tracer *objiotracing.Tracer
33 :
34 : remote remoteSubsystem
35 :
36 : mu struct {
37 : sync.RWMutex
38 :
39 : remote struct {
40 : // catalogBatch accumulates remote object creations and deletions until
41 : // Sync is called.
42 : catalogBatch remoteobjcat.Batch
43 :
44 : storageObjects map[remote.Locator]remote.Storage
45 : }
46 :
47 : // localObjectsChanged is set if non-remote objects were created or deleted
48 : // but Sync was not yet called.
49 : localObjectsChanged bool
50 :
51 : // knownObjects maintains information about objects that are known to the provider.
52 : // It is initialized with the list of files in the manifest when we open a DB.
53 : knownObjects map[base.DiskFileNum]objstorage.ObjectMetadata
54 :
55 : // protectedObjects are objects that cannot be unreferenced because they
56 : // have outstanding SharedObjectBackingHandles. The value is a count of outstanding handles
57 : protectedObjects map[base.DiskFileNum]int
58 : }
59 : }
60 :
61 : var _ objstorage.Provider = (*provider)(nil)
62 :
63 : // Settings that must be specified when creating the provider.
64 : type Settings struct {
65 : Logger base.Logger
66 :
67 : // Local filesystem configuration.
68 : FS vfs.FS
69 : FSDirName string
70 :
71 : // FSDirInitialListing is a listing of FSDirName at the time of calling Open.
72 : //
73 : // This is an optional optimization to avoid double listing on Open when the
74 : // higher layer already has a listing. When nil, we obtain the listing on
75 : // Open.
76 : FSDirInitialListing []string
77 :
78 : // Cleaner cleans obsolete files from the local filesystem.
79 : //
80 : // The default cleaner uses the DeleteCleaner.
81 : FSCleaner base.Cleaner
82 :
83 : // NoSyncOnClose decides whether the implementation will enforce a
84 : // close-time synchronization (e.g., fdatasync() or sync_file_range())
85 : // on files it writes to. Setting this to true removes the guarantee for a
86 : // sync on close. Some implementations can still issue a non-blocking sync.
87 : NoSyncOnClose bool
88 :
89 : // BytesPerSync enables periodic syncing of files in order to smooth out
90 : // writes to disk. This option does not provide any persistence guarantee, but
91 : // is used to avoid latency spikes if the OS automatically decides to write
92 : // out a large chunk of dirty filesystem buffers.
93 : BytesPerSync int
94 :
95 : // Fields here are set only if the provider is to support remote objects
96 : // (experimental).
97 : Remote struct {
98 : StorageFactory remote.StorageFactory
99 :
100 : // If CreateOnShared is true, sstables are created on remote storage using
101 : // the CreateOnSharedLocator (when the PreferSharedStorage create option is
102 : // true).
103 : CreateOnShared bool
104 : CreateOnSharedLocator remote.Locator
105 :
106 : // CacheSizeBytes is the size of the on-disk block cache for objects
107 : // on remote storage. If it is 0, no cache is used.
108 : CacheSizeBytes int64
109 :
110 : // CacheBlockSize is the block size of the cache; if 0, the default of 32KB is used.
111 : CacheBlockSize int
112 :
113 : // ShardingBlockSize is the size of a shard block. The cache is split into contiguous
114 : // ShardingBlockSize units. The units are distributed across multiple independent shards
115 : // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
116 : // policies operate at the level of shard, not whole cache. This is done to reduce lock
117 : // contention.
118 : //
119 : // If ShardingBlockSize is 0, the default of 1 MB is used.
120 : ShardingBlockSize int64
121 :
122 : // The number of independent shards the cache leverages. Each shard is the same size,
123 : // and a hash of filenum & offset map a read to a certain shard. If set to 0,
124 : // 2*runtime.GOMAXPROCS is used as the shard count.
125 : CacheShardCount int
126 :
127 : // TODO(radu): allow the cache to live on another FS/location (e.g. to use
128 : // instance-local SSD).
129 : }
130 : }
131 :
132 : // DefaultSettings initializes default settings (with no remote storage),
133 : // suitable for tests and tools.
134 0 : func DefaultSettings(fs vfs.FS, dirName string) Settings {
135 0 : return Settings{
136 0 : Logger: base.DefaultLogger,
137 0 : FS: fs,
138 0 : FSDirName: dirName,
139 0 : FSCleaner: base.DeleteCleaner{},
140 0 : NoSyncOnClose: false,
141 0 : BytesPerSync: 512 * 1024, // 512KB
142 0 : }
143 0 : }
144 :
145 : // Open creates the provider.
146 1 : func Open(settings Settings) (objstorage.Provider, error) {
147 1 : // Note: we can't just `return open(settings)` because in an error case we
148 1 : // would return (*provider)(nil) which is not objstorage.Provider(nil).
149 1 : p, err := open(settings)
150 1 : if err != nil {
151 0 : return nil, err
152 0 : }
153 1 : return p, nil
154 : }
155 :
156 1 : func open(settings Settings) (p *provider, _ error) {
157 1 : fsDir, err := settings.FS.OpenDir(settings.FSDirName)
158 1 : if err != nil {
159 0 : return nil, err
160 0 : }
161 :
162 1 : defer func() {
163 1 : if p == nil {
164 0 : fsDir.Close()
165 0 : }
166 : }()
167 :
168 1 : p = &provider{
169 1 : st: settings,
170 1 : fsDir: fsDir,
171 1 : }
172 1 : p.mu.knownObjects = make(map[base.DiskFileNum]objstorage.ObjectMetadata)
173 1 : p.mu.protectedObjects = make(map[base.DiskFileNum]int)
174 1 :
175 1 : if objiotracing.Enabled {
176 0 : p.tracer = objiotracing.Open(settings.FS, settings.FSDirName)
177 0 : }
178 :
179 : // Add local FS objects.
180 1 : if err := p.vfsInit(); err != nil {
181 0 : return nil, err
182 0 : }
183 :
184 : // Initialize remote subsystem (if configured) and add remote objects.
185 1 : if err := p.remoteInit(); err != nil {
186 0 : return nil, err
187 0 : }
188 :
189 1 : return p, nil
190 : }
191 :
192 : // Close is part of the objstorage.Provider interface.
193 1 : func (p *provider) Close() error {
194 1 : err := p.sharedClose()
195 1 : if p.fsDir != nil {
196 1 : err = firstError(err, p.fsDir.Close())
197 1 : p.fsDir = nil
198 1 : }
199 1 : if objiotracing.Enabled {
200 0 : if p.tracer != nil {
201 0 : p.tracer.Close()
202 0 : p.tracer = nil
203 0 : }
204 : }
205 1 : return err
206 : }
207 :
208 : // OpenForReading opens an existing object.
209 : func (p *provider) OpenForReading(
210 : ctx context.Context,
211 : fileType base.FileType,
212 : fileNum base.DiskFileNum,
213 : opts objstorage.OpenOptions,
214 1 : ) (objstorage.Readable, error) {
215 1 : meta, err := p.Lookup(fileType, fileNum)
216 1 : if err != nil {
217 0 : if opts.MustExist {
218 0 : p.st.Logger.Fatalf("%v", err)
219 0 : }
220 0 : return nil, err
221 : }
222 :
223 1 : var r objstorage.Readable
224 1 : if !meta.IsRemote() {
225 1 : r, err = p.vfsOpenForReading(ctx, fileType, fileNum, opts)
226 1 : } else {
227 1 : r, err = p.remoteOpenForReading(ctx, meta, opts)
228 1 : if err != nil && p.isNotExistError(meta, err) {
229 0 : // Wrap the error so that IsNotExistError functions properly.
230 0 : err = errors.Mark(err, os.ErrNotExist)
231 0 : }
232 : }
233 1 : if err != nil {
234 0 : return nil, err
235 0 : }
236 1 : if objiotracing.Enabled {
237 0 : r = p.tracer.WrapReadable(ctx, r, fileNum)
238 0 : }
239 1 : return r, nil
240 : }
241 :
242 : // Create creates a new object and opens it for writing.
243 : //
244 : // The object is not guaranteed to be durable (accessible in case of crashes)
245 : // until Sync is called.
246 : func (p *provider) Create(
247 : ctx context.Context,
248 : fileType base.FileType,
249 : fileNum base.DiskFileNum,
250 : opts objstorage.CreateOptions,
251 1 : ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
252 1 : if opts.PreferSharedStorage && p.st.Remote.CreateOnShared {
253 1 : w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts)
254 1 : } else {
255 1 : w, meta, err = p.vfsCreate(ctx, fileType, fileNum)
256 1 : }
257 1 : if err != nil {
258 0 : err = errors.Wrapf(err, "creating object %s", errors.Safe(fileNum))
259 0 : return nil, objstorage.ObjectMetadata{}, err
260 0 : }
261 1 : p.addMetadata(meta)
262 1 : if objiotracing.Enabled {
263 0 : w = p.tracer.WrapWritable(ctx, w, fileNum)
264 0 : }
265 1 : return w, meta, nil
266 : }
267 :
268 : // Remove removes an object.
269 : //
270 : // Note that if the object is remote, the object is only (conceptually) removed
271 : // from this provider. If other providers have references on the remote object,
272 : // it will not be removed.
273 : //
274 : // The object is not guaranteed to be durably removed until Sync is called.
275 1 : func (p *provider) Remove(fileType base.FileType, fileNum base.DiskFileNum) error {
276 1 : meta, err := p.Lookup(fileType, fileNum)
277 1 : if err != nil {
278 0 : return err
279 0 : }
280 :
281 1 : if !meta.IsRemote() {
282 1 : err = p.vfsRemove(fileType, fileNum)
283 1 : } else {
284 1 : // TODO(radu): implement remote object removal (i.e. deref).
285 1 : err = p.sharedUnref(meta)
286 1 : if err != nil && p.isNotExistError(meta, err) {
287 0 : // Wrap the error so that IsNotExistError functions properly.
288 0 : err = errors.Mark(err, os.ErrNotExist)
289 0 : }
290 : }
291 1 : if err != nil && !p.IsNotExistError(err) {
292 0 : // We want to be able to retry a Remove, so we keep the object in our list.
293 0 : // TODO(radu): we should mark the object as "zombie" and not allow any other
294 0 : // operations.
295 0 : return errors.Wrapf(err, "removing object %s", errors.Safe(fileNum))
296 0 : }
297 :
298 1 : p.removeMetadata(fileNum)
299 1 : return err
300 : }
301 :
302 0 : func (p *provider) isNotExistError(meta objstorage.ObjectMetadata, err error) bool {
303 0 : if meta.Remote.Storage != nil {
304 0 : return meta.Remote.Storage.IsNotExistError(err)
305 0 : }
306 0 : return oserror.IsNotExist(err)
307 : }
308 :
309 : // IsNotExistError is part of the objstorage.Provider interface.
310 1 : func (p *provider) IsNotExistError(err error) bool {
311 1 : // We use errors.Mark(err, os.ErrNotExist) for not-exist errors coming from
312 1 : // remote.Storage.
313 1 : return oserror.IsNotExist(err)
314 1 : }
315 :
316 : // Sync flushes the metadata from creation or removal of objects since the last Sync.
317 1 : func (p *provider) Sync() error {
318 1 : if err := p.vfsSync(); err != nil {
319 0 : return err
320 0 : }
321 1 : if err := p.sharedSync(); err != nil {
322 0 : return err
323 0 : }
324 1 : return nil
325 : }
326 :
327 : // LinkOrCopyFromLocal creates a new object that is either a copy of a given
328 : // local file or a hard link (if the new object is created on the same FS, and
329 : // if the FS supports it).
330 : //
331 : // The object is not guaranteed to be durable (accessible in case of crashes)
332 : // until Sync is called.
333 : func (p *provider) LinkOrCopyFromLocal(
334 : ctx context.Context,
335 : srcFS vfs.FS,
336 : srcFilePath string,
337 : dstFileType base.FileType,
338 : dstFileNum base.DiskFileNum,
339 : opts objstorage.CreateOptions,
340 1 : ) (objstorage.ObjectMetadata, error) {
341 1 : shared := opts.PreferSharedStorage && p.st.Remote.CreateOnShared
342 1 : if !shared && srcFS == p.st.FS {
343 1 : // Wrap the normal filesystem with one which wraps newly created files with
344 1 : // vfs.NewSyncingFile.
345 1 : fs := vfs.NewSyncingFS(p.st.FS, vfs.SyncingFileOptions{
346 1 : NoSyncOnClose: p.st.NoSyncOnClose,
347 1 : BytesPerSync: p.st.BytesPerSync,
348 1 : })
349 1 : dstPath := p.vfsPath(dstFileType, dstFileNum)
350 1 : if err := vfs.LinkOrCopy(fs, srcFilePath, dstPath); err != nil {
351 0 : return objstorage.ObjectMetadata{}, err
352 0 : }
353 :
354 1 : meta := objstorage.ObjectMetadata{
355 1 : DiskFileNum: dstFileNum,
356 1 : FileType: dstFileType,
357 1 : }
358 1 : p.addMetadata(meta)
359 1 : return meta, nil
360 : }
361 : // Create the object and copy the data.
362 1 : w, meta, err := p.Create(ctx, dstFileType, dstFileNum, opts)
363 1 : if err != nil {
364 0 : return objstorage.ObjectMetadata{}, err
365 0 : }
366 1 : f, err := srcFS.Open(srcFilePath, vfs.SequentialReadsOption)
367 1 : if err != nil {
368 0 : return objstorage.ObjectMetadata{}, err
369 0 : }
370 1 : defer f.Close()
371 1 : buf := make([]byte, 64*1024)
372 1 : for {
373 1 : n, readErr := f.Read(buf)
374 1 : if readErr != nil && readErr != io.EOF {
375 0 : w.Abort()
376 0 : return objstorage.ObjectMetadata{}, readErr
377 0 : }
378 :
379 1 : if n > 0 {
380 1 : if err := w.Write(buf[:n]); err != nil {
381 0 : w.Abort()
382 0 : return objstorage.ObjectMetadata{}, err
383 0 : }
384 : }
385 :
386 1 : if readErr == io.EOF {
387 1 : break
388 : }
389 : }
390 1 : if err := w.Finish(); err != nil {
391 0 : return objstorage.ObjectMetadata{}, err
392 0 : }
393 1 : return meta, nil
394 : }
395 :
396 : // Lookup is part of the objstorage.Provider interface.
397 : func (p *provider) Lookup(
398 : fileType base.FileType, fileNum base.DiskFileNum,
399 1 : ) (objstorage.ObjectMetadata, error) {
400 1 : p.mu.RLock()
401 1 : defer p.mu.RUnlock()
402 1 : meta, ok := p.mu.knownObjects[fileNum]
403 1 : if !ok {
404 0 : return objstorage.ObjectMetadata{}, errors.Wrapf(
405 0 : os.ErrNotExist,
406 0 : "file %s (type %d) unknown to the objstorage provider",
407 0 : errors.Safe(fileNum), errors.Safe(fileType),
408 0 : )
409 0 : }
410 1 : if meta.FileType != fileType {
411 0 : return objstorage.ObjectMetadata{}, errors.AssertionFailedf(
412 0 : "file %s type mismatch (known type %d, expected type %d)",
413 0 : errors.Safe(fileNum), errors.Safe(meta.FileType), errors.Safe(fileType),
414 0 : )
415 0 : }
416 1 : return meta, nil
417 : }
418 :
419 : // Path is part of the objstorage.Provider interface.
420 1 : func (p *provider) Path(meta objstorage.ObjectMetadata) string {
421 1 : if !meta.IsRemote() {
422 1 : return p.vfsPath(meta.FileType, meta.DiskFileNum)
423 1 : }
424 1 : return p.remotePath(meta)
425 : }
426 :
427 : // Size returns the size of the object.
428 1 : func (p *provider) Size(meta objstorage.ObjectMetadata) (int64, error) {
429 1 : if !meta.IsRemote() {
430 1 : return p.vfsSize(meta.FileType, meta.DiskFileNum)
431 1 : }
432 1 : return p.remoteSize(meta)
433 : }
434 :
435 : // List is part of the objstorage.Provider interface.
436 1 : func (p *provider) List() []objstorage.ObjectMetadata {
437 1 : p.mu.RLock()
438 1 : defer p.mu.RUnlock()
439 1 : res := make([]objstorage.ObjectMetadata, 0, len(p.mu.knownObjects))
440 1 : for _, meta := range p.mu.knownObjects {
441 1 : res = append(res, meta)
442 1 : }
443 1 : sort.Slice(res, func(i, j int) bool {
444 1 : return res[i].DiskFileNum.FileNum() < res[j].DiskFileNum.FileNum()
445 1 : })
446 1 : return res
447 : }
448 :
449 : // Metrics is part of the objstorage.Provider interface.
450 1 : func (p *provider) Metrics() sharedcache.Metrics {
451 1 : if p.remote.cache != nil {
452 1 : return p.remote.cache.Metrics()
453 1 : }
454 1 : return sharedcache.Metrics{}
455 : }
456 :
457 1 : func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
458 1 : if invariants.Enabled {
459 1 : meta.AssertValid()
460 1 : }
461 1 : p.mu.Lock()
462 1 : defer p.mu.Unlock()
463 1 : p.mu.knownObjects[meta.DiskFileNum] = meta
464 1 : if meta.IsRemote() {
465 1 : p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
466 1 : FileNum: meta.DiskFileNum,
467 1 : FileType: meta.FileType,
468 1 : CreatorID: meta.Remote.CreatorID,
469 1 : CreatorFileNum: meta.Remote.CreatorFileNum,
470 1 : Locator: meta.Remote.Locator,
471 1 : CleanupMethod: meta.Remote.CleanupMethod,
472 1 : })
473 1 : } else {
474 1 : p.mu.localObjectsChanged = true
475 1 : }
476 : }
477 :
478 1 : func (p *provider) removeMetadata(fileNum base.DiskFileNum) {
479 1 : p.mu.Lock()
480 1 : defer p.mu.Unlock()
481 1 :
482 1 : meta, ok := p.mu.knownObjects[fileNum]
483 1 : if !ok {
484 0 : return
485 0 : }
486 1 : delete(p.mu.knownObjects, fileNum)
487 1 : if meta.IsRemote() {
488 1 : p.mu.remote.catalogBatch.DeleteObject(fileNum)
489 1 : } else {
490 1 : p.mu.localObjectsChanged = true
491 1 : }
492 : }
493 :
494 : // protectObject prevents the unreferencing of a remote object until
495 : // unprotectObject is called.
496 0 : func (p *provider) protectObject(fileNum base.DiskFileNum) {
497 0 : p.mu.Lock()
498 0 : defer p.mu.Unlock()
499 0 : p.mu.protectedObjects[fileNum] = p.mu.protectedObjects[fileNum] + 1
500 0 : }
501 :
502 0 : func (p *provider) unprotectObject(fileNum base.DiskFileNum) {
503 0 : p.mu.Lock()
504 0 : defer p.mu.Unlock()
505 0 : v := p.mu.protectedObjects[fileNum]
506 0 : if invariants.Enabled && v == 0 {
507 0 : panic("invalid protection count")
508 : }
509 0 : if v > 1 {
510 0 : p.mu.protectedObjects[fileNum] = v - 1
511 0 : } else {
512 0 : delete(p.mu.protectedObjects, fileNum)
513 0 : // TODO(radu): check if the object is still in knownObject; if not, unref it
514 0 : // now.
515 0 : }
516 : }
517 :
518 1 : func (p *provider) isProtected(fileNum base.DiskFileNum) bool {
519 1 : p.mu.Lock()
520 1 : defer p.mu.Unlock()
521 1 : return p.mu.protectedObjects[fileNum] > 0
522 1 : }
|