Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "io"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/objstorage"
15 : "github.com/cockroachdb/pebble/objstorage/remote"
16 : )
17 :
18 : const (
19 : tagCreatorID = 1
20 : tagCreatorFileNum = 2
21 : tagCleanupMethod = 3
22 : // tagRefCheckID encodes the information for a ref marker that needs to be
23 : // checked when attaching this object to another provider. This is set to the
24 : // creator ID and FileNum for the provider that encodes the backing, and
25 : // allows the "target" provider to check that the "source" provider kept its
26 : // reference on the object alive.
27 : tagRefCheckID = 4
28 : // tagLocator encodes the remote.Locator; if absent the locator is "". It is
29 : // followed by the locator string length and the locator string.
30 : tagLocator = 5
31 : // tagLocator encodes a custom object name (if present). It is followed by the
32 : // custom name string length and the string.
33 : tagCustomObjectName = 6
34 :
35 : // Any new tags that don't have the tagNotSafeToIgnoreMask bit set must be
36 : // followed by the length of the data (so they can be skipped).
37 :
38 : // Any new tags that have the tagNotSafeToIgnoreMask bit set cause errors if
39 : // they are encountered by earlier code that doesn't know the tag.
40 : tagNotSafeToIgnoreMask = 64
41 : )
42 :
43 : func (p *provider) encodeRemoteObjectBacking(
44 : meta *objstorage.ObjectMetadata,
45 1 : ) (objstorage.RemoteObjectBacking, error) {
46 1 : if !meta.IsRemote() {
47 0 : return nil, base.AssertionFailedf("object %s not on remote storage", meta.DiskFileNum)
48 0 : }
49 :
50 1 : buf := make([]byte, 0, binary.MaxVarintLen64*4)
51 1 : buf = binary.AppendUvarint(buf, tagCreatorID)
52 1 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorID))
53 1 : // TODO(radu): encode file type as well?
54 1 : buf = binary.AppendUvarint(buf, tagCreatorFileNum)
55 1 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorFileNum))
56 1 : buf = binary.AppendUvarint(buf, tagCleanupMethod)
57 1 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CleanupMethod))
58 1 : if meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
59 1 : buf = binary.AppendUvarint(buf, tagRefCheckID)
60 1 : buf = binary.AppendUvarint(buf, uint64(p.remote.shared.creatorID))
61 1 : buf = binary.AppendUvarint(buf, uint64(meta.DiskFileNum))
62 1 : }
63 1 : if meta.Remote.Locator != "" {
64 1 : buf = binary.AppendUvarint(buf, tagLocator)
65 1 : buf = encodeString(buf, string(meta.Remote.Locator))
66 1 : }
67 1 : if meta.Remote.CustomObjectName != "" {
68 1 : buf = binary.AppendUvarint(buf, tagCustomObjectName)
69 1 : buf = encodeString(buf, meta.Remote.CustomObjectName)
70 1 : }
71 1 : return buf, nil
72 : }
73 :
74 : type remoteObjectBackingHandle struct {
75 : backing objstorage.RemoteObjectBacking
76 : fileNum base.DiskFileNum
77 : p *provider
78 : }
79 :
80 1 : func (s *remoteObjectBackingHandle) Get() (objstorage.RemoteObjectBacking, error) {
81 1 : if s.backing == nil {
82 0 : return nil, errors.Errorf("RemoteObjectBackingHandle.Get() called after Close()")
83 0 : }
84 1 : return s.backing, nil
85 : }
86 :
87 0 : func (s *remoteObjectBackingHandle) Close() {
88 0 : if s.backing != nil {
89 0 : s.backing = nil
90 0 : s.p.unprotectObject(s.fileNum)
91 0 : }
92 : }
93 :
94 : var _ objstorage.RemoteObjectBackingHandle = (*remoteObjectBackingHandle)(nil)
95 :
96 : // RemoteObjectBacking is part of the objstorage.Provider interface.
97 : func (p *provider) RemoteObjectBacking(
98 : meta *objstorage.ObjectMetadata,
99 1 : ) (objstorage.RemoteObjectBackingHandle, error) {
100 1 : backing, err := p.encodeRemoteObjectBacking(meta)
101 1 : if err != nil {
102 0 : return nil, err
103 0 : }
104 1 : p.protectObject(meta.DiskFileNum)
105 1 : return &remoteObjectBackingHandle{
106 1 : backing: backing,
107 1 : fileNum: meta.DiskFileNum,
108 1 : p: p,
109 1 : }, nil
110 : }
111 :
112 : // CreateExternalObjectBacking is part of the objstorage.Provider interface.
113 : func (p *provider) CreateExternalObjectBacking(
114 : locator remote.Locator, objName string,
115 1 : ) (objstorage.RemoteObjectBacking, error) {
116 1 : var meta objstorage.ObjectMetadata
117 1 : meta.Remote.Locator = locator
118 1 : meta.Remote.CustomObjectName = objName
119 1 : meta.Remote.CleanupMethod = objstorage.SharedNoCleanup
120 1 : return p.encodeRemoteObjectBacking(&meta)
121 1 : }
122 :
123 : type decodedBacking struct {
124 : meta objstorage.ObjectMetadata
125 : // refToCheck is set only when meta.Remote.CleanupMethod is RefTracking
126 : refToCheck struct {
127 : creatorID objstorage.CreatorID
128 : fileNum base.DiskFileNum
129 : }
130 : }
131 :
132 : // decodeRemoteObjectBacking decodes the remote object metadata.
133 : //
134 : // Note that the meta.Remote.Storage field is not set.
135 : func decodeRemoteObjectBacking(
136 : fileType base.FileType, fileNum base.DiskFileNum, buf objstorage.RemoteObjectBacking,
137 1 : ) (decodedBacking, error) {
138 1 : var creatorID, creatorFileNum, cleanupMethod, refCheckCreatorID, refCheckFileNum uint64
139 1 : var locator, customObjName string
140 1 : br := bytes.NewReader(buf)
141 1 : for {
142 1 : tag, err := binary.ReadUvarint(br)
143 1 : if err == io.EOF {
144 1 : break
145 : }
146 1 : if err != nil {
147 0 : return decodedBacking{}, err
148 0 : }
149 1 : switch tag {
150 1 : case tagCreatorID:
151 1 : creatorID, err = binary.ReadUvarint(br)
152 :
153 1 : case tagCreatorFileNum:
154 1 : creatorFileNum, err = binary.ReadUvarint(br)
155 :
156 1 : case tagCleanupMethod:
157 1 : cleanupMethod, err = binary.ReadUvarint(br)
158 :
159 1 : case tagRefCheckID:
160 1 : refCheckCreatorID, err = binary.ReadUvarint(br)
161 1 : if err == nil {
162 1 : refCheckFileNum, err = binary.ReadUvarint(br)
163 1 : }
164 :
165 1 : case tagLocator:
166 1 : locator, err = decodeString(br)
167 :
168 1 : case tagCustomObjectName:
169 1 : customObjName, err = decodeString(br)
170 :
171 0 : default:
172 0 : // Ignore unknown tags, unless they're not safe to ignore.
173 0 : if tag&tagNotSafeToIgnoreMask != 0 {
174 0 : return decodedBacking{}, errors.Newf("unknown tag %d", tag)
175 0 : }
176 0 : var dataLen uint64
177 0 : dataLen, err = binary.ReadUvarint(br)
178 0 : if err == nil {
179 0 : _, err = br.Seek(int64(dataLen), io.SeekCurrent)
180 0 : }
181 : }
182 1 : if err != nil {
183 0 : return decodedBacking{}, err
184 0 : }
185 : }
186 1 : if customObjName == "" {
187 1 : if creatorID == 0 {
188 0 : return decodedBacking{}, errors.Newf("remote object backing missing creator ID")
189 0 : }
190 1 : if creatorFileNum == 0 {
191 0 : return decodedBacking{}, errors.Newf("remote object backing missing creator file num")
192 0 : }
193 : }
194 1 : var res decodedBacking
195 1 : res.meta.DiskFileNum = fileNum
196 1 : res.meta.FileType = fileType
197 1 : res.meta.Remote.CreatorID = objstorage.CreatorID(creatorID)
198 1 : res.meta.Remote.CreatorFileNum = base.DiskFileNum(creatorFileNum)
199 1 : res.meta.Remote.CleanupMethod = objstorage.SharedCleanupMethod(cleanupMethod)
200 1 : if res.meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
201 1 : if refCheckCreatorID == 0 || refCheckFileNum == 0 {
202 0 : return decodedBacking{}, errors.Newf("remote object backing missing ref to check")
203 0 : }
204 1 : res.refToCheck.creatorID = objstorage.CreatorID(refCheckCreatorID)
205 1 : res.refToCheck.fileNum = base.DiskFileNum(refCheckFileNum)
206 : }
207 1 : res.meta.Remote.Locator = remote.Locator(locator)
208 1 : res.meta.Remote.CustomObjectName = customObjName
209 1 : return res, nil
210 : }
211 :
212 1 : func encodeString(buf []byte, s string) []byte {
213 1 : buf = binary.AppendUvarint(buf, uint64(len(s)))
214 1 : buf = append(buf, []byte(s)...)
215 1 : return buf
216 1 : }
217 :
218 1 : func decodeString(br io.ByteReader) (string, error) {
219 1 : length, err := binary.ReadUvarint(br)
220 1 : if err != nil || length == 0 {
221 0 : return "", err
222 0 : }
223 1 : buf := make([]byte, length)
224 1 : for i := range buf {
225 1 : buf[i], err = br.ReadByte()
226 1 : if err != nil {
227 0 : return "", err
228 0 : }
229 : }
230 1 : return string(buf), nil
231 : }
232 :
233 : // AttachRemoteObjects is part of the objstorage.Provider interface.
234 : func (p *provider) AttachRemoteObjects(
235 : objs []objstorage.RemoteObjectToAttach,
236 1 : ) ([]objstorage.ObjectMetadata, error) {
237 1 : decoded := make([]decodedBacking, len(objs))
238 1 : for i, o := range objs {
239 1 : var err error
240 1 : decoded[i], err = decodeRemoteObjectBacking(o.FileType, o.FileNum, o.Backing)
241 1 : if err != nil {
242 0 : return nil, err
243 0 : }
244 1 : decoded[i].meta.Remote.Storage, err = p.ensureStorage(decoded[i].meta.Remote.Locator)
245 1 : if err != nil {
246 0 : return nil, err
247 0 : }
248 : }
249 :
250 : // Create the reference marker objects.
251 : // TODO(radu): parallelize this.
252 1 : for _, d := range decoded {
253 1 : if d.meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
254 1 : continue
255 : }
256 1 : if err := p.sharedCreateRef(d.meta); err != nil {
257 0 : // TODO(radu): clean up references previously created in this loop.
258 0 : return nil, err
259 0 : }
260 : // Check the "origin"'s reference.
261 1 : refName := sharedObjectRefName(d.meta, d.refToCheck.creatorID, d.refToCheck.fileNum)
262 1 : if _, err := d.meta.Remote.Storage.Size(refName); err != nil {
263 0 : _ = p.sharedUnref(d.meta)
264 0 : // TODO(radu): clean up references previously created in this loop.
265 0 : if d.meta.Remote.Storage.IsNotExistError(err) {
266 0 : return nil, errors.Errorf("origin marker object %q does not exist;"+
267 0 : " object probably removed from the provider which created the backing", refName)
268 0 : }
269 0 : return nil, errors.Wrapf(err, "checking origin's marker object %s", refName)
270 : }
271 : }
272 :
273 1 : metas := make([]objstorage.ObjectMetadata, len(decoded))
274 1 : for i, d := range decoded {
275 1 : metas[i] = d.meta
276 1 : }
277 :
278 1 : p.mu.Lock()
279 1 : defer p.mu.Unlock()
280 1 : for i := range metas {
281 1 : p.addMetadataLocked(metas[i])
282 1 : }
283 1 : return metas, nil
284 : }
|