//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apparmor
import (
"bytes"
"context"
"fmt"
"os"
"github.com/containerd/containerd/v2/core/containers"
"github.com/containerd/containerd/v2/pkg/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
// WithProfile sets the provided apparmor profile to the spec
func WithProfile(profile string) oci.SpecOpts {
return func(_ context.Context, _ oci.Client, _ *containers.Container, s *specs.Spec) error {
s.Process.ApparmorProfile = profile
return nil
}
}
// WithDefaultProfile will generate a default apparmor profile under the provided name
// for the container. It is only generated if a profile under that name does not exist.
func WithDefaultProfile(name string) oci.SpecOpts {
return func(_ context.Context, _ oci.Client, _ *containers.Container, s *specs.Spec) error {
if err := LoadDefaultProfile(name); err != nil {
return err
}
s.Process.ApparmorProfile = name
return nil
}
}
// LoadDefaultProfile ensures the default profile to be loaded with the given name.
// Returns nil error if the profile is already loaded.
func LoadDefaultProfile(name string) error {
yes, err := isLoaded(name)
if err != nil {
return err
}
if yes {
return nil
}
p, err := loadData(name)
if err != nil {
return err
}
f, err := os.CreateTemp(os.Getenv("XDG_RUNTIME_DIR"), p.Name)
if err != nil {
return err
}
defer f.Close()
path := f.Name()
defer os.Remove(path)
if err := generate(p, f); err != nil {
return err
}
if err := load(path); err != nil {
return fmt.Errorf("load apparmor profile %s: %w", path, err)
}
return nil
}
// DumpDefaultProfile dumps the default profile with the given name.
func DumpDefaultProfile(name string) (string, error) {
p, err := loadData(name)
if err != nil {
return "", err
}
var buf bytes.Buffer
if err := generate(p, &buf); err != nil {
return "", err
}
return buf.String(), nil
}
//go:build linux
/*
Copyright The docker Authors.
Copyright The Moby Authors.
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apparmor
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"path"
"strings"
"text/template"
"github.com/containerd/log"
)
// NOTE: This code is copied from <github.com/docker/docker/profiles/apparmor>.
// If you plan to make any changes, please make sure they are also sent
// upstream.
const dir = "/etc/apparmor.d"
const defaultTemplate = `
{{range $value := .Imports}}
{{$value}}
{{end}}
profile {{.Name}} flags=(attach_disconnected,mediate_deleted) {
{{range $value := .InnerImports}}
{{$value}}
{{end}}
network,
capability,
file,
umount,
# Host (privileged) processes may send signals to container processes.
signal (receive) peer=unconfined,
# runc may send signals to container processes.
signal (receive) peer=runc,
# crun may send signals to container processes.
signal (receive) peer=crun,
# Manager may send signals to container processes.
signal (receive) peer={{.DaemonProfile}},
# Container processes may send signals amongst themselves.
signal (send,receive) peer={{.Name}},
{{if .RootlessKit}}
# https://github.com/containerd/nerdctl/issues/2730
signal (receive) peer={{.RootlessKit}},
{{end}}
deny @{PROC}/* w, # deny write for all files directly in /proc (not in a subdir)
# deny write to files not in /proc/<number>/** or /proc/sys/**
deny @{PROC}/{[^1-9],[^1-9][^0-9],[^1-9s][^0-9y][^0-9s],[^1-9][^0-9][^0-9][^0-9]*}/** w,
deny @{PROC}/sys/[^k]** w, # deny /proc/sys except /proc/sys/k* (effectively /proc/sys/kernel)
deny @{PROC}/sys/kernel/{?,??,[^s][^h][^m]**} w, # deny everything except shm* in /proc/sys/kernel/
deny @{PROC}/sysrq-trigger rwklx,
deny @{PROC}/mem rwklx,
deny @{PROC}/kmem rwklx,
deny @{PROC}/kcore rwklx,
deny mount,
deny /sys/[^f]*/** wklx,
deny /sys/f[^s]*/** wklx,
deny /sys/fs/[^c]*/** wklx,
deny /sys/fs/c[^g]*/** wklx,
deny /sys/fs/cg[^r]*/** wklx,
deny /sys/firmware/** rwklx,
deny /sys/devices/virtual/powercap/** rwklx,
deny /sys/kernel/security/** rwklx,
# allow processes within the container to trace each other,
# provided all other LSM and yama setting allow it.
ptrace (trace,tracedby,read,readby) peer={{.Name}},
}
`
type data struct {
Name string
Imports []string
InnerImports []string
DaemonProfile string
RootlessKit string
}
func cleanProfileName(profile string) string {
// Normally profiles are suffixed by " (enforce)". AppArmor profiles cannot
// contain spaces so this doesn't restrict daemon profile names.
profile, _, _ = strings.Cut(profile, " ")
if profile == "" {
profile = "unconfined"
}
return profile
}
func loadData(name string) (*data, error) {
p := data{
Name: name,
}
if macroExists("tunables/global") {
p.Imports = append(p.Imports, "#include <tunables/global>")
} else {
p.Imports = append(p.Imports, "@{PROC}=/proc/")
}
if macroExists("abstractions/base") {
p.InnerImports = append(p.InnerImports, "#include <abstractions/base>")
}
// Figure out the daemon profile.
currentProfile, err := os.ReadFile("/proc/self/attr/current")
if err != nil {
// If we couldn't get the daemon profile, assume we are running
// unconfined which is generally the default.
currentProfile = nil
}
p.DaemonProfile = cleanProfileName(string(currentProfile))
// If we were running in Rootless mode, we could read `/proc/$(cat ${ROOTLESSKIT_STATE_DIR}/child_pid)/exe`,
// but `nerdctl apparmor load` has to be executed as the root.
// So, do not check ${ROOTLESSKIT_STATE_DIR} (nor EUID) here.
p.RootlessKit, err = exec.LookPath("rootlesskit")
if err != nil {
log.L.WithError(err).Debug("apparmor: failed to determine the RootlessKit binary path")
p.RootlessKit = ""
}
log.L.Debugf("apparmor: RootlessKit=%q", p.RootlessKit)
return &p, nil
}
func generate(p *data, o io.Writer) error {
t, err := template.New("apparmor_profile").Parse(defaultTemplate)
if err != nil {
return err
}
return t.Execute(o, p)
}
func load(path string) error {
out, err := aaParser("-Kr", path)
if err != nil {
return fmt.Errorf("parser error(%q): %w", strings.TrimSpace(out), err)
}
return nil
}
// macroExists checks if the passed macro exists.
func macroExists(m string) bool {
_, err := os.Stat(path.Join(dir, m))
return err == nil
}
func aaParser(args ...string) (string, error) {
out, err := exec.Command("apparmor_parser", args...).CombinedOutput()
return string(out), err
}
func isLoaded(name string) (bool, error) {
f, err := os.Open("/sys/kernel/security/apparmor/profiles")
if err != nil {
return false, err
}
defer f.Close()
r := bufio.NewReader(f)
for {
p, err := r.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
return false, err
}
if strings.HasPrefix(p, name+" ") {
return true, nil
}
}
return false, nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fuzz
import (
"context"
"sync"
"time"
"github.com/containerd/containerd/v2/cmd/containerd/server"
"github.com/containerd/containerd/v2/cmd/containerd/server/config"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/sys"
"github.com/containerd/containerd/v2/version"
"github.com/containerd/log"
)
const (
defaultRoot = "/var/lib/containerd"
defaultState = "/tmp/containerd"
defaultAddress = "/tmp/containerd/containerd.sock"
)
var (
initDaemon sync.Once
)
func startDaemon() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
errC := make(chan error, 1)
go func() {
defer close(errC)
srvconfig := &config.Config{
Version: version.ConfigVersion,
Root: defaultRoot,
State: defaultState,
Debug: config.Debug{
Level: "debug",
},
GRPC: config.GRPCConfig{
Address: defaultAddress,
MaxRecvMsgSize: defaults.DefaultMaxRecvMsgSize,
MaxSendMsgSize: defaults.DefaultMaxSendMsgSize,
},
DisabledPlugins: []string{},
RequiredPlugins: []string{},
}
server, err := server.New(ctx, srvconfig)
if err != nil {
errC <- err
return
}
l, err := sys.GetLocalListener(srvconfig.GRPC.Address, srvconfig.GRPC.UID, srvconfig.GRPC.GID)
if err != nil {
errC <- err
return
}
go func() {
defer l.Close()
if err := server.ServeGRPC(l); err != nil {
log.G(ctx).WithError(err).WithField("address", srvconfig.GRPC.Address).Fatal("serve failure")
}
}()
server.Wait()
}()
var err error
select {
case err = <-errC:
case <-ctx.Done():
err = ctx.Err()
}
if err != nil {
panic(err)
}
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package auth
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"time"
remoteserrors "github.com/containerd/containerd/v2/core/remotes/errors"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/containerd/v2/version"
"github.com/containerd/log"
)
var (
// ErrNoToken is returned if a request is successful but the body does not
// contain an authorization token.
ErrNoToken = errors.New("authorization server did not include a token in the response")
)
// GenerateTokenOptions generates options for fetching a token based on a challenge
func GenerateTokenOptions(ctx context.Context, host, username, secret string, c Challenge) (TokenOptions, error) {
realm, ok := c.Parameters["realm"]
if !ok {
return TokenOptions{}, errors.New("no realm specified for token auth challenge")
}
realmURL, err := url.Parse(realm)
if err != nil {
return TokenOptions{}, fmt.Errorf("invalid token auth challenge realm: %w", err)
}
to := TokenOptions{
Realm: realmURL.String(),
Service: c.Parameters["service"],
Username: username,
Secret: secret,
}
scope, ok := c.Parameters["scope"]
if ok {
to.Scopes = append(to.Scopes, strings.Split(scope, " ")...)
} else {
log.G(ctx).WithField("host", host).Debug("no scope specified for token auth challenge")
}
return to, nil
}
// TokenOptions are options for requesting a token
type TokenOptions struct {
Realm string
Service string
Scopes []string
Username string
Secret string
// FetchRefreshToken enables fetching a refresh token (aka "identity token", "offline token") along with the bearer token.
//
// For HTTP GET mode (FetchToken), FetchRefreshToken sets `offline_token=true` in the request.
// https://distribution.github.io/distribution/spec/auth/token/#requesting-a-token
//
// For HTTP POST mode (FetchTokenWithOAuth), FetchRefreshToken sets `access_type=offline` in the request.
// https://distribution.github.io/distribution/spec/auth/oauth/#getting-a-token
FetchRefreshToken bool
}
// OAuthTokenResponse is response from fetching token with a OAuth POST request
type OAuthTokenResponse struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresInSeconds int `json:"expires_in"`
IssuedAt time.Time `json:"issued_at"`
Scope string `json:"scope"`
}
// FetchTokenWithOAuth fetches a token using a POST request
func FetchTokenWithOAuth(ctx context.Context, client *http.Client, headers http.Header, clientID string, to TokenOptions) (*OAuthTokenResponse, error) {
c := *client
client = &c
tracing.UpdateHTTPClient(client, tracing.Name("remotes.docker.resolver", "FetchTokenWithOAuth"))
form := url.Values{}
if len(to.Scopes) > 0 {
form.Set("scope", strings.Join(to.Scopes, " "))
}
form.Set("service", to.Service)
form.Set("client_id", clientID)
if to.Username == "" {
form.Set("grant_type", "refresh_token")
form.Set("refresh_token", to.Secret)
} else {
form.Set("grant_type", "password")
form.Set("username", to.Username)
form.Set("password", to.Secret)
}
if to.FetchRefreshToken {
form.Set("access_type", "offline")
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, to.Realm, strings.NewReader(form.Encode()))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
for k, v := range headers {
req.Header[k] = append(req.Header[k], v...)
}
if len(req.Header.Get("User-Agent")) == 0 {
req.Header.Set("User-Agent", "containerd/"+version.Version)
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, remoteserrors.NewUnexpectedStatusErr(resp)
}
decoder := json.NewDecoder(resp.Body)
var tr OAuthTokenResponse
if err = decoder.Decode(&tr); err != nil {
return nil, fmt.Errorf("unable to decode token response: %w", err)
}
if tr.AccessToken == "" {
return nil, ErrNoToken
}
return &tr, nil
}
// FetchTokenResponse is response from fetching token with GET request
type FetchTokenResponse struct {
Token string `json:"token"`
AccessToken string `json:"access_token"`
ExpiresInSeconds int `json:"expires_in"`
IssuedAt time.Time `json:"issued_at"`
RefreshToken string `json:"refresh_token"`
}
// FetchToken fetches a token using a GET request
func FetchToken(ctx context.Context, client *http.Client, headers http.Header, to TokenOptions) (*FetchTokenResponse, error) {
c := *client
client = &c
tracing.UpdateHTTPClient(client, tracing.Name("remotes.docker.resolver", "FetchToken"))
req, err := http.NewRequestWithContext(ctx, http.MethodGet, to.Realm, nil)
if err != nil {
return nil, err
}
for k, v := range headers {
req.Header[k] = append(req.Header[k], v...)
}
if len(req.Header.Get("User-Agent")) == 0 {
req.Header.Set("User-Agent", "containerd/"+version.Version)
}
reqParams := req.URL.Query()
if to.Service != "" {
reqParams.Add("service", to.Service)
}
for _, scope := range to.Scopes {
reqParams.Add("scope", scope)
}
if to.Secret != "" {
req.SetBasicAuth(to.Username, to.Secret)
}
if to.FetchRefreshToken {
reqParams.Add("offline_token", "true")
}
req.URL.RawQuery = reqParams.Encode()
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, remoteserrors.NewUnexpectedStatusErr(resp)
}
decoder := json.NewDecoder(resp.Body)
var tr FetchTokenResponse
if err = decoder.Decode(&tr); err != nil {
return nil, fmt.Errorf("unable to decode token response: %w", err)
}
// `access_token` is equivalent to `token` and if both are specified
// the choice is undefined. Canonicalize `access_token` by sticking
// things in `token`.
if tr.AccessToken != "" {
tr.Token = tr.AccessToken
}
if tr.Token == "" {
return nil, ErrNoToken
}
return &tr, nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package auth
import (
"net/http"
"sort"
"strings"
)
// AuthenticationScheme defines scheme of the authentication method
type AuthenticationScheme byte
const (
// BasicAuth is scheme for Basic HTTP Authentication RFC 7617
BasicAuth AuthenticationScheme = 1 << iota
// DigestAuth is scheme for HTTP Digest Access Authentication RFC 7616
DigestAuth
// BearerAuth is scheme for OAuth 2.0 Bearer Tokens RFC 6750
BearerAuth
)
// Challenge carries information from a WWW-Authenticate response header.
// See RFC 2617.
type Challenge struct {
// scheme is the auth-scheme according to RFC 2617
Scheme AuthenticationScheme
// parameters are the auth-params according to RFC 2617
Parameters map[string]string
}
type byScheme []Challenge
func (bs byScheme) Len() int { return len(bs) }
func (bs byScheme) Swap(i, j int) { bs[i], bs[j] = bs[j], bs[i] }
// Less sorts in priority order: token > digest > basic
func (bs byScheme) Less(i, j int) bool { return bs[i].Scheme > bs[j].Scheme }
// Octet types from RFC 2616.
type octetType byte
var octetTypes [256]octetType
const (
isToken octetType = 1 << iota
isSpace
)
func init() {
// OCTET = <any 8-bit sequence of data>
// CHAR = <any US-ASCII character (octets 0 - 127)>
// CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
// CR = <US-ASCII CR, carriage return (13)>
// LF = <US-ASCII LF, linefeed (10)>
// SP = <US-ASCII SP, space (32)>
// HT = <US-ASCII HT, horizontal-tab (9)>
// <"> = <US-ASCII double-quote mark (34)>
// CRLF = CR LF
// LWS = [CRLF] 1*( SP | HT )
// TEXT = <any OCTET except CTLs, but including LWS>
// separators = "(" | ")" | "<" | ">" | "@" | "," | ";" | ":" | "\" | <">
// | "/" | "[" | "]" | "?" | "=" | "{" | "}" | SP | HT
// token = 1*<any CHAR except CTLs or separators>
// qdtext = <any TEXT except <">>
for c := 0; c < 256; c++ {
var t octetType
isCtl := c <= 31 || c == 127
isChar := 0 <= c && c <= 127
isSeparator := strings.ContainsRune(" \t\"(),/:;<=>?@[]\\{}", rune(c))
if strings.ContainsRune(" \t\r\n", rune(c)) {
t |= isSpace
}
if isChar && !isCtl && !isSeparator {
t |= isToken
}
octetTypes[c] = t
}
}
// ParseAuthHeader parses challenges from WWW-Authenticate header
func ParseAuthHeader(header http.Header) []Challenge {
challenges := []Challenge{}
for _, h := range header[http.CanonicalHeaderKey("WWW-Authenticate")] {
v, p := parseValueAndParams(h)
var s AuthenticationScheme
switch v {
case "basic":
s = BasicAuth
case "digest":
s = DigestAuth
case "bearer":
s = BearerAuth
default:
continue
}
challenges = append(challenges, Challenge{Scheme: s, Parameters: p})
}
sort.Stable(byScheme(challenges))
return challenges
}
func parseValueAndParams(header string) (value string, params map[string]string) {
params = make(map[string]string)
value, s := expectToken(header)
if value == "" {
return
}
value = strings.ToLower(value)
for {
var pkey string
pkey, s = expectToken(skipSpace(s))
if pkey == "" {
return
}
if !strings.HasPrefix(s, "=") {
return
}
var pvalue string
pvalue, s = expectTokenOrQuoted(s[1:])
pkey = strings.ToLower(pkey)
params[pkey] = pvalue
s = skipSpace(s)
if !strings.HasPrefix(s, ",") {
return
}
s = s[1:]
}
}
func skipSpace(s string) (rest string) {
i := 0
for ; i < len(s); i++ {
if octetTypes[s[i]]&isSpace == 0 {
break
}
}
return s[i:]
}
func expectToken(s string) (token, rest string) {
i := 0
for ; i < len(s); i++ {
if octetTypes[s[i]]&isToken == 0 {
break
}
}
return s[:i], s[i:]
}
func expectTokenOrQuoted(s string) (value string, rest string) {
if !strings.HasPrefix(s, "\"") {
return expectToken(s)
}
s = s[1:]
for i := 0; i < len(s); i++ {
switch s[i] {
case '"':
return s[:i], s[i+1:]
case '\\':
p := make([]byte, len(s)-1)
j := copy(p, s[:i])
escape := true
for i = i + 1; i < len(s); i++ {
b := s[i]
switch {
case escape:
escape = false
p[j] = b
j++
case b == '\\':
escape = true
case b == '"':
return string(p[:j]), s[i+1:]
default:
p[j] = b
j++
}
}
return "", ""
}
}
return "", ""
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/containerd/containerd/v2/core/remotes/docker/auth"
remoteerrors "github.com/containerd/containerd/v2/core/remotes/errors"
"github.com/containerd/errdefs"
"github.com/containerd/log"
)
type dockerAuthorizer struct {
credentials func(string) (string, string, error)
client *http.Client
header http.Header
mu sync.RWMutex
// indexed by host name
handlers map[string]*authHandler
onFetchRefreshToken OnFetchRefreshToken
}
type authorizerConfig struct {
credentials func(string) (string, string, error)
client *http.Client
header http.Header
onFetchRefreshToken OnFetchRefreshToken
}
// AuthorizerOpt configures an authorizer
type AuthorizerOpt func(*authorizerConfig)
// WithAuthClient provides the HTTP client for the authorizer
func WithAuthClient(client *http.Client) AuthorizerOpt {
return func(opt *authorizerConfig) {
opt.client = client
}
}
// WithAuthCreds provides a credential function to the authorizer
func WithAuthCreds(creds func(string) (string, string, error)) AuthorizerOpt {
return func(opt *authorizerConfig) {
opt.credentials = creds
}
}
// WithAuthHeader provides HTTP headers for authorization
//
// We need to merge instead of replacing because header may be set by
// a per-host hosts.toml or/AND by a global header config (e.g., cri.config.headers)
func WithAuthHeader(hdr http.Header) AuthorizerOpt {
return func(opt *authorizerConfig) {
if opt.header == nil {
opt.header = hdr.Clone()
} else {
for k, v := range hdr {
opt.header[k] = append(opt.header[k], v...)
}
}
}
}
// OnFetchRefreshToken is called on fetching request token.
type OnFetchRefreshToken func(ctx context.Context, refreshToken string, req *http.Request)
// WithFetchRefreshToken enables fetching "refresh token" (aka "identity token", "offline token").
func WithFetchRefreshToken(f OnFetchRefreshToken) AuthorizerOpt {
return func(opt *authorizerConfig) {
opt.onFetchRefreshToken = f
}
}
// NewDockerAuthorizer creates an authorizer using Docker's registry
// authentication spec.
// See https://distribution.github.io/distribution/spec/auth/
func NewDockerAuthorizer(opts ...AuthorizerOpt) Authorizer {
var ao authorizerConfig
for _, opt := range opts {
opt(&ao)
}
if ao.client == nil {
ao.client = http.DefaultClient
}
return &dockerAuthorizer{
credentials: ao.credentials,
client: ao.client,
header: ao.header,
handlers: make(map[string]*authHandler),
onFetchRefreshToken: ao.onFetchRefreshToken,
}
}
// Authorize handles auth request.
func (a *dockerAuthorizer) Authorize(ctx context.Context, req *http.Request) error {
// skip if there is no auth handler
ah := a.getAuthHandler(req.URL.Host)
if ah == nil {
return nil
}
auth, refreshToken, err := ah.authorize(ctx)
if err != nil {
return err
}
req.Header.Set("Authorization", auth)
if refreshToken != "" {
a.mu.RLock()
onFetchRefreshToken := a.onFetchRefreshToken
a.mu.RUnlock()
if onFetchRefreshToken != nil {
onFetchRefreshToken(ctx, refreshToken, req)
}
}
return nil
}
func (a *dockerAuthorizer) getAuthHandler(host string) *authHandler {
a.mu.Lock()
defer a.mu.Unlock()
return a.handlers[host]
}
func (a *dockerAuthorizer) AddResponses(ctx context.Context, responses []*http.Response) error {
last := responses[len(responses)-1]
host := last.Request.URL.Host
a.mu.Lock()
defer a.mu.Unlock()
for _, c := range auth.ParseAuthHeader(last.Header) {
if c.Scheme == auth.BearerAuth {
if retry, err := invalidAuthorization(ctx, c, responses); err != nil {
delete(a.handlers, host)
return err
} else if retry {
delete(a.handlers, host)
}
// reuse existing handler
//
// assume that one registry will return the common
// challenge information, including realm and service.
// and the resource scope is only different part
// which can be provided by each request.
if _, ok := a.handlers[host]; ok {
return nil
}
var username, secret string
if a.credentials != nil {
var err error
username, secret, err = a.credentials(host)
if err != nil {
return err
}
}
common, err := auth.GenerateTokenOptions(ctx, host, username, secret, c)
if err != nil {
return err
}
common.FetchRefreshToken = a.onFetchRefreshToken != nil
a.handlers[host] = newAuthHandler(a.client, a.header, c.Scheme, common)
return nil
} else if c.Scheme == auth.BasicAuth && a.credentials != nil {
username, secret, err := a.credentials(host)
if err != nil {
return err
}
if username == "" || secret == "" {
return fmt.Errorf("%w: no basic auth credentials", ErrInvalidAuthorization)
}
a.handlers[host] = newAuthHandler(a.client, a.header, c.Scheme, auth.TokenOptions{
Username: username,
Secret: secret,
})
return nil
}
}
return fmt.Errorf("failed to find supported auth scheme: %w", errdefs.ErrNotImplemented)
}
// authResult is used to control limit rate.
type authResult struct {
sync.WaitGroup
token string
refreshToken string
expirationTime *time.Time
err error
}
// authHandler is used to handle auth request per registry server.
type authHandler struct {
sync.Mutex
header http.Header
client *http.Client
// only support basic and bearer schemes
scheme auth.AuthenticationScheme
// common contains common challenge answer
common auth.TokenOptions
// scopedTokens caches token indexed by scopes, which used in
// bearer auth case
scopedTokens map[string]*authResult
}
func newAuthHandler(client *http.Client, hdr http.Header, scheme auth.AuthenticationScheme, opts auth.TokenOptions) *authHandler {
return &authHandler{
header: hdr,
client: client,
scheme: scheme,
common: opts,
scopedTokens: map[string]*authResult{},
}
}
func (ah *authHandler) authorize(ctx context.Context) (string, string, error) {
switch ah.scheme {
case auth.BasicAuth:
return ah.doBasicAuth(ctx)
case auth.BearerAuth:
return ah.doBearerAuth(ctx)
default:
return "", "", fmt.Errorf("failed to find supported auth scheme: %s: %w", string(ah.scheme), errdefs.ErrNotImplemented)
}
}
func (ah *authHandler) doBasicAuth(ctx context.Context) (string, string, error) {
username, secret := ah.common.Username, ah.common.Secret
if username == "" || secret == "" {
return "", "", fmt.Errorf("failed to handle basic auth because missing username or secret")
}
auth := base64.StdEncoding.EncodeToString([]byte(username + ":" + secret))
return fmt.Sprintf("Basic %s", auth), "", nil
}
func (ah *authHandler) doBearerAuth(ctx context.Context) (token, refreshToken string, err error) {
// copy common tokenOptions
to := ah.common
to.Scopes = GetTokenScopes(ctx, to.Scopes)
// Docs: https://distribution.github.io/distribution/spec/auth/scope/
scoped := strings.Join(to.Scopes, " ")
// Keep track of the expiration time of cached bearer tokens so they can be
// refreshed when they expire without a server roundtrip.
var expirationTime *time.Time
ah.Lock()
if r, exist := ah.scopedTokens[scoped]; exist && (r.expirationTime == nil || r.expirationTime.After(time.Now())) {
ah.Unlock()
r.Wait()
return r.token, r.refreshToken, r.err
}
// only one fetch token job
r := new(authResult)
r.Add(1)
ah.scopedTokens[scoped] = r
ah.Unlock()
defer func() {
token = fmt.Sprintf("Bearer %s", token)
r.token, r.refreshToken, r.err, r.expirationTime = token, refreshToken, err, expirationTime
r.Done()
}()
// fetch token for the resource scope
if to.Secret != "" {
defer func() {
if err != nil {
err = fmt.Errorf("failed to fetch oauth token: %w", err)
}
}()
// credential information is provided, use oauth POST endpoint
// TODO: Allow setting client_id
resp, err := auth.FetchTokenWithOAuth(ctx, ah.client, ah.header, "containerd-client", to)
if err != nil {
var errStatus remoteerrors.ErrUnexpectedStatus
if errors.As(err, &errStatus) {
// Registries without support for POST may return 404 for POST /v2/token.
// As of September 2017, GCR is known to return 404.
// As of February 2018, JFrog Artifactory is known to return 401.
// As of January 2022, ACR is known to return 400.
if (errStatus.StatusCode == 405 && to.Username != "") || errStatus.StatusCode == 404 || errStatus.StatusCode == 401 || errStatus.StatusCode == 400 {
resp, err := auth.FetchToken(ctx, ah.client, ah.header, to)
if err != nil {
return "", "", err
}
expirationTime = getExpirationTime(resp.ExpiresInSeconds)
return resp.Token, resp.RefreshToken, nil
}
log.G(ctx).WithFields(log.Fields{
"status": errStatus.Status,
"body": string(errStatus.Body),
}).Debugf("token request failed")
}
return "", "", err
}
expirationTime = getExpirationTime(resp.ExpiresInSeconds)
return resp.AccessToken, resp.RefreshToken, nil
}
// do request anonymously
resp, err := auth.FetchToken(ctx, ah.client, ah.header, to)
if err != nil {
return "", "", fmt.Errorf("failed to fetch anonymous token: %w", err)
}
expirationTime = getExpirationTime(resp.ExpiresInSeconds)
return resp.Token, resp.RefreshToken, nil
}
func getExpirationTime(expiresInSeconds int) *time.Time {
if expiresInSeconds <= 0 {
return nil
}
expirationTime := time.Now().Add(time.Duration(expiresInSeconds) * time.Second)
return &expirationTime
}
func invalidAuthorization(ctx context.Context, c auth.Challenge, responses []*http.Response) (retry bool, _ error) {
errStr := c.Parameters["error"]
if errStr == "" {
return retry, nil
}
n := len(responses)
if n == 1 || (n > 1 && !sameRequest(responses[n-2].Request, responses[n-1].Request)) {
limitedErr := errStr
errLenghLimit := 64
if len(limitedErr) > errLenghLimit {
limitedErr = limitedErr[:errLenghLimit] + "..."
}
log.G(ctx).WithField("error", limitedErr).Debug("authorization error using bearer token, retrying")
return true, nil
}
return retry, fmt.Errorf("server message: %s: %w", errStr, ErrInvalidAuthorization)
}
func sameRequest(r1, r2 *http.Request) bool {
if r1.Method != r2.Method {
return false
}
if *r1.URL != *r2.URL {
return false
}
return true
}
//go:build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"crypto/x509"
"path/filepath"
)
func hostPaths(root, host string) (hosts []string) {
ch := hostDirectory(host)
if ch != host {
hosts = append(hosts, filepath.Join(root, ch))
}
hosts = append(hosts,
filepath.Join(root, host),
filepath.Join(root, "_default"),
)
return
}
func rootSystemPool() (*x509.CertPool, error) {
return x509.SystemCertPool()
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package config contains utilities for helping configure the Docker resolver
package config
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/pelletier/go-toml/v2"
tomlu "github.com/pelletier/go-toml/v2/unstable"
"github.com/containerd/containerd/v2/core/remotes/docker"
)
// UpdateClientFunc is a function that lets you to amend http Client behavior used by registry clients.
type UpdateClientFunc func(client *http.Client) error
type hostConfig struct {
scheme string
host string
path string
capabilities docker.HostCapabilities
caCerts []string
clientPairs [][2]string
skipVerify *bool
dialTimeout *time.Duration
header http.Header
// TODO: Add credential configuration (domain alias, username)
}
// HostOptions is used to configure registry hosts
type HostOptions struct {
HostDir func(string) (string, error)
Credentials func(host string) (string, string, error)
DefaultTLS *tls.Config
DefaultScheme string
// UpdateClient will be called after creating http.Client object, so clients can provide extra configuration
UpdateClient UpdateClientFunc
AuthorizerOpts []docker.AuthorizerOpt
}
// ConfigureHosts creates a registry hosts function from the provided
// host creation options. The host directory can read hosts.toml or
// certificate files laid out in the Docker specific layout.
// If a `HostDir` function is not required, defaults are used.
func ConfigureHosts(ctx context.Context, options HostOptions) docker.RegistryHosts {
return func(host string) ([]docker.RegistryHost, error) {
var hosts []hostConfig
if options.HostDir != nil {
dir, err := options.HostDir(host)
if err != nil && !errdefs.IsNotFound(err) {
return nil, err
}
if dir != "" {
log.G(ctx).WithField("dir", dir).Debug("loading host directory")
hosts, err = loadHostDir(ctx, dir)
if err != nil {
return nil, err
}
}
}
// If hosts was not set, add a default host
// NOTE: Check nil here and not empty, the host may be
// intentionally configured to not have any endpoints
if hosts == nil {
hosts = make([]hostConfig, 1)
}
if len(hosts) > 0 && hosts[len(hosts)-1].host == "" {
if host == "docker.io" {
hosts[len(hosts)-1].scheme = "https"
hosts[len(hosts)-1].host = "registry-1.docker.io"
} else if docker.IsLocalhost(host) {
hosts[len(hosts)-1].host = host
if options.DefaultScheme == "" {
_, port, _ := net.SplitHostPort(host)
if port == "" || port == "443" {
// If port is default or 443, only use https
hosts[len(hosts)-1].scheme = "https"
} else {
// HTTP fallback logic will be used when protocol is ambiguous
hosts[len(hosts)-1].scheme = "http"
}
// When port is 80, protocol is not ambiguous
if port != "80" {
// Skipping TLS verification for localhost
var skipVerify = true
hosts[len(hosts)-1].skipVerify = &skipVerify
}
} else {
hosts[len(hosts)-1].scheme = options.DefaultScheme
}
} else {
hosts[len(hosts)-1].host = host
if options.DefaultScheme != "" {
hosts[len(hosts)-1].scheme = options.DefaultScheme
} else {
hosts[len(hosts)-1].scheme = "https"
}
}
hosts[len(hosts)-1].path = "/v2"
hosts[len(hosts)-1].capabilities = docker.HostCapabilityPull | docker.HostCapabilityResolve | docker.HostCapabilityPush
}
// tlsConfigured indicates that TLS was configured and HTTP endpoints should
// attempt to use the TLS configuration before falling back to HTTP
var tlsConfigured bool
var defaultTLSConfig *tls.Config
if options.DefaultTLS != nil {
tlsConfigured = true
defaultTLSConfig = options.DefaultTLS
} else {
defaultTLSConfig = &tls.Config{}
}
defaultTransport := docker.DefaultHTTPTransport(defaultTLSConfig)
client := &http.Client{
Transport: defaultTransport,
}
if options.UpdateClient != nil {
if err := options.UpdateClient(client); err != nil {
return nil, err
}
}
authOpts := []docker.AuthorizerOpt{docker.WithAuthClient(client)}
if options.Credentials != nil {
authOpts = append(authOpts, docker.WithAuthCreds(options.Credentials))
}
authOpts = append(authOpts, options.AuthorizerOpts...)
authorizer := docker.NewDockerAuthorizer(authOpts...)
rhosts := make([]docker.RegistryHost, len(hosts))
for i, host := range hosts {
// Allow setting for each host as well
explicitTLSFromHost := host.caCerts != nil || host.clientPairs != nil || host.skipVerify != nil
explicitTLS := tlsConfigured || explicitTLSFromHost
if explicitTLSFromHost || host.dialTimeout != nil || len(host.header) != 0 {
c := *client
if explicitTLSFromHost || host.dialTimeout != nil {
tr := defaultTransport.Clone()
if explicitTLSFromHost {
if err := updateTLSConfigFromHost(tr.TLSClientConfig, &host); err != nil {
return nil, err
}
}
if host.dialTimeout != nil {
tr.DialContext = (&net.Dialer{
Timeout: *host.dialTimeout,
KeepAlive: 30 * time.Second,
FallbackDelay: 300 * time.Millisecond,
}).DialContext
}
c.Transport = tr
}
if options.UpdateClient != nil {
if err := options.UpdateClient(&c); err != nil {
return nil, err
}
}
// redeclare here to allow per-host changes
authOpts := authOpts
if len(host.header) != 0 {
authOpts = append(authOpts, docker.WithAuthHeader(host.header))
}
rhosts[i].Client = &c
rhosts[i].Authorizer = docker.NewDockerAuthorizer(append(authOpts, docker.WithAuthClient(&c))...)
} else {
rhosts[i].Client = client
rhosts[i].Authorizer = authorizer
}
// When TLS has been configured for the operation or host and
// the protocol from the port number is ambiguous, use the
// docker.NewHTTPFallback roundtripper to catch TLS errors and re-attempt the
// request as http. This allows preference for https when configured but
// also catches TLS errors early enough in the request to avoid sending
// the request twice or consuming the request body.
if host.scheme == "http" && explicitTLS {
_, port, _ := net.SplitHostPort(host.host)
if port != "80" {
log.G(ctx).WithField("host", host.host).Info("host will try HTTPS first since it is configured for HTTP with a TLS configuration, consider changing host to HTTPS or removing unused TLS configuration")
host.scheme = "https"
rhosts[i].Client.Transport = docker.NewHTTPFallback(rhosts[i].Client.Transport)
}
}
rhosts[i].Scheme = host.scheme
rhosts[i].Host = host.host
rhosts[i].Path = host.path
rhosts[i].Capabilities = host.capabilities
rhosts[i].Header = host.header
}
return rhosts, nil
}
}
func updateTLSConfigFromHost(tlsConfig *tls.Config, host *hostConfig) error {
if host.skipVerify != nil {
tlsConfig.InsecureSkipVerify = *host.skipVerify
}
if host.caCerts != nil {
if tlsConfig.RootCAs == nil {
rootPool, err := rootSystemPool()
if err != nil {
return fmt.Errorf("unable to initialize cert pool: %w", err)
}
tlsConfig.RootCAs = rootPool
}
for _, f := range host.caCerts {
data, err := os.ReadFile(f)
if err != nil {
return fmt.Errorf("unable to read CA cert %q: %w", f, err)
}
if !tlsConfig.RootCAs.AppendCertsFromPEM(data) {
return fmt.Errorf("unable to load CA cert %q", f)
}
}
}
for _, pair := range host.clientPairs {
certPEMBlock, err := os.ReadFile(pair[0])
if err != nil {
return fmt.Errorf("unable to read CERT file %q: %w", pair[0], err)
}
var keyPEMBlock []byte
if pair[1] != "" {
keyPEMBlock, err = os.ReadFile(pair[1])
if err != nil {
return fmt.Errorf("unable to read CERT file %q: %w", pair[1], err)
}
} else {
// Load key block from same PEM file
keyPEMBlock = certPEMBlock
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
return fmt.Errorf("failed to load X509 key pair: %w", err)
}
tlsConfig.Certificates = append(tlsConfig.Certificates, cert)
}
return nil
}
// HostDirFromRoot returns a function which finds a host directory
// based at the given root.
func HostDirFromRoot(root string) func(string) (string, error) {
return func(host string) (string, error) {
for _, p := range hostPaths(root, host) {
if _, err := os.Stat(p); err == nil {
return p, nil
} else if !os.IsNotExist(err) {
return "", err
}
}
return "", errdefs.ErrNotFound
}
}
// hostDirectory converts ":port" to "_port_" in directory names
func hostDirectory(host string) string {
idx := strings.LastIndex(host, ":")
if idx > 0 {
return host[:idx] + "_" + host[idx+1:] + "_"
}
return host
}
func loadHostDir(ctx context.Context, hostsDir string) ([]hostConfig, error) {
b, err := os.ReadFile(filepath.Join(hostsDir, "hosts.toml"))
if err != nil && !os.IsNotExist(err) {
return nil, err
}
if len(b) == 0 {
// If hosts.toml does not exist, fallback to checking for
// certificate files based on Docker's certificate file
// pattern (".crt", ".cert", ".key" files)
return loadCertFiles(ctx, hostsDir)
}
hosts, err := parseHostsFile(hostsDir, b)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to decode %s", filepath.Join(hostsDir, "hosts.toml"))
// Fallback to checking certificate files
return loadCertFiles(ctx, hostsDir)
}
return hosts, nil
}
type hostFileConfig struct {
// Capabilities determine what operations a host is
// capable of performing. Allowed values
// - pull
// - resolve
// - push
Capabilities []string `toml:"capabilities"`
// CACert are the public key certificates for TLS
// Accepted types
// - string - Single file with certificate(s)
// - []string - Multiple files with certificates
CACert interface{} `toml:"ca"`
// Client keypair(s) for TLS with client authentication
// Accepted types
// - string - Single file with public and private keys
// - []string - Multiple files with public and private keys
// - [][2]string - Multiple keypairs with public and private keys in separate files
Client interface{} `toml:"client"`
// SkipVerify skips verification of the server's certificate chain
// and host name. This should only be used for testing or in
// combination with other methods of verifying connections.
SkipVerify *bool `toml:"skip_verify"`
// Header are additional header files to send to the server
Header map[string]interface{} `toml:"header"`
// OverridePath indicates the API root endpoint is defined in the URL
// path rather than by the API specification.
// This may be used with non-compliant OCI registries to override the
// API root endpoint.
OverridePath bool `toml:"override_path"`
// DialTimeout is the maximum amount of time a dial will wait for
// a connect to complete.
DialTimeout string `toml:"dial_timeout"`
// TODO: Credentials: helper? name? username? alternate domain? token?
}
func parseHostsFile(baseDir string, b []byte) ([]hostConfig, error) {
orderedHosts, err := getSortedHosts(b)
if err != nil {
return nil, err
}
c := struct {
hostFileConfig
// Server specifies the default server. When `host` is
// also specified, those hosts are tried first.
Server string `toml:"server"`
// HostConfigs store the per-host configuration
HostConfigs map[string]hostFileConfig `toml:"host"`
}{}
var (
hosts []hostConfig
)
if err := toml.Unmarshal(b, &c); err != nil {
return nil, err
}
// Parse hosts array
for _, host := range orderedHosts {
config := c.HostConfigs[host]
parsed, err := parseHostConfig(host, baseDir, config)
if err != nil {
return nil, err
}
hosts = append(hosts, parsed)
}
// Parse root host config and append it as the last element
parsed, err := parseHostConfig(c.Server, baseDir, c.hostFileConfig)
if err != nil {
return nil, err
}
hosts = append(hosts, parsed)
return hosts, nil
}
func parseHostConfig(server string, baseDir string, config hostFileConfig) (hostConfig, error) {
var (
result = hostConfig{}
err error
)
if server != "" {
if !strings.HasPrefix(server, "http") {
server = "https://" + server
}
u, err := url.Parse(server)
if err != nil {
return hostConfig{}, fmt.Errorf("unable to parse server %v: %w", server, err)
}
result.scheme = u.Scheme
result.host = u.Host
if len(u.Path) > 0 {
u.Path = path.Clean(u.Path)
if !strings.HasSuffix(u.Path, "/v2") && !config.OverridePath {
u.Path = u.Path + "/v2"
}
} else if !config.OverridePath {
u.Path = "/v2"
}
result.path = u.Path
}
result.skipVerify = config.SkipVerify
if len(config.Capabilities) > 0 {
for _, c := range config.Capabilities {
switch strings.ToLower(c) {
case "pull":
result.capabilities |= docker.HostCapabilityPull
case "resolve":
result.capabilities |= docker.HostCapabilityResolve
case "push":
result.capabilities |= docker.HostCapabilityPush
default:
return hostConfig{}, fmt.Errorf("unknown capability %v", c)
}
}
} else {
result.capabilities = docker.HostCapabilityPull | docker.HostCapabilityResolve | docker.HostCapabilityPush
}
if config.CACert != nil {
switch cert := config.CACert.(type) {
case string:
result.caCerts = []string{makeAbsPath(cert, baseDir)}
case []interface{}:
result.caCerts, err = makeStringSlice(cert, func(p string) string {
return makeAbsPath(p, baseDir)
})
if err != nil {
return hostConfig{}, err
}
default:
return hostConfig{}, fmt.Errorf("invalid type %v for \"ca\"", cert)
}
}
if config.Client != nil {
switch client := config.Client.(type) {
case string:
result.clientPairs = [][2]string{{makeAbsPath(client, baseDir), ""}}
case []interface{}:
// []string or [][2]string
for _, pairs := range client {
switch p := pairs.(type) {
case string:
result.clientPairs = append(result.clientPairs, [2]string{makeAbsPath(p, baseDir), ""})
case []interface{}:
slice, err := makeStringSlice(p, func(s string) string {
return makeAbsPath(s, baseDir)
})
if err != nil {
return hostConfig{}, err
}
if len(slice) != 2 {
return hostConfig{}, fmt.Errorf("invalid pair %v for \"client\"", p)
}
var pair [2]string
copy(pair[:], slice)
result.clientPairs = append(result.clientPairs, pair)
default:
return hostConfig{}, fmt.Errorf("invalid type %T for \"client\"", p)
}
}
default:
return hostConfig{}, fmt.Errorf("invalid type %v for \"client\"", client)
}
}
if config.Header != nil {
header := http.Header{}
for key, ty := range config.Header {
switch value := ty.(type) {
case string:
header[key] = []string{value}
case []interface{}:
header[key], err = makeStringSlice(value, nil)
if err != nil {
return hostConfig{}, err
}
default:
return hostConfig{}, fmt.Errorf("invalid type %v for header %q", ty, key)
}
}
result.header = header
}
if config.DialTimeout != "" {
dialTimeout, err := time.ParseDuration(config.DialTimeout)
if err != nil {
return hostConfig{}, err
}
result.dialTimeout = &dialTimeout
}
return result, nil
}
// getSortedHosts returns the list of hosts in the order are they defined in the file.
func getSortedHosts(b []byte) ([]string, error) {
var hostsInOrder []string
// Use toml unstable package for directly parsing toml
// See https://github.com/pelletier/go-toml/discussions/801#discussioncomment-7083586
p := tomlu.Parser{}
p.Reset(b)
var host string
// iterate over all top level expressions
for p.NextExpression() {
e := p.Expression()
if e.Kind != tomlu.Table {
continue
}
// Let's look at the key. It's an iterator over the multiple dotted parts of the key.
var parts []string
for it := e.Key(); it.Next(); {
parts = append(parts, string(it.Node().Data))
}
// only consider keys that look like `hosts.XXX`
// and skip subtables such as `hosts.XXX.header`
if len(parts) < 2 || parts[0] != "host" || parts[1] == host {
continue
}
host = parts[1]
hostsInOrder = append(hostsInOrder, host)
}
return hostsInOrder, nil
}
// makeStringSlice is a helper func to convert from []interface{} to []string.
// Additionally an optional cb func may be passed to perform string mapping.
func makeStringSlice(slice []interface{}, cb func(string) string) ([]string, error) {
out := make([]string, len(slice))
for i, value := range slice {
str, ok := value.(string)
if !ok {
return nil, fmt.Errorf("unable to cast %v to string", value)
}
if cb != nil {
out[i] = cb(str)
} else {
out[i] = str
}
}
return out, nil
}
func makeAbsPath(p string, base string) string {
if filepath.IsAbs(p) {
return p
}
return filepath.Join(base, p)
}
// loadCertsDir loads certs from certsDir like "/etc/docker/certs.d" .
// Compatible with Docker file layout
// - files ending with ".crt" are treated as CA certificate files
// - files ending with ".cert" are treated as client certificates, and
// files with the same name but ending with ".key" are treated as the
// corresponding private key.
// NOTE: If a ".key" file is missing, this function will just return
// the ".cert", which may contain the private key. If the ".cert" file
// does not contain the private key, the caller should detect and error.
func loadCertFiles(ctx context.Context, certsDir string) ([]hostConfig, error) {
fs, err := os.ReadDir(certsDir)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
hosts := make([]hostConfig, 1)
for _, f := range fs {
if f.IsDir() {
continue
}
if strings.HasSuffix(f.Name(), ".crt") {
hosts[0].caCerts = append(hosts[0].caCerts, filepath.Join(certsDir, f.Name()))
}
if strings.HasSuffix(f.Name(), ".cert") {
var pair [2]string
certFile := f.Name()
pair[0] = filepath.Join(certsDir, certFile)
// Check if key also exists
keyFile := filepath.Join(certsDir, certFile[:len(certFile)-5]+".key")
if _, err := os.Stat(keyFile); err == nil {
pair[1] = keyFile
} else if !os.IsNotExist(err) {
return nil, err
}
hosts[0].clientPairs = append(hosts[0].clientPairs, pair)
}
}
return hosts, nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
"github.com/containerd/log"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// LegacyConfigMediaType should be replaced by OCI image spec.
//
// More detail: docker/distribution#1622
const LegacyConfigMediaType = "application/octet-stream"
// ConvertManifest changes application/octet-stream to schema2 config media type if need.
//
// NOTE:
// 1. original manifest will be deleted by next gc round.
// 2. don't cover manifest list.
func ConvertManifest(ctx context.Context, store content.Store, desc ocispec.Descriptor) (ocispec.Descriptor, error) {
if !images.IsManifestType(desc.MediaType) {
log.G(ctx).Warnf("do nothing for media type: %s", desc.MediaType)
return desc, nil
}
// read manifest data
mb, err := content.ReadBlob(ctx, store, desc)
if err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to read index data: %w", err)
}
var manifest ocispec.Manifest
if err := json.Unmarshal(mb, &manifest); err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to unmarshal data into manifest: %w", err)
}
// check config media type
if manifest.Config.MediaType != LegacyConfigMediaType {
return desc, nil
}
manifest.Config.MediaType = images.MediaTypeDockerSchema2Config
data, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to marshal manifest: %w", err)
}
// update manifest with gc labels
desc.Digest = digest.Canonical.FromBytes(data)
desc.Size = int64(len(data))
labels := map[string]string{}
for i, c := range append([]ocispec.Descriptor{manifest.Config}, manifest.Layers...) {
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = c.Digest.String()
}
ref := remotes.MakeRefKey(ctx, desc)
if err := content.WriteBlob(ctx, store, ref, bytes.NewReader(data), desc, content.WithLabels(labels)); err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to update content: %w", err)
}
return desc, nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
remoteerrors "github.com/containerd/containerd/v2/core/remotes/errors"
)
// ErrorCoder is the base interface for ErrorCode and Error allowing
// users of each to just call ErrorCode to get the real ID of each
type ErrorCoder interface {
ErrorCode() ErrorCode
}
// ErrorCode represents the error type. The errors are serialized via strings
// and the integer format may change and should *never* be exported.
type ErrorCode int
var _ error = ErrorCode(0)
// ErrorCode just returns itself
func (ec ErrorCode) ErrorCode() ErrorCode {
return ec
}
// Error returns the ID/Value
func (ec ErrorCode) Error() string {
// NOTE(stevvooe): Cannot use message here since it may have unpopulated args.
return strings.ToLower(strings.ReplaceAll(ec.String(), "_", " "))
}
// Descriptor returns the descriptor for the error code.
func (ec ErrorCode) Descriptor() ErrorDescriptor {
d, ok := errorCodeToDescriptors[ec]
if !ok {
return ErrorCodeUnknown.Descriptor()
}
return d
}
// String returns the canonical identifier for this error code.
func (ec ErrorCode) String() string {
return ec.Descriptor().Value
}
// Message returned the human-readable error message for this error code.
func (ec ErrorCode) Message() string {
return ec.Descriptor().Message
}
// MarshalText encodes the receiver into UTF-8-encoded text and returns the
// result.
func (ec ErrorCode) MarshalText() (text []byte, err error) {
return []byte(ec.String()), nil
}
// UnmarshalText decodes the form generated by MarshalText.
func (ec *ErrorCode) UnmarshalText(text []byte) error {
desc, ok := idToDescriptors[string(text)]
if !ok {
desc = ErrorCodeUnknown.Descriptor()
}
*ec = desc.Code
return nil
}
// WithMessage creates a new Error struct based on the passed-in info and
// overrides the Message property.
func (ec ErrorCode) WithMessage(message string) Error {
return Error{
Code: ec,
Message: message,
}
}
// WithDetail creates a new Error struct based on the passed-in info and
// set the Detail property appropriately
func (ec ErrorCode) WithDetail(detail interface{}) Error {
return Error{
Code: ec,
Message: ec.Message(),
}.WithDetail(detail)
}
// WithArgs creates a new Error struct and sets the Args slice
func (ec ErrorCode) WithArgs(args ...interface{}) Error {
return Error{
Code: ec,
Message: ec.Message(),
}.WithArgs(args...)
}
// Error provides a wrapper around ErrorCode with extra Details provided.
type Error struct {
Code ErrorCode `json:"code"`
Message string `json:"message"`
Detail interface{} `json:"detail,omitempty"`
// TODO(duglin): See if we need an "args" property so we can do the
// variable substitution right before showing the message to the user
}
var _ error = Error{}
// ErrorCode returns the ID/Value of this Error
func (e Error) ErrorCode() ErrorCode {
return e.Code
}
// Error returns a human readable representation of the error.
func (e Error) Error() string {
return fmt.Sprintf("%s: %s", e.Code.Error(), e.Message)
}
// WithDetail will return a new Error, based on the current one, but with
// some Detail info added
func (e Error) WithDetail(detail interface{}) Error {
return Error{
Code: e.Code,
Message: e.Message,
Detail: detail,
}
}
// WithArgs uses the passed-in list of interface{} as the substitution
// variables in the Error's Message string, but returns a new Error
func (e Error) WithArgs(args ...interface{}) Error {
return Error{
Code: e.Code,
Message: fmt.Sprintf(e.Code.Message(), args...),
Detail: e.Detail,
}
}
// ErrorDescriptor provides relevant information about a given error code.
type ErrorDescriptor struct {
// Code is the error code that this descriptor describes.
Code ErrorCode
// Value provides a unique, string key, often captilized with
// underscores, to identify the error code. This value is used as the
// keyed value when serializing api errors.
Value string
// Message is a short, human readable description of the error condition
// included in API responses.
Message string
// Description provides a complete account of the errors purpose, suitable
// for use in documentation.
Description string
// HTTPStatusCode provides the http status code that is associated with
// this error condition.
HTTPStatusCode int
}
// ParseErrorCode returns the value by the string error code.
// `ErrorCodeUnknown` will be returned if the error is not known.
func ParseErrorCode(value string) ErrorCode {
ed, ok := idToDescriptors[value]
if ok {
return ed.Code
}
return ErrorCodeUnknown
}
// Errors provides the envelope for multiple errors and a few sugar methods
// for use within the application.
type Errors []error
var _ error = Errors{}
func (errs Errors) Error() string {
switch len(errs) {
case 0:
return "<nil>"
case 1:
return errs[0].Error()
default:
msg := "errors:\n"
for _, err := range errs {
msg += err.Error() + "\n"
}
return msg
}
}
// Len returns the current number of errors.
func (errs Errors) Len() int {
return len(errs)
}
// MarshalJSON converts slice of error, ErrorCode or Error into a
// slice of Error - then serializes
func (errs Errors) MarshalJSON() ([]byte, error) {
var tmpErrs struct {
Errors []Error `json:"errors,omitempty"`
}
for _, daErr := range errs {
var err Error
switch daErr := daErr.(type) {
case ErrorCode:
err = daErr.WithDetail(nil)
case Error:
err = daErr
default:
err = ErrorCodeUnknown.WithDetail(daErr)
}
// If the Error struct was setup and they forgot to set the
// Message field (meaning its "") then grab it from the ErrCode
msg := err.Message
if msg == "" {
msg = err.Code.Message()
}
tmpErrs.Errors = append(tmpErrs.Errors, Error{
Code: err.Code,
Message: msg,
Detail: err.Detail,
})
}
return json.Marshal(tmpErrs)
}
// UnmarshalJSON deserializes []Error and then converts it into slice of
// Error or ErrorCode
func (errs *Errors) UnmarshalJSON(data []byte) error {
var tmpErrs struct {
Errors []Error
}
if err := json.Unmarshal(data, &tmpErrs); err != nil {
return err
}
var newErrs Errors
for _, daErr := range tmpErrs.Errors {
// If Message is empty or exactly matches the Code's message string
// then just use the Code, no need for a full Error struct
if daErr.Detail == nil && (daErr.Message == "" || daErr.Message == daErr.Code.Message()) {
// Error's w/o details get converted to ErrorCode
newErrs = append(newErrs, daErr.Code)
} else {
// Error's w/ details are untouched
newErrs = append(newErrs, Error{
Code: daErr.Code,
Message: daErr.Message,
Detail: daErr.Detail,
})
}
}
*errs = newErrs
return nil
}
func unexpectedResponseErr(resp *http.Response) (retErr error) {
retErr = remoteerrors.NewUnexpectedStatusErr(resp)
// Decode registry error if provided
if rerr := retErr.(remoteerrors.ErrUnexpectedStatus); len(rerr.Body) > 0 {
var registryErr Errors
if err := json.Unmarshal(rerr.Body, ®istryErr); err == nil && registryErr.Len() > 0 {
// Join the unexpected error with the typed errors, when printed it will
// show the unexpected error message and the registry errors. The body
// is always excluded from the unexpected error message. This also allows
// clients to decode into either type.
retErr = errors.Join(rerr, registryErr)
}
}
return
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"fmt"
"net/http"
"sort"
"sync"
)
var (
errorCodeToDescriptors = map[ErrorCode]ErrorDescriptor{}
idToDescriptors = map[string]ErrorDescriptor{}
groupToDescriptors = map[string][]ErrorDescriptor{}
)
var (
// ErrorCodeUnknown is a generic error that can be used as a last
// resort if there is no situation-specific error message that can be used
ErrorCodeUnknown = Register("errcode", ErrorDescriptor{
Value: "UNKNOWN",
Message: "unknown error",
Description: `Generic error returned when the error does not have an
API classification.`,
HTTPStatusCode: http.StatusInternalServerError,
})
// ErrorCodeUnsupported is returned when an operation is not supported.
ErrorCodeUnsupported = Register("errcode", ErrorDescriptor{
Value: "UNSUPPORTED",
Message: "The operation is unsupported.",
Description: `The operation was unsupported due to a missing
implementation or invalid set of parameters.`,
HTTPStatusCode: http.StatusMethodNotAllowed,
})
// ErrorCodeUnauthorized is returned if a request requires
// authentication.
ErrorCodeUnauthorized = Register("errcode", ErrorDescriptor{
Value: "UNAUTHORIZED",
Message: "authentication required",
Description: `The access controller was unable to authenticate
the client. Often this will be accompanied by a
Www-Authenticate HTTP response header indicating how to
authenticate.`,
HTTPStatusCode: http.StatusUnauthorized,
})
// ErrorCodeDenied is returned if a client does not have sufficient
// permission to perform an action.
ErrorCodeDenied = Register("errcode", ErrorDescriptor{
Value: "DENIED",
Message: "requested access to the resource is denied",
Description: `The access controller denied access for the
operation on a resource.`,
HTTPStatusCode: http.StatusForbidden,
})
// ErrorCodeUnavailable provides a common error to report unavailability
// of a service or endpoint.
ErrorCodeUnavailable = Register("errcode", ErrorDescriptor{
Value: "UNAVAILABLE",
Message: "service unavailable",
Description: "Returned when a service is not available",
HTTPStatusCode: http.StatusServiceUnavailable,
})
// ErrorCodeTooManyRequests is returned if a client attempts too many
// times to contact a service endpoint.
ErrorCodeTooManyRequests = Register("errcode", ErrorDescriptor{
Value: "TOOMANYREQUESTS",
Message: "too many requests",
Description: `Returned when a client attempts to contact a
service too many times`,
HTTPStatusCode: http.StatusTooManyRequests,
})
)
var nextCode = 1000
var registerLock sync.Mutex
// Register will make the passed-in error known to the environment and
// return a new ErrorCode
func Register(group string, descriptor ErrorDescriptor) ErrorCode {
registerLock.Lock()
defer registerLock.Unlock()
descriptor.Code = ErrorCode(nextCode)
if _, ok := idToDescriptors[descriptor.Value]; ok {
panic(fmt.Sprintf("ErrorValue %q is already registered", descriptor.Value))
}
if _, ok := errorCodeToDescriptors[descriptor.Code]; ok {
panic(fmt.Sprintf("ErrorCode %v is already registered", descriptor.Code))
}
groupToDescriptors[group] = append(groupToDescriptors[group], descriptor)
errorCodeToDescriptors[descriptor.Code] = descriptor
idToDescriptors[descriptor.Value] = descriptor
nextCode++
return descriptor.Code
}
type byValue []ErrorDescriptor
func (a byValue) Len() int { return len(a) }
func (a byValue) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byValue) Less(i, j int) bool { return a[i].Value < a[j].Value }
// GetGroupNames returns the list of Error group names that are registered
func GetGroupNames() []string {
keys := []string{}
for k := range groupToDescriptors {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
// GetErrorCodeGroup returns the named group of error descriptors
func GetErrorCodeGroup(name string) []ErrorDescriptor {
desc := groupToDescriptors[name]
sort.Sort(byValue(desc))
return desc
}
// GetErrorAllDescriptors returns a slice of all ErrorDescriptors that are
// registered, irrespective of what group they're in
func GetErrorAllDescriptors() []ErrorDescriptor {
result := []ErrorDescriptor{}
for _, group := range GetGroupNames() {
result = append(result, GetErrorCodeGroup(group)...)
}
sort.Sort(byValue(result))
return result
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"bytes"
"compress/flate"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/klauspost/compress/zstd"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
)
type bufferPool struct {
pool *sync.Pool
}
func newbufferPool(bufCap int64) *bufferPool {
pool := &sync.Pool{
New: func() any {
return bytes.NewBuffer(make([]byte, 0, bufCap))
},
}
return &bufferPool{
pool: pool,
}
}
func (p *bufferPool) Get() *bytes.Buffer {
buf := p.pool.Get().(*bytes.Buffer)
return buf
}
func (p *bufferPool) Put(buffer *bytes.Buffer) {
p.pool.Put(buffer)
}
var ErrClosedPipe = errors.New("bufpipe: read/write on closed pipe")
// pipe implements an asynchronous buffered pipe designed for high-throughput
// I/O with configurable initial buffer sizes and buffer reuse. It decouples
// read/write operations, allowing writers to proceed without blocking (unless
// the pipe is closed) and readers to wait efficiently for incoming data.
//
// Key Characteristics:
// - Asynchronous Operation: Writers populate buffers independently of read
// timing, enabling continuous data flow without reader-writer synchronization.
// - Dynamic Buffering: Active buffer grows organically to handle large payloads, while
// the initial capacity (bufCap) balances memory pre-allocation and growth overhead.
// - Buffer Recycling: Retrieved from/pushed to a pool to minimize allocations, reducing
// garbage collection pressure in sustained I/O scenarios.
// - Error Semantics: Closes deterministically on first error (read or write), propagating
// errors atomically to both ends while draining buffered data.
//
// Difference with io.Pipe:
// - Unlike io.Pipe's strict synchronization (blocking write until read), this implementation
// allows writers to buffer data ahead of reads, improving throughput for bursty workloads.
//
// Synchronization & Internals:
// - Condition Variable (sync.Cond): Coordinates reader/writer, waking readers on new data
// or closure. Locking is centralized via the condition's mutex.
// - Buffer Lifecycle: Active buffer serves writes until read depletion, after which it's
// recycled to the pool. Pooled buffers retain their capacity across uses.
// - Error Handling: Write errors (werr) permanently fail writes; read errors (rerr) mark
// terminal read state after buffer exhaustion.
//
// Future Considerations:
// - Zero-copy reads/writes to avoid buffer copying overhead.
// - Memory-mapped file backing for multi-gigabyte payloads.
type pipe struct {
cond *sync.Cond // Coordinates read/write signaling via Lock+Wait/Signal
bufPool *bufferPool // Reusable buffers with initial capacity bufCap
buf *bytes.Buffer // Active data buffer (nil when empty/returned to pool)
rerr, werr error // Terminal read/write errors (sticky once set)
}
type pipeReader struct {
*pipe
}
type pipeWriter struct {
*pipe
}
func newPipeWriter(bufPool *bufferPool) (*pipeReader, *pipeWriter) {
p := &pipe{
cond: sync.NewCond(new(sync.Mutex)),
bufPool: bufPool,
buf: nil,
}
return &pipeReader{
pipe: p,
}, &pipeWriter{
pipe: p,
}
}
// Read implements the standard Read interface: it reads data from the pipe,
// reading from the internal buffer, otherwise blocking until a writer arrives
// or the write end is closed. If the write end is closed with an error, that
// error is returned as err; otherwise err is io.EOF.
func (r *pipeReader) Read(data []byte) (n int, err error) {
r.cond.L.Lock()
defer r.cond.L.Unlock()
if r.buf == nil {
r.buf = r.bufPool.Get()
}
for {
n, err = r.buf.Read(data)
// If not closed and no read, wait for writing.
if err == io.EOF && r.rerr == nil && n == 0 {
r.cond.Wait() // Wait for data to be written
continue
}
break
}
if err == io.EOF {
// Put buffer back to pool
r.bufPool.Put(r.buf)
r.buf = nil
return n, r.rerr
}
return n, err
}
// Close closes the reader; subsequent writes from the write half of the pipe
// will return error ErrClosedPipe.
func (r *pipeReader) Close() error {
return r.CloseWithError(nil)
}
// CloseWithError closes the reader; subsequent writes to the write half of the
// pipe will return the error err.
func (r *pipeReader) CloseWithError(err error) error {
r.cond.L.Lock()
defer r.cond.L.Unlock()
if err == nil {
err = ErrClosedPipe
}
r.werr = err
return nil
}
// Write implements the standard Write interface: it writes data to the internal
// buffer. If the read end is closed with an error, that err is returned as err;
// otherwise err is ErrClosedPipe.
func (w *pipeWriter) Write(data []byte) (int, error) {
w.cond.L.Lock()
defer w.cond.L.Unlock()
if w.werr != nil {
return 0, w.werr
}
if w.buf == nil {
w.buf = w.bufPool.Get()
}
n, err := w.buf.Write(data)
w.cond.Signal()
return n, err
}
// Close closes the writer; subsequent reads from the read half of the pipe will
// return io.EOF once the internal buffer get empty.
func (w *pipeWriter) Close() error {
return w.CloseWithError(nil)
}
// Close closes the writer; subsequent reads from the read half of the pipe will
// return err once the internal buffer get empty.
func (w *pipeWriter) CloseWithError(err error) error {
w.cond.L.Lock()
defer w.cond.L.Unlock()
if err == nil {
err = io.EOF
}
w.rerr = err
w.cond.Broadcast()
return nil
}
type dockerFetcher struct {
*dockerBase
}
func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", desc.Digest))
hosts := r.filterHosts(HostCapabilityPull)
if len(hosts) == 0 {
return nil, fmt.Errorf("no pull hosts: %w", errdefs.ErrNotFound)
}
ctx, err := ContextWithRepositoryScope(ctx, r.refspec, false)
if err != nil {
return nil, err
}
return newHTTPReadSeeker(desc.Size, func(offset int64) (io.ReadCloser, error) {
// firstly try fetch via external urls
for _, us := range desc.URLs {
u, err := url.Parse(us)
if err != nil {
log.G(ctx).WithError(err).Debugf("failed to parse %q", us)
continue
}
if u.Scheme != "http" && u.Scheme != "https" {
log.G(ctx).Debug("non-http(s) alternative url is unsupported")
continue
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u))
log.G(ctx).Info("request")
// Try this first, parse it
host := RegistryHost{
Client: http.DefaultClient,
Host: u.Host,
Scheme: u.Scheme,
Path: u.Path,
Capabilities: HostCapabilityPull,
}
req := r.request(host, http.MethodGet)
// Strip namespace from base
req.path = u.Path
if u.RawQuery != "" {
req.path = req.path + "?" + u.RawQuery
}
rc, err := r.open(ctx, req, desc.MediaType, offset, false)
if err != nil {
if errdefs.IsNotFound(err) {
continue // try one of the other urls.
}
return nil, err
}
return rc, nil
}
// Try manifests endpoints for manifests types
if images.IsManifestType(desc.MediaType) || images.IsIndexType(desc.MediaType) {
var firstErr error
for i, host := range r.hosts {
req := r.request(host, http.MethodGet, "manifests", desc.Digest.String())
if err := req.addNamespace(r.refspec.Hostname()); err != nil {
return nil, err
}
rc, err := r.open(ctx, req, desc.MediaType, offset, i == len(r.hosts)-1)
if err != nil {
// Store the error for referencing later
if firstErr == nil {
firstErr = err
}
continue // try another host
}
return rc, nil
}
return nil, firstErr
}
// Finally use blobs endpoints
var firstErr error
for i, host := range r.hosts {
req := r.request(host, http.MethodGet, "blobs", desc.Digest.String())
if err := req.addNamespace(r.refspec.Hostname()); err != nil {
return nil, err
}
rc, err := r.open(ctx, req, desc.MediaType, offset, i == len(r.hosts)-1)
if err != nil {
// Store the error for referencing later
if firstErr == nil {
firstErr = err
}
continue // try another host
}
return rc, nil
}
if errdefs.IsNotFound(firstErr) {
firstErr = fmt.Errorf("could not fetch content descriptor %v (%v) from remote: %w",
desc.Digest, desc.MediaType, errdefs.ErrNotFound,
)
}
return nil, firstErr
})
}
func (r dockerFetcher) createGetReq(ctx context.Context, host RegistryHost, lastHost bool, mediatype string, ps ...string) (*request, int64, error) {
headReq := r.request(host, http.MethodHead, ps...)
if err := headReq.addNamespace(r.refspec.Hostname()); err != nil {
return nil, 0, err
}
if mediatype == "" {
headReq.header.Set("Accept", "*/*")
} else {
headReq.header.Set("Accept", strings.Join([]string{mediatype, `*/*`}, ", "))
}
headResp, err := headReq.doWithRetries(ctx, lastHost)
if err != nil {
return nil, 0, err
}
if headResp.Body != nil {
headResp.Body.Close()
}
if headResp.StatusCode > 299 {
return nil, 0, fmt.Errorf("unexpected HEAD status code %v: %s", headReq.String(), headResp.Status)
}
getReq := r.request(host, http.MethodGet, ps...)
if err := getReq.addNamespace(r.refspec.Hostname()); err != nil {
return nil, 0, err
}
return getReq, headResp.ContentLength, nil
}
func (r dockerFetcher) FetchByDigest(ctx context.Context, dgst digest.Digest, opts ...remotes.FetchByDigestOpts) (io.ReadCloser, ocispec.Descriptor, error) {
var desc ocispec.Descriptor
ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", dgst))
var config remotes.FetchByDigestConfig
for _, o := range opts {
if err := o(ctx, &config); err != nil {
return nil, desc, err
}
}
hosts := r.filterHosts(HostCapabilityPull)
if len(hosts) == 0 {
return nil, desc, fmt.Errorf("no pull hosts: %w", errdefs.ErrNotFound)
}
ctx, err := ContextWithRepositoryScope(ctx, r.refspec, false)
if err != nil {
return nil, desc, err
}
var (
getReq *request
sz int64
firstErr error
)
for i, host := range r.hosts {
getReq, sz, err = r.createGetReq(ctx, host, i == len(r.hosts)-1, config.Mediatype, "blobs", dgst.String())
if err == nil {
break
}
// Store the error for referencing later
if firstErr == nil {
firstErr = err
}
}
if getReq == nil {
// Fall back to the "manifests" endpoint
for i, host := range r.hosts {
getReq, sz, err = r.createGetReq(ctx, host, i == len(r.hosts)-1, config.Mediatype, "manifests", dgst.String())
if err == nil {
break
}
// Store the error for referencing later
if firstErr == nil {
firstErr = err
}
}
}
if getReq == nil {
if errdefs.IsNotFound(firstErr) {
firstErr = fmt.Errorf("could not fetch content %v from remote: %w", dgst, errdefs.ErrNotFound)
}
if firstErr == nil {
firstErr = fmt.Errorf("could not fetch content %v from remote: (unknown)", dgst)
}
return nil, desc, firstErr
}
seeker, err := newHTTPReadSeeker(sz, func(offset int64) (io.ReadCloser, error) {
return r.open(ctx, getReq, config.Mediatype, offset, true)
})
if err != nil {
return nil, desc, err
}
desc = ocispec.Descriptor{
MediaType: "application/octet-stream",
Digest: dgst,
Size: sz,
}
if config.Mediatype != "" {
desc.MediaType = config.Mediatype
}
return seeker, desc, nil
}
func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string, offset int64, lastHost bool) (_ io.ReadCloser, retErr error) {
const minChunkSize = 512
chunkSize := int64(r.performances.ConcurrentLayerFetchBuffer)
parallelism := int64(r.performances.MaxConcurrentDownloads)
if chunkSize < minChunkSize || req.body != nil {
parallelism = 1
}
log.G(ctx).WithField("initial_parallelism", r.performances.MaxConcurrentDownloads).
WithField("parallelism", parallelism).
WithField("chunk_size", chunkSize).
WithField("offset", offset).
Debug("fetching layer")
req.setMediaType(mediatype)
req.header.Set("Accept-Encoding", "zstd;q=1.0, gzip;q=0.8, deflate;q=0.5")
if parallelism > 1 || offset > 0 {
req.setOffset(offset)
}
if err := r.Acquire(ctx, 1); err != nil {
return nil, err
}
resp, err := req.doWithRetries(ctx, lastHost, withErrorCheck, withOffsetCheck(offset))
switch err {
case nil:
// all good
case errContentRangeIgnored:
if parallelism != 1 {
log.G(ctx).WithError(err).Info("remote host ignored content range, forcing parallelism to 1")
parallelism = 1
}
default:
log.G(ctx).WithError(err).Debug("fetch failed")
r.Release(1)
return nil, err
}
body := &fnOnClose{
BeforeClose: func() {
r.Release(1)
},
ReadCloser: resp.Body,
}
defer func() {
if retErr != nil {
body.Close()
}
}()
encoding := strings.FieldsFunc(resp.Header.Get("Content-Encoding"), func(r rune) bool {
return r == ' ' || r == '\t' || r == ','
})
remaining, _ := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 0)
if remaining <= chunkSize {
parallelism = 1
}
if parallelism > 1 {
// If we have a content length, we can use multiple requests to fetch
// the content in parallel. This will make download of bigger bodies
// faster, at the cost of parallelism more requests and max
// ~(max_parallelism * goroutine footprint) memory usage. The goroutine
// footprint should be: the goroutine stack + pipe buffer size
numChunks := remaining / chunkSize
if numChunks*chunkSize < remaining {
numChunks++
}
if numChunks < parallelism {
parallelism = numChunks
}
queue := make(chan int64, parallelism)
ctx, cancelCtx := context.WithCancel(ctx)
done := ctx.Done()
readers, writers := make([]io.Reader, numChunks), make([]*pipeWriter, numChunks)
bufPool := newbufferPool(chunkSize)
for i := range numChunks {
readers[i], writers[i] = newPipeWriter(bufPool)
}
// keep reference of the initial body value to ensure it is closed
ibody := body
go func() {
for i := range numChunks {
select {
case queue <- i:
case <-done:
if i == 0 {
ibody.Close()
}
return // avoid leaking a goroutine if we exit early.
}
}
close(queue)
}()
for range parallelism {
go func() {
for i := range queue { // first in first out
copy := func() error {
var body io.ReadCloser
if i == 0 {
body = ibody
} else {
if err := r.Acquire(ctx, 1); err != nil {
return err
}
defer r.Release(1)
reqClone := req.clone()
reqClone.setOffset(offset + i*chunkSize)
nresp, err := reqClone.doWithRetries(ctx, lastHost, withErrorCheck)
if err != nil {
_ = writers[i].CloseWithError(err)
select {
case <-done:
return ctx.Err()
default:
cancelCtx()
}
return err
}
body = nresp.Body
}
_, err := io.Copy(writers[i], io.LimitReader(body, chunkSize))
_ = body.Close()
_ = writers[i].CloseWithError(err)
if err != nil && err != io.EOF {
cancelCtx()
return err
}
return nil
}
if copy() != nil {
return
}
}
}()
}
body = &fnOnClose{
BeforeClose: func() {
cancelCtx()
},
ReadCloser: io.NopCloser(io.MultiReader(readers...)),
}
}
for i := len(encoding) - 1; i >= 0; i-- {
algorithm := strings.ToLower(encoding[i])
switch algorithm {
case "zstd":
r, err := zstd.NewReader(body.ReadCloser,
zstd.WithDecoderLowmem(false),
)
if err != nil {
return nil, err
}
body.ReadCloser = r.IOReadCloser()
case "gzip":
r, err := gzip.NewReader(body.ReadCloser)
if err != nil {
return nil, err
}
body.ReadCloser = r
case "deflate":
body.ReadCloser = flate.NewReader(body.ReadCloser)
case "identity", "":
// no content-encoding applied, use raw body
default:
return nil, errors.New("unsupported Content-Encoding algorithm: " + algorithm)
}
}
return body, nil
}
type fnOnClose struct {
BeforeClose func()
io.ReadCloser
}
// Close calls the BeforeClose function before closing the underlying
// ReadCloser.
func (f *fnOnClose) Close() error {
f.BeforeClose()
return f.ReadCloser.Close()
}
var _ io.ReadCloser = &fnOnClose{}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"context"
"fmt"
"net/url"
"strings"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/pkg/labels"
"github.com/containerd/containerd/v2/pkg/reference"
"github.com/containerd/log"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// AppendDistributionSourceLabel updates the label of blob with distribution source.
func AppendDistributionSourceLabel(manager content.Manager, ref string) (images.HandlerFunc, error) {
refspec, err := reference.Parse(ref)
if err != nil {
return nil, err
}
u, err := url.Parse("dummy://" + refspec.Locator)
if err != nil {
return nil, err
}
source, repo := u.Hostname(), strings.TrimPrefix(u.Path, "/")
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
info, err := manager.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
key := distributionSourceLabelKey(source)
originLabel := ""
if info.Labels != nil {
originLabel = info.Labels[key]
}
value := appendDistributionSourceLabel(originLabel, repo)
// The repo name has been limited under 256 and the distribution
// label might hit the limitation of label size, when blob data
// is used as the very, very common layer.
if err := labels.Validate(key, value); err != nil {
log.G(ctx).Warnf("skip to append distribution label: %s", err)
return nil, nil
}
info = content.Info{
Digest: desc.Digest,
Labels: map[string]string{
key: value,
},
}
_, err = manager.Update(ctx, info, fmt.Sprintf("labels.%s", key))
return nil, err
}, nil
}
func appendDistributionSourceLabel(originLabel, repo string) string {
repos := []string{}
if originLabel != "" {
repos = strings.Split(originLabel, ",")
}
repos = append(repos, repo)
// use empty string to present duplicate items
for i := 1; i < len(repos); i++ {
tmp, j := repos[i], i-1
for ; j >= 0 && repos[j] >= tmp; j-- {
if repos[j] == tmp {
tmp = ""
}
repos[j+1] = repos[j]
}
repos[j+1] = tmp
}
i := 0
for ; i < len(repos) && repos[i] == ""; i++ {
}
return strings.Join(repos[i:], ",")
}
func distributionSourceLabelKey(source string) string {
return labels.LabelDistributionSource + "." + source
}
// selectRepositoryMountCandidate will select the repo which has longest
// common prefix components as the candidate.
func selectRepositoryMountCandidate(refspec reference.Spec, sources map[string]string) string {
u, err := url.Parse("dummy://" + refspec.Locator)
if err != nil {
// NOTE: basically, it won't be error here
return ""
}
source, target := u.Hostname(), strings.TrimPrefix(u.Path, "/")
repoLabel, ok := sources[distributionSourceLabelKey(source)]
if !ok || repoLabel == "" {
return ""
}
n, match := 0, ""
components := strings.Split(target, "/")
for _, repo := range strings.Split(repoLabel, ",") {
// the target repo is not a candidate
if repo == target {
continue
}
if l := commonPrefixComponents(components, repo); l >= n {
n, match = l, repo
}
}
return match
}
func commonPrefixComponents(components []string, target string) int {
targetComponents := strings.Split(target, "/")
i := 0
for ; i < len(components) && i < len(targetComponents); i++ {
if components[i] != targetComponents[i] {
break
}
}
return i
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"bytes"
"fmt"
"io"
"github.com/containerd/errdefs"
"github.com/containerd/log"
)
const maxRetry = 3
type httpReadSeeker struct {
size int64
offset int64
rc io.ReadCloser
open func(offset int64) (io.ReadCloser, error)
closed bool
errsWithNoProgress int
}
func newHTTPReadSeeker(size int64, open func(offset int64) (io.ReadCloser, error)) (io.ReadCloser, error) {
return &httpReadSeeker{
size: size,
open: open,
}, nil
}
func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
if hrs.closed {
return 0, io.EOF
}
rd, err := hrs.reader()
if err != nil {
return 0, err
}
n, err = rd.Read(p)
hrs.offset += int64(n)
if n > 0 || err == nil {
hrs.errsWithNoProgress = 0
}
switch err {
case io.ErrUnexpectedEOF:
// connection closed unexpectedly. try reconnecting.
if n == 0 {
hrs.errsWithNoProgress++
if hrs.errsWithNoProgress > maxRetry {
return // too many retries for this offset with no progress
}
}
if hrs.rc != nil {
if clsErr := hrs.rc.Close(); clsErr != nil {
log.L.WithError(clsErr).Error("httpReadSeeker: failed to close ReadCloser")
}
hrs.rc = nil
}
if _, err2 := hrs.reader(); err2 == nil {
return n, nil
}
case io.EOF:
// The CRI's imagePullProgressTimeout relies on responseBody.Close to
// update the process monitor's status. If the err is io.EOF, close
// the connection since there is no more available data.
if hrs.rc != nil {
if clsErr := hrs.rc.Close(); clsErr != nil {
log.L.WithError(clsErr).Error("httpReadSeeker: failed to close ReadCloser after io.EOF")
}
hrs.rc = nil
}
}
return
}
func (hrs *httpReadSeeker) Close() error {
if hrs.closed {
return nil
}
hrs.closed = true
if hrs.rc != nil {
return hrs.rc.Close()
}
return nil
}
func (hrs *httpReadSeeker) ReadAt(p []byte, offset int64) (n int, err error) {
if hrs.closed {
return 0, fmt.Errorf("httpReadSeeker.ReadAt: closed: %w", errdefs.ErrUnavailable)
}
if offset < 0 {
return 0, fmt.Errorf("httpReadSeeker.ReadAt: negative offset: %w", errdefs.ErrInvalidArgument)
}
if hrs.size != -1 && offset >= hrs.size {
return 0, io.EOF
}
if hrs.open == nil {
return 0, fmt.Errorf("httpReadSeeker.ReadAt: cannot open: %w", errdefs.ErrNotImplemented)
}
rc, err := hrs.open(offset)
if err != nil {
return 0, fmt.Errorf("httpReadSeeker.ReadAt: failed to open at offset %d: %w", offset, err)
}
defer func() {
if closeErr := rc.Close(); closeErr != nil {
log.L.WithError(closeErr).Error("httpReadSeeker.ReadAt: failed to close ReadCloser")
}
}()
return io.ReadFull(rc, p)
}
func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
if hrs.closed {
return 0, fmt.Errorf("Fetcher.Seek: closed: %w", errdefs.ErrUnavailable)
}
abs := hrs.offset
switch whence {
case io.SeekStart:
abs = offset
case io.SeekCurrent:
abs += offset
case io.SeekEnd:
if hrs.size == -1 {
return 0, fmt.Errorf("Fetcher.Seek: unknown size, cannot seek from end: %w", errdefs.ErrUnavailable)
}
abs = hrs.size + offset
default:
return 0, fmt.Errorf("Fetcher.Seek: invalid whence: %w", errdefs.ErrInvalidArgument)
}
if abs < 0 {
return 0, fmt.Errorf("Fetcher.Seek: negative offset: %w", errdefs.ErrInvalidArgument)
}
if abs != hrs.offset {
if hrs.rc != nil {
if err := hrs.rc.Close(); err != nil {
log.L.WithError(err).Error("Fetcher.Seek: failed to close ReadCloser")
}
hrs.rc = nil
}
hrs.offset = abs
}
return hrs.offset, nil
}
func (hrs *httpReadSeeker) reader() (io.Reader, error) {
if hrs.rc != nil {
return hrs.rc, nil
}
if hrs.size == -1 || hrs.offset < hrs.size {
// only try to reopen the body request if we are seeking to a value
// less than the actual size.
if hrs.open == nil {
return nil, fmt.Errorf("cannot open: %w", errdefs.ErrNotImplemented)
}
rc, err := hrs.open(hrs.offset)
if err != nil {
return nil, fmt.Errorf("httpReadSeeker: failed open: %w", err)
}
if hrs.rc != nil {
if err := hrs.rc.Close(); err != nil {
log.L.WithError(err).Error("httpReadSeeker: failed to close ReadCloser")
}
}
hrs.rc = rc
} else {
// There is an edge case here where offset == size of the content. If
// we seek, we will probably get an error for content that cannot be
// sought (?). In that case, we should err on committing the content,
// as the length is already satisfied but we just return the empty
// reader instead.
hrs.rc = io.NopCloser(bytes.NewReader([]byte{}))
}
return hrs.rc, nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"
"sync"
"time"
"github.com/containerd/errdefs"
"github.com/containerd/log"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
)
type dockerPusher struct {
*dockerBase
object string
// TODO: namespace tracker
tracker StatusTracker
}
// Writer implements Ingester API of content store. This allows the client
// to receive ErrUnavailable when there is already an on-going upload.
// Note that the tracker MUST implement StatusTrackLocker interface to avoid
// race condition on StatusTracker.
func (p dockerPusher) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
if wOpts.Ref == "" {
return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument)
}
return p.push(ctx, wOpts.Desc, wOpts.Ref, true)
}
func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) {
return p.push(ctx, desc, remotes.MakeRefKey(ctx, desc), false)
}
func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref string, unavailableOnFail bool) (content.Writer, error) {
if l, ok := p.tracker.(StatusTrackLocker); ok {
l.Lock(ref)
defer l.Unlock(ref)
}
ctx, err := ContextWithRepositoryScope(ctx, p.refspec, true)
if err != nil {
return nil, err
}
status, err := p.tracker.GetStatus(ref)
if err == nil {
if status.Committed && status.Offset == status.Total {
return nil, fmt.Errorf("ref %v: %w", ref, errdefs.ErrAlreadyExists)
}
if unavailableOnFail && status.ErrClosed == nil {
// Another push of this ref is happening elsewhere. The rest of function
// will continue only when `errdefs.IsNotFound(err) == true` (i.e. there
// is no actively-tracked ref already).
return nil, fmt.Errorf("push is on-going: %w", errdefs.ErrUnavailable)
}
// TODO: Handle incomplete status
} else if !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to get status: %w", err)
}
hosts := p.filterHosts(HostCapabilityPush)
if len(hosts) == 0 {
return nil, fmt.Errorf("no push hosts: %w", errdefs.ErrNotFound)
}
var (
isManifest bool
existCheck []string
host = hosts[0]
)
if images.IsManifestType(desc.MediaType) || images.IsIndexType(desc.MediaType) {
isManifest = true
existCheck = getManifestPath(p.object, desc.Digest)
} else {
existCheck = []string{"blobs", desc.Digest.String()}
}
req := p.request(host, http.MethodHead, existCheck...)
req.header.Set("Accept", strings.Join([]string{desc.MediaType, `*/*`}, ", "))
log.G(ctx).WithField("url", req.String()).Debugf("checking and pushing to")
resp, err := req.doWithRetries(ctx, true)
if err != nil {
if !errors.Is(err, ErrInvalidAuthorization) {
return nil, err
}
log.G(ctx).WithError(err).Debugf("Unable to check existence, continuing with push")
} else {
if resp.StatusCode == http.StatusOK {
var exists bool
if isManifest && existCheck[1] != desc.Digest.String() {
dgstHeader := digest.Digest(resp.Header.Get("Docker-Content-Digest"))
if dgstHeader == desc.Digest {
exists = true
}
} else {
exists = true
}
if exists {
p.tracker.SetStatus(ref, Status{
Committed: true,
PushStatus: PushStatus{
Exists: true,
},
Status: content.Status{
Ref: ref,
// TODO: Set updated time?
},
})
resp.Body.Close()
return nil, fmt.Errorf("content %v on remote: %w", desc.Digest, errdefs.ErrAlreadyExists)
}
} else if resp.StatusCode != http.StatusNotFound {
err := unexpectedResponseErr(resp)
log.G(ctx).WithError(err).Debug("unexpected response")
resp.Body.Close()
return nil, err
}
resp.Body.Close()
}
if isManifest {
putPath := getManifestPath(p.object, desc.Digest)
req = p.request(host, http.MethodPut, putPath...)
req.header.Add("Content-Type", desc.MediaType)
} else {
// Start upload request
req = p.request(host, http.MethodPost, "blobs", "uploads/")
mountedFrom := ""
var resp *http.Response
if fromRepo := selectRepositoryMountCandidate(p.refspec, desc.Annotations); fromRepo != "" {
preq := requestWithMountFrom(req, desc.Digest.String(), fromRepo)
pctx := ContextWithAppendPullRepositoryScope(ctx, fromRepo)
// NOTE: the fromRepo might be private repo and
// auth service still can grant token without error.
// but the post request will fail because of 401.
//
// for the private repo, we should remove mount-from
// query and send the request again.
resp, err = preq.doWithRetries(pctx, true)
if err != nil {
if !errors.Is(err, ErrInvalidAuthorization) {
return nil, fmt.Errorf("pushing with mount from %s: %w", fromRepo, err)
}
log.G(ctx).Debugf("failed to push with mount from repository %s: %v", fromRepo, err)
}
if resp != nil {
switch resp.StatusCode {
case http.StatusUnauthorized:
log.G(ctx).Debugf("failed to mount from repository %s, not authorized", fromRepo)
resp.Body.Close()
resp = nil
case http.StatusCreated:
mountedFrom = path.Join(p.refspec.Hostname(), fromRepo)
}
}
}
if resp == nil {
resp, err = req.doWithRetries(ctx, true)
if err != nil {
if errors.Is(err, ErrInvalidAuthorization) {
return nil, fmt.Errorf("push access denied, repository does not exist or may require authorization: %w", err)
}
return nil, err
}
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK, http.StatusAccepted, http.StatusNoContent:
case http.StatusCreated:
p.tracker.SetStatus(ref, Status{
Committed: true,
PushStatus: PushStatus{
MountedFrom: mountedFrom,
},
Status: content.Status{
Ref: ref,
Total: desc.Size,
Offset: desc.Size,
},
})
return nil, fmt.Errorf("content %v on remote: %w", desc.Digest, errdefs.ErrAlreadyExists)
default:
err := unexpectedResponseErr(resp)
log.G(ctx).WithError(err).Debug("unexpected response")
return nil, err
}
var (
location = resp.Header.Get("Location")
lurl *url.URL
lhost = host
)
// Support paths without host in location
if strings.HasPrefix(location, "/") {
lurl, err = url.Parse(lhost.Scheme + "://" + lhost.Host + location)
if err != nil {
return nil, fmt.Errorf("unable to parse location %v: %w", location, err)
}
} else {
if !strings.Contains(location, "://") {
location = lhost.Scheme + "://" + location
}
lurl, err = url.Parse(location)
if err != nil {
return nil, fmt.Errorf("unable to parse location %v: %w", location, err)
}
if lurl.Host != lhost.Host || lhost.Scheme != lurl.Scheme {
lhost.Scheme = lurl.Scheme
lhost.Host = lurl.Host
// Check if different than what was requested, accounting for fallback in the transport layer
requested := resp.Request.URL
if requested.Host != lhost.Host || requested.Scheme != lhost.Scheme {
// Strip authorizer if change to host or scheme
lhost.Authorizer = nil
log.G(ctx).WithField("host", lhost.Host).WithField("scheme", lhost.Scheme).Debug("upload changed destination, authorizer removed")
}
}
}
q := lurl.Query()
q.Add("digest", desc.Digest.String())
req = p.request(lhost, http.MethodPut)
req.header.Set("Content-Type", "application/octet-stream")
req.path = lurl.Path + "?" + q.Encode()
}
p.tracker.SetStatus(ref, Status{
Status: content.Status{
Ref: ref,
Total: desc.Size,
Expected: desc.Digest,
StartedAt: time.Now(),
},
})
// TODO: Support chunked upload
pushw := newPushWriter(p.dockerBase, ref, desc.Digest, p.tracker, isManifest)
req.body = func() (io.ReadCloser, error) {
pr, pw := io.Pipe()
pushw.setPipe(pw)
return pr, nil
}
req.size = desc.Size
go func() {
resp, err := req.doWithRetries(ctx, true)
if err != nil {
pushw.setError(err)
return
}
switch resp.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusNoContent:
default:
err := unexpectedResponseErr(resp)
log.G(ctx).WithError(err).Debug("unexpected response")
pushw.setError(err)
return
}
pushw.setResponse(resp)
}()
return pushw, nil
}
func getManifestPath(object string, dgst digest.Digest) []string {
if i := strings.IndexByte(object, '@'); i >= 0 {
if object[i+1:] != dgst.String() {
// use digest, not tag
object = ""
} else {
// strip @<digest> for registry path to make tag
object = object[:i]
}
}
if object == "" {
return []string{"manifests", dgst.String()}
}
return []string{"manifests", object}
}
type pushWriter struct {
base *dockerBase
ref string
pipe *io.PipeWriter
done chan struct{}
closeOnce sync.Once
pipeC chan *io.PipeWriter
respC chan *http.Response
errC chan error
isManifest bool
expected digest.Digest
tracker StatusTracker
}
func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker StatusTracker, isManifest bool) *pushWriter {
// Initialize and create response
return &pushWriter{
base: db,
ref: ref,
expected: expected,
tracker: tracker,
pipeC: make(chan *io.PipeWriter, 1),
respC: make(chan *http.Response, 1),
errC: make(chan error, 1),
done: make(chan struct{}),
isManifest: isManifest,
}
}
func (pw *pushWriter) setPipe(p *io.PipeWriter) {
select {
case <-pw.done:
case pw.pipeC <- p:
}
}
func (pw *pushWriter) setError(err error) {
select {
case <-pw.done:
case pw.errC <- err:
}
}
func (pw *pushWriter) setResponse(resp *http.Response) {
select {
case <-pw.done:
case pw.respC <- resp:
}
}
func (pw *pushWriter) replacePipe(p *io.PipeWriter) error {
if pw.pipe == nil {
pw.pipe = p
return nil
}
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p
// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
}
func (pw *pushWriter) Write(p []byte) (n int, err error) {
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return n, err
}
if pw.pipe == nil {
select {
case <-pw.done:
return 0, io.ErrClosedPipe
case p := <-pw.pipeC:
pw.replacePipe(p)
}
} else {
select {
case <-pw.done:
return 0, io.ErrClosedPipe
case p := <-pw.pipeC:
return 0, pw.replacePipe(p)
default:
}
}
n, err = pw.pipe.Write(p)
if errors.Is(err, io.ErrClosedPipe) {
// if the pipe is closed, we might have the original error on the error
// channel - so we should try and get it
select {
case <-pw.done:
case err = <-pw.errC:
pw.Close()
case p := <-pw.pipeC:
return 0, pw.replacePipe(p)
case resp := <-pw.respC:
pw.setResponse(resp)
}
}
status.Offset += int64(n)
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return
}
func (pw *pushWriter) Close() error {
// Ensure pipeC is closed but handle `Close()` being
// called multiple times without panicking
pw.closeOnce.Do(func() {
close(pw.done)
})
if pw.pipe != nil {
status, err := pw.tracker.GetStatus(pw.ref)
if err == nil && !status.Committed {
// Closing an incomplete writer. Record this as an error so that following write can retry it.
status.ErrClosed = errors.New("closed incomplete writer")
pw.tracker.SetStatus(pw.ref, status)
}
return pw.pipe.Close()
}
return nil
}
func (pw *pushWriter) Status() (content.Status, error) {
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return content.Status{}, err
}
return status.Status, nil
}
func (pw *pushWriter) Digest() digest.Digest {
// TODO: Get rid of this function?
return pw.expected
}
func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
// Check whether read has already thrown an error
if pw.pipe != nil {
if _, err := pw.pipe.Write([]byte{}); err != nil && !errors.Is(err, io.ErrClosedPipe) {
return fmt.Errorf("pipe error before commit: %w", err)
}
if err := pw.pipe.Close(); err != nil {
return err
}
}
// TODO: timeout waiting for response
var resp *http.Response
select {
case <-pw.done:
return io.ErrClosedPipe
case err := <-pw.errC:
pw.Close()
return err
case resp = <-pw.respC:
defer resp.Body.Close()
case p := <-pw.pipeC:
// check whether the pipe has changed in the commit, because sometimes Write
// can complete successfully, but the pipe may have changed. In that case, the
// content needs to be reset.
return pw.replacePipe(p)
}
// 201 is specified return status, some registries return
// 200, 202 or 204.
switch resp.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
default:
return unexpectedResponseErr(resp)
}
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return fmt.Errorf("failed to get status: %w", err)
}
if size > 0 && size != status.Offset {
return fmt.Errorf("unexpected size %d, expected %d", status.Offset, size)
}
if expected == "" {
expected = status.Expected
}
actual, err := digest.Parse(resp.Header.Get("Docker-Content-Digest"))
if err != nil {
return fmt.Errorf("invalid content digest in response: %w", err)
}
if actual != expected {
return fmt.Errorf("got digest %s, expected %s", actual, expected)
}
status.Committed = true
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return nil
}
func (pw *pushWriter) Truncate(size int64) error {
// TODO: if blob close request and start new request at offset
// TODO: always error on manifest
return errors.New("cannot truncate remote upload")
}
func requestWithMountFrom(req *request, mount, from string) *request {
creq := *req
sep := "?"
if strings.Contains(creq.path, sep) {
sep = "&"
}
creq.path = creq.path + sep + "mount=" + mount + "&from=" + from
return &creq
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"crypto/tls"
"errors"
"net"
"net/http"
"time"
)
// HostCapabilities represent the capabilities of the registry
// host. This also represents the set of operations for which
// the registry host may be trusted to perform.
//
// For example pushing is a capability which should only be
// performed on an upstream source, not a mirror.
// Resolving (the process of converting a name into a digest)
// must be considered a trusted operation and only done by
// a host which is trusted (or more preferably by secure process
// which can prove the provenance of the mapping). A public
// mirror should never be trusted to do a resolve action.
//
// | Registry Type | Pull | Resolve | Push |
// |------------------|------|---------|------|
// | Public Registry | yes | yes | yes |
// | Private Registry | yes | yes | yes |
// | Public Mirror | yes | no | no |
// | Private Mirror | yes | yes | no |
type HostCapabilities uint8
const (
// HostCapabilityPull represents the capability to fetch manifests
// and blobs by digest
HostCapabilityPull HostCapabilities = 1 << iota
// HostCapabilityResolve represents the capability to fetch manifests
// by name
HostCapabilityResolve
// HostCapabilityPush represents the capability to push blobs and
// manifests
HostCapabilityPush
// Reserved for future capabilities (i.e. search, catalog, remove)
)
// Has checks whether the capabilities list has the provide capability
func (c HostCapabilities) Has(t HostCapabilities) bool {
return c&t == t
}
// RegistryHost represents a complete configuration for a registry
// host, representing the capabilities, authorizations, connection
// configuration, and location.
type RegistryHost struct {
Client *http.Client
Authorizer Authorizer
Host string
Scheme string
Path string
Capabilities HostCapabilities
Header http.Header
}
func (h RegistryHost) isProxy(refhost string) bool {
if refhost != h.Host {
if refhost != "docker.io" || h.Host != "registry-1.docker.io" {
return true
}
}
return false
}
// RegistryHosts fetches the registry hosts for a given namespace,
// provided by the host component of an distribution image reference.
type RegistryHosts func(string) ([]RegistryHost, error)
// Registries joins multiple registry configuration functions, using the same
// order as provided within the arguments. When an empty registry configuration
// is returned with a nil error, the next function will be called.
// NOTE: This function will not join configurations, as soon as a non-empty
// configuration is returned from a configuration function, it will be returned
// to the caller.
func Registries(registries ...RegistryHosts) RegistryHosts {
return func(host string) ([]RegistryHost, error) {
for _, registry := range registries {
config, err := registry(host)
if err != nil {
return config, err
}
if len(config) > 0 {
return config, nil
}
}
return nil, nil
}
}
type registryOpts struct {
authorizer Authorizer
plainHTTP func(string) (bool, error)
host func(string) (string, error)
client *http.Client
}
// RegistryOpt defines a registry default option
type RegistryOpt func(*registryOpts)
// WithPlainHTTP configures registries to use plaintext http scheme
// for the provided host match function.
func WithPlainHTTP(f func(string) (bool, error)) RegistryOpt {
return func(opts *registryOpts) {
opts.plainHTTP = f
}
}
// WithAuthorizer configures the default authorizer for a registry
func WithAuthorizer(a Authorizer) RegistryOpt {
return func(opts *registryOpts) {
opts.authorizer = a
}
}
// WithHostTranslator defines the default translator to use for registry hosts
func WithHostTranslator(h func(string) (string, error)) RegistryOpt {
return func(opts *registryOpts) {
opts.host = h
}
}
// WithClient configures the default http client for a registry
func WithClient(c *http.Client) RegistryOpt {
return func(opts *registryOpts) {
opts.client = c
}
}
// ConfigureDefaultRegistries is used to create a default configuration for
// registries. For more advanced configurations or per-domain setups,
// the RegistryHosts interface should be used directly.
// NOTE: This function will always return a non-empty value or error
func ConfigureDefaultRegistries(ropts ...RegistryOpt) RegistryHosts {
var opts registryOpts
for _, opt := range ropts {
opt(&opts)
}
return func(host string) ([]RegistryHost, error) {
config := RegistryHost{
Client: opts.client,
Authorizer: opts.authorizer,
Host: host,
Scheme: "https",
Path: "/v2",
Capabilities: HostCapabilityPull | HostCapabilityResolve | HostCapabilityPush,
}
if config.Client == nil {
config.Client = &http.Client{
Transport: DefaultHTTPTransport(nil),
}
}
if opts.plainHTTP != nil {
match, err := opts.plainHTTP(host)
if err != nil {
return nil, err
}
if match {
config.Scheme = "http"
}
}
if opts.host != nil {
var err error
config.Host, err = opts.host(config.Host)
if err != nil {
return nil, err
}
} else if host == "docker.io" {
config.Host = "registry-1.docker.io"
}
return []RegistryHost{config}, nil
}
}
// MatchAllHosts is a host match function which is always true.
func MatchAllHosts(string) (bool, error) {
return true, nil
}
// MatchLocalhost is a host match function which returns true for
// localhost.
//
// Note: this does not handle matching of ip addresses in octal,
// decimal or hex form.
func MatchLocalhost(host string) (bool, error) {
switch host {
case "::1":
return true, nil
case "[::1]":
return true, nil
}
h, p, err := net.SplitHostPort(host)
// addrError helps distinguish between errors of form
// "no colon in address" and "too many colons in address".
// The former is fine as the host string need not have a
// port. Latter needs to be handled.
addrError := &net.AddrError{
Err: "missing port in address",
Addr: host,
}
if err != nil {
if err.Error() != addrError.Error() {
return false, err
}
// host string without any port specified
h = host
} else if len(p) == 0 {
return false, errors.New("invalid host name format")
}
// use ipv4 dotted decimal for further checking
if h == "localhost" {
h = "127.0.0.1"
}
ip := net.ParseIP(h)
return ip.IsLoopback(), nil
}
func DefaultHTTPTransport(defaultTLSConfig *tls.Config) *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
FallbackDelay: 300 * time.Millisecond,
}).DialContext,
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: defaultTLSConfig,
ExpectContinueTimeout: 5 * time.Second,
}
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path"
"strings"
"sync"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/semaphore"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
"github.com/containerd/containerd/v2/core/transfer"
"github.com/containerd/containerd/v2/pkg/reference"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/containerd/v2/version"
)
var (
// ErrInvalidAuthorization is used when credentials are passed to a server but
// those credentials are rejected.
ErrInvalidAuthorization = errors.New("authorization failed")
// MaxManifestSize represents the largest size accepted from a registry
// during resolution. Larger manifests may be accepted using a
// resolution method other than the registry.
//
// NOTE: The max supported layers by some runtimes is 128 and individual
// layers will not contribute more than 256 bytes, making a
// reasonable limit for a large image manifests of 32K bytes.
// 4M bytes represents a much larger upper bound for images which may
// contain large annotations or be non-images. A proper manifest
// design puts large metadata in subobjects, as is consistent the
// intent of the manifest design.
MaxManifestSize int64 = 4 * 1048 * 1048
)
// Authorizer is used to authorize HTTP requests based on 401 HTTP responses.
// An Authorizer is responsible for caching tokens or credentials used by
// requests.
type Authorizer interface {
// Authorize sets the appropriate `Authorization` header on the given
// request.
//
// If no authorization is found for the request, the request remains
// unmodified. It may also add an `Authorization` header as
// "bearer <some bearer token>"
// "basic <base64 encoded credentials>"
//
// It may return remotes/errors.ErrUnexpectedStatus, which for example,
// can be used by the caller to find out the status code returned by the registry.
Authorize(context.Context, *http.Request) error
// AddResponses adds a 401 response for the authorizer to consider when
// authorizing requests. The last response should be unauthorized and
// the previous requests are used to consider redirects and retries
// that may have led to the 401.
//
// If response is not handled, returns `ErrNotImplemented`
AddResponses(context.Context, []*http.Response) error
}
// ResolverOptions are used to configured a new Docker register resolver
type ResolverOptions struct {
// Hosts returns registry host configurations for a namespace.
Hosts RegistryHosts
// Headers are the HTTP request header fields sent by the resolver
Headers http.Header
// Tracker is used to track uploads to the registry. This is used
// since the registry does not have upload tracking and the existing
// mechanism for getting blob upload status is expensive.
Tracker StatusTracker
// Authorizer is used to authorize registry requests
//
// Deprecated: use Hosts.
Authorizer Authorizer
// Credentials provides username and secret given a host.
// If username is empty but a secret is given, that secret
// is interpreted as a long lived token.
//
// Deprecated: use Hosts.
Credentials func(string) (string, string, error)
// Host provides the hostname given a namespace.
//
// Deprecated: use Hosts.
Host func(string) (string, error)
// PlainHTTP specifies to use plain http and not https
//
// Deprecated: use Hosts.
PlainHTTP bool
// Client is the http client to used when making registry requests
//
// Deprecated: use Hosts.
Client *http.Client
}
// DefaultHost is the default host function.
func DefaultHost(ns string) (string, error) {
if ns == "docker.io" {
return "registry-1.docker.io", nil
}
return ns, nil
}
type dockerResolver struct {
hosts RegistryHosts
header http.Header
resolveHeader http.Header
tracker StatusTracker
config transfer.ImageResolverOptions
}
// NewResolver returns a new resolver to a Docker registry
func NewResolver(options ResolverOptions) remotes.Resolver {
if options.Tracker == nil {
options.Tracker = NewInMemoryTracker()
}
if options.Headers == nil {
options.Headers = make(http.Header)
} else {
// make a copy of the headers to avoid race due to concurrent map write
options.Headers = options.Headers.Clone()
}
resolveHeader := http.Header{}
if _, ok := options.Headers["Accept"]; !ok {
// set headers for all the types we support for resolution.
resolveHeader.Set("Accept", strings.Join([]string{
images.MediaTypeDockerSchema2Manifest,
images.MediaTypeDockerSchema2ManifestList,
ocispec.MediaTypeImageManifest,
ocispec.MediaTypeImageIndex, "*/*",
}, ", "))
} else {
resolveHeader["Accept"] = options.Headers["Accept"]
delete(options.Headers, "Accept")
}
if options.Hosts == nil {
opts := []RegistryOpt{}
if options.Host != nil {
opts = append(opts, WithHostTranslator(options.Host))
}
if options.Authorizer == nil {
options.Authorizer = NewDockerAuthorizer(
WithAuthClient(options.Client),
WithAuthHeader(options.Headers),
WithAuthCreds(options.Credentials))
}
opts = append(opts, WithAuthorizer(options.Authorizer))
if options.Client != nil {
opts = append(opts, WithClient(options.Client))
}
if options.PlainHTTP {
opts = append(opts, WithPlainHTTP(MatchAllHosts))
} else {
opts = append(opts, WithPlainHTTP(MatchLocalhost))
}
options.Hosts = ConfigureDefaultRegistries(opts...)
}
return &dockerResolver{
hosts: options.Hosts,
header: options.Headers,
resolveHeader: resolveHeader,
tracker: options.Tracker,
}
}
func getManifestMediaType(resp *http.Response) string {
// Strip encoding data (manifests should always be ascii JSON)
contentType := resp.Header.Get("Content-Type")
if sp := strings.IndexByte(contentType, ';'); sp != -1 {
contentType = contentType[0:sp]
}
// As of Apr 30 2019 the registry.access.redhat.com registry does not specify
// the content type of any data but uses schema1 manifests.
if contentType == "text/plain" {
contentType = images.MediaTypeDockerSchema1Manifest
}
return contentType
}
type countingReader struct {
reader io.Reader
bytesRead int64
}
func (r *countingReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
r.bytesRead += int64(n)
return n, err
}
var _ remotes.ResolverWithOptions = &dockerResolver{}
func (r *dockerResolver) Resolve(ctx context.Context, ref string) (string, ocispec.Descriptor, error) {
base, err := r.resolveDockerBase(ref)
if err != nil {
return "", ocispec.Descriptor{}, err
}
refspec := base.refspec
if refspec.Object == "" {
return "", ocispec.Descriptor{}, reference.ErrObjectRequired
}
var (
paths [][]string
dgst = refspec.Digest()
caps = HostCapabilityPull
)
if dgst != "" {
if err := dgst.Validate(); err != nil {
// need to fail here, since we can't actually resolve the invalid
// digest.
return "", ocispec.Descriptor{}, err
}
// turns out, we have a valid digest, make a url.
paths = append(paths, []string{"manifests", dgst.String()})
// fallback to blobs on not found.
paths = append(paths, []string{"blobs", dgst.String()})
} else {
// Add
paths = append(paths, []string{"manifests", refspec.Object})
caps |= HostCapabilityResolve
}
hosts := base.filterHosts(caps)
if len(hosts) == 0 {
return "", ocispec.Descriptor{}, fmt.Errorf("no resolve hosts: %w", errdefs.ErrNotFound)
}
ctx, err = ContextWithRepositoryScope(ctx, refspec, false)
if err != nil {
return "", ocispec.Descriptor{}, err
}
var (
// firstErr is the most relevant error encountered during resolution.
// We use this to determine the error to return, making sure that the
// error created furthest through the resolution process is returned.
firstErr error
firstErrPriority int
)
nextHostOrFail := func(i int) string {
if i < len(hosts)-1 {
return "trying next host"
}
return "fetch failed"
}
for _, u := range paths {
for i, host := range hosts {
ctx := log.WithLogger(ctx, log.G(ctx).WithField("host", host.Host))
req := base.request(host, http.MethodHead, u...)
if err := req.addNamespace(base.refspec.Hostname()); err != nil {
return "", ocispec.Descriptor{}, err
}
for key, value := range r.resolveHeader {
req.header[key] = append(req.header[key], value...)
}
log.G(ctx).Debug("resolving")
resp, err := req.doWithRetries(ctx, i == len(hosts)-1)
if err != nil {
if errors.Is(err, ErrInvalidAuthorization) {
err = fmt.Errorf("pull access denied, repository does not exist or may require authorization: %w", err)
}
if firstErrPriority < 1 {
firstErr = err
firstErrPriority = 1
}
log.G(ctx).WithError(err).Info(nextHostOrFail(i))
continue // try another host
}
resp.Body.Close() // don't care about body contents.
if resp.StatusCode > 299 {
if resp.StatusCode == http.StatusNotFound {
if firstErrPriority < 2 {
firstErr = fmt.Errorf("%s: %w", ref, errdefs.ErrNotFound)
firstErrPriority = 2
}
log.G(ctx).Infof("%s after status: %s", nextHostOrFail(i), resp.Status)
continue
}
if resp.StatusCode > 399 {
if firstErrPriority < 3 {
firstErr = unexpectedResponseErr(resp)
firstErrPriority = 3
}
log.G(ctx).Infof("%s after status: %s", nextHostOrFail(i), resp.Status)
continue // try another host
}
return "", ocispec.Descriptor{}, unexpectedResponseErr(resp)
}
size := resp.ContentLength
contentType := getManifestMediaType(resp)
// if no digest was provided, then only a resolve
// trusted registry was contacted, in this case use
// the digest header (or content from GET)
if dgst == "" {
// this is the only point at which we trust the registry. we use the
// content headers to assemble a descriptor for the name. when this becomes
// more robust, we mostly get this information from a secure trust store.
dgstHeader := digest.Digest(resp.Header.Get("Docker-Content-Digest"))
if dgstHeader != "" && size != -1 {
if err := dgstHeader.Validate(); err != nil {
return "", ocispec.Descriptor{}, fmt.Errorf("%q in header not a valid digest: %w", dgstHeader, err)
}
dgst = dgstHeader
}
}
if dgst == "" || size == -1 {
log.G(ctx).Debug("no Docker-Content-Digest header, fetching manifest instead")
req = base.request(host, http.MethodGet, u...)
if err := req.addNamespace(base.refspec.Hostname()); err != nil {
return "", ocispec.Descriptor{}, err
}
for key, value := range r.resolveHeader {
req.header[key] = append(req.header[key], value...)
}
resp, err := req.doWithRetries(ctx, true)
if err != nil {
return "", ocispec.Descriptor{}, err
}
bodyReader := countingReader{reader: resp.Body}
contentType = getManifestMediaType(resp)
err = func() error {
defer resp.Body.Close()
if dgst != "" {
_, err = io.Copy(io.Discard, &bodyReader)
return err
}
if contentType == images.MediaTypeDockerSchema1Manifest {
return fmt.Errorf("%w: media type %q is no longer supported since containerd v2.0, please rebuild the image as %q or %q",
errdefs.ErrNotImplemented, images.MediaTypeDockerSchema1Manifest, images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest)
}
dgst, err = digest.FromReader(&bodyReader)
return err
}()
if err != nil {
return "", ocispec.Descriptor{}, err
}
size = bodyReader.bytesRead
}
// Prevent resolving to excessively large manifests
if size > MaxManifestSize {
if firstErrPriority < 4 {
firstErr = fmt.Errorf("rejecting %d byte manifest for %s: %w", size, ref, errdefs.ErrNotFound)
firstErrPriority = 4
}
continue
}
desc := ocispec.Descriptor{
Digest: dgst,
MediaType: contentType,
Size: size,
}
log.G(ctx).WithField("desc.digest", desc.Digest).Debug("resolved")
return ref, desc, nil
}
}
// If above loop terminates without return or error, then no registries
// were provided.
if firstErr == nil {
firstErr = fmt.Errorf("%s: %w", ref, errdefs.ErrNotFound)
}
return "", ocispec.Descriptor{}, firstErr
}
func (r *dockerResolver) SetOptions(options ...transfer.ImageResolverOption) {
for _, opt := range options {
opt(&r.config)
}
}
func (r *dockerResolver) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, error) {
base, err := r.resolveDockerBase(ref)
if err != nil {
return nil, err
}
return dockerFetcher{
dockerBase: base,
}, nil
}
func (r *dockerResolver) Pusher(ctx context.Context, ref string) (remotes.Pusher, error) {
base, err := r.resolveDockerBase(ref)
if err != nil {
return nil, err
}
return dockerPusher{
dockerBase: base,
object: base.refspec.Object,
tracker: r.tracker,
}, nil
}
func (r *dockerResolver) resolveDockerBase(ref string) (*dockerBase, error) {
refspec, err := reference.Parse(ref)
if err != nil {
return nil, err
}
return r.base(refspec)
}
type dockerBase struct {
refspec reference.Spec
repository string
hosts []RegistryHost
header http.Header
performances transfer.ImageResolverPerformanceSettings
limiter *semaphore.Weighted
}
func (r *dockerBase) Acquire(ctx context.Context, weight int64) error {
if r.limiter == nil {
return nil
}
return r.limiter.Acquire(ctx, weight)
}
func (r *dockerBase) Release(weight int64) {
if r.limiter != nil {
r.limiter.Release(weight)
}
}
func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) {
host := refspec.Hostname()
hosts, err := r.hosts(host)
if err != nil {
return nil, err
}
return &dockerBase{
refspec: refspec,
repository: strings.TrimPrefix(refspec.Locator, host+"/"),
hosts: hosts,
header: r.header,
performances: r.config.Performances,
limiter: r.config.DownloadLimiter,
}, nil
}
func (r *dockerBase) filterHosts(caps HostCapabilities) (hosts []RegistryHost) {
for _, host := range r.hosts {
if host.Capabilities.Has(caps) {
hosts = append(hosts, host)
}
}
return
}
func (r *dockerBase) request(host RegistryHost, method string, ps ...string) *request {
header := r.header.Clone()
if header == nil {
header = http.Header{}
}
for key, value := range host.Header {
header[key] = append(header[key], value...)
}
if len(header.Get("User-Agent")) == 0 {
header.Set("User-Agent", "containerd/"+version.Version)
}
parts := append([]string{"/", host.Path, r.repository}, ps...)
p := path.Join(parts...)
// Join strips trailing slash, re-add ending "/" if included
if len(parts) > 0 && strings.HasSuffix(parts[len(parts)-1], "/") {
p = p + "/"
}
return &request{
method: method,
path: p,
header: header,
host: host,
}
}
func (r *request) authorize(ctx context.Context, req *http.Request) error {
// Check if has header for host
if r.host.Authorizer != nil {
if err := r.host.Authorizer.Authorize(ctx, req); err != nil {
return err
}
}
return nil
}
func (r *request) addNamespace(ns string) (err error) {
if !r.host.isProxy(ns) {
return nil
}
var q url.Values
// Parse query
if i := strings.IndexByte(r.path, '?'); i > 0 {
r.path = r.path[:i+1]
q, err = url.ParseQuery(r.path[i+1:])
if err != nil {
return
}
} else {
r.path = r.path + "?"
q = url.Values{}
}
q.Add("ns", ns)
r.path = r.path + q.Encode()
return
}
type request struct {
method string
path string
header http.Header
host RegistryHost
body func() (io.ReadCloser, error)
size int64
}
func (r *request) clone() *request {
res := *r
res.header = r.header.Clone()
return &res
}
func (r *request) do(ctx context.Context) (*http.Response, error) {
u := r.host.Scheme + "://" + r.host.Host + r.path
req, err := http.NewRequestWithContext(ctx, r.method, u, nil)
if err != nil {
return nil, err
}
if r.header == nil {
req.Header = http.Header{}
} else {
req.Header = r.header.Clone() // headers need to be copied to avoid concurrent map access
}
if r.body != nil {
body, err := r.body()
if err != nil {
return nil, err
}
req.Body = body
req.GetBody = r.body
if r.size > 0 {
req.ContentLength = r.size
}
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u))
log.G(ctx).WithFields(requestFields(req)).Debug("do request")
if err := r.authorize(ctx, req); err != nil {
return nil, fmt.Errorf("failed to authorize: %w", err)
}
client := &http.Client{}
if r.host.Client != nil {
*client = *r.host.Client
}
if client.CheckRedirect == nil {
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
if len(via) >= 10 {
return errors.New("stopped after 10 redirects")
}
if err := r.authorize(ctx, req); err != nil {
return fmt.Errorf("failed to authorize redirect: %w", err)
}
return nil
}
}
tracing.UpdateHTTPClient(client, tracing.Name("remotes.docker.resolver", "HTTPRequest"))
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to do request: %w", err)
}
log.G(ctx).WithFields(responseFields(resp)).Debug("fetch response received")
return resp, nil
}
type doChecks func(r *request, resp *http.Response) error
func withErrorCheck(r *request, resp *http.Response) error {
if resp.StatusCode > 299 {
if resp.StatusCode == http.StatusNotFound {
return fmt.Errorf("content at %v not found: %w", r.String(), errdefs.ErrNotFound)
}
return unexpectedResponseErr(resp)
}
return nil
}
var errContentRangeIgnored = errors.New("content range requests ignored")
func withOffsetCheck(offset int64) doChecks {
return func(r *request, resp *http.Response) error {
if offset == 0 {
return nil
}
if resp.StatusCode == http.StatusPartialContent {
return nil
}
if cr := resp.Header.Get("Content-Range"); cr != "" {
if !strings.HasPrefix(cr, fmt.Sprintf("bytes %d-", offset)) {
return fmt.Errorf("unhandled content range in response: %v", cr)
}
return nil
}
// Discard up to offset
// Could use buffer pool here but this case should be rare
n, err := io.Copy(io.Discard, io.LimitReader(resp.Body, offset))
if err != nil {
return fmt.Errorf("failed to discard to offset: %w", err)
}
if n != offset {
return errors.New("unable to discard to offset")
}
// content range ignored, we can't do concurrent fetches here.
// return an error to be caught
return errContentRangeIgnored
}
}
func (r *request) doWithRetries(ctx context.Context, lastHost bool, checks ...doChecks) (resp *http.Response, err error) {
resp, err = r.doWithRetriesInner(ctx, nil, lastHost)
if err != nil {
return nil, err
}
defer func() {
if err != nil && err != errContentRangeIgnored {
resp.Body.Close()
}
}()
for _, check := range checks {
if err := check(r, resp); err != nil {
return resp, err
}
}
return resp, nil
}
func (r *request) doWithRetriesInner(ctx context.Context, responses []*http.Response, lastHost bool) (*http.Response, error) {
resp, err := r.do(ctx)
if err != nil {
return nil, err
}
responses = append(responses, resp)
retry, err := r.retryRequest(ctx, responses, lastHost)
if err != nil {
resp.Body.Close()
return nil, err
}
if retry {
resp.Body.Close()
return r.doWithRetriesInner(ctx, responses, lastHost)
}
return resp, err
}
func (r *request) retryRequest(ctx context.Context, responses []*http.Response, lastHost bool) (bool, error) {
if len(responses) > 5 {
return false, nil
}
last := responses[len(responses)-1]
switch last.StatusCode {
case http.StatusUnauthorized:
log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized")
if r.host.Authorizer != nil {
if err := r.host.Authorizer.AddResponses(ctx, responses); err == nil {
return true, nil
} else if !errdefs.IsNotImplemented(err) {
return false, err
}
}
return false, nil
case http.StatusMethodNotAllowed:
// Support registries which have not properly implemented the HEAD method for
// manifests endpoint
if r.method == http.MethodHead && strings.Contains(r.path, "/manifests/") {
r.method = http.MethodGet
return true, nil
}
case http.StatusRequestTimeout, http.StatusTooManyRequests:
return true, nil
case http.StatusServiceUnavailable, http.StatusGatewayTimeout, http.StatusInternalServerError:
// Do not retry if the same error was seen in the last request
if len(responses) > 1 && responses[len(responses)-2].StatusCode == last.StatusCode {
return false, nil
}
// Only retry if this is the last host that will be attempted
if lastHost {
return true, nil
}
}
return false, nil
}
func (r *request) String() string {
return r.host.Scheme + "://" + r.host.Host + r.path
}
func (r *request) setMediaType(mediatype string) {
if mediatype == "" {
r.header.Set("Accept", "*/*")
} else {
r.header.Set("Accept", strings.Join([]string{mediatype, `*/*`}, ", "))
}
}
func (r *request) setOffset(offset int64) {
r.header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
}
func requestFields(req *http.Request) log.Fields {
fields := map[string]interface{}{
"request.method": req.Method,
}
for k, vals := range req.Header {
k = strings.ToLower(k)
if k == "authorization" {
continue
}
for i, v := range vals {
field := "request.header." + k
if i > 0 {
field = fmt.Sprintf("%s.%d", field, i)
}
fields[field] = v
}
}
return fields
}
func responseFields(resp *http.Response) log.Fields {
fields := map[string]interface{}{
"response.status": resp.Status,
}
for k, vals := range resp.Header {
k = strings.ToLower(k)
for i, v := range vals {
field := "response.header." + k
if i > 0 {
field = fmt.Sprintf("%s.%d", field, i)
}
fields[field] = v
}
}
return fields
}
// IsLocalhost checks if the registry host is local.
func IsLocalhost(host string) bool {
if h, _, err := net.SplitHostPort(host); err == nil {
host = h
}
if host == "localhost" {
return true
}
ip := net.ParseIP(host)
return ip.IsLoopback()
}
// NewHTTPFallback returns http.RoundTripper which allows fallback from https to
// http for registry endpoints with configurations for both http and TLS,
// such as defaulted localhost endpoints.
func NewHTTPFallback(transport http.RoundTripper) http.RoundTripper {
return &httpFallback{
super: transport,
}
}
type httpFallback struct {
super http.RoundTripper
host string
mu sync.Mutex
}
func (f *httpFallback) RoundTrip(r *http.Request) (*http.Response, error) {
f.mu.Lock()
fallback := f.host == r.URL.Host
f.mu.Unlock()
// only fall back if the same host had previously fell back
if !fallback {
resp, err := f.super.RoundTrip(r)
if !isTLSError(err) && !isPortError(err, r.URL.Host) {
return resp, err
}
}
plainHTTPUrl := *r.URL
plainHTTPUrl.Scheme = "http"
plainHTTPRequest := *r
plainHTTPRequest.URL = &plainHTTPUrl
if !fallback {
f.mu.Lock()
if f.host != r.URL.Host {
f.host = r.URL.Host
}
f.mu.Unlock()
// update body on the second attempt
if r.Body != nil && r.GetBody != nil {
body, err := r.GetBody()
if err != nil {
return nil, err
}
plainHTTPRequest.Body = body
}
}
return f.super.RoundTrip(&plainHTTPRequest)
}
func isTLSError(err error) bool {
if err == nil {
return false
}
var tlsErr tls.RecordHeaderError
if errors.As(err, &tlsErr) && string(tlsErr.RecordHeader[:]) == "HTTP/" {
return true
}
if strings.Contains(err.Error(), "TLS handshake timeout") {
return true
}
return false
}
func isPortError(err error, host string) bool {
if isConnError(err) || os.IsTimeout(err) {
if _, port, _ := net.SplitHostPort(host); port != "" {
// Port is specified, will not retry on different port with scheme change
return false
}
return true
}
return false
}
//go:build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"errors"
"syscall"
)
func isConnError(err error) bool {
return errors.Is(err, syscall.ECONNREFUSED)
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"context"
"fmt"
"net/url"
"sort"
"strings"
"github.com/containerd/containerd/v2/pkg/reference"
)
// RepositoryScope returns a repository scope string such as "repository:foo/bar:pull"
// for "host/foo/bar:baz".
// When push is true, both pull and push are added to the scope.
func RepositoryScope(refspec reference.Spec, push bool) (string, error) {
u, err := url.Parse("dummy://" + refspec.Locator)
if err != nil {
return "", err
}
s := "repository:" + strings.TrimPrefix(u.Path, "/") + ":pull"
if push {
s += ",push"
}
return s, nil
}
// tokenScopesKey is used for the key for context.WithValue().
// value: []string (e.g. {"registry:foo/bar:pull"})
type tokenScopesKey struct{}
// ContextWithRepositoryScope returns a context with tokenScopesKey{} and the repository scope value.
func ContextWithRepositoryScope(ctx context.Context, refspec reference.Spec, push bool) (context.Context, error) {
s, err := RepositoryScope(refspec, push)
if err != nil {
return nil, err
}
return WithScope(ctx, s), nil
}
// WithScope appends a custom registry auth scope to the context.
func WithScope(ctx context.Context, scope string) context.Context {
var scopes []string
if v := ctx.Value(tokenScopesKey{}); v != nil {
scopes = v.([]string)
scopes = append(scopes, scope)
} else {
scopes = []string{scope}
}
return context.WithValue(ctx, tokenScopesKey{}, scopes)
}
// ContextWithAppendPullRepositoryScope is used to append repository pull
// scope into existing scopes indexed by the tokenScopesKey{}.
func ContextWithAppendPullRepositoryScope(ctx context.Context, repo string) context.Context {
return WithScope(ctx, fmt.Sprintf("repository:%s:pull", repo))
}
// GetTokenScopes returns deduplicated and sorted scopes from ctx.Value(tokenScopesKey{}) and common scopes.
func GetTokenScopes(ctx context.Context, common []string) []string {
scopes := []string{}
if x := ctx.Value(tokenScopesKey{}); x != nil {
scopes = append(scopes, x.([]string)...)
}
scopes = append(scopes, common...)
sort.Strings(scopes)
if len(scopes) == 0 {
return scopes
}
l := 0
for idx := 1; idx < len(scopes); idx++ {
// Note: this comparison is unaware of the scope grammar (https://distribution.github.io/distribution/spec/auth/scope/)
// So, "repository:foo/bar:pull,push" != "repository:foo/bar:push,pull", although semantically they are equal.
if scopes[l] == scopes[idx] {
continue
}
l++
scopes[l] = scopes[idx]
}
return scopes[:l+1]
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"fmt"
"sync"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/errdefs"
"github.com/moby/locker"
)
// Status of a content operation
type Status struct {
content.Status
Committed bool
// ErrClosed contains error encountered on close.
ErrClosed error
// UploadUUID is used by the Docker registry to reference blob uploads
UploadUUID string
// PushStatus contains status related to push.
PushStatus
}
type PushStatus struct {
// MountedFrom is the source content was cross-repo mounted from (empty if no cross-repo mount was performed).
MountedFrom string
// Exists indicates whether content already exists in the repository and wasn't uploaded.
Exists bool
}
// StatusTracker to track status of operations
type StatusTracker interface {
GetStatus(string) (Status, error)
SetStatus(string, Status)
}
// StatusTrackLocker to track status of operations with lock
type StatusTrackLocker interface {
StatusTracker
Lock(string)
Unlock(string)
}
type memoryStatusTracker struct {
statuses map[string]Status
m sync.Mutex
locker *locker.Locker
}
// NewInMemoryTracker returns a StatusTracker that tracks content status in-memory
func NewInMemoryTracker() StatusTrackLocker {
return &memoryStatusTracker{
statuses: map[string]Status{},
locker: locker.New(),
}
}
func (t *memoryStatusTracker) GetStatus(ref string) (Status, error) {
t.m.Lock()
defer t.m.Unlock()
status, ok := t.statuses[ref]
if !ok {
return Status{}, fmt.Errorf("status for ref %v: %w", ref, errdefs.ErrNotFound)
}
return status, nil
}
func (t *memoryStatusTracker) SetStatus(ref string, status Status) {
t.m.Lock()
t.statuses[ref] = status
t.m.Unlock()
}
func (t *memoryStatusTracker) Lock(ref string) {
t.locker.Lock(ref)
}
func (t *memoryStatusTracker) Unlock(ref string) {
t.locker.Unlock(ref)
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"context"
"errors"
"fmt"
"io"
transferapi "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/typeurl/v2"
)
type readByteStream struct {
ctx context.Context
stream streaming.Stream
window int32
updated chan struct{}
errCh chan error
remaining []byte
}
func ReadByteStream(ctx context.Context, stream streaming.Stream) io.ReadCloser {
rbs := &readByteStream{
ctx: ctx,
stream: stream,
window: 0,
errCh: make(chan error),
updated: make(chan struct{}, 1),
}
go func() {
for {
if rbs.window >= windowSize {
select {
case <-ctx.Done():
return
case <-rbs.updated:
continue
}
}
update := &transferapi.WindowUpdate{
Update: windowSize,
}
anyType, err := typeurl.MarshalAny(update)
if err != nil {
rbs.errCh <- err
return
}
if err := stream.Send(anyType); err == nil {
rbs.window += windowSize
} else if !errors.Is(err, io.EOF) {
rbs.errCh <- err
}
}
}()
return rbs
}
func (r *readByteStream) Read(p []byte) (n int, err error) {
plen := len(p)
if len(r.remaining) > 0 {
copied := copy(p, r.remaining)
if len(r.remaining) > plen {
r.remaining = r.remaining[plen:]
} else {
r.remaining = nil
}
return copied, nil
}
select {
case <-r.ctx.Done():
return 0, r.ctx.Err()
case err := <-r.errCh:
return 0, err
default:
}
anyType, err := r.stream.Recv()
if err != nil {
return 0, err
}
i, err := typeurl.UnmarshalAny(anyType)
if err != nil {
return 0, err
}
switch v := i.(type) {
case *transferapi.Data:
n := copy(p, v.Data)
if len(v.Data) > plen {
r.remaining = v.Data[plen:]
}
r.window = r.window - int32(n)
if r.window < windowSize {
r.updated <- struct{}{}
}
return n, nil
default:
return 0, fmt.Errorf("stream received error type %v", v)
}
}
func (r *readByteStream) Close() error {
return r.stream.Close()
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"context"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"io"
"sync"
"time"
transferapi "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
)
const maxRead = 32 * 1024
const windowSize = 2 * maxRead
var bufPool = &sync.Pool{
New: func() interface{} {
buffer := make([]byte, maxRead)
return &buffer
},
}
func SendStream(ctx context.Context, r io.Reader, stream streaming.Stream) {
window := make(chan int32)
go func() {
defer close(window)
for {
select {
case <-ctx.Done():
return
default:
}
anyType, err := stream.Recv()
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
log.G(ctx).WithError(err).Error("send stream ended without EOF")
}
return
}
i, err := typeurl.UnmarshalAny(anyType)
if err != nil {
log.G(ctx).WithError(err).Error("failed to unmarshal stream object")
continue
}
switch v := i.(type) {
case *transferapi.WindowUpdate:
select {
case <-ctx.Done():
return
case window <- v.Update:
}
default:
log.G(ctx).Errorf("unexpected stream object of type %T", i)
}
}
}()
go func() {
defer stream.Close()
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
var remaining int32
for {
if remaining > 0 {
// Don't wait for window update since there are remaining
select {
case <-ctx.Done():
// TODO: Send error message on stream before close to allow remote side to return error
return
case update := <-window:
remaining += update
default:
}
} else {
// Block until window updated
select {
case <-ctx.Done():
// TODO: Send error message on stream before close to allow remote side to return error
return
case update := <-window:
remaining = update
}
}
var max int32 = maxRead
if max > remaining {
max = remaining
}
b := (*buf)[:max]
n, err := r.Read(b)
if err != nil {
if !errors.Is(err, io.EOF) {
log.G(ctx).WithError(err).Errorf("failed to read stream source")
// TODO: Send error message on stream before close to allow remote side to return error
}
return
}
remaining = remaining - int32(n)
data := &transferapi.Data{
Data: b[:n],
}
anyType, err := typeurl.MarshalAny(data)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to marshal data for send")
// TODO: Send error message on stream before close to allow remote side to return error
return
}
if err := stream.Send(anyType); err != nil {
log.G(ctx).WithError(err).Errorf("send failed")
return
}
}
}()
}
func ReceiveStream(ctx context.Context, stream streaming.Stream) io.Reader {
r, w := io.Pipe()
go func() {
defer stream.Close()
var window int32
for {
var werr error
if window < windowSize {
update := &transferapi.WindowUpdate{
Update: windowSize,
}
anyType, err := typeurl.MarshalAny(update)
if err != nil {
w.CloseWithError(fmt.Errorf("failed to marshal window update: %w", err))
return
}
// check window update error after recv, stream may be complete
if werr = stream.Send(anyType); werr == nil {
window += windowSize
} else if errors.Is(werr, io.EOF) {
// TODO: Why does send return EOF here
werr = nil
}
}
anyType, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
err = nil
} else {
err = fmt.Errorf("received failed: %w", err)
}
w.CloseWithError(err)
return
} else if werr != nil {
// Try receive before erroring out
w.CloseWithError(fmt.Errorf("failed to send window update: %w", werr))
return
}
i, err := typeurl.UnmarshalAny(anyType)
if err != nil {
w.CloseWithError(fmt.Errorf("failed to unmarshal received object: %w", err))
return
}
switch v := i.(type) {
case *transferapi.Data:
n, err := w.Write(v.Data)
if err != nil {
w.CloseWithError(fmt.Errorf("failed to unmarshal received object: %w", err))
// Close will error out sender
return
}
window = window - int32(n)
// TODO: Handle error case
default:
log.G(ctx).Warnf("Ignoring unknown stream object of type %T", i)
continue
}
}
}()
return r
}
func GenerateID(prefix string) string {
t := time.Now()
var b [3]byte
rand.Read(b[:])
return fmt.Sprintf("%s-%d-%s", prefix, t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"context"
"errors"
"io"
"sync/atomic"
transferapi "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
)
func WriteByteStream(ctx context.Context, stream streaming.Stream) io.WriteCloser {
wbs := &writeByteStream{
ctx: ctx,
stream: stream,
updated: make(chan struct{}, 1),
}
go func() {
for {
select {
case <-ctx.Done():
return
default:
}
anyType, err := stream.Recv()
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
log.G(ctx).WithError(err).Error("send byte stream ended without EOF")
}
return
}
i, err := typeurl.UnmarshalAny(anyType)
if err != nil {
log.G(ctx).WithError(err).Error("failed to unmarshal stream object")
continue
}
switch v := i.(type) {
case *transferapi.WindowUpdate:
wbs.remaining.Add(v.Update)
select {
case <-ctx.Done():
return
case wbs.updated <- struct{}{}:
default:
// Don't block if no writes are waiting
}
default:
log.G(ctx).Errorf("unexpected stream object of type %T", i)
}
}
}()
return wbs
}
type writeByteStream struct {
ctx context.Context
stream streaming.Stream
remaining atomic.Int32
updated chan struct{}
}
func (wbs *writeByteStream) Write(p []byte) (n int, err error) {
for len(p) > 0 {
remaining := wbs.remaining.Load()
if remaining == 0 {
// Don't wait for window update since there are remaining
select {
case <-wbs.ctx.Done():
// TODO: Send error message on stream before close to allow remote side to return error
err = io.ErrShortWrite
return
case <-wbs.updated:
continue
}
}
var max int32 = maxRead
if max > int32(len(p)) {
max = int32(len(p))
}
if max > remaining {
max = remaining
}
// TODO: continue
// remaining = remaining - int32(n)
data := &transferapi.Data{
Data: p[:max],
}
var anyType typeurl.Any
anyType, err = typeurl.MarshalAny(data)
if err != nil {
log.G(wbs.ctx).WithError(err).Errorf("failed to marshal data for send")
// TODO: Send error message on stream before close to allow remote side to return error
return
}
if err = wbs.stream.Send(anyType); err != nil {
log.G(wbs.ctx).WithError(err).Errorf("send failed")
return
}
n += int(max)
p = p[max:]
wbs.remaining.Add(-1 * max)
}
return
}
func (wbs *writeByteStream) Close() error {
return wbs.stream.Close()
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
"fmt"
"sync"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/log"
"github.com/containerd/platforms"
)
// LoadImages checks all existing images to ensure they are ready to
// be used for CRI. It may try to recover images which are not ready
// but will only log errors, not return any.
func (c *CRIImageService) CheckImages(ctx context.Context) error {
// TODO: Move way from `client.ListImages` to directly using image store
cImages, err := c.client.ListImages(ctx)
if err != nil {
return fmt.Errorf("unable to list images: %w", err)
}
// TODO: Support all snapshotter
snapshotter := c.config.Snapshotter
var wg sync.WaitGroup
for _, i := range cImages {
wg.Add(1)
go func() {
defer wg.Done()
// TODO: Check platform/snapshot combination. Snapshot check should come first
ok, _, _, _, err := images.Check(ctx, i.ContentStore(), i.Target(), platforms.Default())
if err != nil {
log.G(ctx).WithError(err).Errorf("Failed to check image content readiness for %q", i.Name())
return
}
if !ok {
log.G(ctx).Warnf("The image content readiness for %q is not ok", i.Name())
return
}
// Checking existence of top-level snapshot for each image being recovered.
// TODO: This logic should be done elsewhere and owned by the image service
unpacked, err := i.IsUnpacked(ctx, snapshotter)
if err != nil {
log.G(ctx).WithError(err).Warnf("Failed to check whether image is unpacked for image %s", i.Name())
return
}
if !unpacked {
log.G(ctx).Warnf("The image %s is not unpacked.", i.Name())
// TODO(random-liu): Consider whether we should try unpack here.
}
if err := c.UpdateImage(ctx, i.Name()); err != nil {
log.G(ctx).WithError(err).Warnf("Failed to update reference for image %q", i.Name())
return
}
log.G(ctx).Debugf("Loaded image %q", i.Name())
}()
}
wg.Wait()
return nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
// ListImages lists existing images.
// TODO(random-liu): Add image list filters after CRI defines this more clear, and kubelet
// actually needs it.
func (c *GRPCCRIImageService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (*runtime.ListImagesResponse, error) {
// TODO: From CRIImageService directly
imagesInStore := c.imageStore.List()
var images []*runtime.Image
for _, image := range imagesInStore {
// TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot
// doesn't exist?
images = append(images, toCRIImage(image))
}
return &runtime.ListImagesResponse{Images: images}, nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"io"
"net/http"
"net/url"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/containerd/errdefs"
"github.com/containerd/imgcrypt/v2"
"github.com/containerd/imgcrypt/v2/images/encryption"
"github.com/containerd/log"
"github.com/containerd/platforms"
distribution "github.com/distribution/reference"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/diff"
containerdimages "github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes/docker"
"github.com/containerd/containerd/v2/core/remotes/docker/config"
"github.com/containerd/containerd/v2/core/transfer"
transferimage "github.com/containerd/containerd/v2/core/transfer/image"
"github.com/containerd/containerd/v2/core/transfer/registry"
"github.com/containerd/containerd/v2/internal/cri/annotations"
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
crilabels "github.com/containerd/containerd/v2/internal/cri/labels"
"github.com/containerd/containerd/v2/internal/cri/util"
snpkg "github.com/containerd/containerd/v2/pkg/snapshotters"
"github.com/containerd/containerd/v2/pkg/tracing"
)
// For image management:
// 1) We have an in-memory metadata index to:
// a. Maintain ImageID -> RepoTags, ImageID -> RepoDigset relationships; ImageID
// is the digest of image config, which conforms to oci image spec.
// b. Cache constant and useful information such as image chainID, config etc.
// c. An image will be added into the in-memory metadata only when it's successfully
// pulled and unpacked.
//
// 2) We use containerd image metadata store and content store:
// a. To resolve image reference (digest/tag) locally. During pulling image, we
// normalize the image reference provided by user, and put it into image metadata
// store with resolved descriptor. For the other operations, if image id is provided,
// we'll access the in-memory metadata index directly; if image reference is
// provided, we'll normalize it, resolve it in containerd image metadata store
// to get the image id.
// b. As the backup of in-memory metadata in 1). During startup, the in-memory
// metadata could be re-constructed from image metadata store + content store.
//
// Several problems with current approach:
// 1) An entry in containerd image metadata store doesn't mean a "READY" (successfully
// pulled and unpacked) image. E.g. during pulling, the client gets killed. In that case,
// if we saw an image without snapshots or with in-complete contents during startup,
// should we re-pull the image? Or should we remove the entry?
//
// yanxuean: We can't delete image directly, because we don't know if the image
// is pulled by us. There are resource leakage.
//
// 2) Containerd suggests user to add entry before pulling the image. However if
// an error occurs during the pulling, should we remove the entry from metadata
// store? Or should we leave it there until next startup (resource leakage)?
//
// 3) The cri plugin only exposes "READY" (successfully pulled and unpacked) images
// to the user, which are maintained in the in-memory metadata index. However, it's
// still possible that someone else removes the content or snapshot by-pass the cri plugin,
// how do we detect that and update the in-memory metadata correspondingly? Always
// check whether corresponding snapshot is ready when reporting image status?
//
// 4) Is the content important if we cached necessary information in-memory
// after we pull the image? How to manage the disk usage of contents? If some
// contents are missing but snapshots are ready, is the image still "READY"?
// PullImage pulls an image with authentication config.
func (c *GRPCCRIImageService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (_ *runtime.PullImageResponse, err error) {
imageRef := r.GetImage().GetImage()
credentials := func(host string) (string, string, error) {
hostauth := r.GetAuth()
if hostauth == nil {
config := c.config.Registry.Configs[host]
if config.Auth != nil {
hostauth = toRuntimeAuthConfig(*config.Auth)
}
}
return ParseAuth(hostauth, host)
}
ref, err := c.CRIImageService.PullImage(ctx, imageRef, credentials, r.SandboxConfig, r.GetImage().GetRuntimeHandler())
if err != nil {
return nil, err
}
return &runtime.PullImageResponse{ImageRef: ref}, nil
}
func (c *CRIImageService) PullImage(ctx context.Context, name string, credentials func(string) (string, string, error), sandboxConfig *runtime.PodSandboxConfig, runtimeHandler string) (_ string, err error) {
span := tracing.SpanFromContext(ctx)
defer func() {
// TODO: add domain label for imagePulls metrics, and we may need to provide a mechanism
// for the user to configure the set of registries that they are interested in.
if err != nil {
imagePulls.WithValues("failure").Inc()
} else {
imagePulls.WithValues("success").Inc()
}
}()
inProgressImagePulls.Inc()
defer inProgressImagePulls.Dec()
startTime := time.Now()
if credentials == nil {
credentials = func(host string) (string, string, error) {
var hostauth *runtime.AuthConfig
config := c.config.Registry.Configs[host]
if config.Auth != nil {
hostauth = toRuntimeAuthConfig(*config.Auth)
}
return ParseAuth(hostauth, host)
}
}
namedRef, err := distribution.ParseDockerRef(name)
if err != nil {
return "", fmt.Errorf("failed to parse image reference %q: %w", name, err)
}
ref := namedRef.String()
if ref != name {
log.G(ctx).Debugf("PullImage using normalized image ref: %q", ref)
}
imagePullProgressTimeout, err := time.ParseDuration(c.config.ImagePullProgressTimeout)
if err != nil {
return "", fmt.Errorf("failed to parse image_pull_progress_timeout %q: %w", c.config.ImagePullProgressTimeout, err)
}
snapshotter, err := c.snapshotterFromPodSandboxConfig(ctx, ref, sandboxConfig)
if err != nil {
return "", err
}
span.SetAttributes(
tracing.Attribute("image.ref", ref),
tracing.Attribute("snapshotter.name", snapshotter),
)
labels := c.getLabels(ctx, ref)
// If UseLocalImagePull is true, use client.Pull to pull the image, else use transfer service by default.
//
// Transfer service does not currently support all the CRI image config options.
// TODO: Add support for DisableSnapshotAnnotations, DiscardUnpackedLayers, ImagePullWithSyncFs and unpackDuplicationSuppressor
var image containerd.Image
if c.config.UseLocalImagePull {
image, err = c.pullImageWithLocalPull(ctx, ref, credentials, snapshotter, labels, imagePullProgressTimeout)
} else {
image, err = c.pullImageWithTransferService(ctx, ref, credentials, snapshotter, labels, imagePullProgressTimeout)
}
if err != nil {
return "", err
}
span.AddEvent("Pull and unpack image complete")
configDesc, err := image.Config(ctx)
if err != nil {
return "", fmt.Errorf("get image config descriptor: %w", err)
}
imageID := configDesc.Digest.String()
repoDigest, repoTag := util.GetRepoDigestAndTag(namedRef, image.Target().Digest)
for _, r := range []string{imageID, repoTag, repoDigest} {
if r == "" {
continue
}
if err := c.createOrUpdateImageReference(ctx, r, image.Target(), labels); err != nil {
return "", fmt.Errorf("failed to create image reference %q: %w", r, err)
}
// Update image store to reflect the newest state in containerd.
// No need to use `updateImage`, because the image reference must
// have been managed by the cri plugin.
// TODO: Use image service directly
if err := c.imageStore.Update(ctx, r); err != nil {
return "", fmt.Errorf("failed to update image store %q: %w", r, err)
}
}
const mbToByte = 1024 * 1024
size, _ := image.Size(ctx)
imagePullingSpeed := float64(size) / mbToByte / time.Since(startTime).Seconds()
imagePullThroughput.Observe(imagePullingSpeed)
log.G(ctx).Infof("Pulled image %q with image id %q, repo tag %q, repo digest %q, size %q in %s", name, imageID,
repoTag, repoDigest, strconv.FormatInt(size, 10), time.Since(startTime))
// NOTE(random-liu): the actual state in containerd is the source of truth, even we maintain
// in-memory image store, it's only for in-memory indexing. The image could be removed
// by someone else anytime, before/during/after we create the metadata. We should always
// check the actual state in containerd before using the image or returning status of the
// image.
return imageID, nil
}
// pullImageWithLocalPull handles image pulling using the local client.
func (c *CRIImageService) pullImageWithLocalPull(
ctx context.Context,
ref string,
credentials func(string) (string, string, error),
snapshotter string,
labels map[string]string,
imagePullProgressTimeout time.Duration,
) (containerd.Image, error) {
pctx, pcancel := context.WithCancel(ctx)
defer pcancel()
pullReporter := newPullProgressReporter(ref, pcancel, imagePullProgressTimeout)
resolver := docker.NewResolver(docker.ResolverOptions{
Headers: c.config.Registry.Headers,
Hosts: c.registryHosts(ctx, credentials, pullReporter.optionUpdateClient),
})
log.G(ctx).Debugf("PullImage %q with snapshotter %s using client.Pull()", ref, snapshotter)
pullOpts := []containerd.RemoteOpt{
containerd.WithResolver(resolver),
containerd.WithPullSnapshotter(snapshotter),
containerd.WithPullUnpack,
containerd.WithPullLabels(labels),
containerd.WithDownloadLimiter(c.downloadLimiter),
containerd.WithMaxConcurrentDownloads(c.config.MaxConcurrentDownloads),
containerd.WithConcurrentLayerFetchBuffer(c.config.ConcurrentLayerFetchBuffer),
containerd.WithUnpackOpts([]containerd.UnpackOpt{
containerd.WithUnpackDuplicationSuppressor(c.unpackDuplicationSuppressor),
containerd.WithUnpackApplyOpts(diff.WithSyncFs(c.config.ImagePullWithSyncFs)),
}),
}
// Temporarily removed for v2 upgrade
//pullOpts = append(pullOpts, c.encryptedImagesPullOpts()...)
if !c.config.DisableSnapshotAnnotations {
pullOpts = append(pullOpts,
containerd.WithImageHandlerWrapper(snpkg.AppendInfoHandlerWrapper(ref)))
}
if c.config.DiscardUnpackedLayers {
// Allows GC to clean layers up from the content store after unpacking
pullOpts = append(pullOpts,
containerd.WithChildLabelMap(containerdimages.ChildGCLabelsFilterLayers))
}
pullReporter.start(pctx)
image, err := c.client.Pull(pctx, ref, pullOpts...)
pcancel()
if err != nil {
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
}
return image, nil
}
// pullImageWithTransferService handles image pulling using the transfer service.
func (c *CRIImageService) pullImageWithTransferService(
ctx context.Context,
ref string,
credentials func(string) (string, string, error),
snapshotter string,
labels map[string]string,
imagePullProgressTimeout time.Duration,
) (containerd.Image, error) {
log.G(ctx).Debugf("PullImage %q with snapshotter %s using transfer service", ref, snapshotter)
rctx, rcancel := context.WithCancel(ctx)
defer rcancel()
transferProgressReporter := newTransferProgressReporter(ref, rcancel, imagePullProgressTimeout)
// Set image store opts
var sopts []transferimage.StoreOpt
sopts = append(sopts, transferimage.WithPlatforms(platforms.DefaultSpec()))
sopts = append(sopts, transferimage.WithUnpack(platforms.DefaultSpec(), snapshotter))
sopts = append(sopts, transferimage.WithImageLabels(labels))
is := transferimage.NewStore(ref, sopts...)
log.G(ctx).Debugf("Getting new CRI credentials")
ch := newCRICredentials(ref, credentials)
opts := []registry.Opt{registry.WithCredentials(ch)}
opts = append(opts, registry.WithHeaders(c.config.Registry.Headers))
opts = append(opts, registry.WithHostDir(c.config.Registry.ConfigPath))
reg, err := registry.NewOCIRegistry(ctx, ref, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create OCI registry: %w", err)
}
transferProgressReporter.start(rctx)
log.G(ctx).Debugf("Calling cri transfer service")
err = c.transferrer.Transfer(rctx, reg, is, transfer.WithProgress(transferProgressReporter.createProgressFunc(rctx)))
rcancel()
if err != nil {
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
}
// Image should be pulled, unpacked and present in containerd image store at this moment
image, err := c.client.GetImage(ctx, ref)
if err != nil {
return nil, fmt.Errorf("failed to get image %q from containerd image store: %w", ref, err)
}
return image, nil
}
// ParseAuth parses AuthConfig and returns username and password/secret required by containerd.
func ParseAuth(auth *runtime.AuthConfig, host string) (string, string, error) {
if auth == nil {
return "", "", nil
}
if auth.ServerAddress != "" {
// Do not return the auth info when server address doesn't match.
u, err := url.Parse(auth.ServerAddress)
if err != nil {
return "", "", fmt.Errorf("parse server address: %w", err)
}
if host != u.Host {
return "", "", nil
}
}
if auth.Username != "" {
return auth.Username, auth.Password, nil
}
if auth.IdentityToken != "" {
return "", auth.IdentityToken, nil
}
if auth.Auth != "" {
decLen := base64.StdEncoding.DecodedLen(len(auth.Auth))
decoded := make([]byte, decLen)
_, err := base64.StdEncoding.Decode(decoded, []byte(auth.Auth))
if err != nil {
return "", "", err
}
user, passwd, ok := strings.Cut(string(decoded), ":")
if !ok {
return "", "", fmt.Errorf("invalid decoded auth: %q", decoded)
}
return user, strings.Trim(passwd, "\x00"), nil
}
// TODO(random-liu): Support RegistryToken.
// An empty auth config is valid for anonymous registry
return "", "", nil
}
// createOrUpdateImageReference creates or updates image reference inside containerd image store.
// Note that because create and update are not finished in one transaction, there could be race. E.g.
// the image reference is deleted by someone else after create returns already exists, but before update
// happens.
func (c *CRIImageService) createOrUpdateImageReference(ctx context.Context, name string, desc imagespec.Descriptor, labels map[string]string) error {
img := containerdimages.Image{
Name: name,
Target: desc,
// Add a label to indicate that the image is managed by the cri plugin.
Labels: labels,
}
// TODO(random-liu): Figure out which is the more performant sequence create then update or
// update then create.
// TODO: Call CRIImageService directly
_, err := c.images.Create(ctx, img)
if err == nil {
return nil
} else if !errdefs.IsAlreadyExists(err) {
return err
}
// Retrieve oldImg from image store here because Create routine returns an
// empty image on ErrAlreadyExists
oldImg, err := c.images.Get(ctx, name)
if err != nil {
return err
}
fieldpaths := []string{"target"}
if oldImg.Labels[crilabels.ImageLabelKey] != labels[crilabels.ImageLabelKey] {
fieldpaths = append(fieldpaths, "labels."+crilabels.ImageLabelKey)
}
if oldImg.Labels[crilabels.PinnedImageLabelKey] != labels[crilabels.PinnedImageLabelKey] &&
labels[crilabels.PinnedImageLabelKey] == crilabels.PinnedImageLabelValue {
fieldpaths = append(fieldpaths, "labels."+crilabels.PinnedImageLabelKey)
}
if oldImg.Target.Digest == img.Target.Digest && len(fieldpaths) < 2 {
return nil
}
_, err = c.images.Update(ctx, img, fieldpaths...)
return err
}
// getLabels get image labels to be added on CRI image
func (c *CRIImageService) getLabels(ctx context.Context, name string) map[string]string {
labels := map[string]string{crilabels.ImageLabelKey: crilabels.ImageLabelValue}
for _, pinned := range c.config.PinnedImages {
if pinned == name {
labels[crilabels.PinnedImageLabelKey] = crilabels.PinnedImageLabelValue
}
}
return labels
}
// updateImage updates image store to reflect the newest state of an image reference
// in containerd. If the reference is not managed by the cri plugin, the function also
// generates necessary metadata for the image and make it managed.
func (c *CRIImageService) UpdateImage(ctx context.Context, r string) error {
// TODO: Use image service
img, err := c.client.GetImage(ctx, r)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("get image by reference: %w", err)
}
// If the image is not found, we should continue updating the cache,
// so that the image can be removed from the cache.
if err := c.imageStore.Update(ctx, r); err != nil {
return fmt.Errorf("update image store for %q: %w", r, err)
}
return nil
}
labels := img.Labels()
criLabels := c.getLabels(ctx, r)
for key, value := range criLabels {
if labels[key] != value {
// Make sure the image has the image id as its unique
// identifier that references the image in its lifetime.
configDesc, err := img.Config(ctx)
if err != nil {
return fmt.Errorf("get image id: %w", err)
}
id := configDesc.Digest.String()
if err := c.createOrUpdateImageReference(ctx, id, img.Target(), criLabels); err != nil {
return fmt.Errorf("create image id reference %q: %w", id, err)
}
if err := c.imageStore.Update(ctx, id); err != nil {
return fmt.Errorf("update image store for %q: %w", id, err)
}
// The image id is ready, add the label to mark the image as managed.
if err := c.createOrUpdateImageReference(ctx, r, img.Target(), criLabels); err != nil {
return fmt.Errorf("create managed label: %w", err)
}
break
}
}
if err := c.imageStore.Update(ctx, r); err != nil {
return fmt.Errorf("update image store for %q: %w", r, err)
}
return nil
}
func hostDirFromRoots(roots []string) func(string) (string, error) {
rootfn := make([]func(string) (string, error), len(roots))
for i := range roots {
rootfn[i] = config.HostDirFromRoot(roots[i])
}
return func(host string) (dir string, err error) {
for _, fn := range rootfn {
dir, err = fn(host)
if (err != nil && !errdefs.IsNotFound(err)) || (dir != "") {
break
}
}
return
}
}
// registryHosts is the registry hosts to be used by the resolver.
func (c *CRIImageService) registryHosts(ctx context.Context, credentials func(host string) (string, string, error), updateClientFn config.UpdateClientFunc) docker.RegistryHosts {
paths := filepath.SplitList(c.config.Registry.ConfigPath)
if len(paths) > 0 {
hostOptions := config.HostOptions{
UpdateClient: updateClientFn,
}
hostOptions.Credentials = credentials
hostOptions.HostDir = hostDirFromRoots(paths)
// need to pass cri global headers to per-host authorizers
hostOptions.AuthorizerOpts = []docker.AuthorizerOpt{
docker.WithAuthHeader(c.config.Registry.Headers),
}
return config.ConfigureHosts(ctx, hostOptions)
}
return func(host string) ([]docker.RegistryHost, error) {
var registries []docker.RegistryHost
endpoints, err := c.registryEndpoints(host)
if err != nil {
return nil, fmt.Errorf("get registry endpoints: %w", err)
}
for _, e := range endpoints {
u, err := url.Parse(e)
if err != nil {
return nil, fmt.Errorf("parse registry endpoint %q from mirrors: %w", e, err)
}
var (
transport = docker.DefaultHTTPTransport(nil) // no tls config
client = &http.Client{Transport: transport}
config = c.config.Registry.Configs[u.Host]
)
if docker.IsLocalhost(host) && u.Scheme == "http" {
// Skipping TLS verification for localhost
transport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
// Make a copy of `credentials`, so that different authorizers would not reference
// the same credentials variable.
credentials := credentials
if credentials == nil && config.Auth != nil {
auth := toRuntimeAuthConfig(*config.Auth)
credentials = func(host string) (string, string, error) {
return ParseAuth(auth, host)
}
}
if updateClientFn != nil {
if err := updateClientFn(client); err != nil {
return nil, fmt.Errorf("failed to update http client: %w", err)
}
}
authorizer := docker.NewDockerAuthorizer(
docker.WithAuthClient(client),
docker.WithAuthCreds(credentials),
docker.WithAuthHeader(c.config.Registry.Headers),
)
if u.Path == "" {
u.Path = "/v2"
}
registries = append(registries, docker.RegistryHost{
Client: client,
Authorizer: authorizer,
Host: u.Host,
Scheme: u.Scheme,
Path: u.Path,
Capabilities: docker.HostCapabilityResolve | docker.HostCapabilityPull,
})
}
return registries, nil
}
}
// toRuntimeAuthConfig converts cri plugin auth config to runtime auth config.
func toRuntimeAuthConfig(a criconfig.AuthConfig) *runtime.AuthConfig {
return &runtime.AuthConfig{
Username: a.Username,
Password: a.Password,
Auth: a.Auth,
IdentityToken: a.IdentityToken,
}
}
// defaultScheme returns the default scheme for a registry host.
func defaultScheme(host string) string {
if docker.IsLocalhost(host) {
return "http"
}
return "https"
}
// addDefaultScheme returns the endpoint with default scheme
func addDefaultScheme(endpoint string) (string, error) {
if strings.Contains(endpoint, "://") {
return endpoint, nil
}
ue := "dummy://" + endpoint
u, err := url.Parse(ue)
if err != nil {
return "", err
}
return fmt.Sprintf("%s://%s", defaultScheme(u.Host), endpoint), nil
}
// registryEndpoints returns endpoints for a given host.
// It adds default registry endpoint if it does not exist in the passed-in endpoint list.
// It also supports wildcard host matching with `*`.
func (c *CRIImageService) registryEndpoints(host string) ([]string, error) {
var endpoints []string
_, ok := c.config.Registry.Mirrors[host]
if ok {
endpoints = c.config.Registry.Mirrors[host].Endpoints
} else {
endpoints = c.config.Registry.Mirrors["*"].Endpoints
}
defaultHost, err := docker.DefaultHost(host)
if err != nil {
return nil, fmt.Errorf("get default host: %w", err)
}
for i := range endpoints {
en, err := addDefaultScheme(endpoints[i])
if err != nil {
return nil, fmt.Errorf("parse endpoint url: %w", err)
}
endpoints[i] = en
}
for _, e := range endpoints {
u, err := url.Parse(e)
if err != nil {
return nil, fmt.Errorf("parse endpoint url: %w", err)
}
if u.Host == host {
// Do not add default if the endpoint already exists.
return endpoints, nil
}
}
return append(endpoints, defaultScheme(defaultHost)+"://"+defaultHost), nil
}
// encryptedImagesPullOpts returns the necessary list of pull options required
// for decryption of encrypted images based on the cri decryption configuration.
// Temporarily removed for v2 upgrade
func (c *CRIImageService) encryptedImagesPullOpts() []containerd.RemoteOpt {
if c.config.ImageDecryption.KeyModel == criconfig.KeyModelNode {
ltdd := imgcrypt.Payload{}
decUnpackOpt := encryption.WithUnpackConfigApplyOpts(encryption.WithDecryptedUnpack(<dd))
opt := containerd.WithUnpackOpts([]containerd.UnpackOpt{decUnpackOpt})
return []containerd.RemoteOpt{opt}
}
return nil
}
const (
// defaultPullProgressReportInterval represents that how often the
// reporter checks that pull progress.
defaultPullProgressReportInterval = 10 * time.Second
)
// pullProgressReporter is used to check single PullImage progress.
type pullProgressReporter struct {
ref string
cancel context.CancelFunc
reqReporter pullRequestReporter
timeout time.Duration
}
func newPullProgressReporter(ref string, cancel context.CancelFunc, timeout time.Duration) *pullProgressReporter {
return &pullProgressReporter{
ref: ref,
cancel: cancel,
reqReporter: pullRequestReporter{},
timeout: timeout,
}
}
func (reporter *pullProgressReporter) optionUpdateClient(client *http.Client) error {
client.Transport = &pullRequestReporterRoundTripper{
rt: client.Transport,
reqReporter: &reporter.reqReporter,
}
return nil
}
func (reporter *pullProgressReporter) start(ctx context.Context) {
if reporter.timeout == 0 {
log.G(ctx).Infof("no timeout and will not start pulling image %s reporter", reporter.ref)
return
}
go func() {
var (
reportInterval = defaultPullProgressReportInterval
lastSeenBytesRead = uint64(0)
lastSeenTimestamp = time.Now()
)
// check progress more frequently if timeout < default internal
if reporter.timeout < reportInterval {
reportInterval = reporter.timeout / 2
}
var ticker = time.NewTicker(reportInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
activeReqs, bytesRead := reporter.reqReporter.status()
log.G(ctx).WithField("ref", reporter.ref).
WithField("activeReqs", activeReqs).
WithField("totalBytesRead", bytesRead).
WithField("lastSeenBytesRead", lastSeenBytesRead).
WithField("lastSeenTimestamp", lastSeenTimestamp.Format(time.RFC3339)).
WithField("reportInterval", reportInterval).
Debugf("progress for image pull")
if activeReqs == 0 || bytesRead > lastSeenBytesRead {
lastSeenBytesRead = bytesRead
lastSeenTimestamp = time.Now()
continue
}
if time.Since(lastSeenTimestamp) > reporter.timeout {
log.G(ctx).Errorf("cancel pulling image %s because of no progress in %v", reporter.ref, reporter.timeout)
reporter.cancel()
return
}
case <-ctx.Done():
activeReqs, bytesRead := reporter.reqReporter.status()
log.G(ctx).Infof("stop pulling image %s: active requests=%v, bytes read=%v", reporter.ref, activeReqs, bytesRead)
return
}
}
}()
}
// countingReadCloser wraps http.Response.Body with pull request reporter,
// which is used by pullRequestReporterRoundTripper.
type countingReadCloser struct {
once sync.Once
rc io.ReadCloser
reqReporter *pullRequestReporter
}
// Read reads bytes from original io.ReadCloser and increases bytes in
// pull request reporter.
func (r *countingReadCloser) Read(p []byte) (int, error) {
n, err := r.rc.Read(p)
r.reqReporter.incByteRead(uint64(n))
return n, err
}
// Close closes the original io.ReadCloser and only decreases the number of
// active pull requests once.
func (r *countingReadCloser) Close() error {
err := r.rc.Close()
r.once.Do(r.reqReporter.decRequest)
return err
}
// pullRequestReporter is used to track the progress per each criapi.PullImage.
type pullRequestReporter struct {
// activeReqs indicates that current number of active pulling requests,
// including auth requests.
activeReqs atomic.Int32
// totalBytesRead indicates that the total bytes has been read from
// remote registry.
totalBytesRead atomic.Uint64
}
func (reporter *pullRequestReporter) incRequest() {
reporter.activeReqs.Add(1)
}
func (reporter *pullRequestReporter) decRequest() {
reporter.activeReqs.Add(-1)
}
func (reporter *pullRequestReporter) incByteRead(nr uint64) {
reporter.totalBytesRead.Add(nr)
}
func (reporter *pullRequestReporter) status() (currentReqs int32, totalBytesRead uint64) {
currentReqs = reporter.activeReqs.Load()
totalBytesRead = reporter.totalBytesRead.Load()
return currentReqs, totalBytesRead
}
// pullRequestReporterRoundTripper wraps http.RoundTripper with pull request
// reporter which is used to track the progress of active http request with
// counting readable http.Response.Body.
//
// NOTE:
//
// Although containerd provides ingester manager to track the progress
// of pulling request, for example `ctr image pull` shows the console progress
// bar, it needs more CPU resources to open/read the ingested files with
// acquiring containerd metadata plugin's boltdb lock.
//
// Before sending HTTP request to registry, the containerd.Client.Pull library
// will open writer by containerd ingester manager. Based on this, the
// http.RoundTripper wrapper can track the active progress with lower overhead
// even if the ref has been locked in ingester manager by other Pull request.
type pullRequestReporterRoundTripper struct {
rt http.RoundTripper
reqReporter *pullRequestReporter
}
func (rt *pullRequestReporterRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
rt.reqReporter.incRequest()
resp, err := rt.rt.RoundTrip(req)
if err != nil {
rt.reqReporter.decRequest()
return nil, err
}
resp.Body = &countingReadCloser{
rc: resp.Body,
reqReporter: rt.reqReporter,
}
return resp, err
}
// Given that runtime information is not passed from PullImageRequest, we depend on an experimental annotation
// passed from pod sandbox config to get the runtimeHandler. The annotation key is specified in configuration.
// Once we know the runtime, try to override default snapshotter if it is set for this runtime.
// See https://github.com/containerd/containerd/issues/6657
func (c *CRIImageService) snapshotterFromPodSandboxConfig(ctx context.Context, imageRef string,
s *runtime.PodSandboxConfig) (string, error) {
snapshotter := c.config.Snapshotter
if s == nil || s.Annotations == nil {
return snapshotter, nil
}
// TODO(kiashok): honor the new CRI runtime handler field added to v0.29.0
// for image pull per runtime class support.
runtimeHandler, ok := s.Annotations[annotations.RuntimeHandler]
if !ok {
return snapshotter, nil
}
// TODO: Ensure error is returned if runtime not found?
if c.runtimePlatforms != nil {
if p, ok := c.runtimePlatforms[runtimeHandler]; ok && p.Snapshotter != snapshotter {
snapshotter = p.Snapshotter
log.G(ctx).Infof("experimental: PullImage %q for runtime %s, using snapshotter %s", imageRef, runtimeHandler, snapshotter)
}
}
return snapshotter, nil
}
type criCredentials struct {
ref string
credentials func(string) (string, string, error)
}
func newCRICredentials(ref string, credentials func(string) (string, string, error)) registry.CredentialHelper {
return &criCredentials{
ref: ref,
credentials: credentials,
}
}
// GetCredentials gets credential from criCredentials makes criCredentials a registry.CredentialHelper
func (cc *criCredentials) GetCredentials(ctx context.Context, ref string, host string) (registry.Credentials, error) {
if cc.credentials == nil {
return registry.Credentials{}, fmt.Errorf("credential handler not initialized for ref %q", ref)
}
if ref != cc.ref {
return registry.Credentials{}, fmt.Errorf("invalid ref %q, expected %q", ref, cc.ref)
}
username, secret, err := cc.credentials(host)
if err != nil {
return registry.Credentials{}, fmt.Errorf("failed to get credentials for %q: %w", host, err)
}
return registry.Credentials{
Host: host,
Username: username,
Secret: secret,
}, nil
}
type transferProgressReporter struct {
ref string
pc chan transfer.Progress
cancel context.CancelFunc
timeout time.Duration
reqReporter pullRequestReporter
statuses map[string]*transfer.Progress
lastSeenBytesRead uint64
lastSeenTimestamp time.Time
}
func newTransferProgressReporter(ref string, cancel context.CancelFunc, timeout time.Duration) *transferProgressReporter {
return &transferProgressReporter{
ref: ref,
cancel: cancel,
timeout: timeout,
pc: make(chan transfer.Progress),
statuses: make(map[string]*transfer.Progress),
}
}
func (reporter *transferProgressReporter) handleProgress(p transfer.Progress) {
// We only need to handle Progress Nodes that represent
// valid requests to a remote registry, so Progress nodes
// without 'Name', 'Desc' or 'Total' can be ignored
if p.Name == "" || p.Desc == nil || p.Total == 0 {
return
}
switch p.Event {
case "waiting":
// 'Waiting' events can be either when the layer is waiting to be
// downloaded and no progress has been made. Or when we have made
// some progress but `waiting` for more content to be downloaded.
if p.Progress == 0 {
return
}
fallthrough // Handle non-zero waiting progress same as downloading
case "downloading":
var curProgress int64
if node, ok := reporter.statuses[p.Name]; !ok {
curProgress = p.Progress
reporter.reqReporter.incRequest()
} else {
curProgress = p.Progress - node.Progress
}
reporter.statuses[p.Name] = &p
if curProgress > 0 {
reporter.IncBytesRead(curProgress)
}
// Download may be complete, but waiting for content
// to be written. In this case, we no longer consider it
// as an active requests.
if p.Progress == p.Total {
reporter.reqReporter.decRequest()
delete(reporter.statuses, p.Name)
}
case "complete":
if node, exists := reporter.statuses[p.Name]; exists {
if curProgress := p.Progress - node.Progress; curProgress > 0 {
reporter.IncBytesRead(curProgress)
}
reporter.reqReporter.decRequest()
delete(reporter.statuses, p.Name)
}
default:
return
}
}
func (reporter *transferProgressReporter) IncBytesRead(bytes int64) {
reporter.reqReporter.incByteRead(uint64(bytes))
}
func (reporter *transferProgressReporter) start(ctx context.Context) {
if reporter.timeout == 0 {
log.G(ctx).Infof("no timeout and will not start pulling image %s reporter", reporter.ref)
return
}
go func() {
var (
reportInterval = defaultPullProgressReportInterval
)
reporter.lastSeenBytesRead = uint64(0)
reporter.lastSeenTimestamp = time.Now()
// check progress more frequently if timeout < default internal
if reporter.timeout < reportInterval {
reportInterval = reporter.timeout / 2
}
var ticker = time.NewTicker(reportInterval)
defer ticker.Stop()
for {
select {
case p := <-reporter.pc:
reporter.handleProgress(p)
case <-ticker.C:
reporter.checkProgress(ctx, reportInterval)
continue
case <-ctx.Done():
activeReqs, bytesRead := reporter.reqReporter.status()
log.G(ctx).Infof("stop pulling image %s: active requests=%v, bytes read=%v", reporter.ref, activeReqs, bytesRead)
return
}
}
}()
}
func (reporter *transferProgressReporter) checkProgress(ctx context.Context, reportInterval time.Duration) {
activeReqs, bytesRead := reporter.reqReporter.status()
lastSeenBytesRead := reporter.lastSeenBytesRead
lastSeenTimestamp := reporter.lastSeenTimestamp
log.G(ctx).WithField("ref", reporter.ref).
WithField("activeReqs", activeReqs).
WithField("totalBytesRead", bytesRead).
WithField("lastSeenBytesRead", lastSeenBytesRead).
WithField("lastSeenTimestamp", lastSeenTimestamp.Format(time.RFC3339)).
WithField("reportInterval", reportInterval).
Debugf("progress for image pull")
if activeReqs == 0 || bytesRead > lastSeenBytesRead {
reporter.lastSeenBytesRead = bytesRead
reporter.lastSeenTimestamp = time.Now()
return
}
if time.Since(lastSeenTimestamp) > reporter.timeout {
log.G(ctx).Errorf("cancel pulling image %s because of no progress in %v", reporter.ref, reporter.timeout)
reporter.cancel()
}
}
func (reporter *transferProgressReporter) createProgressFunc(ctx context.Context) transfer.ProgressFunc {
return func(p transfer.Progress) {
select {
case reporter.pc <- p:
case <-ctx.Done():
return
}
}
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
"fmt"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/errdefs"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
// RemoveImage removes the image.
// TODO(random-liu): Update CRI to pass image reference instead of ImageSpec. (See
// kubernetes/kubernetes#46255)
// TODO(random-liu): We should change CRI to distinguish image id and image spec.
// Remove the whole image no matter the it's image id or reference. This is the
// semantic defined in CRI now.
func (c *GRPCCRIImageService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) {
err := c.CRIImageService.RemoveImage(ctx, r.GetImage())
if err != nil && !errdefs.IsNotFound(err) {
return nil, err
}
return &runtime.RemoveImageResponse{}, nil
}
func (c *CRIImageService) RemoveImage(ctx context.Context, imageSpec *runtime.ImageSpec) error {
span := tracing.SpanFromContext(ctx)
image, err := c.LocalResolve(imageSpec.GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
span.AddEvent(err.Error())
// return empty without error when image not found.
return nil
}
return fmt.Errorf("can not resolve %q locally: %w", imageSpec.GetImage(), err)
}
span.SetAttributes(tracing.Attribute("image.id", image.ID))
// Remove all image references.
for i, ref := range image.References {
var opts []images.DeleteOpt
if i == len(image.References)-1 {
// Delete the last image reference synchronously to trigger garbage collection.
// This is best effort. It is possible that the image reference is deleted by
// someone else before this point.
opts = []images.DeleteOpt{images.SynchronousDelete()}
}
err = c.images.Delete(ctx, ref, opts...)
if err == nil || errdefs.IsNotFound(err) {
// Update image store to reflect the newest state in containerd.
if err := c.imageStore.Update(ctx, ref); err != nil {
return fmt.Errorf("failed to update image reference %q for %q: %w", ref, image.ID, err)
}
continue
}
return fmt.Errorf("failed to delete image reference %q for %q: %w", ref, image.ID, err)
}
return nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
"github.com/containerd/containerd/v2/internal/cri/util"
"github.com/containerd/errdefs"
"github.com/containerd/log"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
// ImageStatus returns the status of the image, returns nil if the image isn't present.
// TODO(random-liu): We should change CRI to distinguish image id and image spec. (See
// kubernetes/kubernetes#46255)
func (c *CRIImageService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) {
image, err := c.LocalResolve(r.GetImage().GetImage())
if err != nil {
if errdefs.IsNotFound(err) {
// return empty without error when image not found.
return &runtime.ImageStatusResponse{}, nil
}
return nil, fmt.Errorf("can not resolve %q locally: %w", r.GetImage().GetImage(), err)
}
// TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot
// doesn't exist?
runtimeImage := toCRIImage(image)
info, err := c.toCRIImageInfo(ctx, &image, r.GetVerbose())
if err != nil {
return nil, fmt.Errorf("failed to generate image info: %w", err)
}
return &runtime.ImageStatusResponse{
Image: runtimeImage,
Info: info,
}, nil
}
// toCRIImage converts internal image object to CRI runtime.Image.
func toCRIImage(image imagestore.Image) *runtime.Image {
repoTags, repoDigests := util.ParseImageReferences(image.References)
runtimeImage := &runtime.Image{
Id: image.ID,
RepoTags: repoTags,
RepoDigests: repoDigests,
Size_: uint64(image.Size),
Pinned: image.Pinned,
}
uid, username := getUserFromImage(image.ImageSpec.Config.User)
if uid != nil {
runtimeImage.Uid = &runtime.Int64Value{Value: *uid}
}
runtimeImage.Username = username
return runtimeImage
}
// getUserFromImage gets uid or user name of the image user.
// If user is numeric, it will be treated as uid; or else, it is treated as user name.
func getUserFromImage(user string) (*int64, string) {
// return both empty if user is not specified in the image.
if user == "" {
return nil, ""
}
// split instances where the id may contain user:group
user = strings.Split(user, ":")[0]
// user could be either uid or user name. Try to interpret as numeric uid.
uid, err := strconv.ParseInt(user, 10, 64)
if err != nil {
// If user is non numeric, assume it's user name.
return nil, user
}
// If user is a numeric uid.
return &uid, ""
}
// TODO (mikebrow): discuss moving this struct and / or constants for info map for some or all of these fields to CRI
type verboseImageInfo struct {
ChainID string `json:"chainID"`
ImageSpec imagespec.Image `json:"imageSpec"`
}
// toCRIImageInfo converts internal image object information to CRI image status response info map.
func (c *CRIImageService) toCRIImageInfo(ctx context.Context, image *imagestore.Image, verbose bool) (map[string]string, error) {
if !verbose {
return nil, nil
}
info := make(map[string]string)
imi := &verboseImageInfo{
ChainID: image.ChainID,
ImageSpec: image.ImageSpec,
}
m, err := json.Marshal(imi)
if err == nil {
info["info"] = string(m)
} else {
log.G(ctx).WithError(err).Errorf("failed to marshal info %v", imi)
info["info"] = err.Error()
}
return info, nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
"time"
"github.com/containerd/containerd/v2/internal/cri/store/snapshot"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
// ImageFsInfo returns information of the filesystem that is used to store images.
// TODO(windows): Usage for windows is always 0 right now. Support this for windows.
// TODO(random-liu): Handle storage consumed by content store
func (c *CRIImageService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (*runtime.ImageFsInfoResponse, error) {
snapshots := c.snapshotStore.List()
snapshotterFSInfos := map[string]snapshot.Snapshot{}
for _, sn := range snapshots {
if info, ok := snapshotterFSInfos[sn.Key.Snapshotter]; ok {
// Use the oldest timestamp as the timestamp of imagefs info.
if sn.Timestamp < info.Timestamp {
info.Timestamp = sn.Timestamp
}
info.Size += sn.Size
info.Inodes += sn.Inodes
snapshotterFSInfos[sn.Key.Snapshotter] = info
} else {
snapshotterFSInfos[sn.Key.Snapshotter] = snapshot.Snapshot{
Timestamp: sn.Timestamp,
Size: sn.Size,
Inodes: sn.Inodes,
}
}
}
var imageFilesystems []*runtime.FilesystemUsage
// Currently kubelet always consumes the first entry of the returned array,
// so put the default snapshotter as the first entry for compatibility.
if info, ok := snapshotterFSInfos[c.config.Snapshotter]; ok {
imageFilesystems = append(imageFilesystems, &runtime.FilesystemUsage{
Timestamp: info.Timestamp,
FsId: &runtime.FilesystemIdentifier{Mountpoint: c.imageFSPaths[c.config.Snapshotter]},
UsedBytes: &runtime.UInt64Value{Value: info.Size},
InodesUsed: &runtime.UInt64Value{Value: info.Inodes},
})
delete(snapshotterFSInfos, c.config.Snapshotter)
} else {
imageFilesystems = append(imageFilesystems, &runtime.FilesystemUsage{
Timestamp: time.Now().UnixNano(),
FsId: &runtime.FilesystemIdentifier{Mountpoint: c.imageFSPaths[c.config.Snapshotter]},
UsedBytes: &runtime.UInt64Value{Value: 0},
InodesUsed: &runtime.UInt64Value{Value: 0},
})
}
for snapshotter, info := range snapshotterFSInfos {
imageFilesystems = append(imageFilesystems, &runtime.FilesystemUsage{
Timestamp: info.Timestamp,
FsId: &runtime.FilesystemIdentifier{Mountpoint: c.imageFSPaths[snapshotter]},
UsedBytes: &runtime.UInt64Value{Value: info.Size},
InodesUsed: &runtime.UInt64Value{Value: info.Inodes},
})
}
return &runtime.ImageFsInfoResponse{ImageFilesystems: imageFilesystems}, nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"github.com/docker/go-metrics"
prom "github.com/prometheus/client_golang/prometheus"
)
var (
imagePulls metrics.LabeledCounter
inProgressImagePulls metrics.Gauge
// image size in MB / image pull duration in seconds
imagePullThroughput prom.Histogram
)
func init() {
const (
namespace = "containerd"
subsystem = "cri_sandboxed"
)
// these CRI metrics record latencies for successful operations around a sandbox and container's lifecycle.
ns := metrics.NewNamespace(namespace, subsystem, nil)
imagePulls = ns.NewLabeledCounter("image_pulls", "succeeded and failed counters", "status")
inProgressImagePulls = ns.NewGauge("in_progress_image_pulls", "in progress pulls", metrics.Total)
imagePullThroughput = prom.NewHistogram(
prom.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "image_pulling_throughput",
Help: "image pull throughput",
Buckets: prom.DefBuckets,
},
)
ns.Add(imagePullThroughput)
metrics.Register(ns)
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
"time"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/snapshots"
"github.com/containerd/containerd/v2/core/transfer"
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
snapshotstore "github.com/containerd/containerd/v2/internal/cri/store/snapshot"
"github.com/containerd/containerd/v2/internal/kmutex"
"github.com/containerd/log"
"github.com/containerd/platforms"
"golang.org/x/sync/semaphore"
docker "github.com/distribution/reference"
imagedigest "github.com/opencontainers/go-digest"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
type imageClient interface {
ListImages(context.Context, ...string) ([]containerd.Image, error)
GetImage(context.Context, string) (containerd.Image, error)
Pull(context.Context, string, ...containerd.RemoteOpt) (containerd.Image, error)
}
type ImagePlatform struct {
Snapshotter string
Platform imagespec.Platform
}
type CRIImageService struct {
// config contains all image configurations.
config criconfig.ImageConfig
// images is the lower level image store used for raw storage,
// no event publishing should currently be assumed
images images.Store
// client is a subset of the containerd client
// and will be replaced by image store and transfer service
client imageClient
// imageFSPaths contains path to image filesystem for snapshotters.
imageFSPaths map[string]string
// runtimePlatforms are the platforms configured for a runtime.
runtimePlatforms map[string]ImagePlatform
// imageStore stores all resources associated with images.
imageStore *imagestore.Store
// snapshotStore stores information of all snapshots.
snapshotStore *snapshotstore.Store
// transferrer is used to pull image with transfer service
transferrer transfer.Transferrer
// unpackDuplicationSuppressor is used to make sure that there is only
// one in-flight fetch request or unpack handler for a given descriptor's
// or chain ID.
unpackDuplicationSuppressor kmutex.KeyedLocker
// downloadLimiter is used to limit the number of concurrent downloads.
downloadLimiter *semaphore.Weighted
}
type GRPCCRIImageService struct {
*CRIImageService
}
type CRIImageServiceOptions struct {
Content content.Store
Images images.Store
ImageFSPaths map[string]string
RuntimePlatforms map[string]ImagePlatform
Snapshotters map[string]snapshots.Snapshotter
Client imageClient
Transferrer transfer.Transferrer
}
// NewService creates a new CRI Image Service
//
// TODO:
// 1. Generalize the image service and merge with a single higher level image service
// 2. Update the options to remove client and imageFSPath
// - Platform configuration with Array/Map of snapshotter names + filesystem ID + platform matcher + runtime to snapshotter
// - Transfer service implementation
// - Image Service (from metadata)
// - Content store (from metadata)
// 3. Separate image cache and snapshot cache to first class plugins, make the snapshot cache much more efficient and intelligent
func NewService(config criconfig.ImageConfig, options *CRIImageServiceOptions) (*CRIImageService, error) {
var downloadLimiter *semaphore.Weighted
if config.MaxConcurrentDownloads > 0 {
downloadLimiter = semaphore.NewWeighted(int64(config.MaxConcurrentDownloads))
}
svc := CRIImageService{
config: config,
images: options.Images,
client: options.Client,
imageStore: imagestore.NewStore(options.Images, options.Content, platforms.Default()),
imageFSPaths: options.ImageFSPaths,
runtimePlatforms: options.RuntimePlatforms,
snapshotStore: snapshotstore.NewStore(),
transferrer: options.Transferrer,
unpackDuplicationSuppressor: kmutex.New(),
downloadLimiter: downloadLimiter,
}
log.L.Info("Start snapshots syncer")
snapshotsSyncer := newSnapshotsSyncer(
svc.snapshotStore,
options.Snapshotters,
time.Duration(svc.config.StatsCollectPeriod)*time.Second,
)
snapshotsSyncer.start()
return &svc, nil
}
// LocalResolve resolves image reference locally and returns corresponding image metadata. It
// returns errdefs.ErrNotFound if the reference doesn't exist.
func (c *CRIImageService) LocalResolve(refOrID string) (imagestore.Image, error) {
getImageID := func(refOrId string) string {
if _, err := imagedigest.Parse(refOrID); err == nil {
return refOrID
}
return func(ref string) string {
// ref is not image id, try to resolve it locally.
// TODO(random-liu): Handle this error better for debugging.
normalized, err := docker.ParseDockerRef(ref)
if err != nil {
return ""
}
id, err := c.imageStore.Resolve(normalized.String())
if err != nil {
return ""
}
return id
}(refOrID)
}
imageID := getImageID(refOrID)
if imageID == "" {
// Try to treat ref as imageID
imageID = refOrID
}
return c.imageStore.Get(imageID)
}
// RuntimeSnapshotter overrides the default snapshotter if Snapshotter is set for this runtime.
// See https://github.com/containerd/containerd/issues/6657
// TODO: Pass in name and get back runtime platform
func (c *CRIImageService) RuntimeSnapshotter(ctx context.Context, ociRuntime criconfig.Runtime) string {
if ociRuntime.Snapshotter == "" {
return c.config.Snapshotter
}
log.G(ctx).Debugf("Set snapshotter for runtime %s to %s", ociRuntime.Type, ociRuntime.Snapshotter)
return ociRuntime.Snapshotter
}
// GetImage gets image metadata by image id.
func (c *CRIImageService) GetImage(id string) (imagestore.Image, error) {
return c.imageStore.Get(id)
}
// GetSnapshot returns the snapshot with specified key.
func (c *CRIImageService) GetSnapshot(key, snapshotter string) (snapshotstore.Snapshot, error) {
snapshotKey := snapshotstore.Key{
Key: key,
Snapshotter: snapshotter,
}
return c.snapshotStore.Get(snapshotKey)
}
func (c *CRIImageService) ImageFSPaths() map[string]string {
return c.imageFSPaths
}
// PinnedImage is used to lookup a pinned image by name.
// Most often used to get the "sandbox" image.
func (c *CRIImageService) PinnedImage(name string) string {
return c.config.PinnedImages[name]
}
// GRPCService returns a new CRI Image Service grpc server.
func (c *CRIImageService) GRPCService() runtime.ImageServiceServer {
return &GRPCCRIImageService{c}
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"context"
"fmt"
"time"
snapshot "github.com/containerd/containerd/v2/core/snapshots"
snapshotstore "github.com/containerd/containerd/v2/internal/cri/store/snapshot"
ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
"github.com/containerd/errdefs"
"github.com/containerd/log"
)
// snapshotsSyncer syncs snapshot stats periodically. imagefs info and container stats
// should both use cached result here.
// TODO(random-liu): Benchmark with high workload. We may need a statsSyncer instead if
// benchmark result shows that container cpu/memory stats also need to be cached.
type snapshotsSyncer struct {
store *snapshotstore.Store
snapshotters map[string]snapshot.Snapshotter
syncPeriod time.Duration
}
// newSnapshotsSyncer creates a snapshot syncer.
func newSnapshotsSyncer(store *snapshotstore.Store, snapshotters map[string]snapshot.Snapshotter,
period time.Duration) *snapshotsSyncer {
return &snapshotsSyncer{
store: store,
snapshotters: snapshotters,
syncPeriod: period,
}
}
// start starts the snapshots syncer. No stop function is needed because
// the syncer doesn't update any persistent states, it's fine to let it
// exit with the process.
func (s *snapshotsSyncer) start() {
tick := time.NewTicker(s.syncPeriod)
go func() {
defer tick.Stop()
// TODO(random-liu): This is expensive. We should do benchmark to
// check the resource usage and optimize this.
for {
if err := s.sync(); err != nil {
log.L.WithError(err).Error("Failed to sync snapshot stats")
}
<-tick.C
}
}()
}
// sync updates all snapshots stats.
func (s *snapshotsSyncer) sync() error {
ctx := ctrdutil.NamespacedContext()
start := time.Now().UnixNano()
for key, snapshotter := range s.snapshotters {
var snapshots []snapshot.Info
// Do not call `Usage` directly in collect function, because
// `Usage` takes time, we don't want `Walk` to hold read lock
// of snapshot metadata store for too long time.
// TODO(random-liu): Set timeout for the following 2 contexts.
if err := snapshotter.Walk(ctx, func(ctx context.Context, info snapshot.Info) error {
snapshots = append(snapshots, info)
return nil
}); err != nil {
return fmt.Errorf("walk all snapshots for %q failed: %w", key, err)
}
for _, info := range snapshots {
snapshotKey := snapshotstore.Key{
Key: info.Name,
Snapshotter: key,
}
sn, err := s.store.Get(snapshotKey)
if err == nil {
// Only update timestamp for non-active snapshot.
if sn.Kind == info.Kind && sn.Kind != snapshot.KindActive {
sn.Timestamp = time.Now().UnixNano()
s.store.Add(sn)
continue
}
}
// Get newest stats if the snapshot is new or active.
sn = snapshotstore.Snapshot{
Key: snapshotstore.Key{
Key: info.Name,
Snapshotter: key,
},
Kind: info.Kind,
Timestamp: time.Now().UnixNano(),
}
usage, err := snapshotter.Usage(ctx, info.Name)
if err != nil {
if !errdefs.IsNotFound(err) {
log.L.WithError(err).Errorf("Failed to get usage for snapshot %q", info.Name)
}
continue
}
sn.Size = uint64(usage.Size)
sn.Inodes = uint64(usage.Inodes)
s.store.Add(sn)
}
}
for _, sn := range s.store.List() {
if sn.Timestamp >= start {
continue
}
// Delete the snapshot stats if it's not updated this time.
s.store.Delete(sn.Key)
}
return nil
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compression
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"sync"
"github.com/containerd/log"
"github.com/klauspost/compress/zstd"
)
type (
// Compression is the state represents if compressed or not.
Compression int
)
const (
// Uncompressed represents the uncompressed.
Uncompressed Compression = iota
// Gzip is gzip compression algorithm.
Gzip
// Zstd is zstd compression algorithm.
Zstd
// Unknown is used when a plugin handles the algorithm.
Unknown
)
const (
disablePigzEnv = "CONTAINERD_DISABLE_PIGZ"
disableIgzipEnv = "CONTAINERD_DISABLE_IGZIP"
)
var (
initGzip sync.Once
gzipPath string
)
var (
bufioReader32KPool = &sync.Pool{
New: func() interface{} { return bufio.NewReaderSize(nil, 32*1024) },
}
)
// DecompressReadCloser include the stream after decompress and the compress method detected.
type DecompressReadCloser interface {
io.ReadCloser
// GetCompression returns the compress method which is used before decompressing
GetCompression() Compression
}
type readCloserWrapper struct {
io.Reader
compression Compression
closer func() error
}
func (r *readCloserWrapper) Close() error {
if r.closer != nil {
return r.closer()
}
return nil
}
func (r *readCloserWrapper) GetCompression() Compression {
return r.compression
}
type writeCloserWrapper struct {
io.Writer
closer func() error
}
func (w *writeCloserWrapper) Close() error {
if w.closer != nil {
w.closer()
}
return nil
}
type bufferedReader struct {
buf *bufio.Reader
}
func newBufferedReader(r io.Reader) *bufferedReader {
buf := bufioReader32KPool.Get().(*bufio.Reader)
buf.Reset(r)
return &bufferedReader{buf}
}
func (r *bufferedReader) Read(p []byte) (n int, err error) {
if r.buf == nil {
return 0, io.EOF
}
n, err = r.buf.Read(p)
if err == io.EOF {
r.buf.Reset(nil)
bufioReader32KPool.Put(r.buf)
r.buf = nil
}
return
}
func (r *bufferedReader) Peek(n int) ([]byte, error) {
if r.buf == nil {
return nil, io.EOF
}
return r.buf.Peek(n)
}
const (
zstdMagicSkippableStart = 0x184D2A50
zstdMagicSkippableMask = 0xFFFFFFF0
)
var (
gzipMagic = []byte{0x1F, 0x8B, 0x08}
zstdMagic = []byte{0x28, 0xb5, 0x2f, 0xfd}
)
type matcher = func([]byte) bool
func magicNumberMatcher(m []byte) matcher {
return func(source []byte) bool {
return bytes.HasPrefix(source, m)
}
}
// zstdMatcher detects zstd compression algorithm.
// There are two frame formats defined by Zstandard: Zstandard frames and Skippable frames.
// See https://datatracker.ietf.org/doc/html/rfc8878#section-3 for more details.
func zstdMatcher() matcher {
return func(source []byte) bool {
if bytes.HasPrefix(source, zstdMagic) {
// Zstandard frame
return true
}
// skippable frame
if len(source) < 8 {
return false
}
// magic number from 0x184D2A50 to 0x184D2A5F.
if binary.LittleEndian.Uint32(source[:4])&zstdMagicSkippableMask == zstdMagicSkippableStart {
return true
}
return false
}
}
// DetectCompression detects the compression algorithm of the source.
func DetectCompression(source []byte) Compression {
for compression, fn := range map[Compression]matcher{
Gzip: magicNumberMatcher(gzipMagic),
Zstd: zstdMatcher(),
} {
if fn(source) {
return compression
}
}
return Uncompressed
}
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
buf := newBufferedReader(archive)
bs, err := buf.Peek(10)
if err != nil && err != io.EOF {
// Note: we'll ignore any io.EOF error because there are some odd
// cases where the layer.tar file will be empty (zero bytes) and
// that results in an io.EOF from the Peek() call. So, in those
// cases we'll just treat it as a non-compressed stream and
// that means just create an empty layer.
// See Issue docker/docker#18170
return nil, err
}
switch compression := DetectCompression(bs); compression {
case Uncompressed:
return &readCloserWrapper{
Reader: buf,
compression: compression,
}, nil
case Gzip:
ctx, cancel := context.WithCancel(context.Background())
gzReader, err := gzipDecompress(ctx, buf)
if err != nil {
cancel()
return nil, err
}
return &readCloserWrapper{
Reader: gzReader,
compression: compression,
closer: func() error {
cancel()
return gzReader.Close()
},
}, nil
case Zstd:
zstdReader, err := zstd.NewReader(buf,
zstd.WithDecoderLowmem(false),
)
if err != nil {
return nil, err
}
return &readCloserWrapper{
Reader: zstdReader,
compression: compression,
closer: func() error {
zstdReader.Close()
return nil
},
}, nil
default:
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
}
}
// CompressStream compresses the dest with specified compression algorithm.
func CompressStream(dest io.Writer, compression Compression) (io.WriteCloser, error) {
switch compression {
case Uncompressed:
return &writeCloserWrapper{dest, nil}, nil
case Gzip:
return gzip.NewWriter(dest), nil
case Zstd:
return zstd.NewWriter(dest)
default:
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
}
}
// Extension returns the extension of a file that uses the specified compression algorithm.
func (compression *Compression) Extension() string {
switch *compression {
case Gzip:
return "gz"
case Zstd:
return "zst"
case Unknown:
return "unknown"
}
return ""
}
func gzipDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
initGzip.Do(func() {
if gzipPath = detectCommand("igzip", disableIgzipEnv); gzipPath != "" {
log.L.Debug("using igzip for decompression")
return
}
if gzipPath = detectCommand("unpigz", disablePigzEnv); gzipPath != "" {
log.L.Debug("using unpigz for decompression")
}
})
if gzipPath == "" {
return gzip.NewReader(buf)
}
return cmdStream(exec.CommandContext(ctx, gzipPath, "-d", "-c"), buf)
}
func cmdStream(cmd *exec.Cmd, in io.Reader) (io.ReadCloser, error) {
reader, writer := io.Pipe()
cmd.Stdin = in
cmd.Stdout = writer
var errBuf bytes.Buffer
cmd.Stderr = &errBuf
if err := cmd.Start(); err != nil {
return nil, err
}
go func() {
if err := cmd.Wait(); err != nil {
writer.CloseWithError(fmt.Errorf("%s: %s", err, errBuf.String()))
} else {
writer.Close()
}
}()
return reader, nil
}
func detectCommand(path, disableEnvName string) string {
// Check if this command is disabled via the env variable
value := os.Getenv(disableEnvName)
if value != "" {
disable, err := strconv.ParseBool(value)
if err != nil {
log.L.WithError(err).Warnf("could not parse %s: %s", disableEnvName, value)
}
if disable {
return ""
}
}
path, err := exec.LookPath(path)
if err != nil {
log.L.WithError(err).Debugf("%s not found", path)
return ""
}
return path
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package cap provides Linux capability utility
package cap
import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"strings"
)
// FromNumber returns a cap string like "CAP_SYS_ADMIN"
// that corresponds to the given number like 21.
//
// FromNumber returns an empty string for unknown cap number.
func FromNumber(num int) string {
if num < 0 || num > len(capsLatest)-1 {
return ""
}
return capsLatest[num]
}
// FromBitmap parses an uint64 bitmap into string slice like
// []{"CAP_SYS_ADMIN", ...}.
//
// Unknown cap numbers are returned as []int.
func FromBitmap(v uint64) ([]string, []int) {
var (
res []string
unknown []int
)
for i := 0; i <= 63; i++ {
if b := (v >> i) & 0x1; b == 0x1 {
if s := FromNumber(i); s != "" {
res = append(res, s)
} else {
unknown = append(unknown, i)
}
}
}
return res, unknown
}
// Type is the type of capability
type Type int
const (
// Effective is CapEff
Effective Type = 1 << iota
// Permitted is CapPrm
Permitted
// Inheritable is CapInh
Inheritable
// Bounding is CapBnd
Bounding
// Ambient is CapAmb
Ambient
)
// ParseProcPIDStatus returns uint64 bitmap value from /proc/<PID>/status file
func ParseProcPIDStatus(r io.Reader) (map[Type]uint64, error) {
res := make(map[Type]uint64)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
line := scanner.Text()
k, v, ok := strings.Cut(line, ":")
if !ok {
continue
}
k = strings.TrimSpace(k)
switch k {
case "CapInh", "CapPrm", "CapEff", "CapBnd", "CapAmb":
ui64, err := strconv.ParseUint(strings.TrimSpace(v), 16, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse line %q", line)
}
switch k {
case "CapInh":
res[Inheritable] = ui64
case "CapPrm":
res[Permitted] = ui64
case "CapEff":
res[Effective] = ui64
case "CapBnd":
res[Bounding] = ui64
case "CapAmb":
res[Ambient] = ui64
}
}
}
if err := scanner.Err(); err != nil {
return nil, err
}
return res, nil
}
// Current returns the list of the effective and the known caps of
// the current process.
//
// The result is like []string{"CAP_SYS_ADMIN", ...}.
func Current() ([]string, error) {
f, err := os.Open("/proc/self/status")
if err != nil {
return nil, err
}
defer f.Close()
caps, err := ParseProcPIDStatus(f)
if err != nil {
return nil, err
}
capEff := caps[Effective]
names, _ := FromBitmap(capEff)
return names, nil
}
var (
// caps35 is the caps of kernel 3.5 (37 entries)
caps35 = []string{
"CAP_CHOWN", // 2.2
"CAP_DAC_OVERRIDE", // 2.2
"CAP_DAC_READ_SEARCH", // 2.2
"CAP_FOWNER", // 2.2
"CAP_FSETID", // 2.2
"CAP_KILL", // 2.2
"CAP_SETGID", // 2.2
"CAP_SETUID", // 2.2
"CAP_SETPCAP", // 2.2
"CAP_LINUX_IMMUTABLE", // 2.2
"CAP_NET_BIND_SERVICE", // 2.2
"CAP_NET_BROADCAST", // 2.2
"CAP_NET_ADMIN", // 2.2
"CAP_NET_RAW", // 2.2
"CAP_IPC_LOCK", // 2.2
"CAP_IPC_OWNER", // 2.2
"CAP_SYS_MODULE", // 2.2
"CAP_SYS_RAWIO", // 2.2
"CAP_SYS_CHROOT", // 2.2
"CAP_SYS_PTRACE", // 2.2
"CAP_SYS_PACCT", // 2.2
"CAP_SYS_ADMIN", // 2.2
"CAP_SYS_BOOT", // 2.2
"CAP_SYS_NICE", // 2.2
"CAP_SYS_RESOURCE", // 2.2
"CAP_SYS_TIME", // 2.2
"CAP_SYS_TTY_CONFIG", // 2.2
"CAP_MKNOD", // 2.4
"CAP_LEASE", // 2.4
"CAP_AUDIT_WRITE", // 2.6.11
"CAP_AUDIT_CONTROL", // 2.6.11
"CAP_SETFCAP", // 2.6.24
"CAP_MAC_OVERRIDE", // 2.6.25
"CAP_MAC_ADMIN", // 2.6.25
"CAP_SYSLOG", // 2.6.37
"CAP_WAKE_ALARM", // 3.0
"CAP_BLOCK_SUSPEND", // 3.5
}
// caps316 is the caps of kernel 3.16 (38 entries)
caps316 = append(caps35, "CAP_AUDIT_READ")
// caps58 is the caps of kernel 5.8 (40 entries)
caps58 = append(caps316, []string{"CAP_PERFMON", "CAP_BPF"}...)
// caps59 is the caps of kernel 5.9 (41 entries)
caps59 = append(caps58, "CAP_CHECKPOINT_RESTORE")
capsLatest = caps59
)
// Known returns the known cap strings of the latest kernel.
// The current latest kernel is 5.9.
func Known() []string {
return capsLatest
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
// Adaptor specifies the mapping of fieldpaths to a type. For the given field
// path, the value and whether it is present should be returned. The mapping of
// the fieldpath to a field is deferred to the adaptor implementation, but
// should generally follow protobuf field path/mask semantics.
type Adaptor interface {
Field(fieldpath []string) (value string, present bool)
}
// AdapterFunc allows implementation specific matching of fieldpaths
type AdapterFunc func(fieldpath []string) (string, bool)
// Field returns the field name and true if it exists
func (fn AdapterFunc) Field(fieldpath []string) (string, bool) {
return fn(fieldpath)
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package filters defines a syntax and parser that can be used for the
// filtration of items across the containerd API. The core is built on the
// concept of protobuf field paths, with quoting. Several operators allow the
// user to flexibly select items based on field presence, equality, inequality
// and regular expressions. Flexible adaptors support working with any type.
//
// The syntax is fairly familiar, if you've used container ecosystem
// projects. At the core, we base it on the concept of protobuf field
// paths, augmenting with the ability to quote portions of the field path
// to match arbitrary labels. These "selectors" come in the following
// syntax:
//
// ```
// <fieldpath>[<operator><value>]
// ```
//
// A basic example is as follows:
//
// ```
// name==foo
// ```
//
// This would match all objects that have a field `name` with the value
// `foo`. If we only want to test if the field is present, we can omit the
// operator. This is most useful for matching labels in containerd. The
// following will match objects that have the field "labels" and have the
// label "foo" defined:
//
// ```
// labels.foo
// ```
//
// We also allow for quoting of parts of the field path to allow matching
// of arbitrary items:
//
// ```
// labels."very complex label"==something
// ```
//
// We also define `!=` and `~=` as operators. The `!=` will match all
// objects that don't match the value for a field and `~=` will compile the
// target value as a regular expression and match the field value against that.
//
// Selectors can be combined using a comma, such that the resulting
// selector will require all selectors are matched for the object to match.
// The following example will match objects that are named `foo` and have
// the label `bar`:
//
// ```
// name==foo,labels.bar
// ```
package filters
import (
"regexp"
"github.com/containerd/log"
)
// Filter matches specific resources based the provided filter
type Filter interface {
Match(adaptor Adaptor) bool
}
// FilterFunc is a function that handles matching with an adaptor
type FilterFunc func(Adaptor) bool
// Match matches the FilterFunc returning true if the object matches the filter
func (fn FilterFunc) Match(adaptor Adaptor) bool {
return fn(adaptor)
}
// Always is a filter that always returns true for any type of object
var Always FilterFunc = func(adaptor Adaptor) bool {
return true
}
// Any allows multiple filters to be matched against the object
type Any []Filter
// Match returns true if any of the provided filters are true
func (m Any) Match(adaptor Adaptor) bool {
for _, m := range m {
if m.Match(adaptor) {
return true
}
}
return false
}
// All allows multiple filters to be matched against the object
type All []Filter
// Match only returns true if all filters match the object
func (m All) Match(adaptor Adaptor) bool {
for _, m := range m {
if !m.Match(adaptor) {
return false
}
}
return true
}
type operator int
const (
operatorPresent = iota
operatorEqual
operatorNotEqual
operatorMatches
)
func (op operator) String() string {
switch op {
case operatorPresent:
return "?"
case operatorEqual:
return "=="
case operatorNotEqual:
return "!="
case operatorMatches:
return "~="
}
return "unknown"
}
type selector struct {
fieldpath []string
operator operator
value string
re *regexp.Regexp
}
func (m selector) Match(adaptor Adaptor) bool {
value, present := adaptor.Field(m.fieldpath)
switch m.operator {
case operatorPresent:
return present
case operatorEqual:
return present && value == m.value
case operatorNotEqual:
return value != m.value
case operatorMatches:
if m.re == nil {
r, err := regexp.Compile(m.value)
if err != nil {
log.L.Errorf("error compiling regexp %q", m.value)
return false
}
m.re = r
}
return m.re.MatchString(value)
default:
return false
}
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"fmt"
"io"
"github.com/containerd/errdefs"
)
/*
Parse the strings into a filter that may be used with an adaptor.
The filter is made up of zero or more selectors.
The format is a comma separated list of expressions, in the form of
`<fieldpath><op><value>`, known as selectors. All selectors must match the
target object for the filter to be true.
We define the operators "==" for equality, "!=" for not equal and "~=" for a
regular expression. If the operator and value are not present, the matcher will
test for the presence of a value, as defined by the target object.
The formal grammar is as follows:
selectors := selector ("," selector)*
selector := fieldpath (operator value)
fieldpath := field ('.' field)*
field := quoted | [A-Za-z] [A-Za-z0-9_]+
operator := "==" | "!=" | "~="
value := quoted | [^\s,]+
quoted := <go string syntax>
*/
func Parse(s string) (Filter, error) {
// special case empty to match all
if s == "" {
return Always, nil
}
p := parser{input: s}
return p.parse()
}
// ParseAll parses each filter in ss and returns a filter that will return true
// if any filter matches the expression.
//
// If no filters are provided, the filter will match anything.
func ParseAll(ss ...string) (Filter, error) {
if len(ss) == 0 {
return Always, nil
}
var fs []Filter
for _, s := range ss {
f, err := Parse(s)
if err != nil {
return nil, fmt.Errorf("%s: %w", err.Error(), errdefs.ErrInvalidArgument)
}
fs = append(fs, f)
}
return Any(fs), nil
}
type parser struct {
input string
scanner scanner
}
func (p *parser) parse() (Filter, error) {
p.scanner.init(p.input)
ss, err := p.selectors()
if err != nil {
return nil, fmt.Errorf("filters: %w", err)
}
return ss, nil
}
func (p *parser) selectors() (Filter, error) {
s, err := p.selector()
if err != nil {
return nil, err
}
ss := All{s}
loop:
for {
tok := p.scanner.peek()
switch tok {
case ',':
pos, tok, _ := p.scanner.scan()
if tok != tokenSeparator {
return nil, p.mkerr(pos, "expected a separator")
}
s, err := p.selector()
if err != nil {
return nil, err
}
ss = append(ss, s)
case tokenEOF:
break loop
default:
return nil, p.mkerr(p.scanner.ppos, "unexpected input: %v", string(tok))
}
}
return ss, nil
}
func (p *parser) selector() (selector, error) {
fieldpath, err := p.fieldpath()
if err != nil {
return selector{}, err
}
switch p.scanner.peek() {
case ',', tokenSeparator, tokenEOF:
return selector{
fieldpath: fieldpath,
operator: operatorPresent,
}, nil
}
op, err := p.operator()
if err != nil {
return selector{}, err
}
var allowAltQuotes bool
if op == operatorMatches {
allowAltQuotes = true
}
value, err := p.value(allowAltQuotes)
if err != nil {
if err == io.EOF {
return selector{}, io.ErrUnexpectedEOF
}
return selector{}, err
}
return selector{
fieldpath: fieldpath,
value: value,
operator: op,
}, nil
}
func (p *parser) fieldpath() ([]string, error) {
f, err := p.field()
if err != nil {
return nil, err
}
fs := []string{f}
loop:
for {
tok := p.scanner.peek() // lookahead to consume field separator
switch tok {
case '.':
pos, tok, _ := p.scanner.scan() // consume separator
if tok != tokenSeparator {
return nil, p.mkerr(pos, "expected a field separator (`.`)")
}
f, err := p.field()
if err != nil {
return nil, err
}
fs = append(fs, f)
default:
// let the layer above handle the other bad cases.
break loop
}
}
return fs, nil
}
func (p *parser) field() (string, error) {
pos, tok, s := p.scanner.scan()
switch tok {
case tokenField:
return s, nil
case tokenQuoted:
return p.unquote(pos, s, false)
case tokenIllegal:
return "", p.mkerr(pos, "%s", p.scanner.err)
}
return "", p.mkerr(pos, "expected field or quoted")
}
func (p *parser) operator() (operator, error) {
pos, tok, s := p.scanner.scan()
switch tok {
case tokenOperator:
switch s {
case "==":
return operatorEqual, nil
case "!=":
return operatorNotEqual, nil
case "~=":
return operatorMatches, nil
default:
return 0, p.mkerr(pos, "unsupported operator %q", s)
}
case tokenIllegal:
return 0, p.mkerr(pos, "%s", p.scanner.err)
}
return 0, p.mkerr(pos, `expected an operator ("=="|"!="|"~=")`)
}
func (p *parser) value(allowAltQuotes bool) (string, error) {
pos, tok, s := p.scanner.scan()
switch tok {
case tokenValue, tokenField:
return s, nil
case tokenQuoted:
return p.unquote(pos, s, allowAltQuotes)
case tokenIllegal:
return "", p.mkerr(pos, "%s", p.scanner.err)
}
return "", p.mkerr(pos, "expected value or quoted")
}
func (p *parser) unquote(pos int, s string, allowAlts bool) (string, error) {
if !allowAlts && s[0] != '\'' && s[0] != '"' {
return "", p.mkerr(pos, "invalid quote encountered")
}
uq, err := unquote(s)
if err != nil {
return "", p.mkerr(pos, "unquoting failed: %v", err)
}
return uq, nil
}
type parseError struct {
input string
pos int
msg string
}
func (pe parseError) Error() string {
if pe.pos < len(pe.input) {
before := pe.input[:pe.pos]
location := pe.input[pe.pos : pe.pos+1] // need to handle end
after := pe.input[pe.pos+1:]
return fmt.Sprintf("[%s >|%s|< %s]: %v", before, location, after, pe.msg)
}
return fmt.Sprintf("[%s]: %v", pe.input, pe.msg)
}
func (p *parser) mkerr(pos int, format string, args ...interface{}) error {
return fmt.Errorf("parse error: %w", parseError{
input: p.input,
pos: pos,
msg: fmt.Sprintf(format, args...),
})
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"errors"
"unicode/utf8"
)
// NOTE(stevvooe): Most of this code in this file is copied from the stdlib
// strconv package and modified to be able to handle quoting with `/` and `|`
// as delimiters. The copyright is held by the Go authors.
var errQuoteSyntax = errors.New("quote syntax error")
// UnquoteChar decodes the first character or byte in the escaped string
// or character literal represented by the string s.
// It returns four values:
//
// 1. value, the decoded Unicode code point or byte value;
// 2. multibyte, a boolean indicating whether the decoded character requires a multibyte UTF-8 representation;
// 3. tail, the remainder of the string after the character; and
// 4. an error that will be nil if the character is syntactically valid.
//
// The second argument, quote, specifies the type of literal being parsed
// and therefore which escaped quote character is permitted.
// If set to a single quote, it permits the sequence \' and disallows unescaped '.
// If set to a double quote, it permits \" and disallows unescaped ".
// If set to zero, it does not permit either escape and allows both quote characters to appear unescaped.
//
// This is from Go strconv package, modified to support `|` and `/` as double
// quotes for use with regular expressions.
func unquoteChar(s string, quote byte) (value rune, multibyte bool, tail string, err error) {
// easy cases
switch c := s[0]; {
case c == quote && (quote == '\'' || quote == '"' || quote == '/' || quote == '|'):
err = errQuoteSyntax
return
case c >= utf8.RuneSelf:
r, size := utf8.DecodeRuneInString(s)
return r, true, s[size:], nil
case c != '\\':
return rune(s[0]), false, s[1:], nil
}
// hard case: c is backslash
if len(s) <= 1 {
err = errQuoteSyntax
return
}
c := s[1]
s = s[2:]
switch c {
case 'a':
value = '\a'
case 'b':
value = '\b'
case 'f':
value = '\f'
case 'n':
value = '\n'
case 'r':
value = '\r'
case 't':
value = '\t'
case 'v':
value = '\v'
case 'x', 'u', 'U':
n := 0
switch c {
case 'x':
n = 2
case 'u':
n = 4
case 'U':
n = 8
}
var v rune
if len(s) < n {
err = errQuoteSyntax
return
}
for j := 0; j < n; j++ {
x, ok := unhex(s[j])
if !ok {
err = errQuoteSyntax
return
}
v = v<<4 | x
}
s = s[n:]
if c == 'x' {
// single-byte string, possibly not UTF-8
value = v
break
}
if v > utf8.MaxRune {
err = errQuoteSyntax
return
}
value = v
multibyte = true
case '0', '1', '2', '3', '4', '5', '6', '7':
v := rune(c) - '0'
if len(s) < 2 {
err = errQuoteSyntax
return
}
for j := 0; j < 2; j++ { // one digit already; two more
x := rune(s[j]) - '0'
if x < 0 || x > 7 {
err = errQuoteSyntax
return
}
v = (v << 3) | x
}
s = s[2:]
if v > 255 {
err = errQuoteSyntax
return
}
value = v
case '\\':
value = '\\'
case '\'', '"', '|', '/':
if c != quote {
err = errQuoteSyntax
return
}
value = rune(c)
default:
err = errQuoteSyntax
return
}
tail = s
return
}
// unquote interprets s as a single-quoted, double-quoted,
// or backquoted Go string literal, returning the string value
// that s quotes. (If s is single-quoted, it would be a Go
// character literal; Unquote returns the corresponding
// one-character string.)
//
// This is modified from the standard library to support `|` and `/` as quote
// characters for use with regular expressions.
func unquote(s string) (string, error) {
n := len(s)
if n < 2 {
return "", errQuoteSyntax
}
quote := s[0]
if quote != s[n-1] {
return "", errQuoteSyntax
}
s = s[1 : n-1]
if quote == '`' {
if contains(s, '`') {
return "", errQuoteSyntax
}
if contains(s, '\r') {
// -1 because we know there is at least one \r to remove.
buf := make([]byte, 0, len(s)-1)
for i := 0; i < len(s); i++ {
if s[i] != '\r' {
buf = append(buf, s[i])
}
}
return string(buf), nil
}
return s, nil
}
if quote != '"' && quote != '\'' && quote != '|' && quote != '/' {
return "", errQuoteSyntax
}
if contains(s, '\n') {
return "", errQuoteSyntax
}
// Is it trivial? Avoid allocation.
if !contains(s, '\\') && !contains(s, quote) {
switch quote {
case '"', '/', '|': // pipe and slash are treated like double quote
return s, nil
case '\'':
r, size := utf8.DecodeRuneInString(s)
if size == len(s) && (r != utf8.RuneError || size != 1) {
return s, nil
}
}
}
var runeTmp [utf8.UTFMax]byte
buf := make([]byte, 0, 3*len(s)/2) // Try to avoid more allocations.
for len(s) > 0 {
c, multibyte, ss, err := unquoteChar(s, quote)
if err != nil {
return "", err
}
s = ss
if c < utf8.RuneSelf || !multibyte {
buf = append(buf, byte(c))
} else {
n := utf8.EncodeRune(runeTmp[:], c)
buf = append(buf, runeTmp[:n]...)
}
if quote == '\'' && len(s) != 0 {
// single-quoted must be single character
return "", errQuoteSyntax
}
}
return string(buf), nil
}
// contains reports whether the string contains the byte c.
func contains(s string, c byte) bool {
for i := 0; i < len(s); i++ {
if s[i] == c {
return true
}
}
return false
}
func unhex(b byte) (v rune, ok bool) {
c := rune(b)
switch {
case '0' <= c && c <= '9':
return c - '0', true
case 'a' <= c && c <= 'f':
return c - 'a' + 10, true
case 'A' <= c && c <= 'F':
return c - 'A' + 10, true
}
return
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"unicode"
"unicode/utf8"
)
const (
tokenEOF = -(iota + 1)
tokenQuoted
tokenValue
tokenField
tokenSeparator
tokenOperator
tokenIllegal
)
type token rune
func (t token) String() string {
switch t {
case tokenEOF:
return "EOF"
case tokenQuoted:
return "Quoted"
case tokenValue:
return "Value"
case tokenField:
return "Field"
case tokenSeparator:
return "Separator"
case tokenOperator:
return "Operator"
case tokenIllegal:
return "Illegal"
}
return string(t)
}
func (t token) GoString() string {
return "token" + t.String()
}
type scanner struct {
input string
pos int
ppos int // bounds the current rune in the string
value bool
err string
}
func (s *scanner) init(input string) {
s.input = input
s.pos = 0
s.ppos = 0
}
func (s *scanner) next() rune {
if s.pos >= len(s.input) {
return tokenEOF
}
s.pos = s.ppos
r, w := utf8.DecodeRuneInString(s.input[s.ppos:])
s.ppos += w
if r == utf8.RuneError {
if w > 0 {
s.error("rune error")
return tokenIllegal
}
return tokenEOF
}
if r == 0 {
s.error("unexpected null")
return tokenIllegal
}
return r
}
func (s *scanner) peek() rune {
pos := s.pos
ppos := s.ppos
ch := s.next()
s.pos = pos
s.ppos = ppos
return ch
}
func (s *scanner) scan() (nextp int, tk token, text string) {
var (
ch = s.next()
pos = s.pos
)
chomp:
switch {
case ch == tokenEOF:
case ch == tokenIllegal:
case isQuoteRune(ch):
if !s.scanQuoted(ch) {
return pos, tokenIllegal, s.input[pos:s.ppos]
}
return pos, tokenQuoted, s.input[pos:s.ppos]
case isSeparatorRune(ch):
s.value = false
return pos, tokenSeparator, s.input[pos:s.ppos]
case isOperatorRune(ch):
s.scanOperator()
s.value = true
return pos, tokenOperator, s.input[pos:s.ppos]
case unicode.IsSpace(ch):
// chomp
ch = s.next()
pos = s.pos
goto chomp
case s.value:
s.scanValue()
s.value = false
return pos, tokenValue, s.input[pos:s.ppos]
case isFieldRune(ch):
s.scanField()
return pos, tokenField, s.input[pos:s.ppos]
}
return s.pos, token(ch), ""
}
func (s *scanner) scanField() {
for {
ch := s.peek()
if !isFieldRune(ch) {
break
}
s.next()
}
}
func (s *scanner) scanOperator() {
for {
ch := s.peek()
switch ch {
case '=', '!', '~':
s.next()
default:
return
}
}
}
func (s *scanner) scanValue() {
for {
ch := s.peek()
if !isValueRune(ch) {
break
}
s.next()
}
}
func (s *scanner) scanQuoted(quote rune) bool {
var illegal bool
ch := s.next() // read character after quote
for ch != quote {
if ch == '\n' || ch < 0 {
s.error("quoted literal not terminated")
return false
}
if ch == '\\' {
var legal bool
ch, legal = s.scanEscape(quote)
if !legal {
illegal = true
}
} else {
ch = s.next()
}
}
return !illegal
}
func (s *scanner) scanEscape(quote rune) (ch rune, legal bool) {
ch = s.next() // read character after '/'
switch ch {
case 'a', 'b', 'f', 'n', 'r', 't', 'v', '\\', quote:
// nothing to do
ch = s.next()
legal = true
case '0', '1', '2', '3', '4', '5', '6', '7':
ch, legal = s.scanDigits(ch, 8, 3)
case 'x':
ch, legal = s.scanDigits(s.next(), 16, 2)
case 'u':
ch, legal = s.scanDigits(s.next(), 16, 4)
case 'U':
ch, legal = s.scanDigits(s.next(), 16, 8)
default:
s.error("illegal escape sequence")
}
return
}
func (s *scanner) scanDigits(ch rune, base, n int) (rune, bool) {
for n > 0 && digitVal(ch) < base {
ch = s.next()
n--
}
if n > 0 {
s.error("illegal numeric escape sequence")
return ch, false
}
return ch, true
}
func (s *scanner) error(msg string) {
if s.err == "" {
s.err = msg
}
}
func digitVal(ch rune) int {
switch {
case '0' <= ch && ch <= '9':
return int(ch - '0')
case 'a' <= ch && ch <= 'f':
return int(ch - 'a' + 10)
case 'A' <= ch && ch <= 'F':
return int(ch - 'A' + 10)
}
return 16 // larger than any legal digit val
}
func isFieldRune(r rune) bool {
return (r == '_' || isAlphaRune(r) || isDigitRune(r))
}
func isAlphaRune(r rune) bool {
return r >= 'A' && r <= 'Z' || r >= 'a' && r <= 'z'
}
func isDigitRune(r rune) bool {
return r >= '0' && r <= '9'
}
func isOperatorRune(r rune) bool {
switch r {
case '=', '!', '~':
return true
}
return false
}
func isQuoteRune(r rune) bool {
switch r {
case '/', '|', '"': // maybe add single quoting?
return true
}
return false
}
func isSeparatorRune(r rune) bool {
switch r {
case ',', '.':
return true
}
return false
}
func isValueRune(r rune) bool {
return r != ',' && !unicode.IsSpace(r) &&
(unicode.IsLetter(r) ||
unicode.IsDigit(r) ||
unicode.IsNumber(r) ||
unicode.IsGraphic(r) ||
unicode.IsPunct(r))
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"fmt"
"time"
"github.com/containerd/errdefs"
)
// Handles locking references
type lock struct {
since time.Time
}
func (s *store) tryLock(ref string) error {
s.locksMu.Lock()
defer s.locksMu.Unlock()
if v, ok := s.locks[ref]; ok {
// Returning the duration may help developers distinguish dead locks (long duration) from
// lock contentions (short duration).
now := time.Now()
return fmt.Errorf(
"ref %s locked for %s (since %s): %w", ref, now.Sub(v.since), v.since,
errdefs.ErrUnavailable,
)
}
s.locks[ref] = &lock{time.Now()}
return nil
}
func (s *store) unlock(ref string) {
s.locksMu.Lock()
defer s.locksMu.Unlock()
delete(s.locks, ref)
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"fmt"
"io"
"os"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/errdefs"
)
// readerat implements io.ReaderAt in a completely stateless manner by opening
// the referenced file for each call to ReadAt.
type sizeReaderAt struct {
size int64
fp *os.File
}
// OpenReader creates ReaderAt from a file
func OpenReader(p string) (content.ReaderAt, error) {
fi, err := os.Stat(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
return nil, fmt.Errorf("blob not found: %w", errdefs.ErrNotFound)
}
fp, err := os.Open(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
return nil, fmt.Errorf("blob not found: %w", errdefs.ErrNotFound)
}
return sizeReaderAt{size: fi.Size(), fp: fp}, nil
}
func (ra sizeReaderAt) ReadAt(p []byte, offset int64) (int, error) {
return ra.fp.ReadAt(p, offset)
}
func (ra sizeReaderAt) Size() int64 {
return ra.size
}
func (ra sizeReaderAt) Close() error {
return ra.fp.Close()
}
func (ra sizeReaderAt) Reader() io.Reader {
return io.LimitReader(ra.fp, ra.size)
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/internal/fsverity"
"github.com/containerd/containerd/v2/pkg/filters"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 1<<20)
return &buffer
},
}
// LabelStore is used to store mutable labels for digests
type LabelStore interface {
// Get returns all the labels for the given digest
Get(digest.Digest) (map[string]string, error)
// Set sets all the labels for a given digest
Set(digest.Digest, map[string]string) error
// Update replaces the given labels for a digest,
// a key with an empty value removes a label.
Update(digest.Digest, map[string]string) (map[string]string, error)
}
// Store is digest-keyed store for content. All data written into the store is
// stored under a verifiable digest.
//
// Store can generally support multi-reader, single-writer ingest of data,
// including resumable ingest.
type store struct {
root string
ls LabelStore
integritySupported bool
locksMu sync.Mutex
locks map[string]*lock
ensureIngestRootOnce func() error
}
// NewStore returns a local content store
func NewStore(root string) (content.Store, error) {
return NewLabeledStore(root, nil)
}
// NewLabeledStore returns a new content store using the provided label store
//
// Note: content stores which are used underneath a metadata store may not
// require labels and should use `NewStore`. `NewLabeledStore` is primarily
// useful for tests or standalone implementations.
func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
supported, _ := fsverity.IsSupported(root)
s := &store{
root: root,
ls: ls,
integritySupported: supported,
locks: map[string]*lock{},
}
s.ensureIngestRootOnce = sync.OnceValue(s.ensureIngestRoot)
return s, nil
}
func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
p, err := s.blobPath(dgst)
if err != nil {
return content.Info{}, fmt.Errorf("calculating blob info path: %w", err)
}
fi, err := os.Stat(p)
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("content %v: %w", dgst, errdefs.ErrNotFound)
}
return content.Info{}, err
}
var labels map[string]string
if s.ls != nil {
labels, err = s.ls.Get(dgst)
if err != nil {
return content.Info{}, err
}
}
return s.info(dgst, fi, labels), nil
}
func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]string) content.Info {
return content.Info{
Digest: dgst,
Size: fi.Size(),
CreatedAt: fi.ModTime(),
UpdatedAt: getATime(fi),
Labels: labels,
}
}
// ReaderAt returns an io.ReaderAt for the blob.
func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
p, err := s.blobPath(desc.Digest)
if err != nil {
return nil, fmt.Errorf("calculating blob path for ReaderAt: %w", err)
}
reader, err := OpenReader(p)
if err != nil {
return nil, fmt.Errorf("blob %s expected at %s: %w", desc.Digest, p, err)
}
return reader, nil
}
// Delete removes a blob by its digest.
//
// While this is safe to do concurrently, safe exist-removal logic must hold
// some global lock on the store.
func (s *store) Delete(ctx context.Context, dgst digest.Digest) error {
bp, err := s.blobPath(dgst)
if err != nil {
return fmt.Errorf("calculating blob path for delete: %w", err)
}
if err := os.RemoveAll(bp); err != nil {
if !os.IsNotExist(err) {
return err
}
return fmt.Errorf("content %v: %w", dgst, errdefs.ErrNotFound)
}
return nil
}
func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
if s.ls == nil {
return content.Info{}, fmt.Errorf("update not supported on immutable content store: %w", errdefs.ErrFailedPrecondition)
}
p, err := s.blobPath(info.Digest)
if err != nil {
return content.Info{}, fmt.Errorf("calculating blob path for update: %w", err)
}
fi, err := os.Stat(p)
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("content %v: %w", info.Digest, errdefs.ErrNotFound)
}
return content.Info{}, err
}
var (
all bool
labels map[string]string
)
if len(fieldpaths) > 0 {
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if labels == nil {
labels = map[string]string{}
}
key := strings.TrimPrefix(path, "labels.")
labels[key] = info.Labels[key]
continue
}
switch path {
case "labels":
all = true
labels = info.Labels
default:
return content.Info{}, fmt.Errorf("cannot update %q field on content info %q: %w", path, info.Digest, errdefs.ErrInvalidArgument)
}
}
} else {
all = true
labels = info.Labels
}
if all {
err = s.ls.Set(info.Digest, labels)
} else {
labels, err = s.ls.Update(info.Digest, labels)
}
if err != nil {
return content.Info{}, err
}
info = s.info(info.Digest, fi, labels)
info.UpdatedAt = time.Now()
if err := os.Chtimes(p, info.UpdatedAt, info.CreatedAt); err != nil {
log.G(ctx).WithError(err).Warnf("could not change access time for %s", info.Digest)
}
return info, nil
}
func (s *store) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error {
root := filepath.Join(s.root, "blobs")
filter, err := filters.ParseAll(fs...)
if err != nil {
return err
}
var alg digest.Algorithm
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if !fi.IsDir() && !alg.Available() {
return nil
}
// TODO(stevvooe): There are few more cases with subdirs that should be
// handled in case the layout gets corrupted. This isn't strict enough
// and may spew bad data.
if path == root {
return nil
}
if filepath.Dir(path) == root {
alg = digest.Algorithm(filepath.Base(path))
if !alg.Available() {
alg = ""
return filepath.SkipDir
}
// descending into a hash directory
return nil
}
dgst := digest.NewDigestFromEncoded(alg, filepath.Base(path))
if err := dgst.Validate(); err != nil {
// log error but don't report
log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path")
// if we see this, it could mean some sort of corruption of the
// store or extra paths not expected previously.
}
var labels map[string]string
if s.ls != nil {
labels, err = s.ls.Get(dgst)
if err != nil {
return err
}
}
info := s.info(dgst, fi, labels)
if !filter.Match(content.AdaptInfo(info)) {
return nil
}
return fn(info)
})
}
func (s *store) Status(ctx context.Context, ref string) (content.Status, error) {
return s.status(s.ingestRoot(ref))
}
func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {
fp, err := os.Open(filepath.Join(s.root, "ingest"))
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
defer fp.Close()
fis, err := fp.Readdirnames(-1)
if err != nil {
return nil, err
}
filter, err := filters.ParseAll(fs...)
if err != nil {
return nil, err
}
var active []content.Status
for _, fi := range fis {
p := filepath.Join(s.root, "ingest", fi)
stat, err := s.status(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
// TODO(stevvooe): This is a common error if uploads are being
// completed while making this listing. Need to consider taking a
// lock on the whole store to coordinate this aspect.
//
// Another option is to cleanup downloads asynchronously and
// coordinate this method with the cleanup process.
//
// For now, we just skip them, as they really don't exist.
continue
}
if filter.Match(adaptStatus(stat)) {
active = append(active, stat)
}
}
return active, nil
}
// WalkStatusRefs is used to walk all status references
// Failed status reads will be logged and ignored, if
// this function is called while references are being altered,
// these error messages may be produced.
func (s *store) WalkStatusRefs(ctx context.Context, fn func(string) error) error {
fp, err := os.Open(filepath.Join(s.root, "ingest"))
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer fp.Close()
fis, err := fp.Readdirnames(-1)
if err != nil {
return err
}
for _, fi := range fis {
rf := filepath.Join(s.root, "ingest", fi, "ref")
ref, err := readFileString(rf)
if err != nil {
log.G(ctx).WithError(err).WithField("path", rf).Error("failed to read ingest ref")
continue
}
if err := fn(ref); err != nil {
return err
}
}
return nil
}
// status works like stat above except uses the path to the ingest.
func (s *store) status(ingestPath string) (content.Status, error) {
dp := filepath.Join(ingestPath, "data")
fi, err := os.Stat(dp)
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("%s: %w", err.Error(), errdefs.ErrNotFound)
}
return content.Status{}, err
}
ref, err := readFileString(filepath.Join(ingestPath, "ref"))
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("%s: %w", err.Error(), errdefs.ErrNotFound)
}
return content.Status{}, err
}
startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat"))
if err != nil {
return content.Status{}, fmt.Errorf("could not read startedat: %w", err)
}
updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat"))
if err != nil {
return content.Status{}, fmt.Errorf("could not read updatedat: %w", err)
}
// because we don't write updatedat on every write, the mod time may
// actually be more up to date.
if fi.ModTime().After(updatedAt) {
updatedAt = fi.ModTime()
}
return content.Status{
Ref: ref,
Offset: fi.Size(),
Total: s.total(ingestPath),
UpdatedAt: updatedAt,
StartedAt: startedAt,
}, nil
}
func adaptStatus(status content.Status) filters.Adaptor {
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "ref":
return status.Ref, true
}
return "", false
})
}
// total attempts to resolve the total expected size for the write.
func (s *store) total(ingestPath string) int64 {
totalS, err := readFileString(filepath.Join(ingestPath, "total"))
if err != nil {
return 0
}
total, err := strconv.ParseInt(totalS, 10, 64)
if err != nil {
// represents a corrupted file, should probably remove.
return 0
}
return total
}
// Writer begins or resumes the active writer identified by ref. If the writer
// is already in use, an error is returned. Only one writer may be in use per
// ref at a time.
//
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
// TODO(AkihiroSuda): we could create a random string or one calculated based on the context
// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019
if wOpts.Ref == "" {
return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument)
}
if err := s.tryLock(wOpts.Ref); err != nil {
return nil, err
}
w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
if err != nil {
s.unlock(wOpts.Ref)
return nil, err
}
return w, nil // lock is now held by w.
}
func (s *store) resumeStatus(ref string, total int64, digester digest.Digester) (content.Status, error) {
path, _, data := s.ingestPaths(ref)
status, err := s.status(path)
if err != nil {
return status, fmt.Errorf("failed reading status of resume write: %w", err)
}
if ref != status.Ref {
// NOTE(stevvooe): This is fairly catastrophic. Either we have some
// layout corruption or a hash collision for the ref key.
return status, fmt.Errorf("ref key does not match: %v != %v", ref, status.Ref)
}
if total > 0 && status.Total > 0 && total != status.Total {
return status, fmt.Errorf("provided total differs from status: %v != %v", total, status.Total)
}
//nolint:dupword
// TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes
fp, err := os.Open(data)
if err != nil {
return status, err
}
p := bufPool.Get().(*[]byte)
status.Offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
bufPool.Put(p)
fp.Close()
return status, err
}
// writer provides the main implementation of the Writer method. The caller
// must hold the lock correctly and release on error if there is a problem.
func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
// TODO(stevvooe): Need to actually store expected here. We have
// code in the service that shouldn't be dealing with this.
if expected != "" {
p, err := s.blobPath(expected)
if err != nil {
return nil, fmt.Errorf("calculating expected blob path for writer: %w", err)
}
if _, err := os.Stat(p); err == nil {
return nil, fmt.Errorf("content %v: %w", expected, errdefs.ErrAlreadyExists)
}
}
path, refp, data := s.ingestPaths(ref)
var (
digester = digest.Canonical.Digester()
offset int64
startedAt time.Time
updatedAt time.Time
)
foundValidIngest := false
if err := s.ensureIngestRootOnce(); err != nil {
return nil, err
}
// ensure that the ingest path has been created.
if err := os.Mkdir(path, 0755); err != nil {
if !os.IsExist(err) {
return nil, err
}
status, err := s.resumeStatus(ref, total, digester)
if err == nil {
foundValidIngest = true
updatedAt = status.UpdatedAt
startedAt = status.StartedAt
total = status.Total
offset = status.Offset
} else {
log.G(ctx).Infof("failed to resume the status from path %s: %s. will recreate them", path, err.Error())
}
}
if !foundValidIngest {
startedAt = time.Now()
updatedAt = startedAt
// the ingest is new, we need to setup the target location.
// write the ref to a file for later use
if err := os.WriteFile(refp, []byte(ref), 0666); err != nil {
return nil, err
}
if err := writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
return nil, err
}
if err := writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
return nil, err
}
if total > 0 {
if err := os.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil {
return nil, err
}
}
}
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return nil, fmt.Errorf("failed to open data file: %w", err)
}
if _, err := fp.Seek(offset, io.SeekStart); err != nil {
fp.Close()
return nil, fmt.Errorf("could not seek to current write offset: %w", err)
}
return &writer{
s: s,
fp: fp,
ref: ref,
path: path,
offset: offset,
total: total,
digester: digester,
startedAt: startedAt,
updatedAt: updatedAt,
}, nil
}
// Abort an active transaction keyed by ref. If the ingest is active, it will
// be cancelled. Any resources associated with the ingest will be cleaned.
func (s *store) Abort(ctx context.Context, ref string) error {
root := s.ingestRoot(ref)
if err := os.RemoveAll(root); err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("ingest ref %q: %w", ref, errdefs.ErrNotFound)
}
return err
}
return nil
}
func (s *store) blobPath(dgst digest.Digest) (string, error) {
if err := dgst.Validate(); err != nil {
return "", fmt.Errorf("cannot calculate blob path from invalid digest: %v: %w", err, errdefs.ErrInvalidArgument)
}
return filepath.Join(s.root, "blobs", dgst.Algorithm().String(), dgst.Encoded()), nil
}
func (s *store) ingestRoot(ref string) string {
// we take a digest of the ref to keep the ingest paths constant length.
// Note that this is not the current or potential digest of incoming content.
dgst := digest.FromString(ref)
return filepath.Join(s.root, "ingest", dgst.Encoded())
}
// ingestPaths are returned. The paths are the following:
//
// - root: entire ingest directory
// - ref: name of the starting ref, must be unique
// - data: file where data is written
func (s *store) ingestPaths(ref string) (string, string, string) {
var (
fp = s.ingestRoot(ref)
rp = filepath.Join(fp, "ref")
dp = filepath.Join(fp, "data")
)
return fp, rp, dp
}
func (s *store) ensureIngestRoot() error {
return os.MkdirAll(filepath.Join(s.root, "ingest"), 0777)
}
func readFileString(path string) (string, error) {
p, err := os.ReadFile(path)
return string(p), err
}
// readFileTimestamp reads a file with just a timestamp present.
func readFileTimestamp(p string) (time.Time, error) {
b, err := os.ReadFile(p)
if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("%s: %w", err.Error(), errdefs.ErrNotFound)
}
return time.Time{}, err
}
var t time.Time
if err := t.UnmarshalText(b); err != nil {
return time.Time{}, fmt.Errorf("could not parse timestamp file %v: %w", p, err)
}
return t, nil
}
func writeTimestampFile(p string, t time.Time) error {
b, err := t.MarshalText()
if err != nil {
return err
}
return writeToCompletion(p, b, 0666)
}
func writeToCompletion(path string, data []byte, mode os.FileMode) error {
tmp := fmt.Sprintf("%s.tmp", path)
f, err := os.OpenFile(tmp, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, mode)
if err != nil {
return fmt.Errorf("create tmp file: %w", err)
}
_, err = f.Write(data)
f.Close()
if err != nil {
return fmt.Errorf("write tmp file: %w", err)
}
err = os.Rename(tmp, path)
if err != nil {
return fmt.Errorf("rename tmp file: %w", err)
}
return nil
}
//go:build linux || solaris
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"os"
"syscall"
"time"
)
func getATime(fi os.FileInfo) time.Time {
if st, ok := fi.Sys().(*syscall.Stat_t); ok {
return time.Unix(st.Atim.Unix())
}
return fi.ModTime()
}
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"time"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/opencontainers/go-digest"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/internal/fsverity"
)
// writer represents a write transaction against the blob store.
type writer struct {
s *store
fp *os.File // opened data file
path string // path to writer dir
ref string // ref key
offset int64
total int64
digester digest.Digester
startedAt time.Time
updatedAt time.Time
}
func (w *writer) Status() (content.Status, error) {
return content.Status{
Ref: w.ref,
Offset: w.offset,
Total: w.total,
StartedAt: w.startedAt,
UpdatedAt: w.updatedAt,
}, nil
}
// Digest returns the current digest of the content, up to the current write.
//
// Cannot be called concurrently with `Write`.
func (w *writer) Digest() digest.Digest {
return w.digester.Digest()
}
// Write p to the transaction.
//
// Note that writes are unbuffered to the backing file. When writing, it is
// recommended to wrap in a bufio.Writer or, preferably, use io.CopyBuffer.
func (w *writer) Write(p []byte) (n int, err error) {
n, err = w.fp.Write(p)
w.digester.Hash().Write(p[:n])
w.offset += int64(len(p))
w.updatedAt = time.Now()
return n, err
}
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
// Ensure even on error the writer is fully closed
defer w.s.unlock(w.ref)
var base content.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
}
}
fp := w.fp
w.fp = nil
if fp == nil {
return fmt.Errorf("cannot commit on closed writer: %w", errdefs.ErrFailedPrecondition)
}
if err := fp.Sync(); err != nil {
fp.Close()
return fmt.Errorf("sync failed: %w", err)
}
fi, err := fp.Stat()
closeErr := fp.Close()
if err != nil {
return fmt.Errorf("stat on ingest file failed: %w", err)
}
if closeErr != nil {
return fmt.Errorf("failed to close ingest file: %w", closeErr)
}
if size > 0 && size != fi.Size() {
return fmt.Errorf("unexpected commit size %d, expected %d: %w", fi.Size(), size, errdefs.ErrFailedPrecondition)
}
dgst := w.digester.Digest()
if expected != "" && expected != dgst {
return fmt.Errorf("unexpected commit digest %s, expected %s: %w", dgst, expected, errdefs.ErrFailedPrecondition)
}
var (
ingest = filepath.Join(w.path, "data")
target, _ = w.s.blobPath(dgst) // ignore error because we calculated this dgst
)
// make sure parent directories of blob exist
if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil {
return err
}
if _, err := os.Stat(target); err == nil {
// collision with the target file!
if err := os.RemoveAll(w.path); err != nil {
log.G(ctx).WithField("ref", w.ref).WithField("path", w.path).Error("failed to remove ingest directory")
}
return fmt.Errorf("content %v: %w", dgst, errdefs.ErrAlreadyExists)
}
if err := os.Rename(ingest, target); err != nil {
return err
}
// Enable content blob integrity verification if supported
if w.s.integritySupported {
if err := fsverity.Enable(target); err != nil {
log.G(ctx).Warnf("failed to enable integrity for blob %v: %s", target, err.Error())
}
}
// Ingest has now been made available in the content store, attempt to complete
// setting metadata but errors should only be logged and not returned since
// the content store cannot be cleanly rolled back.
commitTime := time.Now()
if err := os.Chtimes(target, commitTime, commitTime); err != nil {
log.G(ctx).WithField("digest", dgst).Error("failed to change file time to commit time")
}
// clean up!!
if err := os.RemoveAll(w.path); err != nil {
log.G(ctx).WithField("ref", w.ref).WithField("path", w.path).Error("failed to remove ingest directory")
}
if w.s.ls != nil && base.Labels != nil {
if err := w.s.ls.Set(dgst, base.Labels); err != nil {
log.G(ctx).WithField("digest", dgst).Error("failed to set labels")
}
}
// change to readonly, more important for read, but provides _some_
// protection from this point on. We use the existing perms with a mask
// only allowing reads honoring the umask on creation.
//
// This removes write and exec, only allowing read per the creation umask.
//
// NOTE: Windows does not support this operation
if runtime.GOOS != "windows" {
if err := os.Chmod(target, (fi.Mode()&os.ModePerm)&^0333); err != nil {
log.G(ctx).WithField("ref", w.ref).Error("failed to make readonly")
}
}
return nil
}
// Close the writer, flushing any unwritten data and leaving the progress in
// tact.
//
// If one needs to resume the transaction, a new writer can be obtained from
// `Ingester.Writer` using the same key. The write can then be continued
// from it was left off.
//
// To abandon a transaction completely, first call close then `IngestManager.Abort` to
// clean up the associated resources.
func (w *writer) Close() (err error) {
if w.fp != nil {
w.fp.Sync()
err = w.fp.Close()
writeTimestampFile(filepath.Join(w.path, "updatedat"), w.updatedAt)
w.fp = nil
w.s.unlock(w.ref)
return
}
return nil
}
func (w *writer) Truncate(size int64) error {
if size != 0 {
return errors.New("Truncate: unsupported size")
}
w.offset = 0
w.digester.Hash().Reset()
if _, err := w.fp.Seek(0, io.SeekStart); err != nil {
return err
}
return w.fp.Truncate(0)
}
func (w *writer) Sync() error {
if w.fp != nil {
return w.fp.Sync()
}
return nil
}