package challenge
import (
"net/url"
"strings"
)
// FROM: https://golang.org/src/net/http/http.go
// Given a string of the form "host", "host:port", or "[ipv6::address]:port",
// return true if the string includes a port.
func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") }
// FROM: http://golang.org/src/net/http/transport.go
var portMap = map[string]string{
"http": "80",
"https": "443",
}
// canonicalAddr returns url.Host but always with a ":port" suffix
// FROM: http://golang.org/src/net/http/transport.go
func canonicalAddr(url *url.URL) string {
addr := url.Host
if !hasPort(addr) {
return addr + ":" + portMap[url.Scheme]
}
return addr
}
package challenge
import (
"fmt"
"net/http"
"net/url"
"strings"
"sync"
)
// Octet types from RFC 2616.
type octetType byte
var octetTypes [256]octetType
const (
isToken octetType = 1 << iota
isSpace
)
func init() {
// OCTET = <any 8-bit sequence of data>
// CHAR = <any US-ASCII character (octets 0 - 127)>
// CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
// CR = <US-ASCII CR, carriage return (13)>
// LF = <US-ASCII LF, linefeed (10)>
// SP = <US-ASCII SP, space (32)>
// HT = <US-ASCII HT, horizontal-tab (9)>
// <"> = <US-ASCII double-quote mark (34)>
// CRLF = CR LF
// LWS = [CRLF] 1*( SP | HT )
// TEXT = <any OCTET except CTLs, but including LWS>
// separators = "(" | ")" | "<" | ">" | "@" | "," | ";" | ":" | "\" | <">
// | "/" | "[" | "]" | "?" | "=" | "{" | "}" | SP | HT
// token = 1*<any CHAR except CTLs or separators>
// qdtext = <any TEXT except <">>
for c := 0; c < 256; c++ {
var t octetType
isCtl := c <= 31 || c == 127
isChar := 0 <= c && c <= 127
isSeparator := strings.ContainsRune(" \t\"(),/:;<=>?@[]\\{}", rune(c))
if strings.ContainsRune(" \t\r\n", rune(c)) {
t |= isSpace
}
if isChar && !isCtl && !isSeparator {
t |= isToken
}
octetTypes[c] = t
}
}
// Challenge carries information from a WWW-Authenticate response header.
// See RFC 2617.
type Challenge struct {
// Scheme is the auth-scheme according to RFC 2617
Scheme string
// Parameters are the auth-params according to RFC 2617
Parameters map[string]string
}
// Manager manages the challenges for endpoints.
// The challenges are pulled out of HTTP responses. Only
// responses which expect challenges should be added to
// the manager, since a non-unauthorized request will be
// viewed as not requiring challenges.
type Manager interface {
// GetChallenges returns the challenges for the given
// endpoint URL.
GetChallenges(endpoint url.URL) ([]Challenge, error)
// AddResponse adds the response to the challenge
// manager. The challenges will be parsed out of
// the WWW-Authenticate headers and added to the
// URL which was produced the response. If the
// response was authorized, any challenges for the
// endpoint will be cleared.
AddResponse(resp *http.Response) error
}
// NewSimpleManager returns an instance of
// Manager which only maps endpoints to challenges
// based on the responses which have been added the
// manager. The simple manager will make no attempt to
// perform requests on the endpoints or cache the responses
// to a backend.
func NewSimpleManager() Manager {
return &simpleManager{
Challenges: make(map[string][]Challenge),
}
}
type simpleManager struct {
sync.RWMutex
Challenges map[string][]Challenge
}
func normalizeURL(endpoint *url.URL) {
endpoint.Host = strings.ToLower(endpoint.Host)
endpoint.Host = canonicalAddr(endpoint)
}
func (m *simpleManager) GetChallenges(endpoint url.URL) ([]Challenge, error) {
normalizeURL(&endpoint)
m.RLock()
defer m.RUnlock()
challenges := m.Challenges[endpoint.String()]
return challenges, nil
}
func (m *simpleManager) AddResponse(resp *http.Response) error {
challenges := ResponseChallenges(resp)
if resp.Request == nil {
return fmt.Errorf("missing request reference")
}
urlCopy := url.URL{
Path: resp.Request.URL.Path,
Host: resp.Request.URL.Host,
Scheme: resp.Request.URL.Scheme,
}
normalizeURL(&urlCopy)
m.Lock()
defer m.Unlock()
m.Challenges[urlCopy.String()] = challenges
return nil
}
// ResponseChallenges returns a list of authorization challenges
// for the given http Response. Challenges are only checked if
// the response status code was a 401.
func ResponseChallenges(resp *http.Response) []Challenge {
if resp.StatusCode == http.StatusUnauthorized {
// Parse the WWW-Authenticate Header and store the challenges
// on this endpoint object.
return parseAuthHeader(resp.Header)
}
return nil
}
func parseAuthHeader(header http.Header) []Challenge {
challenges := []Challenge{}
for _, h := range header[http.CanonicalHeaderKey("WWW-Authenticate")] {
v, p := parseValueAndParams(h)
if v != "" {
challenges = append(challenges, Challenge{Scheme: v, Parameters: p})
}
}
return challenges
}
func parseValueAndParams(header string) (value string, params map[string]string) {
params = make(map[string]string)
value, s := expectToken(header)
if value == "" {
return
}
value = strings.ToLower(value)
s = "," + skipSpace(s)
for strings.HasPrefix(s, ",") {
var pkey string
pkey, s = expectToken(skipSpace(s[1:]))
if pkey == "" {
return
}
if !strings.HasPrefix(s, "=") {
return
}
var pvalue string
pvalue, s = expectTokenOrQuoted(s[1:])
if pvalue == "" {
return
}
pkey = strings.ToLower(pkey)
params[pkey] = pvalue
s = skipSpace(s)
}
return
}
func skipSpace(s string) (rest string) {
i := 0
for ; i < len(s); i++ {
if octetTypes[s[i]]&isSpace == 0 {
break
}
}
return s[i:]
}
func expectToken(s string) (token, rest string) {
i := 0
for ; i < len(s); i++ {
if octetTypes[s[i]]&isToken == 0 {
break
}
}
return s[:i], s[i:]
}
func expectTokenOrQuoted(s string) (value string, rest string) {
if !strings.HasPrefix(s, "\"") {
return expectToken(s)
}
s = s[1:]
for i := 0; i < len(s); i++ {
switch s[i] {
case '"':
return s[:i], s[i+1:]
case '\\':
p := make([]byte, len(s)-1)
j := copy(p, s[:i])
escape := true
for i = i + 1; i < len(s); i++ {
b := s[i]
switch {
case escape:
escape = false
p[j] = b
j++
case b == '\\':
escape = true
case b == '"':
return string(p[:j]), s[i+1:]
default:
p[j] = b
j++
}
}
return "", ""
}
}
return "", ""
}
package errcode
import (
"encoding/json"
"fmt"
"strings"
)
// ErrorCoder is the base interface for ErrorCode and Error allowing
// users of each to just call ErrorCode to get the real ID of each
type ErrorCoder interface {
ErrorCode() ErrorCode
}
// ErrorCode represents the error type. The errors are serialized via strings
// and the integer format may change and should *never* be exported.
type ErrorCode int
var _ error = ErrorCode(0)
// ErrorCode just returns itself
func (ec ErrorCode) ErrorCode() ErrorCode {
return ec
}
// Error returns the ID/Value
func (ec ErrorCode) Error() string {
// NOTE(stevvooe): Cannot use message here since it may have unpopulated args.
return strings.ToLower(strings.Replace(ec.String(), "_", " ", -1))
}
// Descriptor returns the descriptor for the error code.
func (ec ErrorCode) Descriptor() ErrorDescriptor {
d, ok := errorCodeToDescriptors[ec]
if !ok {
return ErrorCodeUnknown.Descriptor()
}
return d
}
// String returns the canonical identifier for this error code.
func (ec ErrorCode) String() string {
return ec.Descriptor().Value
}
// Message returned the human-readable error message for this error code.
func (ec ErrorCode) Message() string {
return ec.Descriptor().Message
}
// MarshalText encodes the receiver into UTF-8-encoded text and returns the
// result.
func (ec ErrorCode) MarshalText() (text []byte, err error) {
return []byte(ec.String()), nil
}
// UnmarshalText decodes the form generated by MarshalText.
func (ec *ErrorCode) UnmarshalText(text []byte) error {
desc, ok := idToDescriptors[string(text)]
if !ok {
desc = ErrorCodeUnknown.Descriptor()
}
*ec = desc.Code
return nil
}
// WithMessage creates a new Error struct based on the passed-in info and
// overrides the Message property.
func (ec ErrorCode) WithMessage(message string) Error {
return Error{
Code: ec,
Message: message,
}
}
// WithDetail creates a new Error struct based on the passed-in info and
// set the Detail property appropriately
func (ec ErrorCode) WithDetail(detail interface{}) Error {
return Error{
Code: ec,
Message: ec.Message(),
}.WithDetail(detail)
}
// WithArgs creates a new Error struct and sets the Args slice
func (ec ErrorCode) WithArgs(args ...interface{}) Error {
return Error{
Code: ec,
Message: ec.Message(),
}.WithArgs(args...)
}
// Error provides a wrapper around ErrorCode with extra Details provided.
type Error struct {
Code ErrorCode `json:"code"`
Message string `json:"message"`
Detail interface{} `json:"detail,omitempty"`
// TODO(duglin): See if we need an "args" property so we can do the
// variable substitution right before showing the message to the user
}
var _ error = Error{}
// ErrorCode returns the ID/Value of this Error
func (e Error) ErrorCode() ErrorCode {
return e.Code
}
// Error returns a human readable representation of the error.
func (e Error) Error() string {
return fmt.Sprintf("%s: %s", e.Code.Error(), e.Message)
}
// WithDetail will return a new Error, based on the current one, but with
// some Detail info added
func (e Error) WithDetail(detail interface{}) Error {
return Error{
Code: e.Code,
Message: e.Message,
Detail: detail,
}
}
// WithArgs uses the passed-in list of interface{} as the substitution
// variables in the Error's Message string, but returns a new Error
func (e Error) WithArgs(args ...interface{}) Error {
return Error{
Code: e.Code,
Message: fmt.Sprintf(e.Code.Message(), args...),
Detail: e.Detail,
}
}
// ErrorDescriptor provides relevant information about a given error code.
type ErrorDescriptor struct {
// Code is the error code that this descriptor describes.
Code ErrorCode
// Value provides a unique, string key, often captilized with
// underscores, to identify the error code. This value is used as the
// keyed value when serializing api errors.
Value string
// Message is a short, human readable description of the error condition
// included in API responses.
Message string
// Description provides a complete account of the errors purpose, suitable
// for use in documentation.
Description string
// HTTPStatusCode provides the http status code that is associated with
// this error condition.
HTTPStatusCode int
}
// ParseErrorCode returns the value by the string error code.
// `ErrorCodeUnknown` will be returned if the error is not known.
func ParseErrorCode(value string) ErrorCode {
ed, ok := idToDescriptors[value]
if ok {
return ed.Code
}
return ErrorCodeUnknown
}
// Errors provides the envelope for multiple errors and a few sugar methods
// for use within the application.
type Errors []error
var _ error = Errors{}
func (errs Errors) Error() string {
switch len(errs) {
case 0:
return "<nil>"
case 1:
return errs[0].Error()
default:
msg := "errors:\n"
for _, err := range errs {
msg += err.Error() + "\n"
}
return msg
}
}
// Len returns the current number of errors.
func (errs Errors) Len() int {
return len(errs)
}
// MarshalJSON converts slice of error, ErrorCode or Error into a
// slice of Error - then serializes
func (errs Errors) MarshalJSON() ([]byte, error) {
var tmpErrs struct {
Errors []Error `json:"errors,omitempty"`
}
for _, daErr := range errs {
var err Error
switch daErr := daErr.(type) {
case ErrorCode:
err = daErr.WithDetail(nil)
case Error:
err = daErr
default:
err = ErrorCodeUnknown.WithDetail(daErr)
}
// If the Error struct was setup and they forgot to set the
// Message field (meaning its "") then grab it from the ErrCode
msg := err.Message
if msg == "" {
msg = err.Code.Message()
}
tmpErr := Error{
Code: err.Code,
Message: msg,
Detail: err.Detail,
}
// if the detail contains error extract the error message
// otherwise json.Marshal will not serialize it at all
// https://github.com/golang/go/issues/10748
if detail, ok := tmpErr.Detail.(error); ok {
tmpErr.Detail = detail.Error()
}
tmpErrs.Errors = append(tmpErrs.Errors, tmpErr)
}
return json.Marshal(tmpErrs)
}
// UnmarshalJSON deserializes []Error and then converts it into slice of
// Error or ErrorCode
func (errs *Errors) UnmarshalJSON(data []byte) error {
var tmpErrs struct {
Errors []Error
}
if err := json.Unmarshal(data, &tmpErrs); err != nil {
return err
}
var newErrs Errors
for _, daErr := range tmpErrs.Errors {
// If Message is empty or exactly matches the Code's message string
// then just use the Code, no need for a full Error struct
if daErr.Detail == nil && (daErr.Message == "" || daErr.Message == daErr.Code.Message()) {
// Error's w/o details get converted to ErrorCode
newErrs = append(newErrs, daErr.Code)
} else {
// Error's w/ details are untouched
newErrs = append(newErrs, Error{
Code: daErr.Code,
Message: daErr.Message,
Detail: daErr.Detail,
})
}
}
*errs = newErrs
return nil
}
package errcode
import (
"encoding/json"
"net/http"
)
// ServeJSON attempts to serve the errcode in a JSON envelope. It marshals err
// and sets the content-type header to 'application/json'. It will handle
// ErrorCoder and Errors, and if necessary will create an envelope.
func ServeJSON(w http.ResponseWriter, err error) error {
w.Header().Set("Content-Type", "application/json")
var sc int
switch errs := err.(type) {
case Errors:
if len(errs) < 1 {
break
}
if err, ok := errs[0].(ErrorCoder); ok {
sc = err.ErrorCode().Descriptor().HTTPStatusCode
}
case ErrorCoder:
sc = errs.ErrorCode().Descriptor().HTTPStatusCode
err = Errors{err} // create an envelope.
default:
// We just have an unhandled error type, so just place in an envelope
// and move along.
err = Errors{err}
}
if sc == 0 {
sc = http.StatusInternalServerError
}
w.WriteHeader(sc)
return json.NewEncoder(w).Encode(err)
}
package errcode
import (
"fmt"
"net/http"
"sort"
"sync"
)
var (
errorCodeToDescriptors = map[ErrorCode]ErrorDescriptor{}
idToDescriptors = map[string]ErrorDescriptor{}
groupToDescriptors = map[string][]ErrorDescriptor{}
)
var (
// ErrorCodeUnknown is a generic error that can be used as a last
// resort if there is no situation-specific error message that can be used
ErrorCodeUnknown = register("errcode", ErrorDescriptor{
Value: "UNKNOWN",
Message: "unknown error",
Description: `Generic error returned when the error does not have an
API classification.`,
HTTPStatusCode: http.StatusInternalServerError,
})
// ErrorCodeUnsupported is returned when an operation is not supported.
ErrorCodeUnsupported = register("errcode", ErrorDescriptor{
Value: "UNSUPPORTED",
Message: "The operation is unsupported.",
Description: `The operation was unsupported due to a missing
implementation or invalid set of parameters.`,
HTTPStatusCode: http.StatusMethodNotAllowed,
})
// ErrorCodeUnauthorized is returned if a request requires
// authentication.
ErrorCodeUnauthorized = register("errcode", ErrorDescriptor{
Value: "UNAUTHORIZED",
Message: "authentication required",
Description: `The access controller was unable to authenticate
the client. Often this will be accompanied by a
Www-Authenticate HTTP response header indicating how to
authenticate.`,
HTTPStatusCode: http.StatusUnauthorized,
})
// ErrorCodeDenied is returned if a client does not have sufficient
// permission to perform an action.
ErrorCodeDenied = register("errcode", ErrorDescriptor{
Value: "DENIED",
Message: "requested access to the resource is denied",
Description: `The access controller denied access for the
operation on a resource.`,
HTTPStatusCode: http.StatusForbidden,
})
// ErrorCodeUnavailable provides a common error to report unavailability
// of a service or endpoint.
ErrorCodeUnavailable = register("errcode", ErrorDescriptor{
Value: "UNAVAILABLE",
Message: "service unavailable",
Description: "Returned when a service is not available",
HTTPStatusCode: http.StatusServiceUnavailable,
})
// ErrorCodeTooManyRequests is returned if a client attempts too many
// times to contact a service endpoint.
ErrorCodeTooManyRequests = register("errcode", ErrorDescriptor{
Value: "TOOMANYREQUESTS",
Message: "too many requests",
Description: `Returned when a client attempts to contact a
service too many times`,
HTTPStatusCode: http.StatusTooManyRequests,
})
)
const errGroup = "registry.api.v2"
var (
// ErrorCodeDigestInvalid is returned when uploading a blob if the
// provided digest does not match the blob contents.
ErrorCodeDigestInvalid = register(errGroup, ErrorDescriptor{
Value: "DIGEST_INVALID",
Message: "provided digest did not match uploaded content",
Description: `When a blob is uploaded, the registry will check that
the content matches the digest provided by the client. The error may
include a detail structure with the key "digest", including the
invalid digest string. This error may also be returned when a manifest
includes an invalid layer digest.`,
HTTPStatusCode: http.StatusBadRequest,
})
// ErrorCodeSizeInvalid is returned when uploading a blob if the provided
ErrorCodeSizeInvalid = register(errGroup, ErrorDescriptor{
Value: "SIZE_INVALID",
Message: "provided length did not match content length",
Description: `When a layer is uploaded, the provided size will be
checked against the uploaded content. If they do not match, this error
will be returned.`,
HTTPStatusCode: http.StatusBadRequest,
})
// ErrorCodeRangeInvalid is returned when uploading a blob if the provided
// content range is invalid.
ErrorCodeRangeInvalid = register(errGroup, ErrorDescriptor{
Value: "RANGE_INVALID",
Message: "invalid content range",
Description: `When a layer is uploaded, the provided range is checked
against the uploaded chunk. This error is returned if the range is
out of order.`,
HTTPStatusCode: http.StatusRequestedRangeNotSatisfiable,
})
// ErrorCodeNameInvalid is returned when the name in the manifest does not
// match the provided name.
ErrorCodeNameInvalid = register(errGroup, ErrorDescriptor{
Value: "NAME_INVALID",
Message: "invalid repository name",
Description: `Invalid repository name encountered either during
manifest validation or any API operation.`,
HTTPStatusCode: http.StatusBadRequest,
})
// ErrorCodeTagInvalid is returned when the tag in the manifest does not
// match the provided tag.
ErrorCodeTagInvalid = register(errGroup, ErrorDescriptor{
Value: "TAG_INVALID",
Message: "manifest tag did not match URI",
Description: `During a manifest upload, if the tag in the manifest
does not match the uri tag, this error will be returned.`,
HTTPStatusCode: http.StatusBadRequest,
})
// ErrorCodeNameUnknown when the repository name is not known.
ErrorCodeNameUnknown = register(errGroup, ErrorDescriptor{
Value: "NAME_UNKNOWN",
Message: "repository name not known to registry",
Description: `This is returned if the name used during an operation is
unknown to the registry.`,
HTTPStatusCode: http.StatusNotFound,
})
// ErrorCodeManifestUnknown returned when image manifest is unknown.
ErrorCodeManifestUnknown = register(errGroup, ErrorDescriptor{
Value: "MANIFEST_UNKNOWN",
Message: "manifest unknown",
Description: `This error is returned when the manifest, identified by
name and tag is unknown to the repository.`,
HTTPStatusCode: http.StatusNotFound,
})
// ErrorCodeManifestInvalid returned when an image manifest is invalid,
// typically during a PUT operation. This error encompasses all errors
// encountered during manifest validation that aren't signature errors.
ErrorCodeManifestInvalid = register(errGroup, ErrorDescriptor{
Value: "MANIFEST_INVALID",
Message: "manifest invalid",
Description: `During upload, manifests undergo several checks ensuring
validity. If those checks fail, this error may be returned, unless a
more specific error is included. The detail will contain information
the failed validation.`,
HTTPStatusCode: http.StatusBadRequest,
})
// ErrorCodeManifestUnverified is returned when the manifest fails
// signature verification.
ErrorCodeManifestUnverified = register(errGroup, ErrorDescriptor{
Value: "MANIFEST_UNVERIFIED",
Message: "manifest failed signature verification",
Description: `During manifest upload, if the manifest fails signature
verification, this error will be returned.`,
HTTPStatusCode: http.StatusBadRequest,
})
// ErrorCodeManifestBlobUnknown is returned when a manifest blob is
// unknown to the registry.
ErrorCodeManifestBlobUnknown = register(errGroup, ErrorDescriptor{
Value: "MANIFEST_BLOB_UNKNOWN",
Message: "blob unknown to registry",
Description: `This error may be returned when a manifest blob is
unknown to the registry.`,
HTTPStatusCode: http.StatusBadRequest,
})
// ErrorCodeBlobUnknown is returned when a blob is unknown to the
// registry. This can happen when the manifest references a nonexistent
// layer or the result is not found by a blob fetch.
ErrorCodeBlobUnknown = register(errGroup, ErrorDescriptor{
Value: "BLOB_UNKNOWN",
Message: "blob unknown to registry",
Description: `This error may be returned when a blob is unknown to the
registry in a specified repository. This can be returned with a
standard get or if a manifest references an unknown layer during
upload.`,
HTTPStatusCode: http.StatusNotFound,
})
// ErrorCodeBlobUploadUnknown is returned when an upload is unknown.
ErrorCodeBlobUploadUnknown = register(errGroup, ErrorDescriptor{
Value: "BLOB_UPLOAD_UNKNOWN",
Message: "blob upload unknown to registry",
Description: `If a blob upload has been cancelled or was never
started, this error code may be returned.`,
HTTPStatusCode: http.StatusNotFound,
})
// ErrorCodeBlobUploadInvalid is returned when an upload is invalid.
ErrorCodeBlobUploadInvalid = register(errGroup, ErrorDescriptor{
Value: "BLOB_UPLOAD_INVALID",
Message: "blob upload invalid",
Description: `The blob upload encountered an error and can no
longer proceed.`,
HTTPStatusCode: http.StatusNotFound,
})
// ErrorCodePaginationNumberInvalid is returned when the `n` parameter is
// not an integer, or `n` is negative.
ErrorCodePaginationNumberInvalid = register(errGroup, ErrorDescriptor{
Value: "PAGINATION_NUMBER_INVALID",
Message: "invalid number of results requested",
Description: `Returned when the "n" parameter (number of results
to return) is not an integer, "n" is negative or "n" is bigger than
the maximum allowed.`,
HTTPStatusCode: http.StatusBadRequest,
})
)
var (
nextCode = 1000
registerLock sync.Mutex
)
// Register will make the passed-in error known to the environment and
// return a new ErrorCode
func Register(group string, descriptor ErrorDescriptor) ErrorCode {
return register(group, descriptor)
}
// register will make the passed-in error known to the environment and
// return a new ErrorCode
func register(group string, descriptor ErrorDescriptor) ErrorCode {
registerLock.Lock()
defer registerLock.Unlock()
descriptor.Code = ErrorCode(nextCode)
if _, ok := idToDescriptors[descriptor.Value]; ok {
panic(fmt.Sprintf("ErrorValue %q is already registered", descriptor.Value))
}
if _, ok := errorCodeToDescriptors[descriptor.Code]; ok {
panic(fmt.Sprintf("ErrorCode %v is already registered", descriptor.Code))
}
groupToDescriptors[group] = append(groupToDescriptors[group], descriptor)
errorCodeToDescriptors[descriptor.Code] = descriptor
idToDescriptors[descriptor.Value] = descriptor
nextCode++
return descriptor.Code
}
type byValue []ErrorDescriptor
func (a byValue) Len() int { return len(a) }
func (a byValue) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byValue) Less(i, j int) bool { return a[i].Value < a[j].Value }
// GetGroupNames returns the list of Error group names that are registered
func GetGroupNames() []string {
keys := []string{}
for k := range groupToDescriptors {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
// GetErrorCodeGroup returns the named group of error descriptors
func GetErrorCodeGroup(name string) []ErrorDescriptor {
desc := groupToDescriptors[name]
sort.Sort(byValue(desc))
return desc
}
// GetErrorAllDescriptors returns a slice of all ErrorDescriptors that are
// registered, irrespective of what group they're in
func GetErrorAllDescriptors() []ErrorDescriptor {
result := []ErrorDescriptor{}
for _, group := range GetGroupNames() {
result = append(result, GetErrorCodeGroup(group)...)
}
sort.Sort(byValue(result))
return result
}
package handlers
import (
"context"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"expvar"
"fmt"
"math"
"math/big"
"net"
"net/http"
"net/url"
"os"
"regexp"
"runtime"
"strconv"
"strings"
"time"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/configuration"
"github.com/distribution/distribution/v3/health"
"github.com/distribution/distribution/v3/health/checks"
"github.com/distribution/distribution/v3/internal/dcontext"
prometheus "github.com/distribution/distribution/v3/metrics"
"github.com/distribution/distribution/v3/notifications"
"github.com/distribution/distribution/v3/registry/api/errcode"
v2 "github.com/distribution/distribution/v3/registry/api/v2"
"github.com/distribution/distribution/v3/registry/auth"
registrymiddleware "github.com/distribution/distribution/v3/registry/middleware/registry"
repositorymiddleware "github.com/distribution/distribution/v3/registry/middleware/repository"
"github.com/distribution/distribution/v3/registry/proxy"
"github.com/distribution/distribution/v3/registry/storage"
memorycache "github.com/distribution/distribution/v3/registry/storage/cache/memory"
rediscache "github.com/distribution/distribution/v3/registry/storage/cache/redis"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
storagemiddleware "github.com/distribution/distribution/v3/registry/storage/driver/middleware"
"github.com/distribution/distribution/v3/version"
"github.com/distribution/reference"
events "github.com/docker/go-events"
"github.com/docker/go-metrics"
"github.com/gorilla/mux"
"github.com/redis/go-redis/extra/redisotel/v9"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
)
// randomSecretSize is the number of random bytes to generate if no secret
// was specified.
const randomSecretSize = 32
// defaultCheckInterval is the default time in between health checks
const defaultCheckInterval = 10 * time.Second
// App is a global registry application object. Shared resources can be placed
// on this object that will be accessible from all requests. Any writable
// fields should be protected.
type App struct {
context.Context
Config *configuration.Configuration
router *mux.Router // main application router, configured with dispatchers
driver storagedriver.StorageDriver // driver maintains the app global storage driver instance.
registry distribution.Namespace // registry is the primary registry backend for the app instance.
repoRemover distribution.RepositoryRemover // repoRemover provides ability to delete repos
accessController auth.AccessController // main access controller for application
// httpHost is a parsed representation of the http.host parameter from
// the configuration. Only the Scheme and Host fields are used.
httpHost url.URL
// events contains notification related configuration.
events struct {
sink events.Sink
source notifications.SourceRecord
}
redis redis.UniversalClient
// isCache is true if this registry is configured as a pull through cache
isCache bool
// readOnly is true if the registry is in a read-only maintenance mode
readOnly bool
}
// NewApp takes a configuration and returns a configured app, ready to serve
// requests. The app only implements ServeHTTP and can be wrapped in other
// handlers accordingly.
func NewApp(ctx context.Context, config *configuration.Configuration) *App {
app := &App{
Config: config,
Context: ctx,
router: v2.RouterWithPrefix(config.HTTP.Prefix),
isCache: config.Proxy.RemoteURL != "",
}
// Register the handler dispatchers.
app.register(v2.RouteNameBase, func(ctx *Context, r *http.Request) http.Handler {
return http.HandlerFunc(apiBase)
})
app.register(v2.RouteNameManifest, manifestDispatcher)
app.register(v2.RouteNameCatalog, catalogDispatcher)
app.register(v2.RouteNameTags, tagsDispatcher)
app.register(v2.RouteNameBlob, blobDispatcher)
app.register(v2.RouteNameBlobUpload, blobUploadDispatcher)
app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher)
// override the storage driver's UA string for registry outbound HTTP requests
storageParams := config.Storage.Parameters()
if storageParams == nil {
storageParams = make(configuration.Parameters)
}
if storageParams["useragent"] == "" {
storageParams["useragent"] = fmt.Sprintf("distribution/%s %s", version.Version(), runtime.Version())
}
var err error
app.driver, err = factory.Create(app, config.Storage.Type(), storageParams)
if err != nil {
// TODO(stevvooe): Move the creation of a service into a protected
// method, where this is created lazily. Its status can be queried via
// a health check.
panic(err)
}
purgeConfig := uploadPurgeDefaultConfig()
if mc, ok := config.Storage["maintenance"]; ok {
if v, ok := mc["uploadpurging"]; ok {
purgeConfig, ok = v.(map[interface{}]interface{})
if !ok {
panic("uploadpurging config key must contain additional keys")
}
}
if v, ok := mc["readonly"]; ok {
readOnly, ok := v.(map[interface{}]interface{})
if !ok {
panic("readonly config key must contain additional keys")
}
if readOnlyEnabled, ok := readOnly["enabled"]; ok {
app.readOnly, ok = readOnlyEnabled.(bool)
if !ok {
panic("readonly's enabled config key must have a boolean value")
}
}
}
}
startUploadPurger(app, app.driver, dcontext.GetLogger(app), purgeConfig)
app.driver, err = applyStorageMiddleware(app, app.driver, config.Middleware["storage"])
if err != nil {
panic(err)
}
// Do not configure HTTP secret for a proxy registry as HTTP secret
// is only used for blob uploads and a proxy registry does not support blob uploads.
if !app.isCache {
app.configureSecret(config)
}
app.configureEvents(config)
app.configureRedis(config)
app.configureLogHook(config)
options := registrymiddleware.GetRegistryOptions()
if config.HTTP.Host != "" {
u, err := url.Parse(config.HTTP.Host)
if err != nil {
panic(fmt.Sprintf(`could not parse http "host" parameter: %v`, err))
}
app.httpHost = *u
}
if app.isCache {
options = append(options, storage.DisableDigestResumption)
}
// configure deletion
if d, ok := config.Storage["delete"]; ok {
e, ok := d["enabled"]
if ok {
if deleteEnabled, ok := e.(bool); ok && deleteEnabled {
options = append(options, storage.EnableDelete)
}
}
}
// configure tag lookup concurrency limit
if p := config.Storage.TagParameters(); p != nil {
l, ok := p["concurrencylimit"]
if ok {
limit, ok := l.(int)
if !ok {
panic("tag lookup concurrency limit config key must have a integer value")
}
if limit < 0 {
panic("tag lookup concurrency limit should be a non-negative integer value")
}
options = append(options, storage.TagLookupConcurrencyLimit(limit))
}
}
// configure redirects
var redirectDisabled bool
if redirectConfig, ok := config.Storage["redirect"]; ok {
v := redirectConfig["disable"]
switch v := v.(type) {
case bool:
redirectDisabled = v
default:
panic(fmt.Sprintf("invalid type for redirect config: %#v", redirectConfig))
}
}
if redirectDisabled {
dcontext.GetLogger(app).Infof("backend redirection disabled")
} else {
options = append(options, storage.EnableRedirect)
}
if !config.Validation.Enabled {
config.Validation.Enabled = !config.Validation.Disabled
}
// configure validation
if config.Validation.Enabled {
if len(config.Validation.Manifests.URLs.Allow) == 0 && len(config.Validation.Manifests.URLs.Deny) == 0 {
// If Allow and Deny are empty, allow nothing.
options = append(options, storage.ManifestURLsAllowRegexp(regexp.MustCompile("^$")))
} else {
if len(config.Validation.Manifests.URLs.Allow) > 0 {
for i, s := range config.Validation.Manifests.URLs.Allow {
// Validate via compilation.
if _, err := regexp.Compile(s); err != nil {
panic(fmt.Sprintf("validation.manifests.urls.allow: %s", err))
}
// Wrap with non-capturing group.
config.Validation.Manifests.URLs.Allow[i] = fmt.Sprintf("(?:%s)", s)
}
re := regexp.MustCompile(strings.Join(config.Validation.Manifests.URLs.Allow, "|"))
options = append(options, storage.ManifestURLsAllowRegexp(re))
}
if len(config.Validation.Manifests.URLs.Deny) > 0 {
for i, s := range config.Validation.Manifests.URLs.Deny {
// Validate via compilation.
if _, err := regexp.Compile(s); err != nil {
panic(fmt.Sprintf("validation.manifests.urls.deny: %s", err))
}
// Wrap with non-capturing group.
config.Validation.Manifests.URLs.Deny[i] = fmt.Sprintf("(?:%s)", s)
}
re := regexp.MustCompile(strings.Join(config.Validation.Manifests.URLs.Deny, "|"))
options = append(options, storage.ManifestURLsDenyRegexp(re))
}
}
switch config.Validation.Manifests.Indexes.Platforms {
case "list":
options = append(options, storage.EnableValidateImageIndexImagesExist)
for _, platform := range config.Validation.Manifests.Indexes.PlatformList {
options = append(options, storage.AddValidateImageIndexImagesExistPlatform(platform.Architecture, platform.OS))
}
fallthrough
case "none":
dcontext.GetLogger(app).Warn("Image index completeness validation has been disabled, which is an experimental option because other container tooling might expect all image indexes to be complete")
case "all":
fallthrough
default:
options = append(options, storage.EnableValidateImageIndexImagesExist)
}
}
// configure storage caches
if cc, ok := config.Storage["cache"]; ok {
v, ok := cc["blobdescriptor"]
if !ok {
// Backwards compatible: "layerinfo" == "blobdescriptor"
v = cc["layerinfo"]
}
switch v {
case "redis":
if app.redis == nil {
panic("redis configuration required to use for layerinfo cache")
}
if _, ok := cc["blobdescriptorsize"]; ok {
dcontext.GetLogger(app).Warnf("blobdescriptorsize parameter is not supported with redis cache")
}
cacheProvider := rediscache.NewRedisBlobDescriptorCacheProvider(app.redis)
localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider))
app.registry, err = storage.NewRegistry(app, app.driver, localOptions...)
if err != nil {
panic("could not create registry: " + err.Error())
}
dcontext.GetLogger(app).Infof("using redis blob descriptor cache")
case "inmemory":
blobDescriptorSize := memorycache.DefaultSize
configuredSize, ok := cc["blobdescriptorsize"]
if ok {
// Since Parameters is not strongly typed, render to a string and convert back
blobDescriptorSize, err = strconv.Atoi(fmt.Sprint(configuredSize))
if err != nil {
panic(fmt.Sprintf("invalid blobdescriptorsize value %s: %s", configuredSize, err))
}
}
cacheProvider := memorycache.NewInMemoryBlobDescriptorCacheProvider(blobDescriptorSize)
localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider))
app.registry, err = storage.NewRegistry(app, app.driver, localOptions...)
if err != nil {
panic("could not create registry: " + err.Error())
}
dcontext.GetLogger(app).Infof("using inmemory blob descriptor cache")
default:
if v != "" {
dcontext.GetLogger(app).Warnf("unknown cache type %q, caching disabled", config.Storage["cache"])
}
}
}
if app.registry == nil {
// configure the registry if no cache section is available.
app.registry, err = storage.NewRegistry(app.Context, app.driver, options...)
if err != nil {
panic("could not create registry: " + err.Error())
}
}
app.registry, err = applyRegistryMiddleware(app, app.registry, app.driver, config.Middleware["registry"])
if err != nil {
panic(err)
}
authType := config.Auth.Type()
if authType != "" && !strings.EqualFold(authType, "none") {
accessController, err := auth.GetAccessController(config.Auth.Type(), config.Auth.Parameters())
if err != nil {
panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err))
}
app.accessController = accessController
dcontext.GetLogger(app).Debugf("configured %q access controller", authType)
}
// configure as a pull through cache
if config.Proxy.RemoteURL != "" {
app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, config.Proxy)
if err != nil {
panic(err.Error())
}
app.isCache = true
dcontext.GetLogger(app).Info("Registry configured as a proxy cache to ", config.Proxy.RemoteURL)
}
var ok bool
app.repoRemover, ok = app.registry.(distribution.RepositoryRemover)
if !ok {
dcontext.GetLogger(app).Warnf("Registry does not implement RepositoryRemover. Will not be able to delete repos and tags")
}
return app
}
// RegisterHealthChecks is an awful hack to defer health check registration
// control to callers. This should only ever be called once per registry
// process, typically in a main function. The correct way would be register
// health checks outside of app, since multiple apps may exist in the same
// process. Because the configuration and app are tightly coupled,
// implementing this properly will require a refactor. This method may panic
// if called twice in the same process.
func (app *App) RegisterHealthChecks(healthRegistries ...*health.Registry) {
if len(healthRegistries) > 1 {
panic("RegisterHealthChecks called with more than one registry")
}
healthRegistry := health.DefaultRegistry
if len(healthRegistries) == 1 {
healthRegistry = healthRegistries[0]
}
if app.Config.Health.StorageDriver.Enabled {
interval := app.Config.Health.StorageDriver.Interval
if interval == 0 {
interval = defaultCheckInterval
}
storageDriverCheck := health.CheckFunc(func(ctx context.Context) error {
_, err := app.driver.Stat(ctx, "/") // "/" should always exist
if _, ok := err.(storagedriver.PathNotFoundError); ok {
err = nil // pass this through, backend is responding, but this path doesn't exist.
}
if err != nil {
dcontext.GetLogger(ctx).Errorf("storage driver health check: %v", err)
}
return err
})
updater := health.NewThresholdStatusUpdater(app.Config.Health.StorageDriver.Threshold)
healthRegistry.Register("storagedriver_"+app.Config.Storage.Type(), updater)
go health.Poll(app, updater, storageDriverCheck, interval)
}
for _, fileChecker := range app.Config.Health.FileCheckers {
interval := fileChecker.Interval
if interval == 0 {
interval = defaultCheckInterval
}
dcontext.GetLogger(app).Infof("configuring file health check path=%s, interval=%d", fileChecker.File, interval/time.Second)
u := health.NewStatusUpdater()
healthRegistry.Register(fileChecker.File, u)
go health.Poll(app, u, checks.FileChecker(fileChecker.File), interval)
}
for _, httpChecker := range app.Config.Health.HTTPCheckers {
interval := httpChecker.Interval
if interval == 0 {
interval = defaultCheckInterval
}
statusCode := httpChecker.StatusCode
if statusCode == 0 {
statusCode = 200
}
checker := checks.HTTPChecker(httpChecker.URI, statusCode, httpChecker.Timeout, httpChecker.Headers)
dcontext.GetLogger(app).Infof("configuring HTTP health check uri=%s, interval=%d, threshold=%d", httpChecker.URI, interval/time.Second, httpChecker.Threshold)
updater := health.NewThresholdStatusUpdater(httpChecker.Threshold)
healthRegistry.Register(httpChecker.URI, updater)
go health.Poll(app, updater, checker, interval)
}
for _, tcpChecker := range app.Config.Health.TCPCheckers {
interval := tcpChecker.Interval
if interval == 0 {
interval = defaultCheckInterval
}
checker := checks.TCPChecker(tcpChecker.Addr, tcpChecker.Timeout)
dcontext.GetLogger(app).Infof("configuring TCP health check addr=%s, interval=%d, threshold=%d", tcpChecker.Addr, interval/time.Second, tcpChecker.Threshold)
updater := health.NewThresholdStatusUpdater(tcpChecker.Threshold)
healthRegistry.Register(tcpChecker.Addr, updater)
go health.Poll(app, updater, checker, interval)
}
}
// Shutdown close the underlying registry
func (app *App) Shutdown() error {
if r, ok := app.registry.(proxy.Closer); ok {
return r.Close()
}
return nil
}
// register a handler with the application, by route name. The handler will be
// passed through the application filters and context will be constructed at
// request time.
func (app *App) register(routeName string, dispatch dispatchFunc) {
handler := app.dispatcher(dispatch)
// Chain the handler with prometheus instrumented handler
if app.Config.HTTP.Debug.Prometheus.Enabled {
namespace := metrics.NewNamespace(prometheus.NamespacePrefix, "http", nil)
httpMetrics := namespace.NewDefaultHttpMetrics(strings.Replace(routeName, "-", "_", -1))
metrics.Register(namespace)
handler = metrics.InstrumentHandler(httpMetrics, handler)
}
// TODO(stevvooe): This odd dispatcher/route registration is by-product of
// some limitations in the gorilla/mux router. We are using it to keep
// routing consistent between the client and server, but we may want to
// replace it with manual routing and structure-based dispatch for better
// control over the request execution.
app.router.GetRoute(routeName).Handler(handler)
}
// configureEvents prepares the event sink for action.
func (app *App) configureEvents(configuration *configuration.Configuration) {
// Configure all of the endpoint sinks.
// NOTE(milosgajdos): we are disabling the linter here as
// if an endpoint is disabled we continue with the evaluation
// of the next one so we do not know the exact size the slice
// should have at the time the iteration starts
// nolint:prealloc
var sinks []events.Sink
for _, endpoint := range configuration.Notifications.Endpoints {
if endpoint.Disabled {
dcontext.GetLogger(app).Infof("endpoint %s disabled, skipping", endpoint.Name)
continue
}
dcontext.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
Timeout: endpoint.Timeout,
Threshold: endpoint.Threshold,
Backoff: endpoint.Backoff,
Headers: endpoint.Headers,
IgnoredMediaTypes: endpoint.IgnoredMediaTypes,
Ignore: endpoint.Ignore,
})
sinks = append(sinks, endpoint)
}
// NOTE(stevvooe): Moving to a new queuing implementation is as easy as
// replacing broadcaster with a rabbitmq implementation. It's recommended
// that the registry instances also act as the workers to keep deployment
// simple.
app.events.sink = events.NewBroadcaster(sinks...)
// Populate registry event source
hostname, err := os.Hostname()
if err != nil {
hostname = configuration.HTTP.Addr
} else {
// try to pick the port off the config
_, port, err := net.SplitHostPort(configuration.HTTP.Addr)
if err == nil {
hostname = net.JoinHostPort(hostname, port)
}
}
app.events.source = notifications.SourceRecord{
Addr: hostname,
InstanceID: dcontext.GetStringValue(app, "instance.id"),
}
}
func (app *App) configureRedis(cfg *configuration.Configuration) {
if len(cfg.Redis.Options.Addrs) == 0 {
dcontext.GetLogger(app).Infof("redis not configured")
return
}
opts := redis.UniversalOptions{
Addrs: cfg.Redis.Options.Addrs,
ClientName: cfg.Redis.Options.ClientName,
DB: cfg.Redis.Options.DB,
Protocol: cfg.Redis.Options.Protocol,
Username: cfg.Redis.Options.Username,
Password: cfg.Redis.Options.Password,
SentinelUsername: cfg.Redis.Options.SentinelUsername,
SentinelPassword: cfg.Redis.Options.SentinelPassword,
MaxRetries: cfg.Redis.Options.MaxRetries,
MinRetryBackoff: cfg.Redis.Options.MinRetryBackoff,
MaxRetryBackoff: cfg.Redis.Options.MaxRetryBackoff,
DialTimeout: cfg.Redis.Options.DialTimeout,
ReadTimeout: cfg.Redis.Options.ReadTimeout,
WriteTimeout: cfg.Redis.Options.WriteTimeout,
ContextTimeoutEnabled: cfg.Redis.Options.ContextTimeoutEnabled,
PoolFIFO: cfg.Redis.Options.PoolFIFO,
PoolSize: cfg.Redis.Options.PoolSize,
PoolTimeout: cfg.Redis.Options.PoolTimeout,
MinIdleConns: cfg.Redis.Options.MinIdleConns,
MaxIdleConns: cfg.Redis.Options.MaxIdleConns,
MaxActiveConns: cfg.Redis.Options.MaxActiveConns,
ConnMaxIdleTime: cfg.Redis.Options.ConnMaxIdleTime,
ConnMaxLifetime: cfg.Redis.Options.ConnMaxLifetime,
MaxRedirects: cfg.Redis.Options.MaxRedirects,
ReadOnly: cfg.Redis.Options.ReadOnly,
RouteByLatency: cfg.Redis.Options.RouteByLatency,
RouteRandomly: cfg.Redis.Options.RouteRandomly,
MasterName: cfg.Redis.Options.MasterName,
DisableIdentity: cfg.Redis.Options.DisableIdentity,
IdentitySuffix: cfg.Redis.Options.IdentitySuffix,
UnstableResp3: cfg.Redis.Options.UnstableResp3,
}
// redis TLS config
if cfg.Redis.TLS.Certificate != "" || cfg.Redis.TLS.Key != "" {
var err error
tlsConf := &tls.Config{}
tlsConf.Certificates = make([]tls.Certificate, 1)
tlsConf.Certificates[0], err = tls.LoadX509KeyPair(cfg.Redis.TLS.Certificate, cfg.Redis.TLS.Key)
if err != nil {
panic(err)
}
if len(cfg.Redis.TLS.ClientCAs) != 0 {
pool := x509.NewCertPool()
for _, ca := range cfg.Redis.TLS.ClientCAs {
caPem, err := os.ReadFile(ca)
if err != nil {
dcontext.GetLogger(app).Errorf("failed reading redis client CA: %v", err)
return
}
if ok := pool.AppendCertsFromPEM(caPem); !ok {
dcontext.GetLogger(app).Error("could not add CA to pool")
return
}
}
tlsConf.ClientAuth = tls.RequireAndVerifyClientCert
tlsConf.ClientCAs = pool
}
opts.TLSConfig = tlsConf
}
app.redis = app.createPool(opts)
// Enable metrics instrumentation.
if err := redisotel.InstrumentMetrics(app.redis); err != nil {
dcontext.GetLogger(app).Errorf("failed to instrument metrics on redis: %v", err)
}
// setup expvar
registry := expvar.Get("registry")
if registry == nil {
registry = expvar.NewMap("registry")
}
registry.(*expvar.Map).Set("redis", expvar.Func(func() interface{} {
stats := app.redis.PoolStats()
return map[string]interface{}{
"Config": cfg,
"Active": stats.TotalConns - stats.IdleConns,
}
}))
}
func (app *App) createPool(cfg redis.UniversalOptions) redis.UniversalClient {
cfg.OnConnect = func(ctx context.Context, cn *redis.Conn) error {
res := cn.Ping(ctx)
return res.Err()
}
return redis.NewUniversalClient(&cfg)
}
// configureLogHook prepares logging hook parameters.
func (app *App) configureLogHook(configuration *configuration.Configuration) {
entry, ok := dcontext.GetLogger(app).(*logrus.Entry)
if !ok {
// somehow, we are not using logrus
return
}
logger := entry.Logger
for _, configHook := range configuration.Log.Hooks {
if !configHook.Disabled {
switch configHook.Type {
case "mail":
hook := &logHook{}
hook.LevelsParam = configHook.Levels
hook.Mail = &mailer{
Addr: configHook.MailOptions.SMTP.Addr,
Username: configHook.MailOptions.SMTP.Username,
Password: configHook.MailOptions.SMTP.Password,
Insecure: configHook.MailOptions.SMTP.Insecure,
From: configHook.MailOptions.From,
To: configHook.MailOptions.To,
}
logger.Hooks.Add(hook)
default:
}
}
}
}
// configureSecret creates a random secret if a secret wasn't included in the
// configuration.
func (app *App) configureSecret(configuration *configuration.Configuration) {
if configuration.HTTP.Secret == "" {
var secretBytes [randomSecretSize]byte
if _, err := rand.Read(secretBytes[:]); err != nil {
panic(fmt.Sprintf("could not generate random bytes for HTTP secret: %v", err))
}
configuration.HTTP.Secret = string(secretBytes[:])
dcontext.GetLogger(app).Warn("No HTTP secret provided - generated random secret. This may cause problems with uploads if multiple registries are behind a load-balancer. To provide a shared secret, fill in http.secret in the configuration file or set the REGISTRY_HTTP_SECRET environment variable.")
}
}
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Prepare the context with our own little decorations.
ctx := r.Context()
ctx = dcontext.WithRequest(ctx, r)
ctx, w = dcontext.WithResponseWriter(ctx, w)
ctx = dcontext.WithLogger(ctx, dcontext.GetRequestLogger(ctx))
r = r.WithContext(ctx)
// Set a header with the Docker Distribution API Version for all responses.
w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
app.router.ServeHTTP(w, r)
}
// dispatchFunc takes a context and request and returns a constructed handler
// for the route. The dispatcher will use this to dynamically create request
// specific handlers for each endpoint without creating a new router for each
// request.
type dispatchFunc func(ctx *Context, r *http.Request) http.Handler
// TODO(stevvooe): dispatchers should probably have some validation error
// chain with proper error reporting.
// dispatcher returns a handler that constructs a request specific context and
// handler, using the dispatch factory function.
func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for headerName, headerValues := range app.Config.HTTP.Headers {
for _, value := range headerValues {
w.Header().Add(headerName, value)
}
}
context := app.context(w, r)
defer func() {
// Automated error response handling here. Handlers may return their
// own errors if they need different behavior (such as range errors
// for layer upload).
if context.Errors.Len() > 0 {
_ = errcode.ServeJSON(w, context.Errors)
app.logError(context, context.Errors)
} else if status, ok := context.Value("http.response.status").(int); ok && status >= 200 && status <= 399 {
dcontext.GetResponseLogger(context).Infof("response completed")
}
}()
if err := app.authorized(w, r, context); err != nil {
dcontext.GetLogger(context).Warnf("error authorizing context: %v", err)
return
}
// Add username to request logging
context.Context = dcontext.WithLogger(context.Context, dcontext.GetLogger(context.Context, userNameKey))
// sync up context on the request.
r = r.WithContext(context)
if app.nameRequired(r) {
nameRef, err := reference.WithName(getName(context))
if err != nil {
dcontext.GetLogger(context).Errorf("error parsing reference from context: %v", err)
context.Errors = append(context.Errors, distribution.ErrRepositoryNameInvalid{
Name: getName(context),
Reason: err,
})
if err := errcode.ServeJSON(w, context.Errors); err != nil {
dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
}
return
}
repository, err := app.registry.Repository(context, nameRef)
if err != nil {
dcontext.GetLogger(context).Errorf("error resolving repository: %v", err)
switch err := err.(type) {
case distribution.ErrRepositoryUnknown:
context.Errors = append(context.Errors, errcode.ErrorCodeNameUnknown.WithDetail(err))
case distribution.ErrRepositoryNameInvalid:
context.Errors = append(context.Errors, errcode.ErrorCodeNameInvalid.WithDetail(err))
case errcode.Error:
context.Errors = append(context.Errors, err)
}
if err := errcode.ServeJSON(w, context.Errors); err != nil {
dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
}
return
}
// assign and decorate the authorized repository with an event bridge.
context.Repository, context.RepositoryRemover = notifications.Listen(
repository,
context.App.repoRemover,
app.eventBridge(context, r))
context.Repository, err = applyRepoMiddleware(app, context.Repository, app.Config.Middleware["repository"])
if err != nil {
dcontext.GetLogger(context).Errorf("error initializing repository middleware: %v", err)
context.Errors = append(context.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
if err := errcode.ServeJSON(w, context.Errors); err != nil {
dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
}
return
}
}
dispatch(context, r).ServeHTTP(w, r)
})
}
type errCodeKey struct{}
func (errCodeKey) String() string { return "err.code" }
type errMessageKey struct{}
func (errMessageKey) String() string { return "err.message" }
type errDetailKey struct{}
func (errDetailKey) String() string { return "err.detail" }
func (app *App) logError(ctx context.Context, errors errcode.Errors) {
for _, e1 := range errors {
var c context.Context
switch e := e1.(type) {
case errcode.Error:
c = context.WithValue(ctx, errCodeKey{}, e.Code)
c = context.WithValue(c, errMessageKey{}, e.Message)
c = context.WithValue(c, errDetailKey{}, e.Detail)
case errcode.ErrorCode:
c = context.WithValue(ctx, errCodeKey{}, e)
c = context.WithValue(c, errMessageKey{}, e.Message())
default:
// just normal go 'error'
c = context.WithValue(ctx, errCodeKey{}, errcode.ErrorCodeUnknown)
c = context.WithValue(c, errMessageKey{}, e.Error())
}
c = dcontext.WithLogger(c, dcontext.GetLogger(c,
errCodeKey{},
errMessageKey{},
errDetailKey{}))
dcontext.GetResponseLogger(c).Errorf("response completed with error")
}
}
// context constructs the context object for the application. This only be
// called once per request.
func (app *App) context(w http.ResponseWriter, r *http.Request) *Context {
ctx := r.Context()
ctx = dcontext.WithVars(ctx, r)
ctx = dcontext.WithLogger(ctx, dcontext.GetLogger(ctx,
"vars.name",
"vars.reference",
"vars.digest",
"vars.uuid"))
context := &Context{
App: app,
Context: ctx,
}
if app.httpHost.Scheme != "" && app.httpHost.Host != "" {
// A "host" item in the configuration takes precedence over
// X-Forwarded-Proto and X-Forwarded-Host headers, and the
// hostname in the request.
context.urlBuilder = v2.NewURLBuilder(&app.httpHost, false)
} else {
context.urlBuilder = v2.NewURLBuilderFromRequest(r, app.Config.HTTP.RelativeURLs)
}
return context
}
// authorized checks if the request can proceed with access to the requested
// repository. If it succeeds, the context may access the requested
// repository. An error will be returned if access is not available.
func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Context) error {
dcontext.GetLogger(context).Debug("authorizing request")
repo := getName(context)
if app.accessController == nil {
return nil // access controller is not enabled.
}
var accessRecords []auth.Access
if repo != "" {
accessRecords = appendAccessRecords(accessRecords, r.Method, repo)
if fromRepo := r.FormValue("from"); fromRepo != "" {
// mounting a blob from one repository to another requires pull (GET)
// access to the source repository.
accessRecords = appendAccessRecords(accessRecords, http.MethodGet, fromRepo)
}
} else {
// Only allow the name not to be set on the base route.
if app.nameRequired(r) {
// For this to be properly secured, repo must always be set for a
// resource that may make a modification. The only condition under
// which name is not set and we still allow access is when the
// base route is accessed. This section prevents us from making
// that mistake elsewhere in the code, allowing any operation to
// proceed.
if err := errcode.ServeJSON(w, errcode.ErrorCodeUnauthorized); err != nil {
dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
}
return fmt.Errorf("forbidden: no repository name")
}
accessRecords = appendCatalogAccessRecord(accessRecords, r)
}
grant, err := app.accessController.Authorized(r.WithContext(context.Context), accessRecords...)
if err != nil {
switch err := err.(type) {
case auth.Challenge:
// Add the appropriate WWW-Auth header
err.SetHeaders(r, w)
if err := errcode.ServeJSON(w, errcode.ErrorCodeUnauthorized.WithDetail(accessRecords)); err != nil {
dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
}
default:
// This condition is a potential security problem either in
// the configuration or whatever is backing the access
// controller. Just return a bad request with no information
// to avoid exposure. The request should not proceed.
dcontext.GetLogger(context).Errorf("error checking authorization: %v", err)
w.WriteHeader(http.StatusBadRequest)
}
return err
}
if grant == nil {
return fmt.Errorf("access controller returned neither an access grant nor an error")
}
ctx := withUser(context.Context, grant.User)
ctx = withResources(ctx, grant.Resources)
dcontext.GetLogger(ctx, userNameKey).Info("authorized request")
// TODO(stevvooe): This pattern needs to be cleaned up a bit. One context
// should be replaced by another, rather than replacing the context on a
// mutable object.
context.Context = ctx
return nil
}
// eventBridge returns a bridge for the current request, configured with the
// correct actor and source.
func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener {
actor := notifications.ActorRecord{
Name: getUserName(ctx, r),
}
request := notifications.NewRequestRecord(dcontext.GetRequestID(ctx), r)
return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink, app.Config.Notifications.EventConfig.IncludeReferences)
}
// nameRequired returns true if the route requires a name.
func (app *App) nameRequired(r *http.Request) bool {
route := mux.CurrentRoute(r)
if route == nil {
return true
}
routeName := route.GetName()
return routeName != v2.RouteNameBase && routeName != v2.RouteNameCatalog
}
// apiBase implements a simple yes-man for doing overall checks against the
// api. This can support auth roundtrips to support docker login.
func apiBase(w http.ResponseWriter, r *http.Request) {
const emptyJSON = "{}"
// Provide a simple /v2/ 200 OK response with empty json response.
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Length", fmt.Sprint(len(emptyJSON)))
fmt.Fprint(w, emptyJSON)
}
// appendAccessRecords checks the method and adds the appropriate Access records to the records list.
func appendAccessRecords(records []auth.Access, method string, repo string) []auth.Access {
resource := auth.Resource{
Type: "repository",
Name: repo,
}
switch method {
case http.MethodGet, http.MethodHead:
records = append(records,
auth.Access{
Resource: resource,
Action: "pull",
})
case http.MethodPost, http.MethodPut, http.MethodPatch:
records = append(records,
auth.Access{
Resource: resource,
Action: "pull",
},
auth.Access{
Resource: resource,
Action: "push",
})
case http.MethodDelete:
records = append(records,
auth.Access{
Resource: resource,
Action: "delete",
})
}
return records
}
// Add the access record for the catalog if it's our current route
func appendCatalogAccessRecord(accessRecords []auth.Access, r *http.Request) []auth.Access {
route := mux.CurrentRoute(r)
routeName := route.GetName()
if routeName == v2.RouteNameCatalog {
resource := auth.Resource{
Type: "registry",
Name: "catalog",
}
accessRecords = append(accessRecords,
auth.Access{
Resource: resource,
Action: "*",
})
}
return accessRecords
}
// applyRegistryMiddleware wraps a registry instance with the configured middlewares
func applyRegistryMiddleware(ctx context.Context, registry distribution.Namespace, driver storagedriver.StorageDriver, middlewares []configuration.Middleware) (distribution.Namespace, error) {
for _, mw := range middlewares {
rmw, err := registrymiddleware.Get(ctx, mw.Name, mw.Options, registry, driver)
if err != nil {
return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err)
}
registry = rmw
}
return registry, nil
}
// applyRepoMiddleware wraps a repository with the configured middlewares
func applyRepoMiddleware(ctx context.Context, repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) {
for _, mw := range middlewares {
rmw, err := repositorymiddleware.Get(ctx, mw.Name, mw.Options, repository)
if err != nil {
return nil, err
}
repository = rmw
}
return repository, nil
}
// applyStorageMiddleware wraps a storage driver with the configured middlewares
func applyStorageMiddleware(ctx context.Context, driver storagedriver.StorageDriver, middlewares []configuration.Middleware) (storagedriver.StorageDriver, error) {
for _, mw := range middlewares {
smw, err := storagemiddleware.Get(ctx, mw.Name, mw.Options, driver)
if err != nil {
return nil, fmt.Errorf("unable to configure storage middleware (%s): %v", mw.Name, err)
}
driver = smw
}
return driver, nil
}
// uploadPurgeDefaultConfig provides a default configuration for upload
// purging to be used in the absence of configuration in the
// configuration file
func uploadPurgeDefaultConfig() map[interface{}]interface{} {
config := map[interface{}]interface{}{}
config["enabled"] = true
config["age"] = "168h"
config["interval"] = "24h"
config["dryrun"] = false
return config
}
func badPurgeUploadConfig(reason string) {
panic(fmt.Sprintf("Unable to parse upload purge configuration: %s", reason))
}
// startUploadPurger schedules a goroutine which will periodically
// check upload directories for old files and delete them
func startUploadPurger(ctx context.Context, storageDriver storagedriver.StorageDriver, log dcontext.Logger, config map[interface{}]interface{}) {
if config["enabled"] == false {
return
}
var purgeAgeDuration time.Duration
var err error
purgeAge, ok := config["age"]
if ok {
ageStr, ok := purgeAge.(string)
if !ok {
badPurgeUploadConfig("age is not a string")
}
purgeAgeDuration, err = time.ParseDuration(ageStr)
if err != nil {
badPurgeUploadConfig(fmt.Sprintf("Cannot parse duration: %s", err.Error()))
}
} else {
badPurgeUploadConfig("age missing")
}
var intervalDuration time.Duration
interval, ok := config["interval"]
if ok {
intervalStr, ok := interval.(string)
if !ok {
badPurgeUploadConfig("interval is not a string")
}
intervalDuration, err = time.ParseDuration(intervalStr)
if err != nil {
badPurgeUploadConfig(fmt.Sprintf("Cannot parse interval: %s", err.Error()))
}
} else {
badPurgeUploadConfig("interval missing")
}
var dryRunBool bool
dryRun, ok := config["dryrun"]
if ok {
dryRunBool, ok = dryRun.(bool)
if !ok {
badPurgeUploadConfig("cannot parse dryrun")
}
} else {
badPurgeUploadConfig("dryrun missing")
}
go func() {
randInt, err := rand.Int(rand.Reader, new(big.Int).SetInt64(math.MaxInt64))
if err != nil {
log.Infof("Failed to generate random jitter: %v", err)
// sleep 30min for failure case
randInt = big.NewInt(30)
}
jitter := time.Duration(randInt.Int64()%60) * time.Minute
log.Infof("Starting upload purge in %s", jitter)
time.Sleep(jitter)
for {
storage.PurgeUploads(ctx, storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool)
log.Infof("Starting upload purge in %s", intervalDuration)
time.Sleep(intervalDuration)
}
}()
}
package handlers
import (
"net/http"
)
func basicAuth(r *http.Request) (username, password string, ok bool) {
return r.BasicAuth()
}
package handlers
import (
"net/http"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/api/errcode"
"github.com/gorilla/handlers"
"github.com/opencontainers/go-digest"
)
// blobDispatcher uses the request context to build a blobHandler.
func blobDispatcher(ctx *Context, r *http.Request) http.Handler {
dgst, err := getDigest(ctx)
if err != nil {
if err == errDigestNotAvailable {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx.Errors = append(ctx.Errors, errcode.ErrorCodeDigestInvalid.WithDetail(err))
})
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx.Errors = append(ctx.Errors, errcode.ErrorCodeDigestInvalid.WithDetail(err))
})
}
blobHandler := &blobHandler{
Context: ctx,
Digest: dgst,
}
mhandler := handlers.MethodHandler{
http.MethodGet: http.HandlerFunc(blobHandler.GetBlob),
http.MethodHead: http.HandlerFunc(blobHandler.GetBlob),
}
if !ctx.readOnly {
mhandler[http.MethodDelete] = http.HandlerFunc(blobHandler.DeleteBlob)
}
return mhandler
}
// blobHandler serves http blob requests.
type blobHandler struct {
*Context
Digest digest.Digest
}
// GetBlob fetches the binary data from backend storage returns it in the
// response.
func (bh *blobHandler) GetBlob(w http.ResponseWriter, r *http.Request) {
dcontext.GetLogger(bh).Debug("GetBlob")
blobs := bh.Repository.Blobs(bh)
desc, err := blobs.Stat(bh, bh.Digest)
if err != nil {
if err == distribution.ErrBlobUnknown {
bh.Errors = append(bh.Errors, errcode.ErrorCodeBlobUnknown.WithDetail(bh.Digest))
} else {
bh.Errors = append(bh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
if err := blobs.ServeBlob(bh, w, r, desc.Digest); err != nil {
dcontext.GetLogger(bh).Debugf("unexpected error getting blob HTTP handler: %v", err)
bh.Errors = append(bh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
}
// DeleteBlob deletes a layer blob
func (bh *blobHandler) DeleteBlob(w http.ResponseWriter, r *http.Request) {
dcontext.GetLogger(bh).Debug("DeleteBlob")
blobs := bh.Repository.Blobs(bh)
err := blobs.Delete(bh, bh.Digest)
if err != nil {
switch err {
case distribution.ErrUnsupported:
bh.Errors = append(bh.Errors, errcode.ErrorCodeUnsupported)
return
case distribution.ErrBlobUnknown:
bh.Errors = append(bh.Errors, errcode.ErrorCodeBlobUnknown)
return
default:
bh.Errors = append(bh.Errors, err)
dcontext.GetLogger(bh).Errorf("Unknown error deleting blob: %s", err.Error())
return
}
}
w.Header().Set("Content-Length", "0")
w.WriteHeader(http.StatusAccepted)
}
package handlers
import (
"fmt"
"net/http"
"net/url"
"strconv"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/api/errcode"
"github.com/distribution/distribution/v3/registry/storage"
"github.com/distribution/reference"
"github.com/gorilla/handlers"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)
// blobUploadDispatcher constructs and returns the blob upload handler for the
// given request context.
func blobUploadDispatcher(ctx *Context, r *http.Request) http.Handler {
buh := &blobUploadHandler{
Context: ctx,
UUID: getUploadUUID(ctx),
}
handler := handlers.MethodHandler{
http.MethodGet: http.HandlerFunc(buh.GetUploadStatus),
http.MethodHead: http.HandlerFunc(buh.GetUploadStatus),
}
if !ctx.readOnly {
handler[http.MethodPost] = http.HandlerFunc(buh.StartBlobUpload)
handler[http.MethodPatch] = http.HandlerFunc(buh.PatchBlobData)
handler[http.MethodPut] = http.HandlerFunc(buh.PutBlobUploadComplete)
handler[http.MethodDelete] = http.HandlerFunc(buh.CancelBlobUpload)
}
if buh.UUID != "" {
if r.Method == http.MethodGet || r.Method == http.MethodHead {
return handler
}
if h := buh.ResumeBlobUpload(ctx, r); h != nil {
return h
}
return closeResources(handler, buh.Upload)
}
return handler
}
// blobUploadHandler handles the http blob upload process.
type blobUploadHandler struct {
*Context
// UUID identifies the upload instance for the current request. Using UUID
// to key blob writers since this implementation uses UUIDs.
UUID string
Upload distribution.BlobWriter
State blobUploadState
}
// StartBlobUpload begins the blob upload process and allocates a server-side
// blob writer session, optionally mounting the blob from a separate repository.
func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) {
var options []distribution.BlobCreateOption
fromRepo := r.FormValue("from")
mountDigest := r.FormValue("mount")
if mountDigest != "" && fromRepo != "" {
opt, err := buh.createBlobMountOption(fromRepo, mountDigest)
if opt != nil && err == nil {
options = append(options, opt)
}
}
blobs := buh.Repository.Blobs(buh)
upload, err := blobs.Create(buh, options...)
if err != nil {
if ebm, ok := err.(distribution.ErrBlobMounted); ok {
if err := buh.writeBlobCreatedHeaders(w, ebm.Descriptor); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
} else if err == distribution.ErrUnsupported {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported)
} else {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
buh.Upload = upload
if err := buh.blobUploadResponse(w, r); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
w.Header().Set("Docker-Upload-UUID", buh.Upload.ID())
w.WriteHeader(http.StatusAccepted)
}
// GetUploadStatus returns the status of a given upload, identified by id.
func (buh *blobUploadHandler) GetUploadStatus(w http.ResponseWriter, r *http.Request) {
if buh.Upload == nil {
blobs := buh.Repository.Blobs(buh)
upload, err := blobs.Resume(buh, buh.UUID)
if err != nil {
if err == distribution.ErrBlobUploadUnknown {
buh.Errors = append(buh.Errors, errcode.ErrorCodeBlobUploadUnknown.WithDetail(err))
} else {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
buh.Upload = upload
}
if err := buh.blobUploadResponse(w, r); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
w.WriteHeader(http.StatusNoContent)
}
// PatchBlobData writes data to an upload.
func (buh *blobUploadHandler) PatchBlobData(w http.ResponseWriter, r *http.Request) {
if buh.Upload == nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeBlobUploadUnknown)
return
}
ct := r.Header.Get("Content-Type")
if ct != "" && ct != "application/octet-stream" {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(fmt.Errorf("bad Content-Type")))
// TODO(dmcgowan): encode error
return
}
cr := r.Header.Get("Content-Range")
cl := r.Header.Get("Content-Length")
if cr != "" && cl != "" {
start, end, err := parseContentRange(cr)
if err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err.Error()))
return
}
if start > end || start != buh.Upload.Size() {
buh.Errors = append(buh.Errors, errcode.ErrorCodeRangeInvalid)
return
}
clInt, err := strconv.ParseInt(cl, 10, 64)
if err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err.Error()))
return
}
if clInt != (end-start)+1 {
buh.Errors = append(buh.Errors, errcode.ErrorCodeSizeInvalid)
return
}
}
if err := copyFullPayload(buh, w, r, buh.Upload, -1, "blob PATCH"); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err.Error()))
return
}
if err := buh.blobUploadResponse(w, r); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
w.WriteHeader(http.StatusAccepted)
}
// PutBlobUploadComplete takes the final request of a blob upload. The
// request may include all the blob data or no blob data. Any data
// provided is received and verified. If successful, the blob is linked
// into the blob store and 201 Created is returned with the canonical
// url of the blob.
func (buh *blobUploadHandler) PutBlobUploadComplete(w http.ResponseWriter, r *http.Request) {
if buh.Upload == nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeBlobUploadUnknown)
return
}
defer buh.Upload.Close()
dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters!
if dgstStr == "" {
// no digest? return error, but allow retry.
buh.Errors = append(buh.Errors, errcode.ErrorCodeDigestInvalid.WithDetail("digest missing"))
return
}
dgst, err := digest.Parse(dgstStr)
if err != nil {
// no digest? return error, but allow retry.
buh.Errors = append(buh.Errors, errcode.ErrorCodeDigestInvalid.WithDetail("digest parsing failed"))
return
}
if err := copyFullPayload(buh, w, r, buh.Upload, -1, "blob PUT"); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err.Error()))
return
}
desc, err := buh.Upload.Commit(buh, v1.Descriptor{
Digest: dgst,
// TODO(stevvooe): This isn't wildly important yet, but we should
// really set the mediatype. For now, we can let the backend take care
// of this.
})
if err != nil {
switch err := err.(type) {
case distribution.ErrBlobInvalidDigest:
buh.Errors = append(buh.Errors, errcode.ErrorCodeDigestInvalid.WithDetail(err))
case errcode.Error:
buh.Errors = append(buh.Errors, err)
default:
switch err {
case distribution.ErrAccessDenied:
buh.Errors = append(buh.Errors, errcode.ErrorCodeDenied)
case distribution.ErrUnsupported:
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported)
case distribution.ErrBlobInvalidLength, distribution.ErrBlobDigestUnsupported:
buh.Errors = append(buh.Errors, errcode.ErrorCodeBlobUploadInvalid.WithDetail(err))
default:
dcontext.GetLogger(buh).Errorf("unknown error completing upload: %v", err)
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
}
// Clean up the backend blob data if there was an error.
if err := buh.Upload.Cancel(buh); err != nil {
// If the cleanup fails, all we can do is observe and report.
dcontext.GetLogger(buh).Errorf("error canceling upload after error: %v", err)
}
return
}
if err := buh.writeBlobCreatedHeaders(w, desc); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
}
// CancelBlobUpload cancels an in-progress upload of a blob.
func (buh *blobUploadHandler) CancelBlobUpload(w http.ResponseWriter, r *http.Request) {
if buh.Upload == nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeBlobUploadUnknown)
return
}
defer buh.Upload.Close()
w.Header().Set("Docker-Upload-UUID", buh.UUID)
if err := buh.Upload.Cancel(buh); err != nil {
dcontext.GetLogger(buh).Errorf("error encountered canceling upload: %v", err)
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
w.WriteHeader(http.StatusNoContent)
}
func (buh *blobUploadHandler) ResumeBlobUpload(ctx *Context, r *http.Request) http.Handler {
state, err := hmacKey(ctx.Config.HTTP.Secret).unpackUploadState(r.FormValue("_state"))
if err != nil {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
dcontext.GetLogger(ctx).Infof("error resolving upload: %v", err)
buh.Errors = append(buh.Errors, errcode.ErrorCodeBlobUploadInvalid.WithDetail(err))
})
}
buh.State = state
if state.Name != ctx.Repository.Named().Name() {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
dcontext.GetLogger(ctx).Infof("mismatched repository name in upload state: %q != %q", state.Name, buh.Repository.Named().Name())
buh.Errors = append(buh.Errors, errcode.ErrorCodeBlobUploadInvalid.WithDetail(err))
})
}
if state.UUID != buh.UUID {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
dcontext.GetLogger(ctx).Infof("mismatched uuid in upload state: %q != %q", state.UUID, buh.UUID)
buh.Errors = append(buh.Errors, errcode.ErrorCodeBlobUploadInvalid.WithDetail(err))
})
}
blobs := ctx.Repository.Blobs(buh)
upload, err := blobs.Resume(buh, buh.UUID)
if err != nil {
dcontext.GetLogger(ctx).Errorf("error resolving upload: %v", err)
if err == distribution.ErrBlobUploadUnknown {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
buh.Errors = append(buh.Errors, errcode.ErrorCodeBlobUploadUnknown.WithDetail(err))
})
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
})
}
buh.Upload = upload
if size := upload.Size(); size != buh.State.Offset {
dcontext.GetLogger(ctx).Errorf("upload resumed at wrong offset: %d != %d", size, buh.State.Offset)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
buh.Errors = append(buh.Errors, errcode.ErrorCodeRangeInvalid.WithDetail(err))
})
}
return nil
}
// blobUploadResponse provides a standard request for uploading blobs and
// chunk responses. This sets the correct headers but the response status is
// left to the caller.
func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.Request) error {
// TODO(stevvooe): Need a better way to manage the upload state automatically.
buh.State.Name = buh.Repository.Named().Name()
buh.State.UUID = buh.Upload.ID()
buh.Upload.Close()
buh.State.Offset = buh.Upload.Size()
buh.State.StartedAt = buh.Upload.StartedAt()
token, err := hmacKey(buh.Config.HTTP.Secret).packUploadState(buh.State)
if err != nil {
dcontext.GetLogger(buh).Infof("error building upload state token: %s", err)
return err
}
uploadURL, err := buh.urlBuilder.BuildBlobUploadChunkURL(
buh.Repository.Named(), buh.Upload.ID(),
url.Values{
"_state": []string{token},
})
if err != nil {
dcontext.GetLogger(buh).Infof("error building upload url: %s", err)
return err
}
endRange := buh.Upload.Size()
if endRange > 0 {
endRange = endRange - 1
}
w.Header().Set("Docker-Upload-UUID", buh.UUID)
w.Header().Set("Location", uploadURL)
w.Header().Set("Content-Length", "0")
w.Header().Set("Range", fmt.Sprintf("0-%d", endRange))
return nil
}
// mountBlob attempts to mount a blob from another repository by its digest. If
// successful, the blob is linked into the blob store and 201 Created is
// returned with the canonical url of the blob.
func (buh *blobUploadHandler) createBlobMountOption(fromRepo, mountDigest string) (distribution.BlobCreateOption, error) {
dgst, err := digest.Parse(mountDigest)
if err != nil {
return nil, err
}
ref, err := reference.WithName(fromRepo)
if err != nil {
return nil, err
}
canonical, err := reference.WithDigest(ref, dgst)
if err != nil {
return nil, err
}
return storage.WithMountFrom(canonical), nil
}
// writeBlobCreatedHeaders writes the standard headers describing a newly
// created blob. A 201 Created is written as well as the canonical URL and
// blob digest.
func (buh *blobUploadHandler) writeBlobCreatedHeaders(w http.ResponseWriter, desc v1.Descriptor) error {
ref, err := reference.WithDigest(buh.Repository.Named(), desc.Digest)
if err != nil {
return err
}
blobURL, err := buh.urlBuilder.BuildBlobURL(ref)
if err != nil {
return err
}
w.Header().Set("Location", blobURL)
w.Header().Set("Content-Length", "0")
w.Header().Set("Docker-Content-Digest", desc.Digest.String())
w.WriteHeader(http.StatusCreated)
return nil
}
package handlers
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"github.com/distribution/distribution/v3/registry/api/errcode"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/gorilla/handlers"
)
const defaultReturnedEntries = 100
func catalogDispatcher(ctx *Context, r *http.Request) http.Handler {
catalogHandler := &catalogHandler{
Context: ctx,
}
return handlers.MethodHandler{
http.MethodGet: http.HandlerFunc(catalogHandler.GetCatalog),
}
}
type catalogHandler struct {
*Context
}
type catalogAPIResponse struct {
Repositories []string `json:"repositories"`
}
func (ch *catalogHandler) GetCatalog(w http.ResponseWriter, r *http.Request) {
moreEntries := true
q := r.URL.Query()
lastEntry := q.Get("last")
entries := defaultReturnedEntries
maximumConfiguredEntries := ch.App.Config.Catalog.MaxEntries
// parse n, if n is negative abort with an error
if n := q.Get("n"); n != "" {
parsedMax, err := strconv.Atoi(n)
if err != nil || parsedMax < 0 {
ch.Errors = append(ch.Errors, errcode.ErrorCodePaginationNumberInvalid.WithDetail(map[string]string{"n": n}))
return
}
// if a client requests more than it's allowed to receive
if parsedMax > maximumConfiguredEntries {
ch.Errors = append(ch.Errors, errcode.ErrorCodePaginationNumberInvalid.WithDetail(map[string]int{"n": parsedMax}))
return
}
entries = parsedMax
}
// then enforce entries to be between 0 & maximumConfiguredEntries
// max(0, min(entries, maximumConfiguredEntries))
if entries < 0 || entries > maximumConfiguredEntries {
entries = maximumConfiguredEntries
}
repos := make([]string, entries)
filled := 0
// entries is guaranteed to be >= 0 and < maximumConfiguredEntries
if entries == 0 {
moreEntries = false
} else {
returnedRepositories, err := ch.App.registry.Repositories(ch.Context, repos, lastEntry)
if err != nil {
_, pathNotFound := err.(driver.PathNotFoundError)
if err != io.EOF && !pathNotFound {
ch.Errors = append(ch.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
// err is either io.EOF or not PathNotFoundError
moreEntries = false
}
filled = returnedRepositories
}
w.Header().Set("Content-Type", "application/json")
// Add a link header if there are more entries to retrieve
if moreEntries {
lastEntry = repos[filled-1]
urlStr, err := createLinkEntry(r.URL.String(), entries, lastEntry)
if err != nil {
ch.Errors = append(ch.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
w.Header().Set("Link", urlStr)
}
enc := json.NewEncoder(w)
if err := enc.Encode(catalogAPIResponse{
Repositories: repos[0:filled],
}); err != nil {
ch.Errors = append(ch.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
}
// Use the original URL from the request to create a new URL for
// the link header
func createLinkEntry(origURL string, maxEntries int, lastEntry string) (string, error) {
calledURL, err := url.Parse(origURL)
if err != nil {
return "", err
}
v := url.Values{}
v.Add("n", strconv.Itoa(maxEntries))
v.Add("last", lastEntry)
calledURL.RawQuery = v.Encode()
calledURL.Fragment = ""
urlStr := fmt.Sprintf("<%s>; rel=\"next\"", calledURL.String())
return urlStr, nil
}
package handlers
import (
"context"
"fmt"
"net/http"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/api/errcode"
v2 "github.com/distribution/distribution/v3/registry/api/v2"
"github.com/distribution/distribution/v3/registry/auth"
"github.com/opencontainers/go-digest"
)
// Context should contain the request specific context for use in across
// handlers. Resources that don't need to be shared across handlers should not
// be on this object.
type Context struct {
// App points to the application structure that created this context.
*App
context.Context
// Repository is the repository for the current request. All requests
// should be scoped to a single repository. This field may be nil.
Repository distribution.Repository
// RepositoryRemover provides method to delete a repository
RepositoryRemover distribution.RepositoryRemover
// Errors is a collection of errors encountered during the request to be
// returned to the client API. If errors are added to the collection, the
// handler *must not* start the response via http.ResponseWriter.
Errors errcode.Errors
urlBuilder *v2.URLBuilder
// TODO(stevvooe): The goal is too completely factor this context and
// dispatching out of the web application. Ideally, we should lean on
// context.Context for injection of these resources.
}
// Value overrides context.Context.Value to ensure that calls are routed to
// correct context.
func (ctx *Context) Value(key interface{}) interface{} {
return ctx.Context.Value(key)
}
func getName(ctx context.Context) (name string) {
return dcontext.GetStringValue(ctx, "vars.name")
}
func getReference(ctx context.Context) (reference string) {
return dcontext.GetStringValue(ctx, "vars.reference")
}
var errDigestNotAvailable = fmt.Errorf("digest not available in context")
func getDigest(ctx context.Context) (dgst digest.Digest, err error) {
dgstStr := dcontext.GetStringValue(ctx, "vars.digest")
if dgstStr == "" {
dcontext.GetLogger(ctx).Errorf("digest not available")
return "", errDigestNotAvailable
}
d, err := digest.Parse(dgstStr)
if err != nil {
dcontext.GetLogger(ctx).Errorf("error parsing digest=%q: %v", dgstStr, err)
return "", err
}
return d, nil
}
func getUploadUUID(ctx context.Context) (uuid string) {
return dcontext.GetStringValue(ctx, "vars.uuid")
}
const (
// userKey is used to get the user object from
// a user context
userKey = "auth.user"
// userNameKey is used to get the user name from
// a user context
userNameKey = "auth.user.name"
)
// getUserName attempts to resolve a username from the context and request. If
// a username cannot be resolved, the empty string is returned.
func getUserName(ctx context.Context, r *http.Request) string {
username := dcontext.GetStringValue(ctx, userNameKey)
// Fallback to request user with basic auth
if username == "" {
var ok bool
uname, _, ok := basicAuth(r)
if ok {
username = uname
}
}
return username
}
// withUser returns a context with the authorized user info.
func withUser(ctx context.Context, user auth.UserInfo) context.Context {
return userInfoContext{
Context: ctx,
user: user,
}
}
type userInfoContext struct {
context.Context
user auth.UserInfo
}
func (uic userInfoContext) Value(key interface{}) interface{} {
switch key {
case userKey:
return uic.user
case userNameKey:
return uic.user.Name
}
return uic.Context.Value(key)
}
// withResources returns a context with the authorized resources.
func withResources(ctx context.Context, resources []auth.Resource) context.Context {
return resourceContext{
Context: ctx,
resources: resources,
}
}
type resourceContext struct {
context.Context
resources []auth.Resource
}
type resourceKey struct{}
func (rc resourceContext) Value(key interface{}) interface{} {
if key == (resourceKey{}) {
return rc.resources
}
return rc.Context.Value(key)
}
// authorizedResources returns the list of resources which have
// been authorized for this request.
func authorizedResources(ctx context.Context) []auth.Resource {
if resources, ok := ctx.Value(resourceKey{}).([]auth.Resource); ok {
return resources
}
return nil
}
package handlers
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"github.com/distribution/distribution/v3/internal/dcontext"
)
// closeResources closes all the provided resources after running the target
// handler.
func closeResources(handler http.Handler, closers ...io.Closer) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for _, closer := range closers {
defer closer.Close()
}
handler.ServeHTTP(w, r)
})
}
// copyFullPayload copies the payload of an HTTP request to destWriter. If it
// receives less content than expected, and the client disconnected during the
// upload, it avoids sending a 400 error to keep the logs cleaner.
//
// The copy will be limited to `limit` bytes, if limit is greater than zero.
func copyFullPayload(ctx context.Context, responseWriter http.ResponseWriter, r *http.Request, destWriter io.Writer, limit int64, action string) error {
// Get a channel that tells us if the client disconnects
clientClosed := r.Context().Done()
body := r.Body
if limit > 0 {
body = http.MaxBytesReader(responseWriter, body, limit)
}
// Read in the data, if any.
copied, err := io.Copy(destWriter, body)
if clientClosed != nil && (err != nil || (r.ContentLength > 0 && copied < r.ContentLength)) {
// Didn't receive as much content as expected. Did the client
// disconnect during the request? If so, avoid returning a 400
// error to keep the logs cleaner.
select {
case <-clientClosed:
// Set the response code to "499 Client Closed Request"
// Even though the connection has already been closed,
// this causes the logger to pick up a 499 error
// instead of showing 0 for the HTTP status.
responseWriter.WriteHeader(499)
dcontext.GetLoggerWithFields(ctx, map[interface{}]interface{}{
"error": err,
"copied": copied,
"contentLength": r.ContentLength,
}, "error", "copied", "contentLength").Error("client disconnected during " + action)
return errors.New("client disconnected")
default:
}
}
if err != nil {
dcontext.GetLogger(ctx).Errorf("unknown error reading request payload: %v", err)
return err
}
return nil
}
func parseContentRange(cr string) (start int64, end int64, err error) {
rStart, rEnd, ok := strings.Cut(cr, "-")
if !ok {
return -1, -1, fmt.Errorf("invalid content range format, %s", cr)
}
start, err = strconv.ParseInt(rStart, 10, 64)
if err != nil {
return -1, -1, err
}
end, err = strconv.ParseInt(rEnd, 10, 64)
if err != nil {
return -1, -1, err
}
return start, end, nil
}
package handlers
import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"time"
)
// blobUploadState captures the state serializable state of the blob upload.
type blobUploadState struct {
// name is the primary repository under which the blob will be linked.
Name string
// UUID identifies the upload.
UUID string
// offset contains the current progress of the upload.
Offset int64
// StartedAt is the original start time of the upload.
StartedAt time.Time
}
type hmacKey string
var errInvalidSecret = fmt.Errorf("invalid secret")
// unpackUploadState unpacks and validates the blob upload state from the
// token, using the hmacKey secret.
func (secret hmacKey) unpackUploadState(token string) (blobUploadState, error) {
var state blobUploadState
tokenBytes, err := base64.URLEncoding.DecodeString(token)
if err != nil {
return state, err
}
mac := hmac.New(sha256.New, []byte(secret))
if len(tokenBytes) < mac.Size() {
return state, errInvalidSecret
}
macBytes := tokenBytes[:mac.Size()]
messageBytes := tokenBytes[mac.Size():]
mac.Write(messageBytes)
if !hmac.Equal(mac.Sum(nil), macBytes) {
return state, errInvalidSecret
}
if err := json.Unmarshal(messageBytes, &state); err != nil {
return state, err
}
return state, nil
}
// packUploadState packs the upload state signed with and hmac digest using
// the hmacKey secret, encoding to url safe base64. The resulting token can be
// used to share data with minimized risk of external tampering.
func (secret hmacKey) packUploadState(lus blobUploadState) (string, error) {
mac := hmac.New(sha256.New, []byte(secret))
p, err := json.Marshal(lus)
if err != nil {
return "", err
}
mac.Write(p)
return base64.URLEncoding.EncodeToString(append(mac.Sum(nil), p...)), nil
}
package handlers
import (
"bytes"
"errors"
"fmt"
"strings"
"text/template"
"github.com/sirupsen/logrus"
)
// logHook is for hooking Panic in web application
type logHook struct {
LevelsParam []string
Mail *mailer
}
// Fire forwards an error to LogHook
func (hook *logHook) Fire(entry *logrus.Entry) error {
host, _, ok := strings.Cut(hook.Mail.Addr, ":")
if !ok || host == "" {
return errors.New("invalid Mail Address")
}
subject := fmt.Sprintf("[%s] %s: %s", entry.Level, host, entry.Message)
html := `
{{.Message}}
{{range $key, $value := .Data}}
{{$key}}: {{$value}}
{{end}}
`
b := bytes.NewBuffer(make([]byte, 0))
t := template.Must(template.New("mail body").Parse(html))
if err := t.Execute(b, entry); err != nil {
return err
}
body := b.String()
return hook.Mail.sendMail(subject, body)
}
// Levels contains hook levels to be catched
func (hook *logHook) Levels() []logrus.Level {
levels := []logrus.Level{}
for _, v := range hook.LevelsParam {
lv, _ := logrus.ParseLevel(v)
levels = append(levels, lv)
}
return levels
}
package handlers
import (
"errors"
"net/smtp"
"strings"
)
// mailer provides fields of email configuration for sending.
type mailer struct {
Addr, Username, Password, From string
Insecure bool
To []string
}
// sendMail allows users to send email, only if mail parameters is configured correctly.
func (mail *mailer) sendMail(subject, message string) error {
addr := strings.Split(mail.Addr, ":")
if len(addr) != 2 {
return errors.New("invalid Mail Address")
}
host := addr[0]
msg := []byte("To:" + strings.Join(mail.To, ";") +
"\r\nFrom: " + mail.From +
"\r\nSubject: " + subject +
"\r\nContent-Type: text/plain\r\n\r\n" +
message)
auth := smtp.PlainAuth(
"",
mail.Username,
mail.Password,
host,
)
err := smtp.SendMail(
mail.Addr,
auth,
mail.From,
mail.To,
msg,
)
if err != nil {
return err
}
return nil
}
package handlers
import (
"bytes"
"fmt"
"mime"
"net/http"
"strings"
"sync"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/manifest/manifestlist"
"github.com/distribution/distribution/v3/manifest/ocischema"
"github.com/distribution/distribution/v3/manifest/schema2"
"github.com/distribution/distribution/v3/registry/api/errcode"
"github.com/distribution/distribution/v3/registry/storage"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
"github.com/gorilla/handlers"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
)
const (
defaultArch = "amd64"
defaultOS = "linux"
maxManifestBodySize = 4 * 1024 * 1024
imageClass = "image"
)
type storageType int
const (
manifestSchema2 storageType = iota // 0
manifestlistSchema // 1
ociSchema // 2
ociImageIndexSchema // 3
numStorageTypes // 4
)
// manifestDispatcher takes the request context and builds the
// appropriate handler for handling manifest requests.
func manifestDispatcher(ctx *Context, r *http.Request) http.Handler {
manifestHandler := &manifestHandler{
Context: ctx,
}
ref := getReference(ctx)
dgst, err := digest.Parse(ref)
if err != nil {
// We just have a tag
manifestHandler.Tag = ref
} else {
manifestHandler.Digest = dgst
}
mhandler := handlers.MethodHandler{
http.MethodGet: http.HandlerFunc(manifestHandler.GetManifest),
http.MethodHead: http.HandlerFunc(manifestHandler.GetManifest),
}
if !ctx.readOnly {
mhandler[http.MethodPut] = http.HandlerFunc(manifestHandler.PutManifest)
mhandler[http.MethodDelete] = http.HandlerFunc(manifestHandler.DeleteManifest)
}
return mhandler
}
// manifestHandler handles http operations on image manifests.
type manifestHandler struct {
*Context
// One of tag or digest gets set, depending on what is present in context.
Tag string
Digest digest.Digest
}
// GetManifest fetches the image manifest from the storage backend, if it exists.
func (imh *manifestHandler) GetManifest(w http.ResponseWriter, r *http.Request) {
dcontext.GetLogger(imh).Debug("GetImageManifest")
manifests, err := imh.Repository.Manifests(imh)
if err != nil {
imh.Errors = append(imh.Errors, err)
return
}
var supports [numStorageTypes]bool
// this parsing of Accept headers is not quite as full-featured as godoc.org's parser, but we don't care about "q=" values
// https://github.com/golang/gddo/blob/e91d4165076d7474d20abda83f92d15c7ebc3e81/httputil/header/header.go#L165-L202
for _, acceptHeader := range r.Header["Accept"] {
// r.Header[...] is a slice in case the request contains the same header more than once
// if the header isn't set, we'll get the zero value, which "range" will handle gracefully
// we need to split each header value on "," to get the full list of "Accept" values (per RFC 2616)
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.1
for _, mediaType := range strings.Split(acceptHeader, ",") {
if mediaType, _, err = mime.ParseMediaType(mediaType); err != nil {
continue
}
if mediaType == schema2.MediaTypeManifest {
supports[manifestSchema2] = true
}
if mediaType == manifestlist.MediaTypeManifestList {
supports[manifestlistSchema] = true
}
if mediaType == v1.MediaTypeImageManifest {
supports[ociSchema] = true
}
if mediaType == v1.MediaTypeImageIndex {
supports[ociImageIndexSchema] = true
}
}
}
if imh.Tag != "" {
tags := imh.Repository.Tags(imh)
desc, err := tags.Get(imh, imh.Tag)
if err != nil {
if _, ok := err.(distribution.ErrTagUnknown); ok {
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestUnknown.WithDetail(err))
} else {
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
imh.Digest = desc.Digest
}
if etagMatch(r, imh.Digest.String()) {
w.WriteHeader(http.StatusNotModified)
return
}
var options []distribution.ManifestServiceOption
if imh.Tag != "" {
options = append(options, distribution.WithTag(imh.Tag))
}
manifest, err := manifests.Get(imh, imh.Digest, options...)
if err != nil {
if _, ok := err.(distribution.ErrManifestUnknownRevision); ok {
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestUnknown.WithDetail(err))
} else {
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
// determine the type of the returned manifest
manifestType := manifestSchema2
manifestList, isManifestList := manifest.(*manifestlist.DeserializedManifestList)
if _, isOCImanifest := manifest.(*ocischema.DeserializedManifest); isOCImanifest {
manifestType = ociSchema
} else if isManifestList {
if manifestList.MediaType == manifestlist.MediaTypeManifestList {
manifestType = manifestlistSchema
} else if manifestList.MediaType == v1.MediaTypeImageIndex {
manifestType = ociImageIndexSchema
}
}
if manifestType == ociSchema && !supports[ociSchema] {
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestUnknown.WithMessage("OCI manifest found, but accept header does not support OCI manifests"))
return
}
if manifestType == ociImageIndexSchema && !supports[ociImageIndexSchema] {
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestUnknown.WithMessage("OCI index found, but accept header does not support OCI indexes"))
return
}
if imh.Tag != "" && manifestType == manifestlistSchema && !supports[manifestlistSchema] {
// Rewrite manifest in schema1 format
dcontext.GetLogger(imh).Infof("rewriting manifest list %s in schema1 format to support old client", imh.Digest.String())
// Find the image manifest corresponding to the default
// platform
var manifestDigest digest.Digest
for _, manifestDescriptor := range manifestList.Manifests {
if manifestDescriptor.Platform.Architecture == defaultArch && manifestDescriptor.Platform.OS == defaultOS {
manifestDigest = manifestDescriptor.Digest
break
}
}
if manifestDigest == "" {
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestUnknown)
return
}
manifest, err = manifests.Get(imh, manifestDigest)
if err != nil {
if _, ok := err.(distribution.ErrManifestUnknownRevision); ok {
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestUnknown.WithDetail(err))
} else {
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
if _, isSchema2 := manifest.(*schema2.DeserializedManifest); isSchema2 && !supports[manifestSchema2] {
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestInvalid.WithMessage("Schema 2 manifest not supported by client"))
return
} else {
imh.Digest = manifestDigest
}
}
ct, p, err := manifest.Payload()
if err != nil {
return
}
w.Header().Set("Content-Type", ct)
w.Header().Set("Content-Length", fmt.Sprint(len(p)))
w.Header().Set("Docker-Content-Digest", imh.Digest.String())
w.Header().Set("Etag", fmt.Sprintf(`"%s"`, imh.Digest))
if r.Method == http.MethodHead {
return
}
if _, err := w.Write(p); err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}
func etagMatch(r *http.Request, etag string) bool {
for _, headerVal := range r.Header["If-None-Match"] {
if headerVal == etag || headerVal == fmt.Sprintf(`"%s"`, etag) { // allow quoted or unquoted
return true
}
}
return false
}
// PutManifest validates and stores a manifest in the registry.
func (imh *manifestHandler) PutManifest(w http.ResponseWriter, r *http.Request) {
dcontext.GetLogger(imh).Debug("PutImageManifest")
manifests, err := imh.Repository.Manifests(imh)
if err != nil {
imh.Errors = append(imh.Errors, err)
return
}
var jsonBuf bytes.Buffer
if err := copyFullPayload(imh, w, r, &jsonBuf, maxManifestBodySize, "image manifest PUT"); err != nil {
// copyFullPayload reports the error if necessary
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestInvalid.WithDetail(err.Error()))
return
}
mediaType := r.Header.Get("Content-Type")
manifest, desc, err := distribution.UnmarshalManifest(mediaType, jsonBuf.Bytes())
if err != nil {
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestInvalid.WithDetail(err))
return
}
if imh.Digest != "" {
if desc.Digest != imh.Digest {
dcontext.GetLogger(imh).Errorf("payload digest does not match: %q != %q", desc.Digest, imh.Digest)
imh.Errors = append(imh.Errors, errcode.ErrorCodeDigestInvalid)
return
}
} else if imh.Tag != "" {
imh.Digest = desc.Digest
} else {
imh.Errors = append(imh.Errors, errcode.ErrorCodeTagInvalid.WithDetail("no tag or digest specified"))
return
}
isAnOCIManifest := mediaType == v1.MediaTypeImageManifest || mediaType == v1.MediaTypeImageIndex
if isAnOCIManifest {
dcontext.GetLogger(imh).Debug("Putting an OCI Manifest!")
} else {
dcontext.GetLogger(imh).Debug("Putting a Docker Manifest!")
}
var options []distribution.ManifestServiceOption
if imh.Tag != "" {
options = append(options, distribution.WithTag(imh.Tag))
}
if err := imh.applyResourcePolicy(manifest); err != nil {
imh.Errors = append(imh.Errors, err)
return
}
_, err = manifests.Put(imh, manifest, options...)
if err != nil {
// TODO(stevvooe): These error handling switches really need to be
// handled by an app global mapper.
if err == distribution.ErrUnsupported {
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnsupported)
return
}
if err == distribution.ErrAccessDenied {
imh.Errors = append(imh.Errors, errcode.ErrorCodeDenied)
return
}
switch err := err.(type) {
case distribution.ErrManifestVerification:
for _, verificationError := range err {
switch verificationError := verificationError.(type) {
case distribution.ErrManifestBlobUnknown:
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestBlobUnknown.WithDetail(verificationError.Digest))
case distribution.ErrManifestNameInvalid:
imh.Errors = append(imh.Errors, errcode.ErrorCodeNameInvalid.WithDetail(err))
case distribution.ErrManifestUnverified:
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestUnverified)
default:
if verificationError == digest.ErrDigestInvalidFormat {
imh.Errors = append(imh.Errors, errcode.ErrorCodeDigestInvalid)
} else {
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown, verificationError)
}
}
}
case errcode.Error:
imh.Errors = append(imh.Errors, err)
default:
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
// Tag this manifest
if imh.Tag != "" {
tags := imh.Repository.Tags(imh)
err = tags.Tag(imh, imh.Tag, desc)
if err != nil {
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
}
// Construct a canonical url for the uploaded manifest.
ref, err := reference.WithDigest(imh.Repository.Named(), imh.Digest)
if err != nil {
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
location, err := imh.urlBuilder.BuildManifestURL(ref)
if err != nil {
// NOTE(stevvooe): Given the behavior above, this absurdly unlikely to
// happen. We'll log the error here but proceed as if it worked. Worst
// case, we set an empty location header.
dcontext.GetLogger(imh).Errorf("error building manifest url from digest: %v", err)
}
w.Header().Set("Location", location)
w.Header().Set("Docker-Content-Digest", imh.Digest.String())
w.WriteHeader(http.StatusCreated)
dcontext.GetLogger(imh).Debug("Succeeded in putting manifest!")
}
// applyResourcePolicy checks whether the resource class matches what has
// been authorized and allowed by the policy configuration.
func (imh *manifestHandler) applyResourcePolicy(manifest distribution.Manifest) error {
allowedClasses := imh.App.Config.Policy.Repository.Classes
if len(allowedClasses) == 0 {
return nil
}
var class string
switch m := manifest.(type) {
case *schema2.DeserializedManifest:
switch m.Config.MediaType {
case schema2.MediaTypeImageConfig:
class = imageClass
case schema2.MediaTypePluginConfig:
class = "plugin"
default:
return errcode.ErrorCodeDenied.WithMessage("unknown manifest class for " + m.Config.MediaType)
}
case *ocischema.DeserializedManifest:
switch m.Config.MediaType {
case v1.MediaTypeImageConfig:
class = imageClass
default:
return errcode.ErrorCodeDenied.WithMessage("unknown manifest class for " + m.Config.MediaType)
}
}
if class == "" {
return nil
}
// Check to see if class is allowed in registry
var allowedClass bool
for _, c := range allowedClasses {
if class == c {
allowedClass = true
break
}
}
if !allowedClass {
return errcode.ErrorCodeDenied.WithMessage(fmt.Sprintf("registry does not allow %s manifest", class))
}
resources := authorizedResources(imh)
n := imh.Repository.Named().Name()
var foundResource bool
for _, r := range resources {
if r.Name == n {
if r.Class == "" {
r.Class = imageClass
}
if r.Class == class {
return nil
}
foundResource = true
}
}
// resource was found but no matching class was found
if foundResource {
return errcode.ErrorCodeDenied.WithMessage(fmt.Sprintf("repository not authorized for %s manifest", class))
}
return nil
}
// DeleteManifest removes the manifest with the given digest or the tag with the given name from the registry.
func (imh *manifestHandler) DeleteManifest(w http.ResponseWriter, r *http.Request) {
dcontext.GetLogger(imh).Debug("DeleteImageManifest")
if imh.App.isCache {
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnsupported)
return
}
if imh.Tag != "" {
dcontext.GetLogger(imh).Debug("DeleteImageTag")
tagService := imh.Repository.Tags(imh.Context)
if err := tagService.Untag(imh.Context, imh.Tag); err != nil {
switch err.(type) {
case distribution.ErrTagUnknown, driver.PathNotFoundError:
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestUnknown.WithDetail(err))
default:
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
w.WriteHeader(http.StatusAccepted)
return
}
manifests, err := imh.Repository.Manifests(imh)
if err != nil {
imh.Errors = append(imh.Errors, err)
return
}
err = manifests.Delete(imh, imh.Digest)
if err != nil {
switch err {
case digest.ErrDigestUnsupported:
case digest.ErrDigestInvalidFormat:
imh.Errors = append(imh.Errors, errcode.ErrorCodeDigestInvalid)
return
case distribution.ErrBlobUnknown:
imh.Errors = append(imh.Errors, errcode.ErrorCodeManifestUnknown)
return
case distribution.ErrUnsupported:
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnsupported)
return
default:
imh.Errors = append(imh.Errors, errcode.ErrorCodeUnknown)
return
}
}
tagService := imh.Repository.Tags(imh)
referencedTags, err := tagService.Lookup(imh, v1.Descriptor{Digest: imh.Digest})
if err != nil {
imh.Errors = append(imh.Errors, err)
return
}
var (
errs []error
mu sync.Mutex
)
g := errgroup.Group{}
g.SetLimit(storage.DefaultConcurrencyLimit)
for _, tag := range referencedTags {
tag := tag
g.Go(func() error {
if err := tagService.Untag(imh, tag); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
return nil
})
}
_ = g.Wait() // imh will record all errors, so ignore the error of Wait()
imh.Errors = errs
w.WriteHeader(http.StatusAccepted)
}
package handlers
import (
"encoding/json"
"net/http"
"sort"
"strconv"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/registry/api/errcode"
"github.com/gorilla/handlers"
)
// tagsDispatcher constructs the tags handler api endpoint.
func tagsDispatcher(ctx *Context, r *http.Request) http.Handler {
tagsHandler := &tagsHandler{
Context: ctx,
}
return handlers.MethodHandler{
http.MethodGet: http.HandlerFunc(tagsHandler.GetTags),
}
}
// tagsHandler handles requests for lists of tags under a repository name.
type tagsHandler struct {
*Context
}
type tagsAPIResponse struct {
Name string `json:"name"`
Tags []string `json:"tags"`
}
// GetTags returns a json list of tags for a specific image name.
func (th *tagsHandler) GetTags(w http.ResponseWriter, r *http.Request) {
tagService := th.Repository.Tags(th)
tags, err := tagService.All(th)
if err != nil {
switch err := err.(type) {
case distribution.ErrRepositoryUnknown:
th.Errors = append(th.Errors, errcode.ErrorCodeNameUnknown.WithDetail(map[string]string{"name": th.Repository.Named().Name()}))
case errcode.Error:
th.Errors = append(th.Errors, err)
default:
th.Errors = append(th.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
// do pagination if requested
q := r.URL.Query()
// get entries after latest, if any specified
if lastEntry := q.Get("last"); lastEntry != "" {
lastEntryIndex := sort.SearchStrings(tags, lastEntry)
// as`sort.SearchStrings` can return len(tags), if the
// specified `lastEntry` is not found, we need to
// ensure it does not panic when slicing.
if lastEntryIndex == len(tags) {
tags = []string{}
} else {
tags = tags[lastEntryIndex+1:]
}
}
// if no error, means that the user requested `n` entries
if n := q.Get("n"); n != "" {
maxEntries, err := strconv.Atoi(n)
if err != nil || maxEntries < 0 {
th.Errors = append(th.Errors, errcode.ErrorCodePaginationNumberInvalid.WithDetail(map[string]string{"n": n}))
return
}
// if there is requested more than or
// equal to the amount of tags we have,
// then set the request to equal `len(tags)`.
// the reason for the `=`, is so the else
// clause will only activate if there
// are tags left the user needs.
if maxEntries >= len(tags) {
maxEntries = len(tags)
} else if maxEntries > 0 {
// defined in `catalog.go`
urlStr, err := createLinkEntry(r.URL.String(), maxEntries, tags[maxEntries-1])
if err != nil {
th.Errors = append(th.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
w.Header().Set("Link", urlStr)
}
tags = tags[:maxEntries]
}
w.Header().Set("Content-Type", "application/json")
enc := json.NewEncoder(w)
if err := enc.Encode(tagsAPIResponse{
Name: th.Repository.Named().Name(),
Tags: tags,
}); err != nil {
th.Errors = append(th.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
}
package registry
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
logstash "github.com/bshuster-repo/logrus-logstash-hook"
"github.com/docker/go-metrics"
gorhandlers "github.com/gorilla/handlers"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"github.com/distribution/distribution/v3/configuration"
"github.com/distribution/distribution/v3/health"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/handlers"
"github.com/distribution/distribution/v3/registry/listener"
"github.com/distribution/distribution/v3/tracing"
"github.com/distribution/distribution/v3/version"
)
// a map of TLS cipher suite names to constants in https://golang.org/pkg/crypto/tls/#pkg-constants
var cipherSuites = map[string]uint16{
// TLS 1.0 - 1.2 cipher suites
"TLS_RSA_WITH_3DES_EDE_CBC_SHA": tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
"TLS_RSA_WITH_AES_128_CBC_SHA": tls.TLS_RSA_WITH_AES_128_CBC_SHA,
"TLS_RSA_WITH_AES_256_CBC_SHA": tls.TLS_RSA_WITH_AES_256_CBC_SHA,
"TLS_RSA_WITH_AES_128_GCM_SHA256": tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
"TLS_RSA_WITH_AES_256_GCM_SHA384": tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA": tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
"TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA": tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
"TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA": tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA,
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA": tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
"TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA": tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256": tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256": tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384": tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
"TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384": tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
"TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256": tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
"TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256": tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
// TLS 1.3 cipher suites
"TLS_AES_128_GCM_SHA256": tls.TLS_AES_128_GCM_SHA256,
"TLS_AES_256_GCM_SHA384": tls.TLS_AES_256_GCM_SHA384,
"TLS_CHACHA20_POLY1305_SHA256": tls.TLS_CHACHA20_POLY1305_SHA256,
}
// a list of default ciphersuites to utilize
var defaultCipherSuites = []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_AES_128_GCM_SHA256,
tls.TLS_CHACHA20_POLY1305_SHA256,
tls.TLS_AES_256_GCM_SHA384,
}
const defaultTLSVersionStr = "tls1.2"
// tlsVersions maps user-specified values to tls version constants.
var tlsVersions = map[string]uint16{
"tls1.2": tls.VersionTLS12,
"tls1.3": tls.VersionTLS13,
}
// tlsClientAuth maps user-specified values to TLS Client Authentication constants.
var tlsClientAuth = map[string]tls.ClientAuthType{
configuration.ClientAuthRequestClientCert: tls.RequestClientCert,
configuration.ClientAuthRequireAnyClientCert: tls.RequireAnyClientCert,
configuration.ClientAuthVerifyClientCertIfGiven: tls.VerifyClientCertIfGiven,
configuration.ClientAuthRequireAndVerifyClientCert: tls.RequireAndVerifyClientCert,
}
// defaultLogFormatter is the default formatter to use for logs.
const defaultLogFormatter = "text"
// HandlerFunc defines an http middleware
type HandlerFunc func(config *configuration.Configuration, handler http.Handler) http.Handler
var handlerMiddlewares []HandlerFunc
// RegisterHandler is used to register http middlewares to the registry service
func RegisterHandler(handlerFunc HandlerFunc) {
handlerMiddlewares = append(handlerMiddlewares, handlerFunc)
}
// ServeCmd is a cobra command for running the registry.
var ServeCmd = &cobra.Command{
Use: "serve <config>",
Short: "`serve` stores and distributes Docker images",
Long: "`serve` stores and distributes Docker images.",
Run: func(cmd *cobra.Command, args []string) {
// setup context
ctx := dcontext.WithVersion(dcontext.Background(), version.Version())
config, err := resolveConfiguration(args)
if err != nil {
fmt.Fprintf(os.Stderr, "configuration error: %v\n", err)
// nolint:errcheck
cmd.Usage()
os.Exit(1)
}
registry, err := NewRegistry(ctx, config)
if err != nil {
logrus.Fatalln(err)
}
configureDebugServer(config)
if err = registry.ListenAndServe(); err != nil {
logrus.Fatalln(err)
}
},
}
// A Registry represents a complete instance of the registry.
//
// TODO(aaronl): It might make sense for Registry to become an interface.
type Registry struct {
config *configuration.Configuration
app *handlers.App
server *http.Server
quit chan os.Signal
}
// NewRegistry creates a new registry from a context and configuration struct.
func NewRegistry(ctx context.Context, config *configuration.Configuration) (*Registry, error) {
var err error
ctx, err = configureLogging(ctx, config)
if err != nil {
return nil, fmt.Errorf("error configuring logger: %v", err)
}
app := handlers.NewApp(ctx, config)
// TODO(aaronl): The global scope of the health checks means NewRegistry
// can only be called once per process.
app.RegisterHealthChecks()
var handler http.Handler = app
handler = alive("/", handler)
handler = health.Handler(handler)
handler = panicHandler(handler)
if !config.Log.AccessLog.Disabled {
handler = gorhandlers.CombinedLoggingHandler(os.Stdout, handler)
}
for _, applyHandlerMiddleware := range handlerMiddlewares {
handler = applyHandlerMiddleware(config, handler)
}
err = tracing.InitOpenTelemetry(app.Context)
if err != nil {
return nil, fmt.Errorf("error during open telemetry initialization: %v", err)
}
if config.HTTP.H2C.Enabled {
handler = h2c.NewHandler(handler, &http2.Server{})
}
handler = otelHandler(handler)
server := &http.Server{
Handler: handler,
}
return &Registry{
app: app,
config: config,
server: server,
quit: make(chan os.Signal, 1),
}, nil
}
// otelHandler returns an http.Handler that wraps the provided `next` handler with OpenTelemetry instrumentation.
// This instrumentation tracks each HTTP request, creating spans with names derived from the request method and URL path.
func otelHandler(next http.Handler) http.Handler {
return otelhttp.NewHandler(next, "",
otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string { return r.Method + " " + r.URL.Path }))
}
// takes a list of cipher suites and converts it to a list of respective tls constants
// if an empty list is provided, then the defaults will be used
func getCipherSuites(names []string) ([]uint16, error) {
if len(names) == 0 {
return defaultCipherSuites, nil
}
cipherSuiteConsts := make([]uint16, len(names))
for i, name := range names {
cipherSuiteConst, ok := cipherSuites[name]
if !ok {
return nil, fmt.Errorf("unknown TLS cipher suite '%s' specified for http.tls.cipherSuites", name)
}
cipherSuiteConsts[i] = cipherSuiteConst
}
return cipherSuiteConsts, nil
}
// takes a list of cipher suite ids and converts it to a list of respective names
func getCipherSuiteNames(ids []uint16) []string {
if len(ids) == 0 {
return nil
}
names := make([]string, len(ids))
for i, id := range ids {
names[i] = tls.CipherSuiteName(id)
}
return names
}
// set ACME-server/DirectoryURL, if provided
func setDirectoryURL(directoryurl string) *acme.Client {
if len(directoryurl) > 0 {
return &acme.Client{DirectoryURL: directoryurl}
}
return nil
}
// ListenAndServe runs the registry's HTTP server.
func (registry *Registry) ListenAndServe() error {
config := registry.config
ln, err := listener.NewListener(config.HTTP.Net, config.HTTP.Addr)
if err != nil {
return err
}
if config.HTTP.TLS.Certificate != "" || config.HTTP.TLS.LetsEncrypt.CacheFile != "" {
if config.HTTP.TLS.MinimumTLS == "" {
config.HTTP.TLS.MinimumTLS = defaultTLSVersionStr
}
tlsMinVersion, ok := tlsVersions[config.HTTP.TLS.MinimumTLS]
if !ok {
return fmt.Errorf("unknown minimum TLS level '%s' specified for http.tls.minimumtls", config.HTTP.TLS.MinimumTLS)
}
dcontext.GetLogger(registry.app).Infof("restricting TLS version to %s or higher", config.HTTP.TLS.MinimumTLS)
var tlsCipherSuites []uint16
// configuring cipher suites are no longer supported after the tls1.3.
// (https://go.dev/blog/tls-cipher-suites)
if tlsMinVersion > tls.VersionTLS12 {
dcontext.GetLogger(registry.app).Warnf("restricting TLS cipher suites to empty. Because configuring cipher suites is no longer supported in %s", config.HTTP.TLS.MinimumTLS)
} else {
tlsCipherSuites, err = getCipherSuites(config.HTTP.TLS.CipherSuites)
if err != nil {
return err
}
dcontext.GetLogger(registry.app).Infof("restricting TLS cipher suites to: %s", strings.Join(getCipherSuiteNames(tlsCipherSuites), ","))
}
tlsConf := &tls.Config{
ClientAuth: tls.NoClientCert,
NextProtos: nextProtos(config),
MinVersion: tlsMinVersion,
CipherSuites: tlsCipherSuites,
}
if config.HTTP.TLS.LetsEncrypt.CacheFile != "" {
if config.HTTP.TLS.Certificate != "" {
return fmt.Errorf("cannot specify both certificate and Let's Encrypt")
}
m := &autocert.Manager{
HostPolicy: autocert.HostWhitelist(config.HTTP.TLS.LetsEncrypt.Hosts...),
Cache: autocert.DirCache(config.HTTP.TLS.LetsEncrypt.CacheFile),
Email: config.HTTP.TLS.LetsEncrypt.Email,
Prompt: autocert.AcceptTOS,
Client: setDirectoryURL(config.HTTP.TLS.LetsEncrypt.DirectoryURL),
}
tlsConf.GetCertificate = m.GetCertificate
tlsConf.NextProtos = append(tlsConf.NextProtos, acme.ALPNProto)
} else {
tlsConf.Certificates = make([]tls.Certificate, 1)
tlsConf.Certificates[0], err = tls.LoadX509KeyPair(config.HTTP.TLS.Certificate, config.HTTP.TLS.Key)
if err != nil {
return err
}
}
if len(config.HTTP.TLS.ClientCAs) != 0 {
pool := x509.NewCertPool()
for _, ca := range config.HTTP.TLS.ClientCAs {
caPem, err := os.ReadFile(ca)
if err != nil {
return err
}
if ok := pool.AppendCertsFromPEM(caPem); !ok {
return fmt.Errorf("could not add CA to pool")
}
}
for _, subj := range pool.Subjects() { //nolint:staticcheck // FIXME(thaJeztah): ignore SA1019: ac.(*accessController).rootCerts.Subjects has been deprecated since Go 1.18: if s was returned by SystemCertPool, Subjects will not include the system roots. (staticcheck)
dcontext.GetLogger(registry.app).Debugf("CA Subject: %s", string(subj))
}
if config.HTTP.TLS.ClientAuth != "" {
tlsClientAuthMod, ok := tlsClientAuth[string(config.HTTP.TLS.ClientAuth)]
if !ok {
return fmt.Errorf("unknown client auth mod '%s' specified for http.tls.clientauth", config.HTTP.TLS.ClientAuth)
}
tlsConf.ClientAuth = tlsClientAuthMod
} else {
tlsConf.ClientAuth = tls.RequireAndVerifyClientCert
}
tlsConf.ClientCAs = pool
}
ln = tls.NewListener(ln, tlsConf)
dcontext.GetLogger(registry.app).Infof("listening on %v, tls", ln.Addr())
} else {
dcontext.GetLogger(registry.app).Infof("listening on %v", ln.Addr())
}
if config.HTTP.DrainTimeout == 0 {
return registry.server.Serve(ln)
}
// setup channel to get notified on SIGTERM signal
signal.Notify(registry.quit, os.Interrupt, syscall.SIGTERM)
serveErr := make(chan error)
// Start serving in goroutine and listen for stop signal in main thread
go func() {
serveErr <- registry.server.Serve(ln)
}()
select {
case err := <-serveErr:
return err
case <-registry.quit:
dcontext.GetLogger(registry.app).Info("stopping server gracefully. Draining connections for ", config.HTTP.DrainTimeout)
// shutdown the server with a grace period of configured timeout
c, cancel := context.WithTimeout(context.Background(), config.HTTP.DrainTimeout)
defer cancel()
return registry.Shutdown(c)
}
}
// Shutdown gracefully shuts down the registry's HTTP server and application object.
func (registry *Registry) Shutdown(ctx context.Context) error {
err := registry.server.Shutdown(ctx)
if appErr := registry.app.Shutdown(); appErr != nil {
err = errors.Join(err, appErr)
}
return err
}
func configureDebugServer(config *configuration.Configuration) {
if config.HTTP.Debug.Addr != "" {
go func(addr string) {
logrus.Infof("debug server listening %v", addr)
if err := http.ListenAndServe(addr, nil); err != nil {
logrus.Fatalf("error listening on debug interface: %v", err)
}
}(config.HTTP.Debug.Addr)
configurePrometheus(config)
}
}
func configurePrometheus(config *configuration.Configuration) {
if config.HTTP.Debug.Prometheus.Enabled {
path := config.HTTP.Debug.Prometheus.Path
if path == "" {
path = "/metrics"
}
logrus.Info("providing prometheus metrics on ", path)
http.Handle(path, metrics.Handler())
}
}
// configureLogging prepares the context with a logger using the
// configuration.
func configureLogging(ctx context.Context, config *configuration.Configuration) (context.Context, error) {
logrus.SetLevel(logLevel(config.Log.Level))
logrus.SetReportCaller(config.Log.ReportCaller)
formatter := config.Log.Formatter
if formatter == "" {
formatter = defaultLogFormatter
}
switch formatter {
case "json":
logrus.SetFormatter(&logrus.JSONFormatter{
TimestampFormat: time.RFC3339Nano,
DisableHTMLEscape: true,
})
case "text":
logrus.SetFormatter(&logrus.TextFormatter{
TimestampFormat: time.RFC3339Nano,
})
case "logstash":
logrus.SetFormatter(&logstash.LogstashFormatter{
Formatter: &logrus.JSONFormatter{TimestampFormat: time.RFC3339Nano},
})
default:
return ctx, fmt.Errorf("unsupported logging formatter: %q", formatter)
}
logrus.Debugf("using %q logging formatter", formatter)
if len(config.Log.Fields) > 0 {
// build up the static fields, if present.
var fields []interface{}
for k := range config.Log.Fields {
fields = append(fields, k)
}
ctx = dcontext.WithValues(ctx, config.Log.Fields)
ctx = dcontext.WithLogger(ctx, dcontext.GetLogger(ctx, fields...))
}
dcontext.SetDefaultLogger(dcontext.GetLogger(ctx))
return ctx, nil
}
func logLevel(level configuration.Loglevel) logrus.Level {
l, err := logrus.ParseLevel(string(level))
if err != nil {
l = logrus.InfoLevel
logrus.Warnf("error parsing level %q: %v, using %q ", level, err, l)
}
return l
}
// panicHandler add an HTTP handler to web app. The handler recover the happening
// panic. logrus.Panic transmits panic message to pre-config log hooks, which is
// defined in config.yml.
func panicHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
logrus.Panic(fmt.Sprintf("%v", err))
}
}()
handler.ServeHTTP(w, r)
})
}
// alive simply wraps the handler with a route that always returns an http 200
// response when the path is matched. If the path is not matched, the request
// is passed to the provided handler. There is no guarantee of anything but
// that the server is up. Wrap with other handlers (such as health.Handler)
// for greater affect.
func alive(path string, handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == path {
w.Header().Set("Cache-Control", "no-cache")
w.WriteHeader(http.StatusOK)
return
}
handler.ServeHTTP(w, r)
})
}
func resolveConfiguration(args []string) (*configuration.Configuration, error) {
var configurationPath string
if len(args) > 0 {
configurationPath = args[0]
} else if os.Getenv("REGISTRY_CONFIGURATION_PATH") != "" {
configurationPath = os.Getenv("REGISTRY_CONFIGURATION_PATH")
}
if configurationPath == "" {
return nil, fmt.Errorf("configuration path unspecified")
}
fp, err := os.Open(configurationPath)
if err != nil {
return nil, err
}
defer fp.Close()
config, err := configuration.Parse(fp)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %v", configurationPath, err)
}
return config, nil
}
func nextProtos(config *configuration.Configuration) []string {
switch config.HTTP.HTTP2.Disabled {
case true:
return []string{"http/1.1"}
default:
return []string{"h2", "http/1.1"}
}
}
package registry
import (
"fmt"
"os"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/storage"
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
"github.com/distribution/distribution/v3/version"
"github.com/spf13/cobra"
)
var showVersion bool
func init() {
RootCmd.AddCommand(ServeCmd)
RootCmd.AddCommand(GCCmd)
GCCmd.Flags().BoolVarP(&dryRun, "dry-run", "d", false, "do everything except remove the blobs")
GCCmd.Flags().BoolVarP(&removeUntagged, "delete-untagged", "m", false, "delete manifests that are not currently referenced via tag")
GCCmd.Flags().BoolVarP(&quiet, "quiet", "q", false, "silence output")
RootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "show the version and exit")
}
// RootCmd is the main command for the 'registry' binary.
var RootCmd = &cobra.Command{
Use: "registry",
Short: "`registry`",
Long: "`registry`",
Run: func(cmd *cobra.Command, args []string) {
if showVersion {
version.PrintVersion()
return
}
// nolint:errcheck
cmd.Usage()
},
}
var (
dryRun bool
removeUntagged bool
quiet bool
)
// GCCmd is the cobra command that corresponds to the garbage-collect subcommand
var GCCmd = &cobra.Command{
Use: "garbage-collect <config>",
Short: "`garbage-collect` deletes layers not referenced by any manifests",
Long: "`garbage-collect` deletes layers not referenced by any manifests",
Run: func(cmd *cobra.Command, args []string) {
config, err := resolveConfiguration(args)
if err != nil {
fmt.Fprintf(os.Stderr, "configuration error: %v\n", err)
// nolint:errcheck
cmd.Usage()
os.Exit(1)
}
ctx := dcontext.Background()
ctx, err = configureLogging(ctx, config)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to configure logging with config: %s", err)
os.Exit(1)
}
driver, err := factory.Create(ctx, config.Storage.Type(), config.Storage.Parameters())
if err != nil {
fmt.Fprintf(os.Stderr, "failed to construct %s driver: %v", config.Storage.Type(), err)
os.Exit(1)
}
registry, err := storage.NewRegistry(ctx, driver)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to construct registry: %v", err)
os.Exit(1)
}
err = storage.MarkAndSweep(ctx, driver, registry, storage.GCOpts{
DryRun: dryRun,
RemoveUntagged: removeUntagged,
Quiet: quiet,
})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to garbage collect: %v", err)
os.Exit(1)
}
},
}
package storage
import (
"context"
"fmt"
"net/http"
"time"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/opencontainers/go-digest"
)
// TODO(stevvooe): This should configurable in the future.
const blobCacheControlMaxAge = 365 * 24 * time.Hour
// blobServer simply serves blobs from a driver instance using a path function
// to identify paths and a descriptor service to fill in metadata.
type blobServer struct {
driver driver.StorageDriver
statter distribution.BlobStatter
pathFn func(dgst digest.Digest) (string, error)
redirect bool // allows disabling RedirectURL redirects
}
func (bs *blobServer) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
desc, err := bs.statter.Stat(ctx, dgst)
if err != nil {
return err
}
path, err := bs.pathFn(desc.Digest)
if err != nil {
return err
}
if bs.redirect {
redirectURL, err := bs.driver.RedirectURL(r, path)
if err != nil {
return err
}
if redirectURL != "" {
// Redirect to storage URL.
http.Redirect(w, r, redirectURL, http.StatusTemporaryRedirect)
return nil
}
// Fallback to serving the content directly.
}
br, err := newFileReader(ctx, bs.driver, path, desc.Size)
if err != nil {
return err
}
defer br.Close()
w.Header().Set("ETag", fmt.Sprintf(`"%s"`, desc.Digest)) // If-None-Match handled by ServeContent
w.Header().Set("Cache-Control", fmt.Sprintf("max-age=%.f", blobCacheControlMaxAge.Seconds()))
if w.Header().Get("Docker-Content-Digest") == "" {
w.Header().Set("Docker-Content-Digest", desc.Digest.String())
}
if w.Header().Get("Content-Type") == "" {
// Set the content type if not already set.
w.Header().Set("Content-Type", desc.MediaType)
}
if w.Header().Get("Content-Length") == "" {
// Set the content length if not already set.
w.Header().Set("Content-Length", fmt.Sprint(desc.Size))
}
http.ServeContent(w, r, desc.Digest.String(), time.Time{}, br)
return nil
}
package storage
import (
"context"
"io"
"path"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)
// blobStore implements the read side of the blob store interface over a
// driver without enforcing per-repository membership. This object is
// intentionally a leaky abstraction, providing utility methods that support
// creating and traversing backend links.
type blobStore struct {
driver driver.StorageDriver
statter distribution.BlobStatter
}
var _ distribution.BlobProvider = &blobStore{}
// Get implements the BlobProvider.Get call.
func (bs *blobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
bp, err := bs.path(dgst)
if err != nil {
return nil, err
}
p, err := getContent(ctx, bs.driver, bp)
if err != nil {
switch err.(type) {
case driver.PathNotFoundError:
return nil, distribution.ErrBlobUnknown
}
return nil, err
}
return p, nil
}
func (bs *blobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) {
desc, err := bs.statter.Stat(ctx, dgst)
if err != nil {
return nil, err
}
path, err := bs.path(desc.Digest)
if err != nil {
return nil, err
}
return newFileReader(ctx, bs.driver, path, desc.Size)
}
// Put stores the content p in the blob store, calculating the digest. If the
// content is already present, only the digest will be returned. This should
// only be used for small objects, such as manifests. This implemented as a convenience for other Put implementations
func (bs *blobStore) Put(ctx context.Context, mediaType string, p []byte) (v1.Descriptor, error) {
dgst := digest.FromBytes(p)
desc, err := bs.statter.Stat(ctx, dgst)
if err == nil {
// content already present
return desc, nil
} else if err != distribution.ErrBlobUnknown {
dcontext.GetLogger(ctx).Errorf("blobStore: error stating content (%v): %v", dgst, err)
// real error, return it
return v1.Descriptor{}, err
}
bp, err := bs.path(dgst)
if err != nil {
return v1.Descriptor{}, err
}
// TODO(stevvooe): Write out mediatype here, as well.
return v1.Descriptor{
Size: int64(len(p)),
// NOTE(stevvooe): The central blob store firewalls media types from
// other users. The caller should look this up and override the value
// for the specific repository.
MediaType: "application/octet-stream",
Digest: dgst,
}, bs.driver.PutContent(ctx, bp, p)
}
func (bs *blobStore) Enumerate(ctx context.Context, ingester func(dgst digest.Digest) error) error {
specPath, err := pathFor(blobsPathSpec{})
if err != nil {
return err
}
return bs.driver.Walk(ctx, specPath, func(fileInfo driver.FileInfo) error {
// skip directories
if fileInfo.IsDir() {
return nil
}
currentPath := fileInfo.Path()
// we only want to parse paths that end with /data
_, fileName := path.Split(currentPath)
if fileName != "data" {
return nil
}
digest, err := digestFromPath(currentPath)
if err != nil {
return err
}
return ingester(digest)
})
}
// path returns the canonical path for the blob identified by digest. The blob
// may or may not exist.
func (bs *blobStore) path(dgst digest.Digest) (string, error) {
bp, err := pathFor(blobDataPathSpec{
digest: dgst,
})
if err != nil {
return "", err
}
return bp, nil
}
// link links the path to the provided digest by writing the digest into the
// target file. Caller must ensure that the blob actually exists.
func (bs *blobStore) link(ctx context.Context, path string, dgst digest.Digest) error {
// The contents of the "link" file are the exact string contents of the
// digest, which is specified in that package.
return bs.driver.PutContent(ctx, path, []byte(dgst))
}
// readlink returns the linked digest at path.
func (bs *blobStore) readlink(ctx context.Context, path string) (digest.Digest, error) {
content, err := bs.driver.GetContent(ctx, path)
if err != nil {
return "", err
}
linked, err := digest.Parse(string(content))
if err != nil {
return "", err
}
return linked, nil
}
type blobStatter struct {
driver driver.StorageDriver
}
var _ distribution.BlobDescriptorService = &blobStatter{}
// Stat implements BlobStatter.Stat by returning the descriptor for the blob
// in the main blob store. If this method returns successfully, there is
// strong guarantee that the blob exists and is available.
func (bs *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (v1.Descriptor, error) {
path, err := pathFor(blobDataPathSpec{
digest: dgst,
})
if err != nil {
return v1.Descriptor{}, err
}
fi, err := bs.driver.Stat(ctx, path)
if err != nil {
switch err := err.(type) {
case driver.PathNotFoundError:
return v1.Descriptor{}, distribution.ErrBlobUnknown
default:
return v1.Descriptor{}, err
}
}
if fi.IsDir() {
// NOTE(stevvooe): This represents a corruption situation. Somehow, we
// calculated a blob path and then detected a directory. We log the
// error and then error on the side of not knowing about the blob.
dcontext.GetLogger(ctx).Warnf("blob path should not be a directory: %q", path)
return v1.Descriptor{}, distribution.ErrBlobUnknown
}
// TODO(stevvooe): Add method to resolve the mediatype. We can store and
// cache a "global" media type for the blob, even if a specific repo has a
// mediatype that overrides the main one.
return v1.Descriptor{
Size: fi.Size(),
// NOTE(stevvooe): The central blob store firewalls media types from
// other users. The caller should look this up and override the value
// for the specific repository.
MediaType: "application/octet-stream",
Digest: dgst,
}, nil
}
func (bs *blobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
return distribution.ErrUnsupported
}
func (bs *blobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc v1.Descriptor) error {
return distribution.ErrUnsupported
}
package storage
import (
"context"
"errors"
"fmt"
"io"
"path"
"time"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)
var errResumableDigestNotAvailable = errors.New("resumable digest not available")
const (
// digestSha256Empty is the canonical sha256 digest of empty data
digestSha256Empty = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
)
// blobWriter is used to control the various aspects of resumable
// blob upload.
type blobWriter struct {
ctx context.Context
blobStore *linkedBlobStore
id string
startedAt time.Time
digester digest.Digester
written int64 // track the write to digester
fileWriter storagedriver.FileWriter
driver storagedriver.StorageDriver
path string
resumableDigestEnabled bool
committed bool
}
var _ distribution.BlobWriter = &blobWriter{}
// ID returns the identifier for this upload.
func (bw *blobWriter) ID() string {
return bw.id
}
func (bw *blobWriter) StartedAt() time.Time {
return bw.startedAt
}
// Commit marks the upload as completed, returning a valid descriptor. The
// final size and digest are checked against the first descriptor provided.
func (bw *blobWriter) Commit(ctx context.Context, desc v1.Descriptor) (v1.Descriptor, error) {
dcontext.GetLogger(ctx).Debug("(*blobWriter).Commit")
if err := bw.fileWriter.Commit(ctx); err != nil {
return v1.Descriptor{}, err
}
bw.Close()
desc.Size = bw.Size()
canonical, err := bw.validateBlob(ctx, desc)
if err != nil {
return v1.Descriptor{}, err
}
if err := bw.moveBlob(ctx, canonical); err != nil {
return v1.Descriptor{}, err
}
if err := bw.blobStore.linkBlob(ctx, canonical, desc.Digest); err != nil {
return v1.Descriptor{}, err
}
if err := bw.removeResources(ctx); err != nil {
return v1.Descriptor{}, err
}
err = bw.blobStore.blobAccessController.SetDescriptor(ctx, canonical.Digest, canonical)
if err != nil {
return v1.Descriptor{}, err
}
bw.committed = true
return canonical, nil
}
// Cancel the blob upload process, releasing any resources associated with
// the writer and canceling the operation.
func (bw *blobWriter) Cancel(ctx context.Context) error {
dcontext.GetLogger(ctx).Debug("(*blobWriter).Cancel")
if err := bw.fileWriter.Cancel(ctx); err != nil {
return err
}
if err := bw.Close(); err != nil {
dcontext.GetLogger(ctx).Errorf("error closing blobwriter: %s", err)
}
return bw.removeResources(ctx)
}
func (bw *blobWriter) Size() int64 {
return bw.fileWriter.Size()
}
func (bw *blobWriter) Write(p []byte) (int, error) {
// Ensure that the current write offset matches how many bytes have been
// written to the digester. If not, we need to update the digest state to
// match the current write position.
if err := bw.resumeDigest(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable {
return 0, err
}
_, err := bw.fileWriter.Write(p)
if err != nil {
return 0, err
}
n, err := bw.digester.Hash().Write(p)
bw.written += int64(n)
return n, err
}
func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
// Ensure that the current write offset matches how many bytes have been
// written to the digester. If not, we need to update the digest state to
// match the current write position.
if err := bw.resumeDigest(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable {
return 0, err
}
// Using a TeeReader instead of MultiWriter ensures Copy returns
// the amount written to the digester as well as ensuring that we
// write to the fileWriter first
tee := io.TeeReader(r, bw.fileWriter)
nn, err := io.Copy(bw.digester.Hash(), tee)
bw.written += nn
return nn, err
}
func (bw *blobWriter) Close() error {
if bw.committed {
return errors.New("blobwriter close after commit")
}
if err := bw.storeHashState(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable {
return err
}
return bw.fileWriter.Close()
}
// validateBlob checks the data against the digest, returning an error if it
// does not match. The canonical descriptor is returned.
func (bw *blobWriter) validateBlob(ctx context.Context, desc v1.Descriptor) (v1.Descriptor, error) {
var (
verified, fullHash bool
canonical digest.Digest
)
if desc.Digest == "" {
// if no descriptors are provided, we have nothing to validate
// against. We don't really want to support this for the registry.
return v1.Descriptor{}, distribution.ErrBlobInvalidDigest{
Reason: fmt.Errorf("cannot validate against empty digest"),
}
}
var size int64
// Stat the on disk file
if fi, err := bw.driver.Stat(ctx, bw.path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): We really don't care if the file is
// not actually present for the reader. We now assume
// that the desc length is zero.
desc.Size = 0
default:
// Any other error we want propagated up the stack.
return v1.Descriptor{}, err
}
} else {
if fi.IsDir() {
return v1.Descriptor{}, fmt.Errorf("unexpected directory at upload location %q", bw.path)
}
size = fi.Size()
}
if desc.Size > 0 {
if desc.Size != size {
return v1.Descriptor{}, distribution.ErrBlobInvalidLength
}
} else {
// if provided 0 or negative length, we can assume caller doesn't know or
// care about length.
desc.Size = size
}
// TODO(stevvooe): This section is very meandering. Need to be broken down
// to be a lot more clear.
if err := bw.resumeDigest(ctx); err == nil {
canonical = bw.digester.Digest()
if canonical.Algorithm() == desc.Digest.Algorithm() {
// Common case: client and server prefer the same canonical digest
// algorithm - currently SHA256.
verified = desc.Digest == canonical
} else {
// The client wants to use a different digest algorithm. They'll just
// have to be patient and wait for us to download and re-hash the
// uploaded content using that digest algorithm.
fullHash = true
}
} else if err == errResumableDigestNotAvailable {
// Not using resumable digests, so we need to hash the entire layer.
fullHash = true
} else {
return v1.Descriptor{}, err
}
if fullHash {
// a fantastic optimization: if the written data and the size are
// the same, we don't need to read the data from the backend. This is
// because we've written the entire file in the lifecycle of the
// current instance.
if bw.written == size && digest.Canonical == desc.Digest.Algorithm() {
canonical = bw.digester.Digest()
verified = desc.Digest == canonical
}
// If the check based on size fails, we fall back to the slowest of
// paths. We may be able to make the size-based check a stronger
// guarantee, so this may be defensive.
if !verified {
digester := digest.Canonical.Digester()
verifier := desc.Digest.Verifier()
// Read the file from the backend driver and validate it.
fr, err := newFileReader(ctx, bw.driver, bw.path, desc.Size)
if err != nil {
return v1.Descriptor{}, err
}
defer fr.Close()
tr := io.TeeReader(fr, digester.Hash())
if _, err := io.Copy(verifier, tr); err != nil {
return v1.Descriptor{}, err
}
canonical = digester.Digest()
verified = verifier.Verified()
}
}
if !verified {
dcontext.GetLoggerWithFields(ctx,
map[interface{}]interface{}{
"canonical": canonical,
"provided": desc.Digest,
}, "canonical", "provided").
Errorf("canonical digest does match provided digest")
return v1.Descriptor{}, distribution.ErrBlobInvalidDigest{
Digest: desc.Digest,
Reason: fmt.Errorf("content does not match digest"),
}
}
// update desc with canonical hash
desc.Digest = canonical
if desc.MediaType == "" {
desc.MediaType = "application/octet-stream"
}
return desc, nil
}
// moveBlob moves the data into its final, hash-qualified destination,
// identified by dgst. The layer should be validated before commencing the
// move.
func (bw *blobWriter) moveBlob(ctx context.Context, desc v1.Descriptor) error {
blobPath, err := pathFor(blobDataPathSpec{
digest: desc.Digest,
})
if err != nil {
return err
}
// Check for existence
if _, err := bw.blobStore.driver.Stat(ctx, blobPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist.
default:
return err
}
} else {
// If the path exists, we can assume that the content has already
// been uploaded, since the blob storage is content-addressable.
// While it may be corrupted, detection of such corruption belongs
// elsewhere.
return nil
}
// If no data was received, we may not actually have a file on disk. Check
// the size here and write a zero-length file to blobPath if this is the
// case. For the most part, this should only ever happen with zero-length
// blobs.
if _, err := bw.blobStore.driver.Stat(ctx, bw.path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// HACK(stevvooe): This is slightly dangerous: if we verify above,
// get a hash, then the underlying file is deleted, we risk moving
// a zero-length blob into a nonzero-length blob location. To
// prevent this horrid thing, we employ the hack of only allowing
// to this happen for the digest of an empty blob.
if desc.Digest == digestSha256Empty {
return bw.blobStore.driver.PutContent(ctx, blobPath, []byte{})
}
// We let this fail during the move below.
logrus.
WithField("upload.id", bw.ID()).
WithField("digest", desc.Digest).Warnf("attempted to move zero-length content with non-zero digest")
default:
return err // unrelated error
}
}
// TODO(stevvooe): We should also write the mediatype when executing this move.
return bw.blobStore.driver.Move(ctx, bw.path, blobPath)
}
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (bw *blobWriter) removeResources(ctx context.Context) error {
dataPath, err := pathFor(uploadDataPathSpec{
name: bw.blobStore.repository.Named().Name(),
id: bw.id,
})
if err != nil {
return err
}
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!
default:
// This should be uncommon enough such that returning an error
// should be okay. At this point, the upload should be mostly
// complete, but perhaps the backend became unaccessible.
dcontext.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
return err
}
}
return nil
}
func (bw *blobWriter) Reader() (io.ReadCloser, error) {
// todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4
try := 1
for try <= 5 {
_, err := bw.driver.Stat(bw.ctx, bw.path)
if err == nil {
break
}
switch err.(type) {
case storagedriver.PathNotFoundError:
dcontext.GetLogger(bw.ctx).Debugf("Nothing found on try %d, sleeping...", try)
time.Sleep(1 * time.Second)
try++
default:
return nil, err
}
}
readCloser, err := bw.driver.Reader(bw.ctx, bw.path, 0)
if err != nil {
return nil, err
}
return readCloser, nil
}
//go:build !noresumabledigest
// +build !noresumabledigest
package storage
import (
"context"
"encoding"
"fmt"
"hash"
"path"
"strconv"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/sirupsen/logrus"
)
// resumeDigest attempts to restore the state of the internal hash function
// by loading the most recent saved hash state equal to the current size of the blob.
func (bw *blobWriter) resumeDigest(ctx context.Context) error {
if !bw.resumableDigestEnabled {
return errResumableDigestNotAvailable
}
h, ok := bw.digester.Hash().(encoding.BinaryUnmarshaler)
if !ok {
return errResumableDigestNotAvailable
}
offset := bw.fileWriter.Size()
if offset == bw.written {
// State of digester is already at the requested offset.
return nil
}
// List hash states from storage backend.
var hashStateMatch hashStateEntry
hashStates, err := bw.getStoredHashStates(ctx)
if err != nil {
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
}
// Find the highest stored hashState with offset equal to
// the requested offset.
for _, hashState := range hashStates {
if hashState.offset == offset {
hashStateMatch = hashState
break // Found an exact offset match.
}
}
if hashStateMatch.offset == 0 {
// No need to load any state, just reset the hasher.
h.(hash.Hash).Reset()
} else {
storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
if err != nil {
return err
}
if err = h.UnmarshalBinary(storedState); err != nil {
return err
}
bw.written = hashStateMatch.offset
}
// Mind the gap.
if gapLen := offset - bw.written; gapLen > 0 {
return errResumableDigestNotAvailable
}
return nil
}
type hashStateEntry struct {
offset int64
path string
}
// getStoredHashStates returns a slice of hashStateEntries for this upload.
func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
uploadHashStatePathPrefix, err := pathFor(uploadHashStatePathSpec{
name: bw.blobStore.repository.Named().String(),
id: bw.id,
alg: bw.digester.Digest().Algorithm(),
list: true,
})
if err != nil {
return nil, err
}
paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
if err != nil {
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
return nil, err
}
// Treat PathNotFoundError as no entries.
paths = nil
}
hashStateEntries := make([]hashStateEntry, 0, len(paths))
for _, p := range paths {
pathSuffix := path.Base(p)
// The suffix should be the offset.
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
if err != nil {
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
}
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
}
return hashStateEntries, nil
}
func (bw *blobWriter) storeHashState(ctx context.Context) error {
if !bw.resumableDigestEnabled {
return errResumableDigestNotAvailable
}
h, ok := bw.digester.Hash().(encoding.BinaryMarshaler)
if !ok {
return errResumableDigestNotAvailable
}
state, err := h.MarshalBinary()
if err != nil {
return err
}
uploadHashStatePath, err := pathFor(uploadHashStatePathSpec{
name: bw.blobStore.repository.Named().String(),
id: bw.id,
alg: bw.digester.Digest().Algorithm(),
offset: bw.written,
})
if err != nil {
return err
}
return bw.driver.PutContent(ctx, uploadHashStatePath, state)
}
package storage
import (
"context"
"errors"
"io"
"path"
"strings"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
)
// Returns a list, or partial list, of repositories in the registry.
// Because it's a quite expensive operation, it should only be used when building up
// an initial set of repositories.
func (reg *registry) Repositories(ctx context.Context, repos []string, last string) (int, error) {
filledBuffer := false
foundRepos := 0
if len(repos) == 0 {
return 0, errors.New("Attempted to list 0 repositories")
}
root, err := pathFor(repositoriesRootPathSpec{})
if err != nil {
return 0, err
}
startAfter := ""
if last != "" {
startAfter, err = pathFor(manifestsPathSpec{name: last})
if err != nil {
return 0, err
}
}
err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error {
err := handleRepository(fileInfo, root, last, func(repoPath string) error {
repos[foundRepos] = repoPath
foundRepos += 1
return nil
})
if err != nil {
return err
}
// if we've filled our slice, no need to walk any further
if foundRepos == len(repos) {
filledBuffer = true
return driver.ErrFilledBuffer
}
return nil
}, driver.WithStartAfterHint(startAfter))
if err != nil {
return foundRepos, err
}
if filledBuffer {
// There are potentially more repositories to list
return foundRepos, nil
}
// We didn't fill the buffer, so that's the end of the list of repos
return foundRepos, io.EOF
}
// Enumerate applies ingester to each repository
func (reg *registry) Enumerate(ctx context.Context, ingester func(string) error) error {
root, err := pathFor(repositoriesRootPathSpec{})
if err != nil {
return err
}
err = reg.blobStore.driver.Walk(ctx, root, func(fileInfo driver.FileInfo) error {
return handleRepository(fileInfo, root, "", ingester)
})
return err
}
// Remove removes a repository from storage
func (reg *registry) Remove(ctx context.Context, name reference.Named) error {
root, err := pathFor(repositoriesRootPathSpec{})
if err != nil {
return err
}
repoDir := path.Join(root, name.Name())
return reg.driver.Delete(ctx, repoDir)
}
// lessPath returns true if one path a is less than path b.
//
// A component-wise comparison is done, rather than the lexical comparison of
// strings.
func lessPath(a, b string) bool {
// we provide this behavior by making separator always sort first.
return compareReplaceInline(a, b, '/', '\x00') < 0
}
// compareReplaceInline modifies runtime.cmpstring to replace old with new
// during a byte-wise comparison.
func compareReplaceInline(s1, s2 string, old, new byte) int {
// TODO(stevvooe): We are missing an optimization when the s1 and s2 have
// the exact same slice header. It will make the code unsafe but can
// provide some extra performance.
l := len(s1)
if len(s2) < l {
l = len(s2)
}
for i := 0; i < l; i++ {
c1, c2 := s1[i], s2[i]
if c1 == old {
c1 = new
}
if c2 == old {
c2 = new
}
if c1 < c2 {
return -1
}
if c1 > c2 {
return +1
}
}
if len(s1) < len(s2) {
return -1
}
if len(s1) > len(s2) {
return +1
}
return 0
}
// handleRepository calls function fn with a repository path if fileInfo
// has a path of a repository under root and that it is lexographically
// after last. Otherwise, it will return ErrSkipDir or ErrFilledBuffer.
// These should be used with Walk to do handling with repositories in a
// storage.
func handleRepository(fileInfo driver.FileInfo, root, last string, fn func(repoPath string) error) error {
filePath := fileInfo.Path()
// lop the base path off
repo := filePath[len(root)+1:]
_, file := path.Split(repo)
if file == "_manifests" {
repo = strings.TrimSuffix(repo, "/_manifests")
if lessPath(last, repo) {
if err := fn(repo); err != nil {
return err
}
}
return driver.ErrSkipDir
} else if strings.HasPrefix(file, "_") {
return driver.ErrSkipDir
}
return nil
}
package filesystem
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"path"
"time"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/base"
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
"github.com/google/uuid"
)
const (
driverName = "filesystem"
defaultRootDirectory = "/var/lib/registry"
defaultMaxThreads = uint64(100)
// minThreads is the minimum value for the maxthreads configuration
// parameter. If the driver's parameters are less than this we set
// the parameters to minThreads
minThreads = uint64(25)
)
// DriverParameters represents all configuration options available for the
// filesystem driver
type DriverParameters struct {
RootDirectory string
MaxThreads uint64
}
func init() {
factory.Register(driverName, &filesystemDriverFactory{})
}
// filesystemDriverFactory implements the factory.StorageDriverFactory interface
type filesystemDriverFactory struct{}
func (factory *filesystemDriverFactory) Create(ctx context.Context, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
type driver struct {
rootDirectory string
}
type baseEmbed struct {
base.Base
}
// Driver is a storagedriver.StorageDriver implementation backed by a local
// filesystem. All provided paths will be subpaths of the RootDirectory.
type Driver struct {
baseEmbed
}
// FromParameters constructs a new Driver with a given parameters map
// Optional Parameters:
// - rootdirectory
// - maxthreads
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
params, err := fromParametersImpl(parameters)
if err != nil || params == nil {
return nil, err
}
return New(*params), nil
}
func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) {
var (
err error
maxThreads = defaultMaxThreads
rootDirectory = defaultRootDirectory
)
if parameters != nil {
if rootDir, ok := parameters["rootdirectory"]; ok {
rootDirectory = fmt.Sprint(rootDir)
}
maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads)
if err != nil {
return nil, fmt.Errorf("maxthreads config error: %s", err.Error())
}
}
params := &DriverParameters{
RootDirectory: rootDirectory,
MaxThreads: maxThreads,
}
return params, nil
}
// New constructs a new Driver with a given rootDirectory
func New(params DriverParameters) *Driver {
fsDriver := &driver{rootDirectory: params.RootDirectory}
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads),
},
},
}
}
// Implement the storagedriver.StorageDriver interface
func (d *driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
rc, err := d.Reader(ctx, path, 0)
if err != nil {
return nil, err
}
defer rc.Close()
p, err := io.ReadAll(rc)
if err != nil {
return nil, err
}
return p, nil
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error {
tempPath := fmt.Sprintf("%s.%s.tmp", subPath, uuid.NewString())
// Write to a temporary file to prevent partial writes.
writer, err := d.Writer(ctx, tempPath, false)
if err != nil {
return err
}
defer writer.Close()
_, err = io.Copy(writer, bytes.NewReader(contents))
if err != nil {
if cErr := writer.Cancel(ctx); cErr != nil {
return errors.Join(err, cErr)
}
// Attempt to clean up the temporary file on error.
dErr := d.Delete(ctx, tempPath)
return errors.Join(err, dErr)
}
if err := writer.Commit(ctx); err != nil {
return err
}
// Atomically replace the target file with the temporary file.
if err := d.Move(ctx, tempPath, subPath); err != nil {
// Clean up the temporary file if rename fails.
dErr := d.Delete(ctx, tempPath)
return errors.Join(err, dErr)
}
return nil
}
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0o644)
if err != nil {
if os.IsNotExist(err) {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return nil, err
}
seekPos, err := file.Seek(offset, io.SeekStart)
if err != nil {
file.Close()
return nil, err
} else if seekPos < offset {
file.Close()
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
return file, nil
}
func (d *driver) Writer(ctx context.Context, subPath string, append bool) (storagedriver.FileWriter, error) {
fullPath := d.fullPath(subPath)
parentDir := path.Dir(fullPath)
if err := os.MkdirAll(parentDir, 0o777); err != nil {
return nil, err
}
fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0o666)
if err != nil {
return nil, err
}
var offset int64
if !append {
err := fp.Truncate(0)
if err != nil {
fp.Close()
return nil, err
}
} else {
n, err := fp.Seek(0, io.SeekEnd)
if err != nil {
fp.Close()
return nil, err
}
offset = n
}
return newFileWriter(fp, offset), nil
}
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, subPath string) (storagedriver.FileInfo, error) {
fullPath := d.fullPath(subPath)
fi, err := os.Stat(fullPath)
if err != nil {
if os.IsNotExist(err) {
return nil, storagedriver.PathNotFoundError{Path: subPath}
}
return nil, err
}
return fileInfo{
path: subPath,
FileInfo: fi,
}, nil
}
// List returns a list of the objects that are direct descendants of the given
// path.
func (d *driver) List(ctx context.Context, subPath string) ([]string, error) {
fullPath := d.fullPath(subPath)
dir, err := os.Open(fullPath)
if err != nil {
if os.IsNotExist(err) {
return nil, storagedriver.PathNotFoundError{Path: subPath}
}
return nil, err
}
defer dir.Close()
fileNames, err := dir.Readdirnames(0)
if err != nil {
return nil, err
}
keys := make([]string, 0, len(fileNames))
for _, fileName := range fileNames {
keys = append(keys, path.Join(subPath, fileName))
}
return keys, nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
source := d.fullPath(sourcePath)
dest := d.fullPath(destPath)
if _, err := os.Stat(source); os.IsNotExist(err) {
return storagedriver.PathNotFoundError{Path: sourcePath}
}
if err := os.MkdirAll(path.Dir(dest), 0o777); err != nil {
return err
}
err := os.Rename(source, dest)
return err
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(ctx context.Context, subPath string) error {
fullPath := d.fullPath(subPath)
_, err := os.Stat(fullPath)
if err != nil && !os.IsNotExist(err) {
return err
} else if err != nil {
return storagedriver.PathNotFoundError{Path: subPath}
}
err = os.RemoveAll(fullPath)
return err
}
// RedirectURL returns a URL which may be used to retrieve the content stored at the given path.
func (d *driver) RedirectURL(*http.Request, string) (string, error) {
return "", nil
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
return storagedriver.WalkFallback(ctx, d, path, f, options...)
}
// fullPath returns the absolute path of a key within the Driver's storage.
func (d *driver) fullPath(subPath string) string {
return path.Join(d.rootDirectory, subPath)
}
type fileInfo struct {
os.FileInfo
path string
}
var _ storagedriver.FileInfo = fileInfo{}
// Path provides the full path of the target of this file info.
func (fi fileInfo) Path() string {
return fi.path
}
// Size returns current length in bytes of the file. The return value can
// be used to write to the end of the file at path. The value is
// meaningless if IsDir returns true.
func (fi fileInfo) Size() int64 {
if fi.IsDir() {
return 0
}
return fi.FileInfo.Size()
}
// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
func (fi fileInfo) ModTime() time.Time {
return fi.FileInfo.ModTime()
}
// IsDir returns true if the path is a directory.
func (fi fileInfo) IsDir() bool {
return fi.FileInfo.IsDir()
}
type fileWriter struct {
file *os.File
size int64
bw *bufio.Writer
closed bool
committed bool
cancelled bool
}
func newFileWriter(file *os.File, size int64) *fileWriter {
return &fileWriter{
file: file,
size: size,
bw: bufio.NewWriter(file),
}
}
func (fw *fileWriter) Write(p []byte) (int, error) {
if fw.closed {
return 0, fmt.Errorf("already closed")
} else if fw.committed {
return 0, fmt.Errorf("already committed")
} else if fw.cancelled {
return 0, fmt.Errorf("already cancelled")
}
n, err := fw.bw.Write(p)
fw.size += int64(n)
return n, err
}
func (fw *fileWriter) Size() int64 {
return fw.size
}
func (fw *fileWriter) Close() error {
if fw.closed {
return fmt.Errorf("already closed")
}
if err := fw.bw.Flush(); err != nil {
return err
}
if err := fw.file.Sync(); err != nil {
return err
}
if err := fw.file.Close(); err != nil {
return err
}
fw.closed = true
return nil
}
func (fw *fileWriter) Cancel(ctx context.Context) error {
if fw.closed {
return fmt.Errorf("already closed")
}
fw.cancelled = true
fw.file.Close()
return os.Remove(fw.file.Name())
}
func (fw *fileWriter) Commit(ctx context.Context) error {
if fw.closed {
return fmt.Errorf("already closed")
} else if fw.committed {
return fmt.Errorf("already committed")
} else if fw.cancelled {
return fmt.Errorf("already cancelled")
}
if err := fw.bw.Flush(); err != nil {
return err
}
if err := fw.file.Sync(); err != nil {
return err
}
fw.committed = true
return nil
}
package inmemory
import (
"context"
"fmt"
"io"
"net/http"
"sync"
"time"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/base"
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
)
const driverName = "inmemory"
func init() {
factory.Register(driverName, &inMemoryDriverFactory{})
}
// inMemoryDriverFacotry implements the factory.StorageDriverFactory interface.
type inMemoryDriverFactory struct{}
func (factory *inMemoryDriverFactory) Create(ctx context.Context, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return New(), nil
}
type driver struct {
root *dir
mutex sync.RWMutex
}
// baseEmbed allows us to hide the Base embed.
type baseEmbed struct {
base.Base
}
// Driver is a storagedriver.StorageDriver implementation backed by a local map.
// Intended solely for example and testing purposes.
type Driver struct {
baseEmbed // embedded, hidden base driver.
}
var _ storagedriver.StorageDriver = &Driver{}
// New constructs a new Driver.
func New() *Driver {
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: &driver{
root: &dir{
common: common{
p: "/",
mod: time.Now(),
},
},
},
},
},
}
}
// Implement the storagedriver.StorageDriver interface.
func (d *driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
rc, err := d.reader(ctx, path, 0)
if err != nil {
return nil, err
}
defer rc.Close()
return io.ReadAll(rc)
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, p string, contents []byte) error {
d.mutex.Lock()
defer d.mutex.Unlock()
normalized := normalize(p)
f, err := d.root.mkfile(normalized)
if err != nil {
// TODO(stevvooe): Again, we need to clarify when this is not a
// directory in StorageDriver API.
return fmt.Errorf("not a file")
}
f.truncate()
if _, err := f.WriteAt(contents, 0); err != nil {
return err
}
return nil
}
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
return d.reader(ctx, path, offset)
}
func (d *driver) reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
if offset < 0 {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
normalized := normalize(path)
found := d.root.find(normalized)
if found.path() != normalized {
return nil, storagedriver.PathNotFoundError{Path: path}
}
if found.isdir() {
return nil, fmt.Errorf("%q is a directory", path)
}
return io.NopCloser(found.(*file).sectionReader(offset)), nil
}
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
d.mutex.Lock()
defer d.mutex.Unlock()
normalized := normalize(path)
f, err := d.root.mkfile(normalized)
if err != nil {
return nil, fmt.Errorf("not a file")
}
if !append {
f.truncate()
}
return d.newWriter(f), nil
}
// Stat returns info about the provided path.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
normalized := normalize(path)
found := d.root.find(normalized)
if found.path() != normalized {
return nil, storagedriver.PathNotFoundError{Path: path}
}
fi := storagedriver.FileInfoFields{
Path: path,
IsDir: found.isdir(),
ModTime: found.modtime(),
}
if !fi.IsDir {
fi.Size = int64(len(found.(*file).data))
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}
// List returns a list of the objects that are direct descendants of the given
// path.
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
normalized := normalize(path)
found := d.root.find(normalized)
if !found.isdir() {
return nil, fmt.Errorf("not a directory") // TODO(stevvooe): Need error type for this...
}
entries, err := found.(*dir).list(normalized)
if err != nil {
switch err {
case errNotExists:
return nil, storagedriver.PathNotFoundError{Path: path}
case errIsNotDir:
return nil, fmt.Errorf("not a directory")
default:
return nil, err
}
}
return entries, nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
d.mutex.Lock()
defer d.mutex.Unlock()
normalizedSrc, normalizedDst := normalize(sourcePath), normalize(destPath)
err := d.root.move(normalizedSrc, normalizedDst)
switch err {
case errNotExists:
return storagedriver.PathNotFoundError{Path: destPath}
default:
return err
}
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(ctx context.Context, path string) error {
d.mutex.Lock()
defer d.mutex.Unlock()
normalized := normalize(path)
err := d.root.delete(normalized)
switch err {
case errNotExists:
return storagedriver.PathNotFoundError{Path: path}
default:
return err
}
}
// RedirectURL returns a URL which may be used to retrieve the content stored at the given path.
func (d *driver) RedirectURL(*http.Request, string) (string, error) {
return "", nil
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
return storagedriver.WalkFallback(ctx, d, path, f, options...)
}
type writer struct {
d *driver
f *file
buffer []byte
buffSize int
closed bool
committed bool
cancelled bool
}
func (d *driver) newWriter(f *file) storagedriver.FileWriter {
return &writer{
d: d,
f: f,
}
}
func (w *writer) Write(p []byte) (int, error) {
if w.closed {
return 0, fmt.Errorf("already closed")
} else if w.committed {
return 0, fmt.Errorf("already committed")
} else if w.cancelled {
return 0, fmt.Errorf("already cancelled")
}
w.d.mutex.Lock()
defer w.d.mutex.Unlock()
if cap(w.buffer) < len(p)+w.buffSize {
data := make([]byte, len(w.buffer), len(p)+w.buffSize)
copy(data, w.buffer)
w.buffer = data
}
w.buffer = w.buffer[:w.buffSize+len(p)]
n := copy(w.buffer[w.buffSize:w.buffSize+len(p)], p)
w.buffSize += n
return n, nil
}
func (w *writer) Size() int64 {
w.d.mutex.RLock()
defer w.d.mutex.RUnlock()
return int64(len(w.f.data))
}
func (w *writer) Close() error {
if w.closed {
return fmt.Errorf("already closed")
}
w.closed = true
if err := w.flush(); err != nil {
return err
}
return nil
}
func (w *writer) Cancel(ctx context.Context) error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
}
w.cancelled = true
w.d.mutex.Lock()
defer w.d.mutex.Unlock()
return w.d.root.delete(w.f.path())
}
func (w *writer) Commit(ctx context.Context) error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
} else if w.cancelled {
return fmt.Errorf("already cancelled")
}
w.committed = true
if err := w.flush(); err != nil {
return err
}
return nil
}
func (w *writer) flush() error {
w.d.mutex.Lock()
defer w.d.mutex.Unlock()
if _, err := w.f.WriteAt(w.buffer, int64(len(w.f.data))); err != nil {
return err
}
w.buffer = []byte{}
w.buffSize = 0
return nil
}
package inmemory
import (
"fmt"
"io"
"path"
"sort"
"strings"
"time"
)
var (
errExists = fmt.Errorf("exists")
errNotExists = fmt.Errorf("notexists")
errIsNotDir = fmt.Errorf("notdir")
errIsDir = fmt.Errorf("isdir")
)
type node interface {
name() string
path() string
isdir() bool
modtime() time.Time
}
// dir is the central type for the memory-based storagedriver. All operations
// are dispatched from a root dir.
type dir struct {
common
// TODO(stevvooe): Use sorted slice + search.
children map[string]node
}
var _ node = &dir{}
func (d *dir) isdir() bool {
return true
}
// add places the node n into dir d.
func (d *dir) add(n node) {
if d.children == nil {
d.children = make(map[string]node)
}
d.children[n.name()] = n
d.mod = time.Now()
}
// find searches for the node, given path q in dir. If the node is found, it
// will be returned. If the node is not found, the closet existing parent. If
// the node is found, the returned (node).path() will match q.
func (d *dir) find(q string) node {
q = strings.Trim(q, "/")
i := strings.Index(q, "/")
if q == "" {
return d
}
if i == 0 {
panic("shouldn't happen, no root paths")
}
var component string
if i < 0 {
// No more path components
component = q
} else {
component = q[:i]
}
child, ok := d.children[component]
if !ok {
// Node was not found. Return p and the current node.
return d
}
if child.isdir() {
// traverse down!
q = q[i+1:]
return child.(*dir).find(q)
}
return child
}
func (d *dir) list(p string) ([]string, error) {
n := d.find(p)
if n.path() != p {
return nil, errNotExists
}
if !n.isdir() {
return nil, errIsNotDir
}
// NOTE(milosgajdos): this is safe to do because
// n can only be *dir due to the compile time check
dirChildren := n.(*dir).children
children := make([]string, 0, len(dirChildren))
for _, child := range dirChildren {
children = append(children, child.path())
}
sort.Strings(children)
return children, nil
}
// mkfile or return the existing one. returns an error if it exists and is a
// directory. Essentially, this is open or create.
func (d *dir) mkfile(p string) (*file, error) {
n := d.find(p)
if n.path() == p {
if n.isdir() {
return nil, errIsDir
}
return n.(*file), nil
}
dirpath, filename := path.Split(p)
// Make any non-existent directories
n, err := d.mkdirs(dirpath)
if err != nil {
return nil, err
}
dd := n.(*dir)
n = &file{
common: common{
p: path.Join(dd.path(), filename),
mod: time.Now(),
},
}
dd.add(n)
return n.(*file), nil
}
// mkdirs creates any missing directory entries in p and returns the result.
func (d *dir) mkdirs(p string) (*dir, error) {
p = normalize(p)
n := d.find(p)
if !n.isdir() {
// Found something there
return nil, errIsNotDir
}
if n.path() == p {
return n.(*dir), nil
}
dd := n.(*dir)
relative := strings.Trim(strings.TrimPrefix(p, n.path()), "/")
if relative == "" {
return dd, nil
}
components := strings.Split(relative, "/")
for _, component := range components {
d, err := dd.mkdir(component)
if err != nil {
// This should actually never happen, since there are no children.
return nil, err
}
dd = d
}
return dd, nil
}
// mkdir creates a child directory under d with the given name.
func (d *dir) mkdir(name string) (*dir, error) {
if name == "" {
return nil, fmt.Errorf("invalid dirname")
}
_, ok := d.children[name]
if ok {
return nil, errExists
}
child := &dir{
common: common{
p: path.Join(d.path(), name),
mod: time.Now(),
},
}
d.add(child)
d.mod = time.Now()
return child, nil
}
func (d *dir) move(src, dst string) error {
dstDirname, _ := path.Split(dst)
dp, err := d.mkdirs(dstDirname)
if err != nil {
return err
}
srcDirname, srcFilename := path.Split(src)
sp := d.find(srcDirname)
if normalize(srcDirname) != normalize(sp.path()) {
return errNotExists
}
spd, ok := sp.(*dir)
if !ok {
return errIsNotDir // paranoid.
}
s, ok := spd.children[srcFilename]
if !ok {
return errNotExists
}
delete(spd.children, srcFilename)
switch n := s.(type) {
case *dir:
n.p = dst
case *file:
n.p = dst
}
dp.add(s)
return nil
}
func (d *dir) delete(p string) error {
dirname, filename := path.Split(p)
parent := d.find(dirname)
if normalize(dirname) != normalize(parent.path()) {
return errNotExists
}
parentDir, ok := parent.(*dir)
if !ok {
return errIsNotDir
}
if _, ok := parentDir.children[filename]; !ok {
return errNotExists
}
delete(parentDir.children, filename)
return nil
}
func (d *dir) String() string {
return fmt.Sprintf("&dir{path: %v, children: %v}", d.p, d.children)
}
// file stores actual data in the fs tree. It acts like an open, seekable file
// where operations are conducted through ReadAt and WriteAt. Use it with
// SectionReader for the best effect.
type file struct {
common
data []byte
}
var _ node = &file{}
func (f *file) isdir() bool {
return false
}
func (f *file) truncate() {
f.data = f.data[:0]
}
func (f *file) sectionReader(offset int64) io.Reader {
return io.NewSectionReader(f, offset, int64(len(f.data))-offset)
}
func (f *file) ReadAt(p []byte, offset int64) (n int, err error) {
if offset >= int64(len(f.data)) {
return 0, io.EOF
}
return copy(p, f.data[offset:]), nil
}
// reallocExponent is the exponent used to realloc a slice. The value roughly
// follows the behavior of Go built-in append function.
const reallocExponent = 1.25
func (f *file) WriteAt(p []byte, offset int64) (n int, err error) {
newLen := offset + int64(len(p))
if int64(cap(f.data)) < newLen {
// Grow slice exponentially to ensure amortized linear time complexity
// of reallocation
newCap := int64(float64(cap(f.data)) * reallocExponent)
if newCap < newLen {
newCap = newLen
}
data := make([]byte, len(f.data), newCap)
copy(data, f.data)
f.data = data
}
f.mod = time.Now()
f.data = f.data[:newLen]
return copy(f.data[offset:newLen], p), nil
}
func (f *file) String() string {
return fmt.Sprintf("&file{path: %q}", f.p)
}
// common provides shared fields and methods for node implementations.
type common struct {
p string
mod time.Time
}
func (c *common) name() string {
_, name := path.Split(c.p)
return name
}
func (c *common) path() string {
return c.p
}
func (c *common) modtime() time.Time {
return c.mod
}
func normalize(p string) string {
return "/" + strings.Trim(p, "/")
}
// Package s3 provides a storagedriver.StorageDriver implementation to
// store blobs in Amazon S3 cloud storage.
//
// This package leverages the official aws client library for interfacing with
// S3.
//
// Because S3 is a key, value store the Stat call does not support last modification
// time for directories (directories are an abstraction for key, value stores)
//
// Keep in mind that S3 guarantees only read-after-write consistency for new
// objects, but no read-after-update or list-after-write consistency.
package s3
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"math"
"net/http"
"path/filepath"
"slices"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/distribution/distribution/v3/internal/dcontext"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/base"
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
)
const driverName = "s3aws"
// minChunkSize defines the minimum multipart upload chunk size
// S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 * 1024 * 1024
const defaultChunkSize = 2 * minChunkSize
const (
// defaultMultipartCopyChunkSize defines the default chunk size for all
// but the last Upload Part - Copy operation of a multipart copy.
// Empirically, 32 MB is optimal.
defaultMultipartCopyChunkSize = 32 * 1024 * 1024
// defaultMultipartCopyMaxConcurrency defines the default maximum number
// of concurrent Upload Part - Copy operations for a multipart copy.
defaultMultipartCopyMaxConcurrency = 100
// defaultMultipartCopyThresholdSize defines the default object size
// above which multipart copy will be used. (PUT Object - Copy is used
// for objects at or below this size.) Empirically, 32 MB is optimal.
defaultMultipartCopyThresholdSize = 32 * 1024 * 1024
)
// listMax is the largest amount of objects you can request from S3 in a list call
const listMax = 1000
// noStorageClass defines the value to be used if storage class is not supported by the S3 endpoint
const noStorageClass = "NONE"
// s3StorageClasses lists all compatible (instant retrieval) S3 storage classes
var s3StorageClasses = []string{
noStorageClass,
s3.StorageClassStandard,
s3.StorageClassReducedRedundancy,
s3.StorageClassStandardIa,
s3.StorageClassOnezoneIa,
s3.StorageClassIntelligentTiering,
s3.StorageClassOutposts,
s3.StorageClassGlacierIr,
}
// validRegions maps known s3 region identifiers to region descriptors
var validRegions = map[string]struct{}{}
// validObjectACLs contains known s3 object Acls
var validObjectACLs = map[string]struct{}{}
// DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
type DriverParameters struct {
AccessKey string
SecretKey string
Bucket string
Region string
RegionEndpoint string
ForcePathStyle bool
Encrypt bool
KeyID string
Secure bool
SkipVerify bool
V4Auth bool
ChunkSize int
MultipartCopyChunkSize int64
MultipartCopyMaxConcurrency int64
MultipartCopyThresholdSize int64
RootDirectory string
StorageClass string
UserAgent string
ObjectACL string
SessionToken string
UseDualStack bool
Accelerate bool
LogLevel aws.LogLevelType
}
func init() {
partitions := endpoints.DefaultPartitions()
for _, p := range partitions {
for region := range p.Regions() {
validRegions[region] = struct{}{}
}
}
for _, objectACL := range []string{
s3.ObjectCannedACLPrivate,
s3.ObjectCannedACLPublicRead,
s3.ObjectCannedACLPublicReadWrite,
s3.ObjectCannedACLAuthenticatedRead,
s3.ObjectCannedACLAwsExecRead,
s3.ObjectCannedACLBucketOwnerRead,
s3.ObjectCannedACLBucketOwnerFullControl,
} {
validObjectACLs[objectACL] = struct{}{}
}
// Register this as the default s3 driver in addition to s3aws
factory.Register("s3", &s3DriverFactory{})
factory.Register(driverName, &s3DriverFactory{})
}
// s3DriverFactory implements the factory.StorageDriverFactory interface
type s3DriverFactory struct{}
func (factory *s3DriverFactory) Create(ctx context.Context, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(ctx, parameters)
}
var _ storagedriver.StorageDriver = &driver{}
type driver struct {
S3 *s3.S3
Bucket string
ChunkSize int
Encrypt bool
KeyID string
MultipartCopyChunkSize int64
MultipartCopyMaxConcurrency int64
MultipartCopyThresholdSize int64
RootDirectory string
StorageClass string
ObjectACL string
pool *sync.Pool
}
type baseEmbed struct {
base.Base
}
// Driver is a storagedriver.StorageDriver implementation backed by Amazon S3
// Objects are stored at absolute keys in the provided bucket.
type Driver struct {
baseEmbed
}
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - accesskey
// - secretkey
// - region
// - bucket
// - encrypt
func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Driver, error) {
// Providing no values for these is valid in case the user is authenticating
// with an IAM on an ec2 instance (in which case the instance credentials will
// be summoned when GetAuth is called)
accessKey := parameters["accesskey"]
if accessKey == nil {
accessKey = ""
}
secretKey := parameters["secretkey"]
if secretKey == nil {
secretKey = ""
}
regionEndpoint := parameters["regionendpoint"]
if regionEndpoint == nil {
regionEndpoint = ""
}
forcePathStyleBool := false
forcePathStyle := parameters["forcepathstyle"]
switch forcePathStyle := forcePathStyle.(type) {
case string:
b, err := strconv.ParseBool(forcePathStyle)
if err != nil {
return nil, fmt.Errorf("the forcePathStyle parameter should be a boolean")
}
forcePathStyleBool = b
case bool:
forcePathStyleBool = forcePathStyle
case nil:
// do nothing
default:
return nil, fmt.Errorf("the forcePathStyle parameter should be a boolean")
}
regionName := parameters["region"]
region := fmt.Sprint(regionName)
// Don't check the region value if a custom endpoint is provided.
if regionEndpoint == "" {
if regionName == nil || region == "" {
return nil, fmt.Errorf("no region parameter provided")
}
if _, ok := validRegions[region]; !ok {
return nil, fmt.Errorf("invalid region provided: %v", region)
}
}
bucket := parameters["bucket"]
if bucket == nil || fmt.Sprint(bucket) == "" {
return nil, fmt.Errorf("no bucket parameter provided")
}
encryptBool := false
encrypt := parameters["encrypt"]
switch encrypt := encrypt.(type) {
case string:
b, err := strconv.ParseBool(encrypt)
if err != nil {
return nil, fmt.Errorf("the encrypt parameter should be a boolean")
}
encryptBool = b
case bool:
encryptBool = encrypt
case nil:
// do nothing
default:
return nil, fmt.Errorf("the encrypt parameter should be a boolean")
}
secureBool := true
secure := parameters["secure"]
switch secure := secure.(type) {
case string:
b, err := strconv.ParseBool(secure)
if err != nil {
return nil, fmt.Errorf("the secure parameter should be a boolean")
}
secureBool = b
case bool:
secureBool = secure
case nil:
// do nothing
default:
return nil, fmt.Errorf("the secure parameter should be a boolean")
}
skipVerifyBool := false
skipVerify := parameters["skipverify"]
switch skipVerify := skipVerify.(type) {
case string:
b, err := strconv.ParseBool(skipVerify)
if err != nil {
return nil, fmt.Errorf("the skipVerify parameter should be a boolean")
}
skipVerifyBool = b
case bool:
skipVerifyBool = skipVerify
case nil:
// do nothing
default:
return nil, fmt.Errorf("the skipVerify parameter should be a boolean")
}
v4Bool := true
v4auth := parameters["v4auth"]
switch v4auth := v4auth.(type) {
case string:
b, err := strconv.ParseBool(v4auth)
if err != nil {
return nil, fmt.Errorf("the v4auth parameter should be a boolean")
}
v4Bool = b
case bool:
v4Bool = v4auth
case nil:
// do nothing
default:
return nil, fmt.Errorf("the v4auth parameter should be a boolean")
}
keyID := parameters["keyid"]
if keyID == nil {
keyID = ""
}
chunkSize, err := getParameterAsInteger(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize)
if err != nil {
return nil, err
}
multipartCopyChunkSize, err := getParameterAsInteger[int64](parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize)
if err != nil {
return nil, err
}
multipartCopyMaxConcurrency, err := getParameterAsInteger[int64](parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64)
if err != nil {
return nil, err
}
multipartCopyThresholdSize, err := getParameterAsInteger[int64](parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize)
if err != nil {
return nil, err
}
rootDirectory := parameters["rootdirectory"]
if rootDirectory == nil {
rootDirectory = ""
}
storageClass := s3.StorageClassStandard
storageClassParam := parameters["storageclass"]
if storageClassParam != nil {
storageClassString, ok := storageClassParam.(string)
if !ok {
return nil, fmt.Errorf(
"the storageclass parameter must be one of %v, %v invalid",
s3StorageClasses,
storageClassParam,
)
}
// All valid storage class parameters are UPPERCASE, so be a bit more flexible here
storageClassString = strings.ToUpper(storageClassString)
if storageClassString != noStorageClass &&
storageClassString != s3.StorageClassStandard &&
storageClassString != s3.StorageClassReducedRedundancy &&
storageClassString != s3.StorageClassStandardIa &&
storageClassString != s3.StorageClassOnezoneIa &&
storageClassString != s3.StorageClassIntelligentTiering &&
storageClassString != s3.StorageClassOutposts &&
storageClassString != s3.StorageClassGlacierIr {
return nil, fmt.Errorf(
"the storageclass parameter must be one of %v, %v invalid",
s3StorageClasses,
storageClassParam,
)
}
storageClass = storageClassString
}
userAgent := parameters["useragent"]
if userAgent == nil {
userAgent = ""
}
objectACL := s3.ObjectCannedACLPrivate
objectACLParam := parameters["objectacl"]
if objectACLParam != nil {
objectACLString, ok := objectACLParam.(string)
if !ok {
return nil, fmt.Errorf("invalid value for objectacl parameter: %v", objectACLParam)
}
if _, ok = validObjectACLs[objectACLString]; !ok {
return nil, fmt.Errorf("invalid value for objectacl parameter: %v", objectACLParam)
}
objectACL = objectACLString
}
useDualStackBool := false
useDualStack := parameters["usedualstack"]
switch useDualStack := useDualStack.(type) {
case string:
b, err := strconv.ParseBool(useDualStack)
if err != nil {
return nil, fmt.Errorf("the useDualStack parameter should be a boolean")
}
useDualStackBool = b
case bool:
useDualStackBool = useDualStack
case nil:
// do nothing
default:
return nil, fmt.Errorf("the useDualStack parameter should be a boolean")
}
sessionToken := ""
accelerateBool := false
accelerate := parameters["accelerate"]
switch accelerate := accelerate.(type) {
case string:
b, err := strconv.ParseBool(accelerate)
if err != nil {
return nil, fmt.Errorf("the accelerate parameter should be a boolean")
}
accelerateBool = b
case bool:
accelerateBool = accelerate
case nil:
// do nothing
default:
return nil, fmt.Errorf("the accelerate parameter should be a boolean")
}
params := DriverParameters{
AccessKey: fmt.Sprint(accessKey),
SecretKey: fmt.Sprint(secretKey),
Bucket: fmt.Sprint(bucket),
Region: region,
RegionEndpoint: fmt.Sprint(regionEndpoint),
ForcePathStyle: forcePathStyleBool,
Encrypt: encryptBool,
KeyID: fmt.Sprint(keyID),
Secure: secureBool,
SkipVerify: skipVerifyBool,
V4Auth: v4Bool,
ChunkSize: chunkSize,
MultipartCopyChunkSize: multipartCopyChunkSize,
MultipartCopyMaxConcurrency: multipartCopyMaxConcurrency,
MultipartCopyThresholdSize: multipartCopyThresholdSize,
RootDirectory: fmt.Sprint(rootDirectory),
StorageClass: storageClass,
UserAgent: fmt.Sprint(userAgent),
ObjectACL: objectACL,
SessionToken: fmt.Sprint(sessionToken),
UseDualStack: useDualStackBool,
Accelerate: accelerateBool,
LogLevel: getS3LogLevelFromParam(parameters["loglevel"]),
}
return New(ctx, params)
}
func getS3LogLevelFromParam(param interface{}) aws.LogLevelType {
if param == nil {
return aws.LogOff
}
logLevelParam := param.(string)
var logLevel aws.LogLevelType
switch strings.ToLower(logLevelParam) {
case "off":
logLevel = aws.LogOff
case "debug":
logLevel = aws.LogDebug
case "debugwithsigning":
logLevel = aws.LogDebugWithSigning
case "debugwithhttpbody":
logLevel = aws.LogDebugWithHTTPBody
case "debugwithrequestretries":
logLevel = aws.LogDebugWithRequestRetries
case "debugwithrequesterrors":
logLevel = aws.LogDebugWithRequestErrors
case "debugwitheventstreambody":
logLevel = aws.LogDebugWithEventStreamBody
default:
logLevel = aws.LogOff
}
return logLevel
}
type integer interface{ signed | unsigned }
type signed interface {
~int | ~int8 | ~int16 | ~int32 | ~int64
}
type unsigned interface {
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr
}
// getParameterAsInteger converts parameters[name] to T (using defaultValue if
// nil) and ensures it is in the range of min and max.
func getParameterAsInteger[T integer](parameters map[string]any, name string, defaultValue, min, max T) (T, error) {
v := defaultValue
if p := parameters[name]; p != nil {
if _, err := fmt.Sscanf(fmt.Sprint(p), "%d", &v); err != nil {
return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, p)
}
}
if v < min || v > max {
return 0, fmt.Errorf("the %s %#v parameter should be a number between %d and %d (inclusive)", name, v, min, max)
}
return v, nil
}
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
// bucketName
func New(ctx context.Context, params DriverParameters) (*Driver, error) {
if !params.V4Auth &&
(params.RegionEndpoint == "" ||
strings.Contains(params.RegionEndpoint, "s3.amazonaws.com")) {
return nil, fmt.Errorf("on Amazon S3 this storage driver can only be used with v4 authentication")
}
awsConfig := aws.NewConfig().WithLogLevel(params.LogLevel)
if params.AccessKey != "" && params.SecretKey != "" {
creds := credentials.NewStaticCredentials(
params.AccessKey,
params.SecretKey,
params.SessionToken,
)
awsConfig.WithCredentials(creds)
}
if params.RegionEndpoint != "" {
awsConfig.WithEndpoint(params.RegionEndpoint)
}
awsConfig.WithS3ForcePathStyle(params.ForcePathStyle)
awsConfig.WithS3UseAccelerate(params.Accelerate)
awsConfig.WithRegion(params.Region)
awsConfig.WithDisableSSL(!params.Secure)
if params.UseDualStack {
awsConfig.UseDualStackEndpoint = endpoints.DualStackEndpointStateEnabled
}
if params.SkipVerify {
httpTransport := http.DefaultTransport.(*http.Transport).Clone()
httpTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
awsConfig.WithHTTPClient(&http.Client{
Transport: httpTransport,
})
}
sess, err := session.NewSession(awsConfig)
if err != nil {
return nil, fmt.Errorf("failed to create new session with aws config: %v", err)
}
if params.UserAgent != "" {
sess.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler(params.UserAgent))
}
s3obj := s3.New(sess)
// enable S3 compatible signature v2 signing instead
if !params.V4Auth {
setv2Handlers(s3obj)
}
// TODO Currently multipart uploads have no timestamps, so this would be unwise
// if you initiated a new s3driver while another one is running on the same bucket.
// multis, _, err := bucket.ListMulti("", "")
// if err != nil {
// return nil, err
// }
// for _, multi := range multis {
// err := multi.Abort()
// //TODO appropriate to do this error checking?
// if err != nil {
// return nil, err
// }
// }
d := &driver{
S3: s3obj,
Bucket: params.Bucket,
ChunkSize: params.ChunkSize,
Encrypt: params.Encrypt,
KeyID: params.KeyID,
MultipartCopyChunkSize: params.MultipartCopyChunkSize,
MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
MultipartCopyThresholdSize: params.MultipartCopyThresholdSize,
RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass,
ObjectACL: params.ObjectACL,
pool: &sync.Pool{
New: func() any { return &bytes.Buffer{} },
},
}
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: d,
},
},
}, nil
}
// Implement the storagedriver.StorageDriver interface
func (d *driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
reader, err := d.Reader(ctx, path, 0)
if err != nil {
return nil, err
}
return io.ReadAll(reader)
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
_, err := d.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
ContentType: d.getContentType(),
ACL: d.getACL(),
ServerSideEncryption: d.getEncryptionMode(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
StorageClass: d.getStorageClass(),
Body: bytes.NewReader(contents),
})
return parseError(path, err)
}
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
resp, err := d.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"),
})
if err != nil {
if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "InvalidRange" {
return io.NopCloser(bytes.NewReader(nil)), nil
}
return nil, parseError(path, err)
}
return resp.Body, nil
}
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
// It only allows appending to paths with zero size committed content,
// in which the existing content is overridden with the new content.
// It returns storagedriver.Error when appending to paths
// with non-zero committed content.
func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (storagedriver.FileWriter, error) {
key := d.s3Path(path)
if !appendMode {
// TODO (brianbland): cancel other uploads at this path
resp, err := d.S3.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
ContentType: d.getContentType(),
ACL: d.getACL(),
ServerSideEncryption: d.getEncryptionMode(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
StorageClass: d.getStorageClass(),
})
if err != nil {
return nil, err
}
return d.newWriter(ctx, key, *resp.UploadId, nil), nil
}
listMultipartUploadsInput := &s3.ListMultipartUploadsInput{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(key),
}
for {
resp, err := d.S3.ListMultipartUploadsWithContext(ctx, listMultipartUploadsInput)
if err != nil {
return nil, parseError(path, err)
}
// resp.Uploads can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false
// and the loop would be exited without recalling ListMultipartUploads
if len(resp.Uploads) == 0 {
fi, err := d.Stat(ctx, path)
if err != nil {
return nil, parseError(path, err)
}
if fi.Size() == 0 {
resp, err := d.S3.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
ContentType: d.getContentType(),
ACL: d.getACL(),
ServerSideEncryption: d.getEncryptionMode(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
StorageClass: d.getStorageClass(),
})
if err != nil {
return nil, err
}
return d.newWriter(ctx, key, *resp.UploadId, nil), nil
}
return nil, storagedriver.Error{
DriverName: driverName,
Detail: fmt.Errorf("append to zero-size path %s unsupported", path),
}
}
var allParts []*s3.Part
for _, multi := range resp.Uploads {
if key != *multi.Key {
continue
}
partsList, err := d.S3.ListPartsWithContext(ctx, &s3.ListPartsInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
UploadId: multi.UploadId,
})
if err != nil {
return nil, parseError(path, err)
}
allParts = append(allParts, partsList.Parts...)
for *partsList.IsTruncated {
partsList, err = d.S3.ListPartsWithContext(ctx, &s3.ListPartsInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
UploadId: multi.UploadId,
PartNumberMarker: partsList.NextPartNumberMarker,
})
if err != nil {
return nil, parseError(path, err)
}
allParts = append(allParts, partsList.Parts...)
}
return d.newWriter(ctx, key, *multi.UploadId, allParts), nil
}
// resp.NextUploadIdMarker must have at least one element or we would have returned not found
listMultipartUploadsInput.UploadIdMarker = resp.NextUploadIdMarker
// from the s3 api docs, IsTruncated "specifies whether (true) or not (false) all of the results were returned"
// if everything has been returned, break
if resp.IsTruncated == nil || !*resp.IsTruncated {
break
}
}
return nil, storagedriver.PathNotFoundError{Path: path}
}
func (d *driver) statHead(ctx context.Context, path string) (*storagedriver.FileInfoFields, error) {
resp, err := d.S3.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
})
if err != nil {
return nil, err
}
return &storagedriver.FileInfoFields{
Path: path,
IsDir: false,
Size: *resp.ContentLength,
ModTime: *resp.LastModified,
}, nil
}
func (d *driver) statList(ctx context.Context, path string) (*storagedriver.FileInfoFields, error) {
s3Path := d.s3Path(path)
resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(s3Path),
MaxKeys: aws.Int64(1),
})
if err != nil {
return nil, err
}
if len(resp.Contents) == 1 {
if *resp.Contents[0].Key != s3Path {
return &storagedriver.FileInfoFields{
Path: path,
IsDir: true,
}, nil
}
return &storagedriver.FileInfoFields{
Path: path,
Size: *resp.Contents[0].Size,
ModTime: *resp.Contents[0].LastModified,
}, nil
}
if len(resp.CommonPrefixes) == 1 {
return &storagedriver.FileInfoFields{
Path: path,
IsDir: true,
}, nil
}
return nil, storagedriver.PathNotFoundError{Path: path}
}
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
fi, err := d.statHead(ctx, path)
if err != nil {
// For AWS errors, we fail over to ListObjects:
// Though the official docs https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_Errors
// are slightly outdated, the HeadObject actually returns NotFound error
// if querying a key which doesn't exist or a key which has nested keys
// and Forbidden if IAM/ACL permissions do not allow Head but allow List.
var awsErr awserr.Error
if errors.As(err, &awsErr) {
fi, err := d.statList(ctx, path)
if err != nil {
return nil, parseError(path, err)
}
return storagedriver.FileInfoInternal{FileInfoFields: *fi}, nil
}
// For non-AWS errors, return the error directly
return nil, err
}
return storagedriver.FileInfoInternal{FileInfoFields: *fi}, nil
}
// List returns a list of the objects that are direct descendants of the given path.
func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
path := opath
if path != "/" && path[len(path)-1] != '/' {
path = path + "/"
}
// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
// In those cases, there is no root prefix to replace and we must actually add a "/" to all
// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
prefix := ""
if d.s3Path("") == "" {
prefix = "/"
}
resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(listMax),
})
if err != nil {
return nil, parseError(opath, err)
}
files := []string{}
directories := []string{}
for {
for _, key := range resp.Contents {
files = append(files, strings.Replace(*key.Key, d.s3Path(""), prefix, 1))
}
for _, commonPrefix := range resp.CommonPrefixes {
commonPrefix := *commonPrefix.Prefix
directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.s3Path(""), prefix, 1))
}
if resp.IsTruncated == nil || !*resp.IsTruncated {
break
}
resp, err = d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(listMax),
ContinuationToken: resp.NextContinuationToken,
})
if err != nil {
return nil, err
}
}
if opath != "/" {
if len(files) == 0 && len(directories) == 0 {
// Treat empty response as missing directory, since we don't actually
// have directories in s3.
return nil, storagedriver.PathNotFoundError{Path: opath}
}
}
return append(files, directories...), nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(ctx context.Context, sourcePath, destPath string) error {
/* This is terrible, but aws doesn't have an actual move. */
if err := d.copy(ctx, sourcePath, destPath); err != nil {
return err
}
return d.Delete(ctx, sourcePath)
}
// copy copies an object stored at sourcePath to destPath.
func (d *driver) copy(ctx context.Context, sourcePath, destPath string) error {
// S3 can copy objects up to 5 GB in size with a single PUT Object - Copy
// operation. For larger objects, the multipart upload API must be used.
//
// Empirically, multipart copy is fastest with 32 MB parts and is faster
// than PUT Object - Copy for objects larger than 32 MB.
fileInfo, err := d.Stat(ctx, sourcePath)
if err != nil {
return parseError(sourcePath, err)
}
if fileInfo.Size() <= d.MultipartCopyThresholdSize {
_, err := d.S3.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
ACL: d.getACL(),
ServerSideEncryption: d.getEncryptionMode(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
StorageClass: d.getStorageClass(),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
})
if err != nil {
return parseError(sourcePath, err)
}
return nil
}
createResp, err := d.S3.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
ACL: d.getACL(),
SSEKMSKeyId: d.getSSEKMSKeyID(),
ServerSideEncryption: d.getEncryptionMode(),
StorageClass: d.getStorageClass(),
})
if err != nil {
return err
}
numParts := (fileInfo.Size() + d.MultipartCopyChunkSize - 1) / d.MultipartCopyChunkSize
completedParts := make([]*s3.CompletedPart, numParts)
errChan := make(chan error, numParts)
limiter := make(chan struct{}, d.MultipartCopyMaxConcurrency)
for i := range completedParts {
i := int64(i)
go func() {
limiter <- struct{}{}
firstByte := i * d.MultipartCopyChunkSize
lastByte := firstByte + d.MultipartCopyChunkSize - 1
if lastByte >= fileInfo.Size() {
lastByte = fileInfo.Size() - 1
}
uploadResp, err := d.S3.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{
Bucket: aws.String(d.Bucket),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
Key: aws.String(d.s3Path(destPath)),
PartNumber: aws.Int64(i + 1),
UploadId: createResp.UploadId,
CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", firstByte, lastByte)),
})
if err == nil {
completedParts[i] = &s3.CompletedPart{
ETag: uploadResp.CopyPartResult.ETag,
PartNumber: aws.Int64(i + 1),
}
}
errChan <- err
<-limiter
}()
}
for range completedParts {
err := <-errChan
if err != nil {
return err
}
}
_, err = d.S3.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
UploadId: createResp.UploadId,
MultipartUpload: &s3.CompletedMultipartUpload{Parts: completedParts},
})
return err
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
// We must be careful since S3 does not guarantee read after delete consistency
func (d *driver) Delete(ctx context.Context, path string) error {
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
s3Path := d.s3Path(path)
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(s3Path),
}
for {
// list all the objects
resp, err := d.S3.ListObjectsV2WithContext(ctx, listObjectsInput)
// resp.Contents can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false
// and the loop would exit without recalling ListObjects
if err != nil || len(resp.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: path}
}
for _, key := range resp.Contents {
// Skip if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' {
continue
}
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
Key: key.Key,
})
}
// Delete objects only if the list is not empty, otherwise S3 API returns a cryptic error
if len(s3Objects) > 0 {
// NOTE: according to AWS docs https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
// by default the response returns up to 1,000 key names. The response _might_ contain fewer keys but it will never contain more.
// 10000 keys is coincidentally (?) also the max number of keys that can be deleted in a single Delete operation, so we'll just smack
// Delete here straight away and reset the object slice when successful.
resp, err := d.S3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{
Objects: s3Objects,
Quiet: aws.Bool(false),
},
})
if err != nil {
return err
}
if len(resp.Errors) > 0 {
// NOTE: AWS SDK s3.Error does not implement error interface which
// is pretty intensely sad, so we have to do away with this for now.
errs := make([]error, 0, len(resp.Errors))
for _, err := range resp.Errors {
errs = append(errs, errors.New(err.String()))
}
return storagedriver.Errors{
DriverName: driverName,
Errs: errs,
}
}
}
// NOTE: we don't want to reallocate
// the slice so we simply "reset" it
s3Objects = s3Objects[:0]
// resp.Contents must have at least one element or we would have returned not found
listObjectsInput.StartAfter = resp.Contents[len(resp.Contents)-1].Key
// from the s3 api docs, IsTruncated "specifies whether (true) or not (false) all of the results were returned"
// if everything has been returned, break
if resp.IsTruncated == nil || !*resp.IsTruncated {
break
}
}
return nil
}
// RedirectURL returns a URL which may be used to retrieve the content stored at the given path.
func (d *driver) RedirectURL(r *http.Request, path string) (string, error) {
expiresIn := 20 * time.Minute
var req *request.Request
switch r.Method {
case http.MethodGet:
req, _ = d.S3.GetObjectRequest(&s3.GetObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
})
case http.MethodHead:
req, _ = d.S3.HeadObjectRequest(&s3.HeadObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
})
default:
return "", nil
}
return req.Presign(expiresIn)
}
// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
walkOptions := &storagedriver.WalkOptions{}
for _, o := range options {
o(walkOptions)
}
var objectCount int64
if err := d.doWalk(ctx, &objectCount, from, walkOptions.StartAfterHint, f); err != nil {
return err
}
return nil
}
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from, startAfter string, f storagedriver.WalkFn) error {
var (
retError error
// the most recent directory walked for de-duping
prevDir string
// the most recent skip directory to avoid walking over undesirable files
prevSkipDir string
)
prevDir = from
path := from
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
prefix := ""
if d.s3Path("") == "" {
prefix = "/"
}
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
MaxKeys: aws.Int64(listMax),
StartAfter: aws.String(d.s3Path(startAfter)),
}
ctx, done := dcontext.WithTrace(parentCtx)
defer done("s3aws.ListObjectsV2PagesWithContext(%s)", listObjectsInput)
// When the "delimiter" argument is omitted, the S3 list API will list all objects in the bucket
// recursively, omitting directory paths. Objects are listed in sorted, depth-first order so we
// can infer all the directories by comparing each object path to the last one we saw.
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html
// With files returned in sorted depth-first order, directories are inferred in the same order.
// ErrSkipDir is handled by explicitly skipping over any files under the skipped directory. This may be sub-optimal
// for extreme edge cases but for the general use case in a registry, this is orders of magnitude
// faster than a more explicit recursive implementation.
listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {
walkInfos := make([]storagedriver.FileInfoInternal, 0, len(objects.Contents))
for _, file := range objects.Contents {
filePath := strings.Replace(*file.Key, d.s3Path(""), prefix, 1)
// get a list of all inferred directories between the previous directory and this file
dirs := directoryDiff(prevDir, filePath)
for _, dir := range dirs {
walkInfos = append(walkInfos, storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: true,
Path: dir,
},
})
prevDir = dir
}
// in some cases the _uploads dir might be empty. when this happens, it would
// be appended twice to the walkInfos slice, once as [...]/_uploads and
// once more erroneously as [...]/_uploads/. the easiest way to avoid this is
// to skip appending filePath to walkInfos if it ends in "/". the loop through
// dirs will already have handled it in that case, so it's safe to continue this
// loop.
if strings.HasSuffix(filePath, "/") {
continue
}
walkInfos = append(walkInfos, storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: false,
Size: *file.Size,
ModTime: *file.LastModified,
Path: filePath,
},
})
}
for _, walkInfo := range walkInfos {
// skip any results under the last skip directory
if prevSkipDir != "" && strings.HasPrefix(walkInfo.Path(), prevSkipDir) {
continue
}
err := f(walkInfo)
*objectCount++
if err != nil {
if err == storagedriver.ErrSkipDir {
prevSkipDir = walkInfo.Path()
continue
}
if err == storagedriver.ErrFilledBuffer {
return false
}
retError = err
return false
}
}
return true
})
if retError != nil {
return retError
}
if listObjectErr != nil {
return listObjectErr
}
return nil
}
// directoryDiff finds all directories that are not in common between
// the previous and current paths in sorted order.
//
// # Examples
//
// directoryDiff("/path/to/folder", "/path/to/folder/folder/file")
// // => [ "/path/to/folder/folder" ]
//
// directoryDiff("/path/to/folder/folder1", "/path/to/folder/folder2/file")
// // => [ "/path/to/folder/folder2" ]
//
// directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/file")
// // => [ "/path/to/folder/folder2" ]
//
// directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/folder1/file")
// // => [ "/path/to/folder/folder2", "/path/to/folder/folder2/folder1" ]
//
// directoryDiff("/", "/path/to/folder/folder/file")
// // => [ "/path", "/path/to", "/path/to/folder", "/path/to/folder/folder" ]
func directoryDiff(prev, current string) []string {
var paths []string
if prev == "" || current == "" {
return paths
}
parent := current
for {
parent = filepath.Dir(parent)
if parent == "/" || parent == prev || strings.HasPrefix(prev+"/", parent+"/") {
break
}
paths = append(paths, parent)
}
slices.Reverse(paths)
return paths
}
func (d *driver) s3Path(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
}
// S3BucketKey returns the s3 bucket key for the given storage driver path.
func (d *Driver) S3BucketKey(path string) string {
return d.StorageDriver.(*driver).s3Path(path)
}
func parseError(path string, err error) error {
if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "NoSuchKey" {
return storagedriver.PathNotFoundError{Path: path}
}
return err
}
func (d *driver) getEncryptionMode() *string {
if !d.Encrypt {
return nil
}
if d.KeyID == "" {
return aws.String("AES256")
}
return aws.String("aws:kms")
}
func (d *driver) getSSEKMSKeyID() *string {
if d.KeyID != "" {
return aws.String(d.KeyID)
}
return nil
}
func (d *driver) getContentType() *string {
return aws.String("application/octet-stream")
}
func (d *driver) getACL() *string {
return aws.String(d.ObjectACL)
}
func (d *driver) getStorageClass() *string {
if d.StorageClass == noStorageClass {
return nil
}
return aws.String(d.StorageClass)
}
// writer uploads parts to S3 in a buffered fashion where the length of each
// part is [writer.driver.ChunkSize], excluding the last part which may be
// smaller than the configured chunk size and never larger. This allows the
// multipart upload to be cleanly resumed in future. This is violated if
// [writer.Close] is called before at least one chunk is written.
type writer struct {
ctx context.Context
driver *driver
key string
uploadID string
parts []*s3.Part
size int64
buf *bytes.Buffer
closed bool
committed bool
cancelled bool
}
func (d *driver) newWriter(ctx context.Context, key, uploadID string, parts []*s3.Part) storagedriver.FileWriter {
var size int64
for _, part := range parts {
size += *part.Size
}
return &writer{
ctx: ctx,
driver: d,
key: key,
uploadID: uploadID,
parts: parts,
size: size,
buf: d.pool.Get().(*bytes.Buffer),
}
}
type completedParts []*s3.CompletedPart
func (a completedParts) Len() int { return len(a) }
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
func (w *writer) Write(p []byte) (int, error) {
if err := w.done(); err != nil {
return 0, err
}
// If the last written part is smaller than minChunkSize, we need to make a
// new multipart upload :sadface:
if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize {
completedUploadedParts := make(completedParts, len(w.parts))
for i, part := range w.parts {
completedUploadedParts[i] = &s3.CompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
}
}
sort.Sort(completedUploadedParts)
_, err := w.driver.S3.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedUploadedParts,
},
})
if err != nil {
if _, aErr := w.driver.S3.AbortMultipartUploadWithContext(w.ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
}); aErr != nil {
return 0, errors.Join(err, aErr)
}
return 0, err
}
resp, err := w.driver.S3.CreateMultipartUploadWithContext(w.ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
ContentType: w.driver.getContentType(),
ACL: w.driver.getACL(),
ServerSideEncryption: w.driver.getEncryptionMode(),
StorageClass: w.driver.getStorageClass(),
})
if err != nil {
return 0, err
}
w.uploadID = *resp.UploadId
// If the entire written file is smaller than minChunkSize, we need to make
// a new part from scratch :double sad face:
if w.size < minChunkSize {
resp, err := w.driver.S3.GetObjectWithContext(w.ctx, &s3.GetObjectInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
})
if err != nil {
return 0, err
}
defer resp.Body.Close()
w.reset()
if _, err := io.Copy(w.buf, resp.Body); err != nil {
return 0, err
}
} else {
// Otherwise we can use the old file as the new first part
copyPartResp, err := w.driver.S3.UploadPartCopyWithContext(w.ctx, &s3.UploadPartCopyInput{
Bucket: aws.String(w.driver.Bucket),
CopySource: aws.String(w.driver.Bucket + "/" + w.key),
Key: aws.String(w.key),
PartNumber: aws.Int64(1),
UploadId: resp.UploadId,
})
if err != nil {
return 0, err
}
w.parts = []*s3.Part{{
ETag: copyPartResp.CopyPartResult.ETag,
PartNumber: aws.Int64(1),
Size: aws.Int64(w.size),
}}
}
}
n, _ := w.buf.Write(p)
for w.buf.Len() >= w.driver.ChunkSize {
if err := w.flush(); err != nil {
return 0, fmt.Errorf("flush: %w", err)
}
}
return n, nil
}
func (w *writer) Size() int64 {
return w.size
}
// Close flushes any remaining data in the buffer and releases the buffer back
// to the pool.
func (w *writer) Close() error {
if w.closed {
return fmt.Errorf("already closed")
}
w.closed = true
defer w.releaseBuffer()
return w.flush()
}
func (w *writer) reset() {
w.buf.Reset()
w.parts = nil
w.size = 0
}
// releaseBuffer resets the buffer and returns it to the pool.
func (w *writer) releaseBuffer() {
w.buf.Reset()
w.driver.pool.Put(w.buf)
}
// Cancel aborts the multipart upload and closes the writer.
func (w *writer) Cancel(ctx context.Context) error {
if err := w.done(); err != nil {
return err
}
w.cancelled = true
_, err := w.driver.S3.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
})
return err
}
// Commit flushes any remaining data in the buffer and completes the multipart
// upload.
func (w *writer) Commit(ctx context.Context) error {
if err := w.done(); err != nil {
return err
}
if err := w.flush(); err != nil {
return err
}
w.committed = true
completedUploadedParts := make(completedParts, len(w.parts))
for i, part := range w.parts {
completedUploadedParts[i] = &s3.CompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
}
}
// This is an edge case when we are trying to upload an empty file as part of
// the MultiPart upload. We get a PUT with Content-Length: 0 and sad things happen.
// The result is we are trying to Complete MultipartUpload with an empty list of
// completedUploadedParts which will always lead to 400 being returned from S3
// See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload
// Solution: we upload the empty i.e. 0 byte part as a single part and then append it
// to the completedUploadedParts slice used to complete the Multipart upload.
if len(w.parts) == 0 {
resp, err := w.driver.S3.UploadPartWithContext(w.ctx, &s3.UploadPartInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
PartNumber: aws.Int64(1),
UploadId: aws.String(w.uploadID),
Body: bytes.NewReader(nil),
})
if err != nil {
return err
}
completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{
ETag: resp.ETag,
PartNumber: aws.Int64(1),
})
}
sort.Sort(completedUploadedParts)
if _, err := w.driver.S3.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedUploadedParts,
},
}); err != nil {
if _, aErr := w.driver.S3.AbortMultipartUploadWithContext(w.ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
}); aErr != nil {
return errors.Join(err, aErr)
}
return err
}
return nil
}
// flush writes at most [w.driver.ChunkSize] of the buffer to S3. flush is only
// called by [writer.Write] if the buffer is full, and always by [writer.Close]
// and [writer.Commit].
func (w *writer) flush() error {
if w.buf.Len() == 0 {
return nil
}
r := bytes.NewReader(w.buf.Next(w.driver.ChunkSize))
partSize := r.Len()
partNumber := aws.Int64(int64(len(w.parts)) + 1)
resp, err := w.driver.S3.UploadPartWithContext(w.ctx, &s3.UploadPartInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
PartNumber: partNumber,
UploadId: aws.String(w.uploadID),
Body: r,
})
if err != nil {
return fmt.Errorf("upload part: %w", err)
}
w.parts = append(w.parts, &s3.Part{
ETag: resp.ETag,
PartNumber: partNumber,
Size: aws.Int64(int64(partSize)),
})
w.size += int64(partSize)
return nil
}
// done returns an error if the writer is in an invalid state.
func (w *writer) done() error {
switch {
case w.closed:
return fmt.Errorf("already closed")
case w.committed:
return fmt.Errorf("already committed")
case w.cancelled:
return fmt.Errorf("already cancelled")
}
return nil
}
package s3
// Source: https://github.com/pivotal-golang/s3cli
// Copyright (c) 2013 Damien Le Berrigaud and Nick Wade
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
import (
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"net/http"
"net/url"
"sort"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws/corehandlers"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
log "github.com/sirupsen/logrus"
)
type signer struct {
// Values that must be populated from the request
Request *http.Request
Time time.Time
Credentials *credentials.Credentials
Query url.Values
stringToSign string
signature string
}
var s3ParamsToSign = map[string]bool{
"acl": true,
"location": true,
"logging": true,
"notification": true,
"partNumber": true,
"policy": true,
"requestPayment": true,
"torrent": true,
"uploadId": true,
"uploads": true,
"versionId": true,
"versioning": true,
"versions": true,
"response-content-type": true,
"response-content-language": true,
"response-expires": true,
"response-cache-control": true,
"response-content-disposition": true,
"response-content-encoding": true,
"website": true,
"delete": true,
}
// setv2Handlers will setup v2 signature signing on the S3 driver
func setv2Handlers(svc *s3.S3) {
svc.Handlers.Build.PushBack(func(r *request.Request) {
parsedURL, err := url.Parse(r.HTTPRequest.URL.String())
if err != nil {
log.Fatalf("Failed to parse URL: %v", err)
}
r.HTTPRequest.URL.Opaque = parsedURL.Path
})
svc.Handlers.Sign.Clear()
svc.Handlers.Sign.PushBack(Sign)
svc.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler)
}
// Sign requests with signature version 2.
//
// Will sign the requests with the service config's Credentials object
// Signing is skipped if the credentials is the credentials.AnonymousCredentials
// object.
func Sign(req *request.Request) {
// If the request does not need to be signed ignore the signing of the
// request if the AnonymousCredentials object is used.
if req.Config.Credentials == credentials.AnonymousCredentials {
return
}
v2 := signer{
Request: req.HTTPRequest,
Time: req.Time,
Credentials: req.Config.Credentials,
}
// TODO(milosgajdos): figure this out; if Sign returns error which we should check,
// we should modify the codepath related to svc.Handlers.Sign.PushBack etc.
// nolint:errcheck
v2.Sign()
}
func (v2 *signer) Sign() error {
credValue, err := v2.Credentials.Get()
if err != nil {
return err
}
accessKey := credValue.AccessKeyID
var (
md5, ctype, date, xamz string
xamzDate bool
sarray []string
smap map[string]string
sharray []string
)
headers := v2.Request.Header
params := v2.Request.URL.Query()
parsedURL, err := url.Parse(v2.Request.URL.String())
if err != nil {
return err
}
host, canonicalPath := parsedURL.Host, parsedURL.Path
v2.Request.Header["Host"] = []string{host}
v2.Request.Header["date"] = []string{v2.Time.In(time.UTC).Format(time.RFC1123)}
if credValue.SessionToken != "" {
v2.Request.Header["x-amz-security-token"] = []string{credValue.SessionToken}
}
smap = make(map[string]string)
for k, v := range headers {
k = strings.ToLower(k)
switch k {
case "content-md5":
md5 = v[0]
case "content-type":
ctype = v[0]
case "date":
if !xamzDate {
date = v[0]
}
default:
if strings.HasPrefix(k, "x-amz-") {
vall := strings.Join(v, ",")
smap[k] = k + ":" + vall
if k == "x-amz-date" {
xamzDate = true
date = ""
}
sharray = append(sharray, k)
}
}
}
if len(sharray) > 0 {
sort.StringSlice(sharray).Sort()
for _, h := range sharray {
sarray = append(sarray, smap[h])
}
xamz = strings.Join(sarray, "\n") + "\n"
}
expires := false
if v, ok := params["Expires"]; ok {
expires = true
date = v[0]
params["AWSAccessKeyId"] = []string{accessKey}
}
sarray = sarray[0:0]
for k, v := range params {
if s3ParamsToSign[k] {
for _, vi := range v {
if vi == "" {
sarray = append(sarray, k)
} else {
sarray = append(sarray, k+"="+vi)
}
}
}
}
if len(sarray) > 0 {
sort.StringSlice(sarray).Sort()
canonicalPath = canonicalPath + "?" + strings.Join(sarray, "&")
}
v2.stringToSign = strings.Join([]string{
v2.Request.Method,
md5,
ctype,
date,
xamz + canonicalPath,
}, "\n")
hash := hmac.New(sha1.New, []byte(credValue.SecretAccessKey))
hash.Write([]byte(v2.stringToSign))
v2.signature = base64.StdEncoding.EncodeToString(hash.Sum(nil))
if expires {
params["Signature"] = []string{v2.signature}
} else {
headers["Authorization"] = []string{"AWS " + accessKey + ":" + v2.signature}
}
log.WithFields(log.Fields{
"string-to-sign": v2.stringToSign,
"signature": v2.signature,
}).Debugln("request signature")
return nil
}
package storage
import "fmt"
// pushError formats an error type given a path and an error
// and pushes it to a slice of errors
func pushError(errors []error, path string, err error) []error {
return append(errors, fmt.Errorf("%s: %s", path, err))
}
package storage
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
)
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
// understand the latency characteristics of the underlying network to
// set this correctly, so we may want to leave it to the driver. For
// out of process drivers, we'll have to optimize this buffer size for
// local communication.
const fileReaderBufferSize = 4 * 1024 * 1024
// remoteFileReader provides a read seeker interface to files stored in
// storagedriver. Used to implement part of layer interface and will be used
// to implement read side of LayerUpload.
type fileReader struct {
driver storagedriver.StorageDriver
ctx context.Context
// identifying fields
path string
size int64 // size is the total size, must be set.
// mutable fields
rc io.ReadCloser // remote read closer
brd *bufio.Reader // internal buffered io
offset int64 // offset is the current read offset
err error // terminal error, if set, reader is closed
}
// newFileReader initializes a file reader for the remote file. The reader
// takes on the size and path that must be determined externally with a stat
// call. The reader operates optimistically, assuming that the file is already
// there.
func newFileReader(ctx context.Context, driver storagedriver.StorageDriver, path string, size int64) (*fileReader, error) {
return &fileReader{
ctx: ctx,
driver: driver,
path: path,
size: size,
}, nil
}
func (fr *fileReader) Read(p []byte) (n int, err error) {
if fr.err != nil {
return 0, fr.err
}
rd, err := fr.reader()
if err != nil {
return 0, err
}
n, err = rd.Read(p)
fr.offset += int64(n)
// Simulate io.EOR error if we reach filesize.
if err == nil && fr.offset >= fr.size {
err = io.EOF
}
return n, err
}
func (fr *fileReader) Seek(offset int64, whence int) (int64, error) {
if fr.err != nil {
return 0, fr.err
}
var err error
newOffset := fr.offset
switch whence {
case io.SeekCurrent:
newOffset += offset
case io.SeekEnd:
newOffset = fr.size + offset
case io.SeekStart:
newOffset = offset
}
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else {
if fr.offset != newOffset {
fr.reset()
}
// No problems, set the offset.
fr.offset = newOffset
}
return fr.offset, err
}
func (fr *fileReader) Close() error {
return fr.closeWithErr(fmt.Errorf("fileReader: closed"))
}
// reader prepares the current reader at the lrs offset, ensuring its buffered
// and ready to go.
func (fr *fileReader) reader() (io.Reader, error) {
if fr.err != nil {
return nil, fr.err
}
if fr.rc != nil {
return fr.brd, nil
}
// If we don't have a reader, open one up.
rc, err := fr.driver.Reader(fr.ctx, fr.path, fr.offset)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): If the path is not found, we simply return a
// reader that returns io.EOF. However, we do not set fr.rc,
// allowing future attempts at getting a reader to possibly
// succeed if the file turns up later.
return io.NopCloser(bytes.NewReader([]byte{})), nil
default:
return nil, err
}
}
fr.rc = rc
if fr.brd == nil {
fr.brd = bufio.NewReaderSize(fr.rc, fileReaderBufferSize)
} else {
fr.brd.Reset(fr.rc)
}
return fr.brd, nil
}
// resetReader resets the reader, forcing the read method to open up a new
// connection and rebuild the buffered reader. This should be called when the
// offset and the reader will become out of sync, such as during a seek
// operation.
func (fr *fileReader) reset() {
if fr.err != nil {
return
}
if fr.rc != nil {
fr.rc.Close()
fr.rc = nil
}
}
func (fr *fileReader) closeWithErr(err error) error {
if fr.err != nil {
return fr.err
}
fr.err = err
// close and release reader chain
if fr.rc != nil {
fr.rc.Close()
}
fr.rc = nil
fr.brd = nil
return fr.err
}
package storage
import (
"context"
"errors"
"fmt"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)
func emit(format string, a ...interface{}) {
fmt.Printf(format+"\n", a...)
}
// GCOpts contains options for garbage collector
type GCOpts struct {
DryRun bool
RemoveUntagged bool
Quiet bool
}
// ManifestDel contains manifest structure which will be deleted
type ManifestDel struct {
Name string
Digest digest.Digest
Tags []string
}
// MarkAndSweep performs a mark and sweep of registry data
func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, registry distribution.Namespace, opts GCOpts) error {
repositoryEnumerator, ok := registry.(distribution.RepositoryEnumerator)
if !ok {
return fmt.Errorf("unable to convert Namespace to RepositoryEnumerator")
}
// mark
markSet := make(map[digest.Digest]struct{})
deleteLayerSet := make(map[string][]digest.Digest)
manifestArr := make([]ManifestDel, 0)
err := repositoryEnumerator.Enumerate(ctx, func(repoName string) error {
if !opts.Quiet {
emit(repoName)
}
var err error
named, err := reference.WithName(repoName)
if err != nil {
return fmt.Errorf("failed to parse repo name %s: %v", repoName, err)
}
repository, err := registry.Repository(ctx, named)
if err != nil {
return fmt.Errorf("failed to construct repository: %v", err)
}
manifestService, err := repository.Manifests(ctx)
if err != nil {
return fmt.Errorf("failed to construct manifest service: %v", err)
}
manifestEnumerator, ok := manifestService.(distribution.ManifestEnumerator)
if !ok {
return fmt.Errorf("unable to convert ManifestService into ManifestEnumerator")
}
err = manifestEnumerator.Enumerate(ctx, func(dgst digest.Digest) error {
if opts.RemoveUntagged {
// fetch all tags where this manifest is the latest one
tags, err := repository.Tags(ctx).Lookup(ctx, v1.Descriptor{Digest: dgst})
if err != nil {
return fmt.Errorf("failed to retrieve tags for digest %v: %v", dgst, err)
}
if len(tags) == 0 {
// fetch all tags from repository
// all of these tags could contain manifest in history
// which means that we need check (and delete) those references when deleting manifest
allTags, err := repository.Tags(ctx).All(ctx)
if err != nil {
if _, ok := err.(distribution.ErrRepositoryUnknown); ok {
if !opts.Quiet {
emit("manifest tags path of repository %s does not exist", repoName)
}
return nil
}
return fmt.Errorf("failed to retrieve tags %v", err)
}
manifestArr = append(manifestArr, ManifestDel{Name: repoName, Digest: dgst, Tags: allTags})
return nil
}
}
// Mark the manifest's blob
if !opts.Quiet {
emit("%s: marking manifest %s ", repoName, dgst)
}
markSet[dgst] = struct{}{}
return markManifestReferences(dgst, manifestService, ctx, func(d digest.Digest) bool {
_, marked := markSet[d]
if !marked {
markSet[d] = struct{}{}
if !opts.Quiet {
emit("%s: marking blob %s", repoName, d)
}
}
return marked
})
})
if err != nil {
// In certain situations such as unfinished uploads, deleting all
// tags in S3 or removing the _manifests folder manually, this
// error may be of type PathNotFound.
//
// In these cases we can continue marking other manifests safely.
if _, ok := err.(driver.PathNotFoundError); !ok {
return err
}
}
blobService := repository.Blobs(ctx)
layerEnumerator, ok := blobService.(distribution.ManifestEnumerator)
if !ok {
return errors.New("unable to convert BlobService into ManifestEnumerator")
}
var deleteLayers []digest.Digest
err = layerEnumerator.Enumerate(ctx, func(dgst digest.Digest) error {
if _, ok := markSet[dgst]; !ok {
deleteLayers = append(deleteLayers, dgst)
}
return nil
})
if len(deleteLayers) > 0 {
deleteLayerSet[repoName] = deleteLayers
}
return err
})
if err != nil {
return fmt.Errorf("failed to mark: %v", err)
}
manifestArr = unmarkReferencedManifest(manifestArr, markSet, opts.Quiet)
// sweep
vacuum := NewVacuum(ctx, storageDriver)
if !opts.DryRun {
for _, obj := range manifestArr {
err = vacuum.RemoveManifest(obj.Name, obj.Digest, obj.Tags)
if err != nil {
return fmt.Errorf("failed to delete manifest %s: %v", obj.Digest, err)
}
}
}
blobService := registry.Blobs()
deleteSet := make(map[digest.Digest]struct{})
err = blobService.Enumerate(ctx, func(dgst digest.Digest) error {
// check if digest is in markSet. If not, delete it!
if _, ok := markSet[dgst]; !ok {
deleteSet[dgst] = struct{}{}
}
return nil
})
if err != nil {
return fmt.Errorf("error enumerating blobs: %v", err)
}
if !opts.Quiet {
emit("\n%d blobs marked, %d blobs and %d manifests eligible for deletion", len(markSet), len(deleteSet), len(manifestArr))
}
for dgst := range deleteSet {
if !opts.Quiet {
emit("blob eligible for deletion: %s", dgst)
}
if opts.DryRun {
continue
}
err = vacuum.RemoveBlob(string(dgst))
if err != nil {
return fmt.Errorf("failed to delete blob %s: %v", dgst, err)
}
}
for repo, dgsts := range deleteLayerSet {
for _, dgst := range dgsts {
if !opts.Quiet {
emit("%s: layer link eligible for deletion: %s", repo, dgst)
}
if opts.DryRun {
continue
}
err = vacuum.RemoveLayer(repo, dgst)
if err != nil {
return fmt.Errorf("failed to delete layer link %s of repo %s: %v", dgst, repo, err)
}
}
}
return err
}
// unmarkReferencedManifest filters out manifest present in markSet
func unmarkReferencedManifest(manifestArr []ManifestDel, markSet map[digest.Digest]struct{}, quietOutput bool) []ManifestDel {
filtered := make([]ManifestDel, 0)
for _, obj := range manifestArr {
if _, ok := markSet[obj.Digest]; !ok {
if !quietOutput {
emit("manifest eligible for deletion: %s", obj)
}
filtered = append(filtered, obj)
}
}
return filtered
}
// markManifestReferences marks the manifest references
func markManifestReferences(dgst digest.Digest, manifestService distribution.ManifestService, ctx context.Context, ingester func(digest.Digest) bool) error {
manifest, err := manifestService.Get(ctx, dgst)
if err != nil {
return fmt.Errorf("failed to retrieve manifest for digest %v: %v", dgst, err)
}
descriptors := manifest.References()
for _, descriptor := range descriptors {
// do not visit references if already marked
if ingester(descriptor.Digest) {
continue
}
if ok, _ := manifestService.Exists(ctx, descriptor.Digest); ok {
err := markManifestReferences(descriptor.Digest, manifestService, ctx, ingester)
if err != nil {
return err
}
}
}
return nil
}
package storage
import (
"context"
"errors"
"io"
"github.com/distribution/distribution/v3/registry/storage/driver"
)
const (
maxBlobGetSize = 4 * 1024 * 1024
)
func getContent(ctx context.Context, driver driver.StorageDriver, p string) ([]byte, error) {
r, err := driver.Reader(ctx, p, 0)
if err != nil {
return nil, err
}
defer r.Close()
return readAllLimited(r, maxBlobGetSize)
}
func readAllLimited(r io.Reader, limit int64) ([]byte, error) {
r = limitReader(r, limit)
return io.ReadAll(r)
}
// limitReader returns a new reader limited to n bytes. Unlike io.LimitReader,
// this returns an error when the limit reached.
func limitReader(r io.Reader, n int64) io.Reader {
return &limitedReader{r: r, n: n}
}
// limitedReader implements a reader that errors when the limit is reached.
//
// Partially cribbed from net/http.MaxBytesReader.
type limitedReader struct {
r io.Reader // underlying reader
n int64 // max bytes remaining
err error // sticky error
}
func (l *limitedReader) Read(p []byte) (n int, err error) {
if l.err != nil {
return 0, l.err
}
if len(p) == 0 {
return 0, nil
}
// If they asked for a 32KB byte read but only 5 bytes are
// remaining, no need to read 32KB. 6 bytes will answer the
// question of the whether we hit the limit or go past it.
if int64(len(p)) > l.n+1 {
p = p[:l.n+1]
}
n, err = l.r.Read(p)
if int64(n) <= l.n {
l.n -= int64(n)
l.err = err
return n, err
}
n = int(l.n)
l.n = 0
l.err = errors.New("storage: read exceeds limit")
return n, l.err
}
package storage
import (
"context"
"fmt"
"io"
"net/http"
"path"
"time"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
"github.com/google/uuid"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)
// linkPathFunc describes a function that can resolve a link based on the
// repository name and digest.
type linkPathFunc func(name string, dgst digest.Digest) (string, error)
// linkedBlobStore provides a full BlobService that namespaces the blobs to a
// given repository. Effectively, it manages the links in a given repository
// that grant access to the global blob store.
type linkedBlobStore struct {
*blobStore
registry *registry
blobServer distribution.BlobServer
blobAccessController distribution.BlobDescriptorService
repository distribution.Repository
ctx context.Context // only to be used where context can't come through method args
deleteEnabled bool
resumableDigestEnabled bool
// linkPath allows one to control the repository blob link set to which
// the blob store dispatches. This is required because manifest and layer
// blobs have not yet been fully merged. At some point, this functionality
// should be removed and the blob links folder should be merged.
linkPath linkPathFunc
// linkDirectoryPathSpec locates the root directories in which one might find links
linkDirectoryPathSpec pathSpec
}
var _ distribution.BlobStore = &linkedBlobStore{}
func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (v1.Descriptor, error) {
return lbs.blobAccessController.Stat(ctx, dgst)
}
func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
canonical, err := lbs.Stat(ctx, dgst) // access check
if err != nil {
return nil, err
}
return lbs.blobStore.Get(ctx, canonical.Digest)
}
func (lbs *linkedBlobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) {
canonical, err := lbs.Stat(ctx, dgst) // access check
if err != nil {
return nil, err
}
return lbs.blobStore.Open(ctx, canonical.Digest)
}
func (lbs *linkedBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
canonical, err := lbs.Stat(ctx, dgst) // access check
if err != nil {
return err
}
if canonical.MediaType != "" {
// Set the repository local content type.
w.Header().Set("Content-Type", canonical.MediaType)
}
return lbs.blobServer.ServeBlob(ctx, w, r, canonical.Digest)
}
func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (v1.Descriptor, error) {
dgst := digest.FromBytes(p)
// Place the data in the blob store first.
desc, err := lbs.blobStore.Put(ctx, mediaType, p)
if err != nil {
dcontext.GetLogger(ctx).Errorf("error putting into main store: %v", err)
return v1.Descriptor{}, err
}
if err := lbs.blobAccessController.SetDescriptor(ctx, dgst, desc); err != nil {
return v1.Descriptor{}, err
}
// TODO(stevvooe): Write out mediatype if incoming differs from what is
// returned by Put above. Note that we should allow updates for a given
// repository.
return desc, lbs.linkBlob(ctx, desc)
}
type optionFunc func(interface{}) error
func (f optionFunc) Apply(v interface{}) error {
return f(v)
}
// WithMountFrom returns a BlobCreateOption which designates that the blob should be
// mounted from the given canonical reference.
func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption {
return optionFunc(func(v interface{}) error {
opts, ok := v.(*distribution.CreateOptions)
if !ok {
return fmt.Errorf("unexpected options type: %T", v)
}
opts.Mount.ShouldMount = true
opts.Mount.From = ref
return nil
})
}
// Create begins a blob write session, returning a handle.
func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
dcontext.GetLogger(ctx).Debug("(*linkedBlobStore).Create")
var opts distribution.CreateOptions
for _, option := range options {
err := option.Apply(&opts)
if err != nil {
return nil, err
}
}
if opts.Mount.ShouldMount {
desc, err := lbs.mount(ctx, opts.Mount.From, opts.Mount.From.Digest(), opts.Mount.Stat)
if err == nil {
// Mount successful, no need to initiate an upload session
return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc}
}
}
uuid := uuid.NewString()
startedAt := time.Now().UTC()
path, err := pathFor(uploadDataPathSpec{
name: lbs.repository.Named().Name(),
id: uuid,
})
if err != nil {
return nil, err
}
startedAtPath, err := pathFor(uploadStartedAtPathSpec{
name: lbs.repository.Named().Name(),
id: uuid,
})
if err != nil {
return nil, err
}
// Write a startedat file for this upload
if err := lbs.blobStore.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
return nil, err
}
return lbs.newBlobUpload(ctx, uuid, path, startedAt, false)
}
func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
dcontext.GetLogger(ctx).Debug("(*linkedBlobStore).Resume")
startedAtPath, err := pathFor(uploadStartedAtPathSpec{
name: lbs.repository.Named().Name(),
id: id,
})
if err != nil {
return nil, err
}
startedAtBytes, err := lbs.blobStore.driver.GetContent(ctx, startedAtPath)
if err != nil {
switch err := err.(type) {
case driver.PathNotFoundError:
return nil, distribution.ErrBlobUploadUnknown
default:
return nil, err
}
}
startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
if err != nil {
return nil, err
}
path, err := pathFor(uploadDataPathSpec{
name: lbs.repository.Named().Name(),
id: id,
})
if err != nil {
return nil, err
}
return lbs.newBlobUpload(ctx, id, path, startedAt, true)
}
func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
if !lbs.deleteEnabled {
return distribution.ErrUnsupported
}
// Ensure the blob is available for deletion
_, err := lbs.blobAccessController.Stat(ctx, dgst)
if err != nil {
return err
}
err = lbs.blobAccessController.Clear(ctx, dgst)
if err != nil {
return err
}
return nil
}
func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest.Digest) error) error {
rootPath, err := pathFor(lbs.linkDirectoryPathSpec)
if err != nil {
return err
}
return lbs.driver.Walk(ctx, rootPath, func(fileInfo driver.FileInfo) error {
// exit early if directory...
if fileInfo.IsDir() {
return nil
}
filePath := fileInfo.Path()
// check if it's a link
_, fileName := path.Split(filePath)
if fileName != "link" {
return nil
}
// read the digest found in link
digest, err := lbs.blobStore.readlink(ctx, filePath)
if err != nil {
return err
}
// ensure this conforms to the linkPathFns
_, err = lbs.Stat(ctx, digest)
if err != nil {
// we expect this error to occur so we move on
if err == distribution.ErrBlobUnknown {
return nil
}
return err
}
err = ingestor(digest)
if err != nil {
return err
}
return nil
})
}
func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest, sourceStat *v1.Descriptor) (v1.Descriptor, error) {
var stat v1.Descriptor
if sourceStat == nil {
// look up the blob info from the sourceRepo if not already provided
repo, err := lbs.registry.Repository(ctx, sourceRepo)
if err != nil {
return v1.Descriptor{}, err
}
stat, err = repo.Blobs(ctx).Stat(ctx, dgst)
if err != nil {
return v1.Descriptor{}, err
}
} else {
// use the provided blob info
stat = *sourceStat
}
desc := v1.Descriptor{
Size: stat.Size,
// NOTE(stevvooe): The central blob store firewalls media types from
// other users. The caller should look this up and override the value
// for the specific repository.
MediaType: "application/octet-stream",
Digest: dgst,
}
return desc, lbs.linkBlob(ctx, desc)
}
// newBlobUpload allocates a new upload controller with the given state.
func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time, append bool) (distribution.BlobWriter, error) {
fw, err := lbs.driver.Writer(ctx, path, append)
if err != nil {
return nil, err
}
bw := &blobWriter{
ctx: ctx,
blobStore: lbs,
id: uuid,
startedAt: startedAt,
digester: digest.Canonical.Digester(),
fileWriter: fw,
driver: lbs.driver,
path: path,
resumableDigestEnabled: lbs.resumableDigestEnabled,
}
return bw, nil
}
// linkBlob links a valid, written blob into the registry under the named
// repository for the upload controller.
func (lbs *linkedBlobStore) linkBlob(ctx context.Context, canonical v1.Descriptor, aliases ...digest.Digest) error {
dgsts := append([]digest.Digest{canonical.Digest}, aliases...)
// TODO(stevvooe): Need to write out mediatype for only canonical hash
// since we don't care about the aliases. They are generally unused except
// for tarsum but those versions don't care about mediatype.
// Don't make duplicate links.
seenDigests := make(map[digest.Digest]struct{}, len(dgsts))
for _, dgst := range dgsts {
if _, seen := seenDigests[dgst]; seen {
continue
}
seenDigests[dgst] = struct{}{}
blobLinkPath, err := lbs.linkPath(lbs.repository.Named().Name(), dgst)
if err != nil {
return err
}
if err := lbs.blobStore.link(ctx, blobLinkPath, canonical.Digest); err != nil {
return err
}
}
return nil
}
type linkedBlobStatter struct {
*blobStore
repository distribution.Repository
// linkPath allows one to control the repository blob link set to which
// the blob store dispatches. This is required because manifest and layer
// blobs have not yet been fully merged. At some point, this functionality
// should be removed an the blob links folder should be merged.
linkPath linkPathFunc
}
var _ distribution.BlobDescriptorService = &linkedBlobStatter{}
func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (v1.Descriptor, error) {
blobLinkPath, err := lbs.linkPath(lbs.repository.Named().Name(), dgst)
if err != nil {
return v1.Descriptor{}, err
}
target, err := lbs.blobStore.readlink(ctx, blobLinkPath)
if err != nil {
switch err := err.(type) {
case driver.PathNotFoundError:
return v1.Descriptor{}, distribution.ErrBlobUnknown
default:
return v1.Descriptor{}, err
}
}
if target != dgst {
// Track when we are doing cross-digest domain lookups. ie, sha512 to sha256.
dcontext.GetLogger(ctx).Warnf("looking up blob with canonical target: %v -> %v", dgst, target)
}
// TODO(stevvooe): Look up repository local mediatype and replace that on
// the returned descriptor.
return lbs.blobStore.statter.Stat(ctx, target)
}
func (lbs *linkedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) (err error) {
blobLinkPath, err := lbs.linkPath(lbs.repository.Named().Name(), dgst)
if err != nil {
return err
}
return lbs.blobStore.driver.Delete(ctx, blobLinkPath)
}
func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc v1.Descriptor) error {
// The canonical descriptor for a blob is set at the commit phase of upload
return nil
}
// blobLinkPath provides the path to the blob link, also known as layers.
func blobLinkPath(name string, dgst digest.Digest) (string, error) {
return pathFor(layerLinkPathSpec{name: name, digest: dgst})
}
// manifestRevisionLinkPath provides the path to the manifest revision link.
func manifestRevisionLinkPath(name string, dgst digest.Digest) (string, error) {
return pathFor(manifestRevisionLinkPathSpec{name: name, revision: dgst})
}
package storage
import (
"context"
"fmt"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/manifest/manifestlist"
"github.com/distribution/distribution/v3/manifest/ocischema"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)
// manifestListHandler is a ManifestHandler that covers schema2 manifest lists.
type manifestListHandler struct {
repository distribution.Repository
blobStore distribution.BlobStore
ctx context.Context
validateImageIndexes validateImageIndexes
}
var _ ManifestHandler = &manifestListHandler{}
func (ms *manifestListHandler) Unmarshal(ctx context.Context, dgst digest.Digest, content []byte) (distribution.Manifest, error) {
dcontext.GetLogger(ms.ctx).Debug("(*manifestListHandler).Unmarshal")
m := &manifestlist.DeserializedManifestList{}
if err := m.UnmarshalJSON(content); err != nil {
return nil, err
}
return m, nil
}
func (ms *manifestListHandler) Put(ctx context.Context, manifestList distribution.Manifest, skipDependencyVerification bool) (digest.Digest, error) {
dcontext.GetLogger(ms.ctx).Debug("(*manifestListHandler).Put")
var schemaVersion int
switch m := manifestList.(type) {
case *manifestlist.DeserializedManifestList:
schemaVersion = m.SchemaVersion
case *ocischema.DeserializedImageIndex:
schemaVersion = m.SchemaVersion
default:
return "", fmt.Errorf("wrong type put to manifestListHandler: %T", manifestList)
}
const expectedSchemaVersion = 2
if schemaVersion != expectedSchemaVersion {
return "", fmt.Errorf("unrecognized manifest list schema version %d, expected %d", schemaVersion, expectedSchemaVersion)
}
if err := ms.verifyManifest(ms.ctx, manifestList, skipDependencyVerification); err != nil {
return "", err
}
mt, payload, err := manifestList.Payload()
if err != nil {
return "", err
}
revision, err := ms.blobStore.Put(ctx, mt, payload)
if err != nil {
dcontext.GetLogger(ctx).Errorf("error putting payload into blobstore: %v", err)
return "", err
}
return revision.Digest, nil
}
// verifyManifest ensures that the manifest content is valid from the
// perspective of the registry. As a policy, the registry only tries to
// store valid content, leaving trust policies of that content up to
// consumers.
func (ms *manifestListHandler) verifyManifest(ctx context.Context, mnfst distribution.Manifest, skipDependencyVerification bool) error {
var errs distribution.ErrManifestVerification
// Check if we should be validating the existence of any child images in images indexes
if ms.validateImageIndexes.imagesExist && !skipDependencyVerification {
// Get the manifest service we can use to check for the existence of child images
manifestService, err := ms.repository.Manifests(ctx)
if err != nil {
return err
}
for _, manifestDescriptor := range mnfst.References() {
if ms.platformMustExist(manifestDescriptor) {
exists, err := manifestService.Exists(ctx, manifestDescriptor.Digest)
if err != nil && err != distribution.ErrBlobUnknown {
errs = append(errs, err)
}
if err != nil || !exists {
// On error here, we always append unknown blob errors.
errs = append(errs, distribution.ErrManifestBlobUnknown{Digest: manifestDescriptor.Digest})
}
}
}
}
if len(errs) != 0 {
return errs
}
return nil
}
// platformMustExist checks if a descriptor within an index should be validated as existing before accepting the manifest into the registry.
func (ms *manifestListHandler) platformMustExist(descriptor v1.Descriptor) bool {
// If there are no image platforms configured to validate, we must check the existence of all child images.
if len(ms.validateImageIndexes.imagePlatforms) == 0 {
return true
}
imagePlatform := descriptor.Platform
// If the platform matches a platform that is configured to validate, we must check the existence.
for _, platform := range ms.validateImageIndexes.imagePlatforms {
if imagePlatform.Architecture == platform.architecture &&
imagePlatform.OS == platform.os {
return true
}
}
// If the platform doesn't match a platform configured to validate, we don't need to check the existence.
return false
}
package storage
import (
"context"
"encoding/json"
"fmt"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/manifest/manifestlist"
"github.com/distribution/distribution/v3/manifest/ocischema"
"github.com/distribution/distribution/v3/manifest/schema2"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)
// A ManifestHandler gets and puts manifests of a particular type.
type ManifestHandler interface {
// Unmarshal unmarshals the manifest from a byte slice.
Unmarshal(ctx context.Context, dgst digest.Digest, content []byte) (distribution.Manifest, error)
// Put creates or updates the given manifest returning the manifest digest.
Put(ctx context.Context, manifest distribution.Manifest, skipDependencyVerification bool) (digest.Digest, error)
}
// SkipLayerVerification allows a manifest to be Put before its
// layers are on the filesystem
func SkipLayerVerification() distribution.ManifestServiceOption {
return skipLayerOption{}
}
type skipLayerOption struct{}
func (o skipLayerOption) Apply(m distribution.ManifestService) error {
if ms, ok := m.(*manifestStore); ok {
ms.skipDependencyVerification = true
return nil
}
return fmt.Errorf("skip layer verification only valid for manifestStore")
}
type manifestStore struct {
repository *repository
blobStore *linkedBlobStore
ctx context.Context
skipDependencyVerification bool
schema2Handler ManifestHandler
manifestListHandler ManifestHandler
ocischemaHandler ManifestHandler
ocischemaIndexHandler ManifestHandler
}
var _ distribution.ManifestService = &manifestStore{}
func (ms *manifestStore) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
dcontext.GetLogger(ms.ctx).Debug("(*manifestStore).Exists")
_, err := ms.blobStore.Stat(ms.ctx, dgst)
if err != nil {
if err == distribution.ErrBlobUnknown {
return false, nil
}
return false, err
}
return true, nil
}
func (ms *manifestStore) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
dcontext.GetLogger(ms.ctx).Debug("(*manifestStore).Get")
// TODO(stevvooe): Need to check descriptor from above to ensure that the
// mediatype is as we expect for the manifest store.
content, err := ms.blobStore.Get(ctx, dgst)
if err != nil {
if err == distribution.ErrBlobUnknown {
return nil, distribution.ErrManifestUnknownRevision{
Name: ms.repository.Named().Name(),
Revision: dgst,
}
}
return nil, err
}
// versioned is a minimal representation of a manifest with version and mediatype.
var versioned struct {
specs.Versioned
// MediaType is the media type of this schema.
MediaType string `json:"mediaType,omitempty"`
}
if err = json.Unmarshal(content, &versioned); err != nil {
return nil, err
}
switch versioned.SchemaVersion {
case 2:
// This can be an image manifest or a manifest list
switch versioned.MediaType {
case schema2.MediaTypeManifest:
return ms.schema2Handler.Unmarshal(ctx, dgst, content)
case v1.MediaTypeImageManifest:
return ms.ocischemaHandler.Unmarshal(ctx, dgst, content)
case manifestlist.MediaTypeManifestList:
return ms.manifestListHandler.Unmarshal(ctx, dgst, content)
case v1.MediaTypeImageIndex:
return ms.ocischemaIndexHandler.Unmarshal(ctx, dgst, content)
case "":
// OCI image or image index - no media type in the content
// First see if it looks like an image index
res, err := ms.ocischemaIndexHandler.Unmarshal(ctx, dgst, content)
resIndex := res.(*ocischema.DeserializedImageIndex)
if err == nil && resIndex.Manifests != nil {
return resIndex, nil
}
// Otherwise, assume it must be an image manifest
return ms.ocischemaHandler.Unmarshal(ctx, dgst, content)
default:
return nil, distribution.ErrManifestVerification{fmt.Errorf("unrecognized manifest content type %s", versioned.MediaType)}
}
}
return nil, fmt.Errorf("unrecognized manifest schema version %d", versioned.SchemaVersion)
}
func (ms *manifestStore) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
dcontext.GetLogger(ms.ctx).Debug("(*manifestStore).Put")
switch manifest.(type) {
case *schema2.DeserializedManifest:
return ms.schema2Handler.Put(ctx, manifest, ms.skipDependencyVerification)
case *ocischema.DeserializedManifest:
return ms.ocischemaHandler.Put(ctx, manifest, ms.skipDependencyVerification)
case *manifestlist.DeserializedManifestList:
return ms.manifestListHandler.Put(ctx, manifest, ms.skipDependencyVerification)
case *ocischema.DeserializedImageIndex:
return ms.ocischemaIndexHandler.Put(ctx, manifest, ms.skipDependencyVerification)
}
return "", fmt.Errorf("unrecognized manifest type %T", manifest)
}
// Delete removes the revision of the specified manifest.
func (ms *manifestStore) Delete(ctx context.Context, dgst digest.Digest) error {
dcontext.GetLogger(ms.ctx).Debug("(*manifestStore).Delete")
return ms.blobStore.Delete(ctx, dgst)
}
func (ms *manifestStore) Enumerate(ctx context.Context, ingester func(digest.Digest) error) error {
err := ms.blobStore.Enumerate(ctx, func(dgst digest.Digest) error {
err := ingester(dgst)
if err != nil {
return err
}
return nil
})
return err
}
package storage
import (
"context"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/manifest/ocischema"
"github.com/opencontainers/go-digest"
)
// ocischemaIndexHandler is a ManifestHandler that covers the OCI Image Index.
type ocischemaIndexHandler struct {
*manifestListHandler
}
var _ ManifestHandler = &manifestListHandler{}
func (ms *ocischemaIndexHandler) Unmarshal(ctx context.Context, dgst digest.Digest, content []byte) (distribution.Manifest, error) {
dcontext.GetLogger(ms.ctx).Debug("(*ociIndexHandler).Unmarshal")
m := &ocischema.DeserializedImageIndex{}
if err := m.UnmarshalJSON(content); err != nil {
return nil, err
}
return m, nil
}
package storage
import (
"context"
"fmt"
"net/url"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/manifest/ocischema"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)
// ocischemaManifestHandler is a ManifestHandler that covers ocischema manifests.
type ocischemaManifestHandler struct {
repository distribution.Repository
blobStore distribution.BlobStore
ctx context.Context
manifestURLs manifestURLs
}
var _ ManifestHandler = &ocischemaManifestHandler{}
func (ms *ocischemaManifestHandler) Unmarshal(ctx context.Context, dgst digest.Digest, content []byte) (distribution.Manifest, error) {
dcontext.GetLogger(ms.ctx).Debug("(*ocischemaManifestHandler).Unmarshal")
m := &ocischema.DeserializedManifest{}
if err := m.UnmarshalJSON(content); err != nil {
return nil, err
}
return m, nil
}
func (ms *ocischemaManifestHandler) Put(ctx context.Context, manifest distribution.Manifest, skipDependencyVerification bool) (digest.Digest, error) {
dcontext.GetLogger(ms.ctx).Debug("(*ocischemaManifestHandler).Put")
m, ok := manifest.(*ocischema.DeserializedManifest)
if !ok {
return "", fmt.Errorf("non-ocischema manifest put to ocischemaManifestHandler: %T", manifest)
}
if err := ms.verifyManifest(ms.ctx, *m, skipDependencyVerification); err != nil {
return "", err
}
mt, payload, err := m.Payload()
if err != nil {
return "", err
}
revision, err := ms.blobStore.Put(ctx, mt, payload)
if err != nil {
dcontext.GetLogger(ctx).Errorf("error putting payload into blobstore: %v", err)
return "", err
}
return revision.Digest, nil
}
// verifyManifest ensures that the manifest content is valid from the
// perspective of the registry. As a policy, the registry only tries to store
// valid content, leaving trust policies of that content up to consumers.
func (ms *ocischemaManifestHandler) verifyManifest(ctx context.Context, mnfst ocischema.DeserializedManifest, skipDependencyVerification bool) error {
var errs distribution.ErrManifestVerification
if mnfst.Manifest.SchemaVersion != 2 {
return fmt.Errorf("unrecognized manifest schema version %d", mnfst.Manifest.SchemaVersion)
}
if skipDependencyVerification {
return nil
}
manifestService, err := ms.repository.Manifests(ctx)
if err != nil {
return err
}
blobsService := ms.repository.Blobs(ctx)
for _, descriptor := range mnfst.References() {
err := descriptor.Digest.Validate()
if err != nil {
errs = append(errs, err, distribution.ErrManifestBlobUnknown{Digest: descriptor.Digest})
continue
}
switch descriptor.MediaType {
case v1.MediaTypeImageLayer, v1.MediaTypeImageLayerGzip, v1.MediaTypeImageLayerNonDistributable, v1.MediaTypeImageLayerNonDistributableGzip: //nolint:staticcheck // ignore A1019: v1.MediaTypeImageLayerNonDistributable is deprecated: Non-distributable layers are deprecated, and not recommended for future use.
allow := ms.manifestURLs.allow
deny := ms.manifestURLs.deny
for _, u := range descriptor.URLs {
var pu *url.URL
pu, err = url.Parse(u)
if err != nil || (pu.Scheme != "http" && pu.Scheme != "https") || pu.Fragment != "" || (allow != nil && !allow.MatchString(u)) || (deny != nil && deny.MatchString(u)) {
err = errInvalidURL
break
}
}
if err == nil {
// check the presence if it is normal layer or
// there is no urls for non-distributable
if len(descriptor.URLs) == 0 ||
(descriptor.MediaType == v1.MediaTypeImageLayer || descriptor.MediaType == v1.MediaTypeImageLayerGzip) {
_, err = blobsService.Stat(ctx, descriptor.Digest)
}
}
case v1.MediaTypeImageManifest:
var exists bool
exists, err = manifestService.Exists(ctx, descriptor.Digest)
if err != nil || !exists {
err = distribution.ErrBlobUnknown // just coerce to unknown.
}
if err != nil {
dcontext.GetLogger(ms.ctx).WithError(err).Debugf("failed to ensure exists of %v in manifest service", descriptor.Digest)
}
fallthrough // double check the blob store.
default:
// check the presence
_, err = blobsService.Stat(ctx, descriptor.Digest)
}
if err != nil {
if err != distribution.ErrBlobUnknown {
errs = append(errs, err)
}
// On error here, we always append unknown blob errors.
errs = append(errs, distribution.ErrManifestBlobUnknown{Digest: descriptor.Digest})
}
}
if len(errs) != 0 {
return errs
}
return nil
}
package storage
import (
"fmt"
"path"
"strings"
"github.com/opencontainers/go-digest"
)
const (
storagePathVersion = "v2" // fixed storage layout version
storagePathRoot = "/docker/registry/" // all driver paths have a prefix
// TODO(stevvooe): Get rid of the "storagePathRoot". Initially, we though
// storage path root would configurable for all drivers through this
// package. In reality, we've found it simpler to do this on a per driver
// basis.
)
// pathFor maps paths based on "object names" and their ids. The "object
// names" mapped by are internal to the storage system.
//
// The path layout in the storage backend is roughly as follows:
//
// <root>/v2
// βββ blobs
// β βββ <algorithm>
// β βββ <split directory content addressable storage>
// βββ repositories
// βββ <name>
// βββ _layers
// β βββ <layer links to blob store>
// βββ _manifests
// β βββ revisions
// β β βββ <manifest digest path>
// β β βββ link
// β βββ tags
// β βββ <tag>
// β βββ current
// β β βββ link
// β βββ index
// β βββ <algorithm>
// β βββ <hex digest>
// β βββ link
// βββ _uploads
// βββ <id>
// βββ data
// βββ hashstates
// β βββ <algorithm>
// β βββ <offset>
// βββ startedat
//
// The storage backend layout is broken up into a content-addressable blob
// store and repositories. The content-addressable blob store holds most data
// throughout the backend, keyed by algorithm and digests of the underlying
// content. Access to the blob store is controlled through links from the
// repository to blobstore.
//
// A repository is made up of layers, manifests and tags. The layers component
// is just a directory of layers which are "linked" into a repository. A layer
// can only be accessed through a qualified repository name if it is linked in
// the repository. Uploads of layers are managed in the uploads directory,
// which is key by upload id. When all data for an upload is received, the
// data is moved into the blob store and the upload directory is deleted.
// Abandoned uploads can be garbage collected by reading the startedat file
// and removing uploads that have been active for longer than a certain time.
//
// The third component of the repository directory is the manifests store,
// which is made up of a revision store and tag store. Manifests are stored in
// the blob store and linked into the revision store.
// While the registry can save all revisions of a manifest, no relationship is
// implied as to the ordering of changes to a manifest. The tag store provides
// support for name, tag lookups of manifests, using "current/link" under a
// named tag directory. An index is maintained to support deletions of all
// revisions of a given manifest tag.
//
// We cover the path formats implemented by this path mapper below.
//
// Repositories:
//
// repositoriesRootPathSpec: <root>/v2/repositories
//
// Manifests:
//
// manifestsPathSpec: <root>/v2/repositories/<name>/_manifests
// manifestRevisionsPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/
// manifestRevisionPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/
// manifestRevisionLinkPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/link
//
// Tags:
//
// manifestTagsPathSpec: <root>/v2/repositories/<name>/_manifests/tags/
// manifestTagPathSpec: <root>/v2/repositories/<name>/_manifests/tags/<tag>/
// manifestTagCurrentPathSpec: <root>/v2/repositories/<name>/_manifests/tags/<tag>/current/link
// manifestTagIndexPathSpec: <root>/v2/repositories/<name>/_manifests/tags/<tag>/index/
// manifestTagIndexEntryPathSpec: <root>/v2/repositories/<name>/_manifests/tags/<tag>/index/<algorithm>/<hex digest>/
// manifestTagIndexEntryLinkPathSpec: <root>/v2/repositories/<name>/_manifests/tags/<tag>/index/<algorithm>/<hex digest>/link
//
// Blobs:
//
// layerLinkPathSpec: <root>/v2/repositories/<name>/_layers/<algorithm>/<hex digest>/link
// layersPathSpec: <root>/v2/repositories/<name>/_layers
//
// Uploads:
//
// uploadDataPathSpec: <root>/v2/repositories/<name>/_uploads/<id>/data
// uploadStartedAtPathSpec: <root>/v2/repositories/<name>/_uploads/<id>/startedat
// uploadHashStatePathSpec: <root>/v2/repositories/<name>/_uploads/<id>/hashstates/<algorithm>/<offset>
//
// Blob Store:
//
// blobsPathSpec: <root>/v2/blobs/
// blobPathSpec: <root>/v2/blobs/<algorithm>/<first two hex bytes of digest>/<hex digest>
// blobDataPathSpec: <root>/v2/blobs/<algorithm>/<first two hex bytes of digest>/<hex digest>/data
//
// For more information on the semantic meaning of each path and their
// contents, please see the path spec documentation.
func pathFor(spec pathSpec) (string, error) {
// Switch on the path object type and return the appropriate path. At
// first glance, one may wonder why we don't use an interface to
// accomplish this. By keep the formatting separate from the pathSpec, we
// keep separate the path generation componentized. These specs could be
// passed to a completely different mapper implementation and generate a
// different set of paths.
//
// For example, imagine migrating from one backend to the other: one could
// build a filesystem walker that converts a string path in one version,
// to an intermediate path object, than can be consumed and mapped by the
// other version.
rootPrefix := []string{storagePathRoot, storagePathVersion}
repoPrefix := append(rootPrefix, "repositories")
switch v := spec.(type) {
case manifestsPathSpec:
return path.Join(append(repoPrefix, v.name, "_manifests")...), nil
case manifestRevisionsPathSpec:
return path.Join(append(repoPrefix, v.name, "_manifests", "revisions")...), nil
case manifestRevisionPathSpec:
components, err := digestPathComponents(v.revision, false)
if err != nil {
return "", err
}
return path.Join(append(append(repoPrefix, v.name, "_manifests", "revisions"), components...)...), nil
case manifestRevisionLinkPathSpec:
root, err := pathFor(manifestRevisionPathSpec(v))
if err != nil {
return "", err
}
return path.Join(root, "link"), nil
case manifestTagsPathSpec:
return path.Join(append(repoPrefix, v.name, "_manifests", "tags")...), nil
case manifestTagPathSpec:
root, err := pathFor(manifestTagsPathSpec{
name: v.name,
})
if err != nil {
return "", err
}
return path.Join(root, v.tag), nil
case manifestTagCurrentPathSpec:
root, err := pathFor(manifestTagPathSpec(v))
if err != nil {
return "", err
}
return path.Join(root, "current", "link"), nil
case manifestTagIndexPathSpec:
root, err := pathFor(manifestTagPathSpec(v))
if err != nil {
return "", err
}
return path.Join(root, "index"), nil
case manifestTagIndexEntryLinkPathSpec:
root, err := pathFor(manifestTagIndexEntryPathSpec(v))
if err != nil {
return "", err
}
return path.Join(root, "link"), nil
case manifestTagIndexEntryPathSpec:
root, err := pathFor(manifestTagIndexPathSpec{
name: v.name,
tag: v.tag,
})
if err != nil {
return "", err
}
components, err := digestPathComponents(v.revision, false)
if err != nil {
return "", err
}
return path.Join(root, path.Join(components...)), nil
case layerLinkPathSpec:
components, err := digestPathComponents(v.digest, false)
if err != nil {
return "", err
}
// TODO(stevvooe): Right now, all blobs are linked under "_layers". If
// we have future migrations, we may want to rename this to "_blobs".
// A migration strategy would simply leave existing items in place and
// write the new paths, commit a file then delete the old files.
blobLinkPathComponents := append(repoPrefix, v.name, "_layers")
return path.Join(path.Join(append(blobLinkPathComponents, components...)...), "link"), nil
case layersPathSpec:
return path.Join(append(repoPrefix, v.name, "_layers")...), nil
case blobsPathSpec:
blobsPathPrefix := append(rootPrefix, "blobs")
return path.Join(blobsPathPrefix...), nil
case blobPathSpec:
components, err := digestPathComponents(v.digest, true)
if err != nil {
return "", err
}
blobPathPrefix := append(rootPrefix, "blobs")
return path.Join(append(blobPathPrefix, components...)...), nil
case blobDataPathSpec:
components, err := digestPathComponents(v.digest, true)
if err != nil {
return "", err
}
components = append(components, "data")
blobPathPrefix := append(rootPrefix, "blobs")
return path.Join(append(blobPathPrefix, components...)...), nil
case uploadDataPathSpec:
return path.Join(append(repoPrefix, v.name, "_uploads", v.id, "data")...), nil
case uploadStartedAtPathSpec:
return path.Join(append(repoPrefix, v.name, "_uploads", v.id, "startedat")...), nil
case uploadHashStatePathSpec:
offset := fmt.Sprintf("%d", v.offset)
if v.list {
offset = "" // Limit to the prefix for listing offsets.
}
return path.Join(append(repoPrefix, v.name, "_uploads", v.id, "hashstates", string(v.alg), offset)...), nil
case repositoriesRootPathSpec:
return path.Join(repoPrefix...), nil
default:
// TODO(sday): This is an internal error. Ensure it doesn't escape (panic?).
return "", fmt.Errorf("unknown path spec: %#v", v)
}
}
// pathSpec is a type to mark structs as path specs. There is no
// implementation because we'd like to keep the specs and the mappers
// decoupled.
type pathSpec interface {
pathSpec()
}
// manifestPathSpec describes the directory path for a manifest.
type manifestsPathSpec struct {
name string
}
func (manifestsPathSpec) pathSpec() {}
// manifestRevisionsPathSpec describes the directory path for
// a manifest revision.
type manifestRevisionsPathSpec struct {
name string
}
func (manifestRevisionsPathSpec) pathSpec() {}
// manifestRevisionPathSpec describes the components of the directory path for
// a manifest revision.
type manifestRevisionPathSpec struct {
name string
revision digest.Digest
}
func (manifestRevisionPathSpec) pathSpec() {}
// manifestRevisionLinkPathSpec describes the path components required to look
// up the data link for a revision of a manifest. If this file is not present,
// the manifest blob is not available in the given repo. The contents of this
// file should just be the digest.
type manifestRevisionLinkPathSpec struct {
name string
revision digest.Digest
}
func (manifestRevisionLinkPathSpec) pathSpec() {}
// manifestTagsPathSpec describes the path elements required to point to the
// manifest tags directory.
type manifestTagsPathSpec struct {
name string
}
func (manifestTagsPathSpec) pathSpec() {}
// manifestTagPathSpec describes the path elements required to point to the
// manifest tag links files under a repository. These contain a blob id that
// can be used to look up the data and signatures.
type manifestTagPathSpec struct {
name string
tag string
}
func (manifestTagPathSpec) pathSpec() {}
// manifestTagCurrentPathSpec describes the link to the current revision for a
// given tag.
type manifestTagCurrentPathSpec struct {
name string
tag string
}
func (manifestTagCurrentPathSpec) pathSpec() {}
// manifestTagCurrentPathSpec describes the link to the index of revisions
// with the given tag.
type manifestTagIndexPathSpec struct {
name string
tag string
}
func (manifestTagIndexPathSpec) pathSpec() {}
// manifestTagIndexEntryPathSpec contains the entries of the index by revision.
type manifestTagIndexEntryPathSpec struct {
name string
tag string
revision digest.Digest
}
func (manifestTagIndexEntryPathSpec) pathSpec() {}
// manifestTagIndexEntryLinkPathSpec describes the link to a revisions of a
// manifest with given tag within the index.
type manifestTagIndexEntryLinkPathSpec struct {
name string
tag string
revision digest.Digest
}
func (manifestTagIndexEntryLinkPathSpec) pathSpec() {}
// layersPathSpec contains the path for the layers inside a repo
type layersPathSpec struct {
name string
}
func (layersPathSpec) pathSpec() {}
// layerLinkPathSpec specifies a path for a blob link, which is a file with a
// blob id. The blob link will contain a content addressable blob id reference
// into the blob store. The format of the contents is as follows:
//
// <algorithm>:<hex digest of layer data>
//
// The following example of the file contents is more illustrative:
//
// sha256:96443a84ce518ac22acb2e985eda402b58ac19ce6f91980bde63726a79d80b36
//
// This indicates that there is a blob with the id/digest, calculated via
// sha256 that can be fetched from the blob store.
type layerLinkPathSpec struct {
name string
digest digest.Digest
}
func (layerLinkPathSpec) pathSpec() {}
// blobAlgorithmReplacer does some very simple path sanitization for user
// input. Paths should be "safe" before getting this far due to strict digest
// requirements but we can add further path conversion here, if needed.
var blobAlgorithmReplacer = strings.NewReplacer(
"+", "/",
".", "/",
";", "/",
)
// blobsPathSpec contains the path for the blobs directory
type blobsPathSpec struct{}
func (blobsPathSpec) pathSpec() {}
// blobPathSpec contains the path for the registry global blob store.
type blobPathSpec struct {
digest digest.Digest
}
func (blobPathSpec) pathSpec() {}
// blobDataPathSpec contains the path for the registry global blob store. For
// now, this contains layer data, exclusively.
type blobDataPathSpec struct {
digest digest.Digest
}
func (blobDataPathSpec) pathSpec() {}
// uploadDataPathSpec defines the path parameters of the data file for
// uploads.
type uploadDataPathSpec struct {
name string
id string
}
func (uploadDataPathSpec) pathSpec() {}
// uploadStartedAtPathSpec defines the path parameters for the file that stores the
// start time of an uploads. If it is missing, the upload is considered
// unknown. Admittedly, the presence of this file is an ugly hack to make sure
// we have a way to cleanup old or stalled uploads that doesn't rely on driver
// FileInfo behavior. If we come up with a more clever way to do this, we
// should remove this file immediately and rely on the startetAt field from
// the client to enforce time out policies.
type uploadStartedAtPathSpec struct {
name string
id string
}
func (uploadStartedAtPathSpec) pathSpec() {}
// uploadHashStatePathSpec defines the path parameters for the file that stores
// the hash function state of an upload at a specific byte offset. If `list` is
// set, then the path mapper will generate a list prefix for all hash state
// offsets for the upload identified by the name, id, and alg.
type uploadHashStatePathSpec struct {
name string
id string
alg digest.Algorithm
offset int64
list bool
}
func (uploadHashStatePathSpec) pathSpec() {}
// repositoriesRootPathSpec returns the root of repositories
type repositoriesRootPathSpec struct{}
func (repositoriesRootPathSpec) pathSpec() {}
// digestPathComponents provides a consistent path breakdown for a given
// digest. For a generic digest, it will be as follows:
//
// <algorithm>/<hex digest>
//
// If multilevel is true, the first two bytes of the digest will separate
// groups of digest folder. It will be as follows:
//
// <algorithm>/<first two bytes of digest>/<full digest>
func digestPathComponents(dgst digest.Digest, multilevel bool) ([]string, error) {
if err := dgst.Validate(); err != nil {
return nil, err
}
algorithm := blobAlgorithmReplacer.Replace(string(dgst.Algorithm()))
hex := dgst.Encoded()
prefix := []string{algorithm}
var suffix []string
if multilevel {
suffix = append(suffix, hex[:2])
}
suffix = append(suffix, hex)
return append(prefix, suffix...), nil
}
// Reconstructs a digest from a path
func digestFromPath(digestPath string) (digest.Digest, error) {
digestPath = strings.TrimSuffix(digestPath, "/data")
dir, hex := path.Split(digestPath)
dir = path.Dir(dir)
dir, next := path.Split(dir)
// next is either the algorithm OR the first two characters in the hex string
var algo string
if next == hex[:2] {
algo = path.Base(dir)
} else {
algo = next
}
dgst := digest.NewDigestFromEncoded(digest.Algorithm(algo), hex)
return dgst, dgst.Validate()
}
package storage
import (
"context"
"path"
"strings"
"time"
storageDriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
// uploadData stored the location of temporary files created during a layer upload
// along with the date the upload was started
type uploadData struct {
containingDir string
startedAt time.Time
}
func newUploadData() uploadData {
return uploadData{
containingDir: "",
// default to far in future to protect against missing startedat
startedAt: time.Now().Add(10000 * time.Hour),
}
}
// PurgeUploads deletes files from the upload directory
// created before olderThan. The list of files deleted and errors
// encountered are returned
func PurgeUploads(ctx context.Context, driver storageDriver.StorageDriver, olderThan time.Time, actuallyDelete bool) ([]string, []error) {
logrus.Infof("PurgeUploads starting: olderThan=%s, actuallyDelete=%t", olderThan, actuallyDelete)
uploadData, errors := getOutstandingUploads(ctx, driver)
var deleted []string
for _, uploadData := range uploadData {
if uploadData.startedAt.Before(olderThan) {
var err error
logrus.Infof("Upload files in %s have older date (%s) than purge date (%s). Removing upload directory.",
uploadData.containingDir, uploadData.startedAt, olderThan)
if actuallyDelete {
err = driver.Delete(ctx, uploadData.containingDir)
}
if err == nil {
deleted = append(deleted, uploadData.containingDir)
} else {
errors = append(errors, err)
}
}
}
logrus.Infof("Purge uploads finished. Num deleted=%d, num errors=%d", len(deleted), len(errors))
return deleted, errors
}
// getOutstandingUploads walks the upload directory, collecting files
// which could be eligible for deletion. The only reliable way to
// classify the age of a file is with the date stored in the startedAt
// file, so gather files by UUID with a date from startedAt.
func getOutstandingUploads(ctx context.Context, driver storageDriver.StorageDriver) (map[string]uploadData, []error) {
var errors []error
uploads := make(map[string]uploadData)
inUploadDir := false
root, err := pathFor(repositoriesRootPathSpec{})
if err != nil {
return uploads, append(errors, err)
}
err = driver.Walk(ctx, root, func(fileInfo storageDriver.FileInfo) error {
filePath := fileInfo.Path()
_, file := path.Split(filePath)
if file[0] == '_' {
// Reserved directory
inUploadDir = (file == "_uploads")
if fileInfo.IsDir() && !inUploadDir {
return storageDriver.ErrSkipDir
}
}
uuid, isContainingDir := uuidFromPath(filePath)
if uuid == "" {
// Cannot reliably delete
return nil
}
ud, ok := uploads[uuid]
if !ok {
ud = newUploadData()
}
if isContainingDir {
ud.containingDir = filePath
}
if file == "startedat" {
if t, err := readStartedAtFile(ctx, driver, filePath); err == nil {
ud.startedAt = t
} else {
errors = pushError(errors, filePath, err)
}
}
uploads[uuid] = ud
return nil
})
if err != nil {
errors = pushError(errors, root, err)
}
return uploads, errors
}
// uuidFromPath extracts the upload UUID from a given path
// If the UUID is the last path component, this is the containing
// directory for all upload files
func uuidFromPath(path string) (string, bool) {
components := strings.Split(path, "/")
for i := len(components) - 1; i >= 0; i-- {
if u, err := uuid.Parse(components[i]); err == nil {
return u.String(), i == len(components)-1
}
}
return "", false
}
// readStartedAtFile reads the date from an upload's startedAtFile
func readStartedAtFile(ctx context.Context, driver storageDriver.StorageDriver, path string) (time.Time, error) {
startedAtBytes, err := driver.GetContent(ctx, path)
if err != nil {
return time.Now(), err
}
startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
if err != nil {
return time.Now(), err
}
return startedAt, nil
}
package storage
import (
"context"
"regexp"
"runtime"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/registry/storage/cache"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
)
var (
DefaultConcurrencyLimit = runtime.GOMAXPROCS(0)
)
// registry is the top-level implementation of Registry for use in the storage
// package. All instances should descend from this object.
type registry struct {
blobStore *blobStore
blobServer *blobServer
statter *blobStatter // global statter service.
blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider
deleteEnabled bool
tagLookupConcurrencyLimit int
resumableDigestEnabled bool
blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory
driver storagedriver.StorageDriver
// Validation
manifestURLs manifestURLs
validateImageIndexes validateImageIndexes
}
// manifestURLs holds regular expressions for controlling manifest URL whitelisting
type manifestURLs struct {
allow *regexp.Regexp
deny *regexp.Regexp
}
// validateImageIndexImages holds configuration for validation of image indexes
type validateImageIndexes struct {
// exist can be used to disable checking that platform images exist entirely. Default true.
imagesExist bool
// platforms can be used to only validate the existence of images for a set of platforms. The empty array means validate all platforms.
imagePlatforms []platform
}
// platform represents a platform to validate exists in the
type platform struct {
architecture string
os string
}
// RegistryOption is the type used for functional options for NewRegistry.
type RegistryOption func(*registry) error
// EnableRedirect is a functional option for NewRegistry. It causes the backend
// blob server to attempt using (StorageDriver).RedirectURL to serve all blobs.
func EnableRedirect(registry *registry) error {
registry.blobServer.redirect = true
return nil
}
func TagLookupConcurrencyLimit(concurrencyLimit int) RegistryOption {
return func(registry *registry) error {
registry.tagLookupConcurrencyLimit = concurrencyLimit
return nil
}
}
// EnableDelete is a functional option for NewRegistry. It enables deletion on
// the registry.
func EnableDelete(registry *registry) error {
registry.deleteEnabled = true
return nil
}
// DisableDigestResumption is a functional option for NewRegistry. It should be
// used if the registry is acting as a caching proxy.
func DisableDigestResumption(registry *registry) error {
registry.resumableDigestEnabled = false
return nil
}
// ManifestURLsAllowRegexp is a functional option for NewRegistry.
func ManifestURLsAllowRegexp(r *regexp.Regexp) RegistryOption {
return func(registry *registry) error {
registry.manifestURLs.allow = r
return nil
}
}
// ManifestURLsDenyRegexp is a functional option for NewRegistry.
func ManifestURLsDenyRegexp(r *regexp.Regexp) RegistryOption {
return func(registry *registry) error {
registry.manifestURLs.deny = r
return nil
}
}
// EnableValidateImageIndexImagesExist is a functional option for NewRegistry. It enables
// validation that references exist before an image index is accepted.
func EnableValidateImageIndexImagesExist(registry *registry) error {
registry.validateImageIndexes.imagesExist = true
return nil
}
// AddValidateImageIndexImagesExistPlatform returns a functional option for NewRegistry.
// It adds a platform to check for existence before an image index is accepted.
func AddValidateImageIndexImagesExistPlatform(architecture string, os string) RegistryOption {
return func(registry *registry) error {
registry.validateImageIndexes.imagePlatforms = append(
registry.validateImageIndexes.imagePlatforms,
platform{
architecture: architecture,
os: os,
},
)
return nil
}
}
// BlobDescriptorServiceFactory returns a functional option for NewRegistry. It sets the
// factory to create BlobDescriptorServiceFactory middleware.
func BlobDescriptorServiceFactory(factory distribution.BlobDescriptorServiceFactory) RegistryOption {
return func(registry *registry) error {
registry.blobDescriptorServiceFactory = factory
return nil
}
}
// BlobDescriptorCacheProvider returns a functional option for
// NewRegistry. It creates a cached blob statter for use by the
// registry.
func BlobDescriptorCacheProvider(blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider) RegistryOption {
// TODO(aaronl): The duplication of statter across several objects is
// ugly, and prevents us from using interface types in the registry
// struct. Ideally, blobStore and blobServer should be lazily
// initialized, and use the current value of
// blobDescriptorCacheProvider.
return func(registry *registry) error {
if blobDescriptorCacheProvider != nil {
statter := cache.NewCachedBlobStatter(blobDescriptorCacheProvider, registry.statter)
registry.blobStore.statter = statter
registry.blobServer.statter = statter
registry.blobDescriptorCacheProvider = blobDescriptorCacheProvider
}
return nil
}
}
// NewRegistry creates a new registry instance from the provided driver. The
// resulting registry may be shared by multiple goroutines but is cheap to
// allocate. If the Redirect option is specified, the backend blob server will
// attempt to use (StorageDriver).RedirectURL to serve all blobs.
func NewRegistry(ctx context.Context, driver storagedriver.StorageDriver, options ...RegistryOption) (distribution.Namespace, error) {
// create global statter
statter := &blobStatter{
driver: driver,
}
bs := &blobStore{
driver: driver,
statter: statter,
}
registry := ®istry{
blobStore: bs,
blobServer: &blobServer{
driver: driver,
statter: statter,
pathFn: bs.path,
},
statter: statter,
resumableDigestEnabled: true,
driver: driver,
}
for _, option := range options {
if err := option(registry); err != nil {
return nil, err
}
}
return registry, nil
}
// Scope returns the namespace scope for a registry. The registry
// will only serve repositories contained within this scope.
func (reg *registry) Scope() distribution.Scope {
return distribution.GlobalScope
}
// Repository returns an instance of the repository tied to the registry.
// Instances should not be shared between goroutines but are cheap to
// allocate. In general, they should be request scoped.
func (reg *registry) Repository(ctx context.Context, canonicalName reference.Named) (distribution.Repository, error) {
var descriptorCache distribution.BlobDescriptorService
if reg.blobDescriptorCacheProvider != nil {
var err error
descriptorCache, err = reg.blobDescriptorCacheProvider.RepositoryScoped(canonicalName.Name())
if err != nil {
return nil, err
}
}
return &repository{
ctx: ctx,
registry: reg,
name: canonicalName,
descriptorCache: descriptorCache,
}, nil
}
func (reg *registry) Blobs() distribution.BlobEnumerator {
return reg.blobStore
}
func (reg *registry) BlobStatter() distribution.BlobStatter {
return reg.statter
}
// repository provides name-scoped access to various services.
type repository struct {
*registry
ctx context.Context
name reference.Named
descriptorCache distribution.BlobDescriptorService
}
// Name returns the name of the repository.
func (repo *repository) Named() reference.Named {
return repo.name
}
func (repo *repository) Tags(ctx context.Context) distribution.TagService {
limit := DefaultConcurrencyLimit
if repo.tagLookupConcurrencyLimit > 0 {
limit = repo.tagLookupConcurrencyLimit
}
tags := &tagStore{
repository: repo,
blobStore: repo.registry.blobStore,
concurrencyLimit: limit,
}
return tags
}
// Manifests returns an instance of ManifestService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
manifestDirectoryPathSpec := manifestRevisionsPathSpec{name: repo.name.Name()}
var statter distribution.BlobDescriptorService = &linkedBlobStatter{
blobStore: repo.blobStore,
repository: repo,
linkPath: manifestRevisionLinkPath,
}
if repo.descriptorCache != nil {
statter = cache.NewCachedBlobStatter(repo.descriptorCache, statter)
}
if repo.registry.blobDescriptorServiceFactory != nil {
statter = repo.registry.blobDescriptorServiceFactory.BlobAccessController(statter)
}
blobStore := &linkedBlobStore{
ctx: ctx,
blobStore: repo.blobStore,
repository: repo,
deleteEnabled: repo.registry.deleteEnabled,
blobAccessController: statter,
// TODO(stevvooe): linkPath limits this blob store to only
// manifests. This instance cannot be used for blob checks.
linkPath: manifestRevisionLinkPath,
linkDirectoryPathSpec: manifestDirectoryPathSpec,
}
manifestListHandler := &manifestListHandler{
ctx: ctx,
repository: repo,
blobStore: blobStore,
validateImageIndexes: repo.validateImageIndexes,
}
ms := &manifestStore{
ctx: ctx,
repository: repo,
blobStore: blobStore,
schema2Handler: &schema2ManifestHandler{
ctx: ctx,
repository: repo,
blobStore: blobStore,
manifestURLs: repo.registry.manifestURLs,
},
manifestListHandler: manifestListHandler,
ocischemaHandler: &ocischemaManifestHandler{
ctx: ctx,
repository: repo,
blobStore: blobStore,
manifestURLs: repo.registry.manifestURLs,
},
ocischemaIndexHandler: &ocischemaIndexHandler{
manifestListHandler: manifestListHandler,
},
}
// Apply options
for _, option := range options {
err := option.Apply(ms)
if err != nil {
return nil, err
}
}
return ms, nil
}
// Blobs returns an instance of the BlobStore. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
var statter distribution.BlobDescriptorService = &linkedBlobStatter{
blobStore: repo.blobStore,
repository: repo,
linkPath: blobLinkPath,
}
if repo.descriptorCache != nil {
statter = cache.NewCachedBlobStatter(repo.descriptorCache, statter)
}
if repo.registry.blobDescriptorServiceFactory != nil {
statter = repo.registry.blobDescriptorServiceFactory.BlobAccessController(statter)
}
return &linkedBlobStore{
registry: repo.registry,
blobStore: repo.blobStore,
blobServer: repo.blobServer,
blobAccessController: statter,
repository: repo,
ctx: ctx,
// TODO(stevvooe): linkPath limits this blob store to only layers.
// This instance cannot be used for manifest checks.
linkPath: blobLinkPath,
linkDirectoryPathSpec: layersPathSpec{name: repo.name.Name()},
deleteEnabled: repo.registry.deleteEnabled,
resumableDigestEnabled: repo.resumableDigestEnabled,
}
}
package storage
import (
"context"
"errors"
"fmt"
"net/url"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/manifest/schema2"
"github.com/opencontainers/go-digest"
)
var (
errMissingURL = errors.New("missing URL on layer")
errInvalidURL = errors.New("invalid URL on layer")
)
// schema2ManifestHandler is a ManifestHandler that covers schema2 manifests.
type schema2ManifestHandler struct {
repository distribution.Repository
blobStore distribution.BlobStore
ctx context.Context
manifestURLs manifestURLs
}
var _ ManifestHandler = &schema2ManifestHandler{}
func (ms *schema2ManifestHandler) Unmarshal(ctx context.Context, dgst digest.Digest, content []byte) (distribution.Manifest, error) {
dcontext.GetLogger(ms.ctx).Debug("(*schema2ManifestHandler).Unmarshal")
m := &schema2.DeserializedManifest{}
if err := m.UnmarshalJSON(content); err != nil {
return nil, err
}
return m, nil
}
func (ms *schema2ManifestHandler) Put(ctx context.Context, manifest distribution.Manifest, skipDependencyVerification bool) (digest.Digest, error) {
dcontext.GetLogger(ms.ctx).Debug("(*schema2ManifestHandler).Put")
m, ok := manifest.(*schema2.DeserializedManifest)
if !ok {
return "", fmt.Errorf("non-schema2 manifest put to schema2ManifestHandler: %T", manifest)
}
if err := ms.verifyManifest(ms.ctx, *m, skipDependencyVerification); err != nil {
return "", err
}
mt, payload, err := m.Payload()
if err != nil {
return "", err
}
revision, err := ms.blobStore.Put(ctx, mt, payload)
if err != nil {
dcontext.GetLogger(ctx).Errorf("error putting payload into blobstore: %v", err)
return "", err
}
return revision.Digest, nil
}
// verifyManifest ensures that the manifest content is valid from the
// perspective of the registry. As a policy, the registry only tries to store
// valid content, leaving trust policies of that content up to consumers.
func (ms *schema2ManifestHandler) verifyManifest(ctx context.Context, mnfst schema2.DeserializedManifest, skipDependencyVerification bool) error {
var errs distribution.ErrManifestVerification
if mnfst.Manifest.SchemaVersion != 2 {
return fmt.Errorf("unrecognized manifest schema version %d", mnfst.Manifest.SchemaVersion)
}
if skipDependencyVerification {
return nil
}
manifestService, err := ms.repository.Manifests(ctx)
if err != nil {
return err
}
blobsService := ms.repository.Blobs(ctx)
for _, descriptor := range mnfst.References() {
err := descriptor.Digest.Validate()
if err != nil {
errs = append(errs, err, distribution.ErrManifestBlobUnknown{Digest: descriptor.Digest})
continue
}
switch descriptor.MediaType {
case schema2.MediaTypeForeignLayer:
// Clients download this layer from an external URL, so do not check for
// its presence.
if len(descriptor.URLs) == 0 {
err = errMissingURL
}
allow := ms.manifestURLs.allow
deny := ms.manifestURLs.deny
for _, u := range descriptor.URLs {
var pu *url.URL
pu, err = url.Parse(u)
if err != nil || (pu.Scheme != "http" && pu.Scheme != "https") || pu.Fragment != "" || (allow != nil && !allow.MatchString(u)) || (deny != nil && deny.MatchString(u)) {
err = errInvalidURL
break
}
}
case schema2.MediaTypeManifest:
var exists bool
exists, err = manifestService.Exists(ctx, descriptor.Digest)
if err != nil || !exists {
err = distribution.ErrBlobUnknown // just coerce to unknown.
}
if err != nil {
dcontext.GetLogger(ms.ctx).WithError(err).Debugf("failed to ensure exists of %v in manifest service", descriptor.Digest)
}
fallthrough // double check the blob store.
default:
// check its presence
_, err = blobsService.Stat(ctx, descriptor.Digest)
}
if err != nil {
if err != distribution.ErrBlobUnknown {
errs = append(errs, err)
}
// On error here, we always append unknown blob errors.
errs = append(errs, distribution.ErrManifestBlobUnknown{Digest: descriptor.Digest})
}
}
if len(errs) != 0 {
return errs
}
return nil
}
package storage
import (
"context"
"path"
"sort"
"sync"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
"github.com/distribution/distribution/v3"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
)
var _ distribution.TagService = &tagStore{}
// tagStore provides methods to manage manifest tags in a backend storage driver.
// This implementation uses the same on-disk layout as the (now deleted) tag
// store. This provides backward compatibility with current registry deployments
// which only makes use of the Digest field of the returned v1.Descriptor
// but does not enable full roundtripping of Descriptor objects
type tagStore struct {
repository *repository
blobStore *blobStore
concurrencyLimit int
}
// All returns all tags
func (ts *tagStore) All(ctx context.Context) ([]string, error) {
pathSpec, err := pathFor(manifestTagsPathSpec{
name: ts.repository.Named().Name(),
})
if err != nil {
return nil, err
}
entries, err := ts.blobStore.driver.List(ctx, pathSpec)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
return nil, distribution.ErrRepositoryUnknown{Name: ts.repository.Named().Name()}
default:
return nil, err
}
}
tags := make([]string, 0, len(entries))
for _, entry := range entries {
_, filename := path.Split(entry)
tags = append(tags, filename)
}
// there is no guarantee for the order,
// therefore sort before return.
sort.Strings(tags)
return tags, nil
}
// Tag tags the digest with the given tag, updating the store to point at
// the current tag. The digest must point to a manifest.
func (ts *tagStore) Tag(ctx context.Context, tag string, desc v1.Descriptor) error {
currentPath, err := pathFor(manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
})
if err != nil {
return err
}
lbs := ts.linkedBlobStore(ctx, tag)
// Link into the index
if err := lbs.linkBlob(ctx, desc); err != nil {
return err
}
// Overwrite the current link
return ts.blobStore.link(ctx, currentPath, desc.Digest)
}
// resolve the current revision for name and tag.
func (ts *tagStore) Get(ctx context.Context, tag string) (v1.Descriptor, error) {
currentPath, err := pathFor(manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
})
if err != nil {
return v1.Descriptor{}, err
}
revision, err := ts.blobStore.readlink(ctx, currentPath)
if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
return v1.Descriptor{}, distribution.ErrTagUnknown{Tag: tag}
}
return v1.Descriptor{}, err
}
return v1.Descriptor{Digest: revision}, nil
}
// Untag removes the tag association
func (ts *tagStore) Untag(ctx context.Context, tag string) error {
tagPath, err := pathFor(manifestTagPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
})
if err != nil {
return err
}
return ts.blobStore.driver.Delete(ctx, tagPath)
}
// linkedBlobStore returns the linkedBlobStore for the named tag, allowing one
// to index manifest blobs by tag name. While the tag store doesn't map
// precisely to the linked blob store, using this ensures the links are
// managed via the same code path.
func (ts *tagStore) linkedBlobStore(ctx context.Context, tag string) *linkedBlobStore {
return &linkedBlobStore{
blobStore: ts.blobStore,
repository: ts.repository,
ctx: ctx,
linkPath: func(name string, dgst digest.Digest) (string, error) {
return pathFor(manifestTagIndexEntryLinkPathSpec{
name: name,
tag: tag,
revision: dgst,
})
},
}
}
// Lookup recovers a list of tags which refer to this digest. When a manifest is deleted by
// digest, tag entries which point to it need to be recovered to avoid dangling tags.
func (ts *tagStore) Lookup(ctx context.Context, desc v1.Descriptor) ([]string, error) {
allTags, err := ts.All(ctx)
switch err.(type) {
case distribution.ErrRepositoryUnknown:
// This tag store has been initialized but not yet populated
break
case nil:
break
default:
return nil, err
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(ts.concurrencyLimit)
var (
tags []string
mu sync.Mutex
)
for _, tag := range allTags {
if ctx.Err() != nil {
break
}
tag := tag
g.Go(func() error {
tagLinkPathSpec := manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
}
tagLinkPath, _ := pathFor(tagLinkPathSpec)
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)
if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
return nil
}
return err
}
if tagDigest == desc.Digest {
mu.Lock()
tags = append(tags, tag)
mu.Unlock()
}
return nil
})
}
err = g.Wait()
if err != nil {
return nil, err
}
return tags, nil
}
func (ts *tagStore) ManifestDigests(ctx context.Context, tag string) ([]digest.Digest, error) {
tagLinkPath := func(name string, dgst digest.Digest) (string, error) {
return pathFor(manifestTagIndexEntryLinkPathSpec{
name: name,
tag: tag,
revision: dgst,
})
}
lbs := &linkedBlobStore{
blobStore: ts.blobStore,
blobAccessController: &linkedBlobStatter{
blobStore: ts.blobStore,
repository: ts.repository,
linkPath: manifestRevisionLinkPath,
},
repository: ts.repository,
ctx: ctx,
linkPath: tagLinkPath,
linkDirectoryPathSpec: manifestTagIndexPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
},
}
var dgsts []digest.Digest
err := lbs.Enumerate(ctx, func(dgst digest.Digest) error {
dgsts = append(dgsts, dgst)
return nil
})
if err != nil {
return nil, err
}
return dgsts, nil
}
package storage
import (
"context"
"path"
"github.com/distribution/distribution/v3/internal/dcontext"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/opencontainers/go-digest"
)
// vacuum contains functions for cleaning up repositories and blobs
// These functions will only reliably work on strongly consistent
// storage systems.
// https://en.wikipedia.org/wiki/Consistency_model
// NewVacuum creates a new Vacuum
func NewVacuum(ctx context.Context, driver driver.StorageDriver) Vacuum {
return Vacuum{
ctx: ctx,
driver: driver,
}
}
// Vacuum removes content from the filesystem
type Vacuum struct {
driver driver.StorageDriver
ctx context.Context
}
// RemoveBlob removes a blob from the filesystem
func (v Vacuum) RemoveBlob(dgst string) error {
d, err := digest.Parse(dgst)
if err != nil {
return err
}
blobPath, err := pathFor(blobPathSpec{digest: d})
if err != nil {
return err
}
dcontext.GetLogger(v.ctx).Infof("Deleting blob: %s", blobPath)
err = v.driver.Delete(v.ctx, blobPath)
if err != nil {
return err
}
return nil
}
// RemoveManifest removes a manifest from the filesystem
func (v Vacuum) RemoveManifest(name string, dgst digest.Digest, tags []string) error {
// remove a tag manifest reference, in case of not found continue to next one
for _, tag := range tags {
tagsPath, err := pathFor(manifestTagIndexEntryPathSpec{name: name, revision: dgst, tag: tag})
if err != nil {
return err
}
_, err = v.driver.Stat(v.ctx, tagsPath)
if err != nil {
switch err := err.(type) {
case driver.PathNotFoundError:
continue
default:
return err
}
}
dcontext.GetLogger(v.ctx).Infof("deleting manifest tag reference: %s", tagsPath)
err = v.driver.Delete(v.ctx, tagsPath)
if err != nil {
return err
}
}
manifestPath, err := pathFor(manifestRevisionPathSpec{name: name, revision: dgst})
if err != nil {
return err
}
dcontext.GetLogger(v.ctx).Infof("deleting manifest: %s", manifestPath)
return v.driver.Delete(v.ctx, manifestPath)
}
// RemoveRepository removes a repository directory from the
// filesystem
func (v Vacuum) RemoveRepository(repoName string) error {
rootForRepository, err := pathFor(repositoriesRootPathSpec{})
if err != nil {
return err
}
repoDir := path.Join(rootForRepository, repoName)
dcontext.GetLogger(v.ctx).Infof("Deleting repo: %s", repoDir)
err = v.driver.Delete(v.ctx, repoDir)
if err != nil {
return err
}
return nil
}
// RemoveLayer removes a layer link path from the storage
func (v Vacuum) RemoveLayer(repoName string, dgst digest.Digest) error {
layerLinkPath, err := pathFor(layerLinkPathSpec{name: repoName, digest: dgst})
if err != nil {
return err
}
dcontext.GetLogger(v.ctx).Infof("Deleting layer link path: %s", layerLinkPath)
err = v.driver.Delete(v.ctx, layerLinkPath)
if err != nil {
return err
}
return nil
}