// SPDX-License-Identifier:Apache-2.0
package community
import (
"errors"
"fmt"
"strconv"
"strings"
)
var (
ErrInvalidCommunityValue = errors.New("invalid community value")
ErrInvalidCommunityFormat = errors.New("invalid community format")
)
// largeBGPCommunityMarker is the prefix that shall be used to indicate that a given community value is of type large
// community. If and when extended BGP communities will be supported, the largeBGPCommunityMarker allows us to
// distinguish between extended and large communities.
const largeBGPCommunityMarker = "large"
// BGPCommunity represents a BGP community.
type BGPCommunity interface {
LessThan(BGPCommunity) bool
String() string
}
// New parses the provided string and returns a newly created BGPCommunity object.
// Strings are parsed according to Juniper style syntax (https://www.juniper.net/documentation/us/en/software/\
// junos/routing-policy/bgp/topics/concept/policy-bgp-communities-extended-communities-match-conditions-overview.html
// Legacy communities are of format "<AS number>:<community value>".
// Extended communities (support of which is yet to be implemented) are of format "<type>:<administrator>:<assigned-number>".
// Large communities are of format large:<global administrator>:<localdata part 1>:<localdata part 2>.
func New(c string) (BGPCommunity, error) {
var bgpCommunity BGPCommunity
fs := strings.Split(c, ":")
switch l := len(fs); l {
case 2:
var fields [2]uint16
for i := 0; i < 2; i++ {
b, err := strconv.ParseUint(fs[i], 10, 16)
if err != nil {
return bgpCommunity, fmt.Errorf("%w: invalid section %q of community %q, err: %q",
ErrInvalidCommunityValue, fs[i], c, err)
}
fields[i] = uint16(b)
}
return BGPCommunityLegacy{
upperVal: fields[0],
lowerVal: fields[1],
}, nil
case 4:
if fs[0] != largeBGPCommunityMarker {
return bgpCommunity, fmt.Errorf("%w: invalid marker for large community, expected community to be of "+
"format %s:<uint32>:<uint32>:<uint32> but got %q instead",
ErrInvalidCommunityValue, largeBGPCommunityMarker, c)
}
var fields [3]uint32
for i := 1; i < 4; i++ {
b, err := strconv.ParseUint(fs[i], 10, 32)
if err != nil {
return bgpCommunity, fmt.Errorf("%w: invalid section %q of community %q, err: %q",
ErrInvalidCommunityValue, fs[i], c, err)
}
fields[i-1] = uint32(b)
}
return BGPCommunityLarge{
globalAdministrator: fields[0],
localDataPart1: fields[1],
localDataPart2: fields[2],
}, nil
}
return bgpCommunity, fmt.Errorf("%w: %s", ErrInvalidCommunityFormat, c)
}
// BGPCommunityLegacy holds the internal representation of a BGP legacy community.
type BGPCommunityLegacy struct {
upperVal uint16
lowerVal uint16
}
// LessThan makes 2 different BGPCommunity objects comparable. For the sake of comparison, legacy communities are
// considered to be large communities in format <legacy community>:0:0 and thus can be compared with large communities.
func (b BGPCommunityLegacy) LessThan(c BGPCommunity) bool {
return lessThan(b, c)
}
// String returns the string representation of this community. Legacy communities will be printed in new-format,
// "<AS number>:<community value>".
func (b BGPCommunityLegacy) String() string {
return fmt.Sprintf("%d:%d", b.upperVal, b.lowerVal)
}
// ToUint32 returns the uint32 representation of this legacy community.
func (b BGPCommunityLegacy) ToUint32() uint32 {
return (uint32(b.upperVal) << 16) + uint32(b.lowerVal)
}
// BGPCommunity holds the internal representation of a large BGP community.
type BGPCommunityLarge struct {
globalAdministrator uint32
localDataPart1 uint32
localDataPart2 uint32
}
// LessThan makes 2 different BGPCommunity objects comparable. For the sake of comparison, legacy communities are
// considered to be large communities in format <legacy community>:0:0 and thus can be compared with large communities.
func (b BGPCommunityLarge) LessThan(c BGPCommunity) bool {
return lessThan(b, c)
}
// String returns the string representation of this community. Large communities will be printed as
// ("<global administrator>:<localdata part 1>:<localdata part 2>").
func (b BGPCommunityLarge) String() string {
return fmt.Sprintf("%d:%d:%d", b.globalAdministrator, b.localDataPart1, b.localDataPart2)
}
// IsLegacy returns true if this is a Legacy community.
func IsLegacy(c BGPCommunity) bool {
_, ok := c.(BGPCommunityLegacy)
return ok
}
// IsLarge returns true if this is a Legacy community.
func IsLarge(c BGPCommunity) bool {
_, ok := c.(BGPCommunityLarge)
return ok
}
// lessThan is a helper function that compares two communities regardless of their type.
func lessThan(b BGPCommunity, c BGPCommunity) bool {
var bl BGPCommunityLarge
var cl BGPCommunityLarge
switch v := b.(type) {
case BGPCommunityLegacy:
bl = BGPCommunityLarge{
globalAdministrator: v.ToUint32(),
localDataPart1: 0,
localDataPart2: 0,
}
case BGPCommunityLarge:
bl = v
}
switch v := c.(type) {
case BGPCommunityLegacy:
cl = BGPCommunityLarge{
globalAdministrator: v.ToUint32(),
localDataPart1: 0,
localDataPart2: 0,
}
case BGPCommunityLarge:
cl = v
}
return bl.globalAdministrator < cl.globalAdministrator ||
(bl.globalAdministrator == cl.globalAdministrator && (bl.localDataPart1 < cl.localDataPart1 ||
(bl.localDataPart1 == cl.localDataPart1 && bl.localDataPart2 < cl.localDataPart2)))
}
// SPDX-License-Identifier:Apache-2.0
package native
import "time"
const (
backoffMax = 2 * time.Minute
backoffFactor = 2
)
// backoff implements multiplicative backoff for retrying failing
// operations.
type backoff struct {
nextDelay time.Duration
}
// Duration returns how long to wait before the next retry.
func (b *backoff) Duration() time.Duration {
ret := b.nextDelay
if b.nextDelay == 0 {
b.nextDelay = time.Second
} else {
b.nextDelay *= backoffFactor
if b.nextDelay > backoffMax {
b.nextDelay = backoffMax
}
}
return ret
}
// Reset removes any existing backoff, so the next Duration() will
// return 0.
func (b *backoff) Reset() {
b.nextDelay = 0
}
// SPDX-License-Identifier:Apache-2.0
package native
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"net"
"time"
"go.universe.tf/metallb/internal/bgp"
"go.universe.tf/metallb/internal/bgp/community"
"go.universe.tf/metallb/internal/safeconvert"
)
func sendOpen(w io.Writer, asn uint32, routerID net.IP, holdTime time.Duration) error {
if routerID.To4() == nil {
panic("non-ipv4 address used as RouterID")
}
msg := struct {
// Header
Marker1, Marker2 uint64
Len uint16
Type uint8
// OPEN
Version uint8
ASN16 uint16
HoldTime uint16
RouterID [4]byte
// Options (we only send one, capabilities)
OptsLen uint8
OptType uint8
OptLen uint8
// Capabilities: multiprotocol extension for IPv4+IPv6
// unicast, and 4-byte ASNs
MP4Type uint8
MP4Len uint8
AFI4 uint16
SAFI4 uint16
MP6Type uint8
MP6Len uint8
AFI6 uint16
SAFI6 uint16
CapType uint8
CapLen uint8
ASN32 uint32
}{
Marker1: 0xffffffffffffffff,
Marker2: 0xffffffffffffffff,
Len: 0, // Filled below
Type: 1, // OPEN
Version: 4,
ASN16: uint16(asn), // Possibly tweaked below
HoldTime: uint16(holdTime.Seconds()),
// RouterID filled below
OptsLen: 20,
OptType: 2, // Capabilities
OptLen: 18,
MP4Type: 1, // BGP Multi-protocol Extensions
MP4Len: 4,
AFI4: 1, // IPv4
SAFI4: 1, // Unicast
MP6Type: 1, // BGP Multi-protocol Extensions
MP6Len: 4,
AFI6: 2, // IPv6
SAFI6: 1, // Unicast
CapType: 65, // 4-byte ASN
CapLen: 4,
ASN32: asn,
}
var err error
msg.Len, err = safeconvert.IntToUInt16(binary.Size(msg))
if err != nil {
return fmt.Errorf("invalid message len %w", err)
}
if asn > 65535 {
msg.ASN16 = 23456
}
copy(msg.RouterID[:], routerID.To4())
return binary.Write(w, binary.BigEndian, msg)
}
type openResult struct {
asn uint32
holdTime time.Duration
mp4 bool
mp6 bool
// Four-byte ASN supported
fbasn bool
}
var notificationCodes = map[uint16]string{
0x0100: "Message header error (unspecific)",
0x0101: "Connection not synchronized",
0x0102: "Bad message length",
0x0103: "Bad message type",
0x0200: "OPEN message error (unspecific)",
0x0201: "Unsupported version number",
0x0202: "Bad peer AS",
0x0203: "Bad BGP identifier",
0x0204: "Unsupported optional parameter",
0x0206: "Unacceptable hold time",
0x0207: "Unsupported capability",
0x0300: "UPDATE message error (unspecific)",
0x0301: "Malformed Attribute List",
0x0302: "Unrecognized Well-known Attribute",
0x0303: "Missing Well-known Attribute",
0x0304: "Attribute Flags Error",
0x0305: "Attribute Length Error",
0x0306: "Invalid ORIGIN Attribute",
0x0308: "Invalid NEXT_HOP Attribute",
0x0309: "Optional Attribute Error",
0x030a: "Invalid Network Field",
0x030b: "Malformed AS_PATH",
0x0400: "Hold Timer Expired (unspecific)",
0x0500: "BGP FSM state error (unspecific)",
0x0501: "Receive Unexpected Message in OpenSent State",
0x0502: "Receive Unexpected Message in OpenConfirm State",
0x0503: "Receive Unexpected Message in Established State",
0x0601: "Maximum Number of Prefixes Reached",
0x0602: "Administrative Shutdown",
0x0603: "Peer De-configured",
0x0604: "Administrative Reset",
0x0605: "Connection Rejected",
0x0606: "Other Configuration Change",
0x0607: "Connection Collision Resolution",
0x0608: "Out of Resources",
}
// readNotification reads the body of a notification message (header
// has already been consumed). It must always return an error, because
// receiving a notification is an error.
func readNotification(r io.Reader) error {
var code uint16
if err := binary.Read(r, binary.BigEndian, &code); err != nil {
return err
}
v, ok := notificationCodes[code]
if !ok {
v = "unknown code"
}
return fmt.Errorf("got BGP notification code 0x%04x (%s)", code, v)
}
func readOpen(r io.Reader) (*openResult, error) {
hdr := struct {
// Header
Marker1, Marker2 uint64
Len uint16
Type uint8
}{}
if err := binary.Read(r, binary.BigEndian, &hdr); err != nil {
return nil, err
}
if hdr.Marker1 != 0xffffffffffffffff || hdr.Marker2 != 0xffffffffffffffff {
return nil, fmt.Errorf("synchronization error, incorrect header marker")
}
if hdr.Type == 3 {
return nil, readNotification(r)
}
if hdr.Type != 1 {
return nil, fmt.Errorf("message type is not OPEN, got %d, want 1", hdr.Type)
}
if hdr.Len < 37 {
return nil, fmt.Errorf("message length %d too small to be OPEN", hdr.Len)
}
lr := &io.LimitedReader{
R: r,
N: int64(hdr.Len) - 19,
}
open := struct {
Version uint8
ASN16 uint16
HoldTime uint16
RouterID uint32
OptsLen uint8
}{}
if err := binary.Read(lr, binary.BigEndian, &open); err != nil {
return nil, err
}
fmt.Printf("%#v\n", open)
if open.Version != 4 {
return nil, fmt.Errorf("wrong BGP version")
}
if open.HoldTime != 0 && open.HoldTime < 3 {
return nil, fmt.Errorf("invalid hold time %q, must be 0 or >=3s", open.HoldTime)
}
ret := &openResult{
asn: uint32(open.ASN16),
holdTime: time.Duration(open.HoldTime) * time.Second,
}
if err := readOptions(lr, ret); err != nil {
return nil, err
}
return ret, nil
}
func readOptions(r io.Reader, ret *openResult) error {
for {
hdr := struct {
Type uint8
Len uint8
}{}
if err := binary.Read(r, binary.BigEndian, &hdr); err != nil {
if err == io.EOF {
return nil
}
return err
}
if hdr.Type != 2 {
return fmt.Errorf("unknown BGP option type %d", hdr.Type)
}
lr := &io.LimitedReader{
R: r,
N: int64(hdr.Len),
}
if err := readCapabilities(lr, ret); err != nil {
return err
}
if lr.N != 0 {
return fmt.Errorf("%d trailing garbage bytes after capability option", lr.N)
}
}
}
func readCapabilities(r io.Reader, ret *openResult) error {
for {
cap := struct {
Code uint8
Len uint8
}{}
if err := binary.Read(r, binary.BigEndian, &cap); err != nil {
if err == io.EOF {
return nil
}
return err
}
lr := io.LimitedReader{
R: r,
N: int64(cap.Len),
}
switch cap.Code {
case 65:
if err := binary.Read(&lr, binary.BigEndian, &ret.asn); err != nil {
return err
}
ret.fbasn = true
case 1:
af := struct{ AFI, SAFI uint16 }{}
if err := binary.Read(&lr, binary.BigEndian, &af); err != nil {
return err
}
switch {
case af.AFI == 1 && af.SAFI == 1:
ret.mp4 = true
case af.AFI == 2 && af.SAFI == 1:
ret.mp6 = true
}
default:
// TODO: only ignore capabilities that we know are fine to
// ignore.
if _, err := io.Copy(io.Discard, &lr); err != nil {
return err
}
}
if lr.N != 0 {
return fmt.Errorf("%d leftover bytes after decoding capability %d", lr.N, cap.Code)
}
}
}
func sendUpdate(w io.Writer, asn uint32, ibgp, fbasn bool, nextHop net.IP, adv *bgp.Advertisement) error {
var b bytes.Buffer
hdr := struct {
M1, M2 uint64
Len uint16
Type uint8
WdrLen uint16
AttrLen uint16
}{
M1: uint64(0xffffffffffffffff),
M2: uint64(0xffffffffffffffff),
Type: 2,
}
if err := binary.Write(&b, binary.BigEndian, hdr); err != nil {
return err
}
l := b.Len()
if err := encodePathAttrs(&b, asn, ibgp, fbasn, nextHop, adv); err != nil {
return err
}
toWrite, err := safeconvert.IntToUInt16(b.Len() - l)
if err != nil {
return err
}
binary.BigEndian.PutUint16(b.Bytes()[21:23], toWrite)
encodePrefixes(&b, []*net.IPNet{adv.Prefix})
toWrite, err = safeconvert.IntToUInt16(b.Len())
if err != nil {
return err
}
binary.BigEndian.PutUint16(b.Bytes()[16:18], toWrite)
if _, err := io.Copy(w, &b); err != nil {
return err
}
return nil
}
func encodePrefixes(b *bytes.Buffer, pfxs []*net.IPNet) {
for _, pfx := range pfxs {
o, _ := pfx.Mask.Size()
b.WriteByte(byte(o))
b.Write(pfx.IP.To4()[:bytesForBits(o)])
}
}
func bytesForBits(n int) int {
// Evil bit hack that rounds n up to the next multiple of 8, then
// divides by 8. This returns the minimum number of whole bytes
// required to contain n bits.
return ((n + 7) &^ 7) / 8
}
func encodePathAttrs(b *bytes.Buffer, asn uint32, ibgp, fbasn bool, nextHop net.IP, adv *bgp.Advertisement) error {
b.Write([]byte{
0x40, 1, // mandatory, origin
1, // len
0, // igb
0x40, 2, // mandatory, as-path
})
if ibgp {
b.WriteByte(0) // empty AS path
} else {
if fbasn {
b.Write([]byte{
6, // len (1x 4-byte ASN)
2, // AS_SEQUENCE
1, // len (in number of ASes)
})
if err := binary.Write(b, binary.BigEndian, asn); err != nil {
return err
}
} else {
b.Write([]byte{
4, // len (1x 2-byte ASN)
2, // AS_SEQUENCE
1, // len (in number of ASes)
})
asnToWrite, err := safeconvert.Uint32ToInt16(asn)
if err != nil {
return fmt.Errorf("invalid asn: %w", err)
}
if err := binary.Write(b, binary.BigEndian, asnToWrite); err != nil {
return err
}
}
}
b.Write([]byte{
0x40, 3, // mandatory, next-hop
4, // len
})
b.Write(nextHop)
if ibgp {
b.Write([]byte{
0x40, 5, // well-known, localpref
4, // len
})
if err := binary.Write(b, binary.BigEndian, adv.LocalPref); err != nil {
return err
}
}
if len(adv.Communities) > 0 {
// Convert communities to legacy communities uint32 representation. Throw an error if any non legacy type
// communities were found.
var legacyCommunities []uint32
for _, c := range adv.Communities {
legacyCommunity, ok := c.(community.BGPCommunityLegacy)
if !ok {
return fmt.Errorf("invalid community type for BGP native mode, community %s is not a legacy BGP "+
"Community", c)
}
legacyCommunities = append(legacyCommunities, legacyCommunity.ToUint32())
}
b.Write([]byte{
0xc0, 8, // optional transitive, communities
})
toWrite, err := safeconvert.IntToUInt8(len(legacyCommunities) * 4)
if err != nil {
return fmt.Errorf("invalid size of legacy communities: %w", err)
}
if err := binary.Write(b, binary.BigEndian, toWrite); err != nil {
return err
}
for _, c := range legacyCommunities {
if err := binary.Write(b, binary.BigEndian, c); err != nil {
return err
}
}
}
return nil
}
func sendWithdraw(w io.Writer, prefixes []*net.IPNet) error {
var b bytes.Buffer
hdr := struct {
M1, M2 uint64
Len uint16
Type uint8
WdrLen uint16
}{
M1: uint64(0xffffffffffffffff),
M2: uint64(0xffffffffffffffff),
Type: 2,
}
if err := binary.Write(&b, binary.BigEndian, hdr); err != nil {
return err
}
l := b.Len()
encodePrefixes(&b, prefixes)
toWrite, err := safeconvert.IntToUInt16(b.Len() - l)
if err != nil {
return fmt.Errorf("invalid buffer %w", err)
}
binary.BigEndian.PutUint16(b.Bytes()[19:21], toWrite)
if err := binary.Write(&b, binary.BigEndian, uint16(0)); err != nil {
return err
}
toWrite, err = safeconvert.IntToUInt16(b.Len())
if err != nil {
return fmt.Errorf("invalid buffer %w", err)
}
binary.BigEndian.PutUint16(b.Bytes()[16:18], toWrite)
if _, err := io.Copy(w, &b); err != nil {
return err
}
return nil
}
func sendKeepalive(w io.Writer) error {
msg := struct {
Marker1, Marker2 uint64
Len uint16
Type uint8
}{
Marker1: 0xffffffffffffffff,
Marker2: 0xffffffffffffffff,
Len: 19,
Type: 4,
}
return binary.Write(w, binary.BigEndian, msg)
}
// SPDX-License-Identifier:Apache-2.0
package native
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
"net"
"os"
"sync"
"syscall"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"go.universe.tf/metallb/internal/bgp"
"go.universe.tf/metallb/internal/config"
"go.universe.tf/metallb/internal/safeconvert"
"golang.org/x/sys/unix"
)
var errClosed = errors.New("session closed")
// session represents one BGP session to an external router.
type session struct {
bgp.SessionParameters
peerFBASNSupport bool
logger log.Logger
newHoldTime chan bool
backoff backoff
mu sync.Mutex
cond *sync.Cond
closed bool
conn net.Conn
actualHoldTime time.Duration
nextHop net.IP
advertised map[string]*bgp.Advertisement
new map[string]*bgp.Advertisement
// peerName identifies this BGP session to be used for metrics
peerName string
}
// The 'Native' implementation does not require a session manager .
type sessionManager struct {
}
func NewSessionManager(l log.Logger) bgp.SessionManager {
return &sessionManager{}
}
// NewSession() creates a BGP session using the given session parameters.
//
// The session will immediately try to connect and synchronize its
// local state with the peer.
func (sm *sessionManager) NewSession(l log.Logger, args bgp.SessionParameters) (bgp.Session, error) {
sessionsParams := args
// native mode does not support empty holdtime,
// we explicitly set it to 90s in this case.
if args.HoldTime == nil {
ht := 90 * time.Second
sessionsParams.HoldTime = &ht
}
ret := &session{
SessionParameters: sessionsParams,
logger: log.With(l, "peer", args.PeerAddress, "localASN", args.MyASN, "peerASN", args.PeerASN),
newHoldTime: make(chan bool, 1),
advertised: map[string]*bgp.Advertisement{},
peerName: fmt.Sprintf("%s:%d", args.PeerAddress, args.PeerPort),
}
ret.cond = sync.NewCond(&ret.mu)
go ret.sendKeepalives()
go ret.run()
stats.sessionUp.WithLabelValues(ret.PeerAddress).Set(0)
stats.prefixes.WithLabelValues(ret.PeerAddress).Set(0)
return ret, nil
}
func (sm *sessionManager) SyncBFDProfiles(profiles map[string]*config.BFDProfile) error {
if len(profiles) > 0 {
return errors.New("bfd profiles not supported in native mode")
}
return nil
}
func (sm *sessionManager) SyncExtraInfo(extras string) error {
if extras != "" {
return errors.New("bgp extra info not supported in native mode")
}
return nil
}
func (sm *sessionManager) SetEventCallback(func(interface{})) {}
// run tries to stay connected to the peer, and pumps route updates to it.
func (s *session) run() {
defer stats.DeleteSession(s.peerName)
for {
if err := s.connect(); err != nil {
if err == errClosed {
return
}
level.Error(s.logger).Log("op", "connect", "error", err, "msg", "failed to connect to peer")
backoff := s.backoff.Duration()
time.Sleep(backoff)
continue
}
stats.SessionUp(s.peerName)
s.backoff.Reset()
level.Info(s.logger).Log("event", "sessionUp", "msg", "BGP session established")
if !s.sendUpdates() {
return
}
stats.SessionDown(s.peerName)
level.Warn(s.logger).Log("event", "sessionDown", "msg", "BGP session down")
}
}
// sendUpdates waits for changes to desired advertisements, and pushes
// them out to the peer.
func (s *session) sendUpdates() bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return false
}
if s.conn == nil {
return true
}
ibgp := s.MyASN == s.PeerASN
fbasn := s.peerFBASNSupport
if s.new != nil {
s.advertised, s.new = s.new, nil
}
for c, adv := range s.advertised {
if err := sendUpdate(s.conn, s.MyASN, ibgp, fbasn, s.nextHop, adv); err != nil {
s.abort()
level.Error(s.logger).Log("op", "sendUpdate", "ip", c, "error", err, "msg", "failed to send BGP update")
return true
}
stats.UpdateSent(s.peerName)
}
stats.AdvertisedPrefixes(s.peerName, len(s.advertised))
for {
for s.new == nil && s.conn != nil {
s.cond.Wait()
}
if s.closed {
return false
}
if s.conn == nil {
return true
}
if s.new == nil {
// nil is "no pending updates", contrast to a non-nil
// empty map which means "withdraw all".
continue
}
for c, adv := range s.new {
if adv2, ok := s.advertised[c]; ok && adv.Equal(adv2) {
// Peer already has correct state for this
// advertisement, nothing to do.
continue
}
if err := sendUpdate(s.conn, s.MyASN, ibgp, fbasn, s.nextHop, adv); err != nil {
s.abort()
level.Error(s.logger).Log("op", "sendUpdate", "prefix", c, "error", err, "msg", "failed to send BGP update")
return true
}
stats.UpdateSent(s.peerName)
}
wdr := []*net.IPNet{}
for c, adv := range s.advertised {
if s.new[c] == nil {
wdr = append(wdr, adv.Prefix)
}
}
if len(wdr) > 0 {
if err := sendWithdraw(s.conn, wdr); err != nil {
s.abort()
for _, pfx := range wdr {
level.Error(s.logger).Log("op", "sendWithdraw", "prefix", pfx, "error", err, "msg", "failed to send BGP withdraw")
}
return true
}
stats.UpdateSent(s.peerName)
}
s.advertised, s.new = s.new, nil
stats.AdvertisedPrefixes(s.peerName, len(s.advertised))
}
}
// connect establishes the BGP session with the peer.
// Sets TCP_MD5 sockopt if password is !="".
func (s *session) connect() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return errClosed
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
deadline, _ := ctx.Deadline()
conn, err := dialMD5(ctx, fmt.Sprintf("%s:%d", s.PeerAddress, s.PeerPort), s.SourceAddress, s.Password)
if err != nil {
return fmt.Errorf("dial %q: %s", s.PeerAddress, err)
}
if err = conn.SetDeadline(deadline); err != nil {
conn.Close()
return fmt.Errorf("setting deadline on conn to %q: %s", s.PeerAddress, err)
}
addr, ok := conn.LocalAddr().(*net.TCPAddr)
if !ok {
conn.Close()
return fmt.Errorf("getting local addr for default nexthop to %q: %s", s.PeerAddress, err)
}
s.nextHop = addr.IP
routerID := s.RouterID
if routerID == nil {
routerID, err = getRouterID(s.nextHop, s.CurrentNode)
if err != nil {
return err
}
}
if err = sendOpen(conn, s.MyASN, routerID, *s.HoldTime); err != nil {
conn.Close()
return fmt.Errorf("send OPEN to %q: %s", s.PeerAddress, err)
}
op, err := readOpen(conn)
if err != nil {
conn.Close()
return fmt.Errorf("read OPEN from %q: %s", s.PeerAddress, err)
}
if op.asn != s.PeerASN {
conn.Close()
return fmt.Errorf("unexpected peer ASN %d, want %d", op.asn, s.PeerASN)
}
s.peerFBASNSupport = op.fbasn
if s.MyASN > 65536 && !s.peerFBASNSupport {
conn.Close()
return fmt.Errorf("peer does not support 4-byte ASNs")
}
// BGP session is established, clear the connect timeout deadline.
if err := conn.SetDeadline(time.Time{}); err != nil {
conn.Close()
return fmt.Errorf("clearing deadline on conn to %q: %s", s.PeerAddress, err)
}
// Consume BGP messages until the connection closes.
go s.consumeBGP(conn)
// Send one keepalive to say that yes, we accept the OPEN.
if err := sendKeepalive(conn); err != nil {
conn.Close()
return fmt.Errorf("accepting peer OPEN from %q: %s", s.PeerAddress, err)
}
// Set up regular keepalives from now on.
s.actualHoldTime = *s.HoldTime
if op.holdTime < s.actualHoldTime {
s.actualHoldTime = op.holdTime
}
select {
case s.newHoldTime <- true:
default:
}
s.conn = conn
return nil
}
func hashRouterID(hostname string) (net.IP, error) {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, crc32.ChecksumIEEE([]byte(hostname)))
if err != nil {
return nil, err
}
return net.IP(buf.Bytes()), nil
}
// Ipv4; Use the address as-is.
// Ipv6; Pick the first ipv4 address on the same interface as the address.
func getRouterID(addr net.IP, myNode string) (net.IP, error) {
if addr.To4() != nil {
return addr, nil
}
ifaces, err := net.Interfaces()
if err != nil {
return hashRouterID(myNode)
}
for _, i := range ifaces {
addrs, err := i.Addrs()
if err != nil {
continue
}
for _, a := range addrs {
var ip net.IP
switch v := a.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.Equal(addr) {
// This is the interface.
// Loop through the addresses again and search for ipv4
for _, a := range addrs {
var ip net.IP
switch v := a.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.To4() != nil {
return ip, nil
}
}
return hashRouterID(myNode)
}
}
}
return hashRouterID(myNode)
}
// sendKeepalives sends BGP KEEPALIVE packets at the negotiated rate
// whenever the session is connected.
func (s *session) sendKeepalives() {
var (
t *time.Ticker
ch <-chan time.Time
)
for {
select {
case <-s.newHoldTime:
s.mu.Lock()
ht := s.actualHoldTime
s.mu.Unlock()
if t != nil {
t.Stop()
t = nil
ch = nil
}
if ht != 0 {
t = time.NewTicker(ht / 3)
ch = t.C
}
case <-ch:
if err := s.sendKeepalive(); err == errClosed {
// Session has been closed by package caller, we're
// done here.
return
}
}
}
}
// sendKeepalive sends a single BGP KEEPALIVE packet.
func (s *session) sendKeepalive() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return errClosed
}
if s.conn == nil {
// No connection established, othing to do.
return nil
}
if err := sendKeepalive(s.conn); err != nil {
s.abort()
level.Error(s.logger).Log("op", "sendKeepalive", "error", err, "msg", "failed to send keepalive")
return fmt.Errorf("sending keepalive to %q: %s", s.PeerAddress, err)
}
return nil
}
// consumeBGP receives BGP messages from the peer, and ignores
// them. It does minimal checks for the well-formedness of messages,
// and terminates the connection if something looks wrong.
func (s *session) consumeBGP(conn io.ReadCloser) {
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
if s.conn == conn {
s.abort()
} else {
conn.Close()
}
}()
for {
hdr := struct {
Marker1, Marker2 uint64
Len uint16
Type uint8
}{}
if err := binary.Read(conn, binary.BigEndian, &hdr); err != nil {
// TODO: log, or propagate the error somehow.
return
}
if hdr.Marker1 != 0xffffffffffffffff || hdr.Marker2 != 0xffffffffffffffff {
// TODO: propagate
return
}
if hdr.Type == 3 {
// TODO: propagate better than just logging directly.
err := readNotification(conn)
level.Error(s.logger).Log("event", "peerNotification", "error", err, "msg", "peer sent notification, closing session")
return
}
if _, err := io.Copy(io.Discard, io.LimitReader(conn, int64(hdr.Len)-19)); err != nil {
// TODO: propagate
return
}
}
}
func validate(adv *bgp.Advertisement) error {
if adv.Prefix.IP.To4() == nil {
return fmt.Errorf("cannot advertise non-v4 prefix %q", adv.Prefix)
}
if len(adv.Communities) > 63 {
return fmt.Errorf("max supported communities is 63, got %d", len(adv.Communities))
}
return nil
}
// Set updates the set of Advertisements that this session's peer should receive.
//
// Changes are propagated to the peer asynchronously, Set may return
// before the peer learns about the changes.
func (s *session) Set(advs ...*bgp.Advertisement) error {
s.mu.Lock()
defer s.mu.Unlock()
newAdvs := map[string]*bgp.Advertisement{}
for _, adv := range advs {
err := validate(adv)
if err != nil {
return err
}
newAdvs[adv.Prefix.String()] = adv
}
s.new = newAdvs
stats.PendingPrefixes(s.peerName, len(s.new))
s.cond.Broadcast()
return nil
}
// abort closes any existing connection, updates stats, and cleans up
// state ready for another connection attempt.
func (s *session) abort() {
if s.conn != nil {
s.conn.Close()
s.conn = nil
stats.SessionDown(s.peerName)
}
// Next time we retry the connection, we can just skip straight to
// the desired end state.
if s.new != nil {
s.advertised, s.new = s.new, nil
stats.PendingPrefixes(s.peerName, len(s.advertised))
}
s.cond.Broadcast()
}
// Close shuts down the BGP session.
func (s *session) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
s.abort()
return nil
}
// DialTCP does the part of creating a connection manually, including setting the
// proper TCP MD5 options when the password is not empty. Works by manipulating
// the low level FD's, skipping the net.Conn API as it has not hooks to set
// the necessary sockopts for TCP MD5.
func dialMD5(ctx context.Context, addr string, srcAddr net.IP, password string) (net.Conn, error) {
// If srcAddr exists on any of the local network interfaces, use it as the
// source address of the TCP socket. Otherwise, use the IPv6 unspecified
// address ("::") to let the kernel figure out the source address.
// NOTE: On Linux, "::" also includes "0.0.0.0" (all IPv4 addresses).
a := "[::]"
if srcAddr != nil {
ifs, err := net.Interfaces()
if err != nil {
return nil, fmt.Errorf("querying local interfaces: %w", err)
}
if !localAddressExists(ifs, srcAddr) {
return nil, fmt.Errorf("address %q doesn't exist on this host", srcAddr)
}
a = fmt.Sprintf("[%s]", srcAddr.String())
}
laddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", a))
if err != nil {
return nil, fmt.Errorf("error resolving local address: %s ", err)
}
raddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, fmt.Errorf("invalid remote address: %s ", err)
}
var family int
var ra, la unix.Sockaddr
if raddr.IP.To4() != nil {
family = unix.AF_INET
rsockaddr := &unix.SockaddrInet4{Port: raddr.Port}
copy(rsockaddr.Addr[:], raddr.IP.To4())
ra = rsockaddr
lsockaddr := &unix.SockaddrInet4{}
copy(lsockaddr.Addr[:], laddr.IP.To4())
la = lsockaddr
} else {
family = unix.AF_INET6
rsockaddr := &unix.SockaddrInet6{Port: raddr.Port}
copy(rsockaddr.Addr[:], raddr.IP.To16())
ra = rsockaddr
var zone uint32
if laddr.Zone != "" {
intf, errs := net.InterfaceByName(laddr.Zone)
if errs != nil {
return nil, errs
}
zone, err = safeconvert.IntToUInt32(intf.Index)
if err != nil {
return nil, fmt.Errorf("invalid interface index %d", intf.Index)
}
}
lsockaddr := &unix.SockaddrInet6{ZoneId: zone}
copy(lsockaddr.Addr[:], laddr.IP.To16())
la = lsockaddr
}
sockType := unix.SOCK_STREAM | unix.SOCK_CLOEXEC | unix.SOCK_NONBLOCK
proto := 0
fd, err := unix.Socket(family, sockType, proto)
if err != nil {
return nil, err
}
// A new socket was created so we must close it before this
// function returns either on failure or success. On success,
// net.FileConn() in newTCPConn() increases the refcount of
// the socket so this fi.Close() doesn't destroy the socket.
// The caller must call Close() with the file later.
// Note that the above os.NewFile() doesn't play with the
// refcount.
fi := os.NewFile(uintptr(fd), "")
defer func() {
if tmpErr := fi.Close(); tmpErr != nil {
err = tmpErr
}
}()
if password != "" {
sig, err := buildTCPMD5Sig(raddr.IP, password)
if err != nil {
return nil, err
}
// Better way may be available in Go 1.11, see go-review.googlesource.com/c/go/+/72810
if err = os.NewSyscallError("setsockopt", unix.SetsockoptTCPMD5Sig(fd, unix.IPPROTO_TCP, unix.TCP_MD5SIG, sig)); err != nil {
return nil, err
}
}
if err = unix.Bind(fd, la); err != nil {
return nil, os.NewSyscallError("bind", err)
}
err = unix.Connect(fd, ra)
switch err {
case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR:
case nil:
return net.FileConn(fi)
default:
return nil, os.NewSyscallError("connect", err)
}
// With a non-blocking socket, the connection process is
// asynchronous, so we need to manually wait with epoll until the
// connection succeeds. All of the following is doing that, with
// appropriate use of the deadline in the context.
epfd, err := unix.EpollCreate1(syscall.EPOLL_CLOEXEC)
if err != nil {
return nil, err
}
defer unix.Close(epfd)
var event unix.EpollEvent
events := make([]unix.EpollEvent, 1)
event.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLPRI
event.Fd, err = safeconvert.IntToInt32(fd)
if err != nil {
return nil, fmt.Errorf("invalid fd %w", err)
}
if err = unix.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return nil, err
}
for {
timeout := int(-1)
if deadline, ok := ctx.Deadline(); ok {
timeout = int(time.Until(deadline).Nanoseconds() / 1000000)
if timeout <= 0 {
return nil, fmt.Errorf("timeout")
}
}
nevents, err := unix.EpollWait(epfd, events, timeout)
if err != nil {
return nil, err
}
if nevents == 0 {
return nil, fmt.Errorf("timeout")
}
fdToCheck, err := safeconvert.IntToInt32(fd)
if err != nil {
return nil, fmt.Errorf("invalid fd %w", err)
}
if nevents > 1 || events[0].Fd != fdToCheck {
return nil, fmt.Errorf("unexpected epoll behavior")
}
nerr, err := unix.GetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_ERROR)
if err != nil {
return nil, os.NewSyscallError("getsockopt", err)
}
switch err := syscall.Errno(nerr); err {
case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR:
case syscall.Errno(0), unix.EISCONN:
return net.FileConn(fi)
default:
return nil, os.NewSyscallError("getsockopt", err)
}
}
}
func buildTCPMD5Sig(addr net.IP, key string) (*unix.TCPMD5Sig, error) {
t := unix.TCPMD5Sig{}
if addr.To4() != nil {
t.Addr.Family = unix.AF_INET
copy(t.Addr.Data[2:], addr.To4())
} else {
t.Addr.Family = unix.AF_INET6
copy(t.Addr.Data[6:], addr.To16())
}
var err error
t.Keylen, err = safeconvert.IntToUInt16(len(key))
if err != nil {
return nil, fmt.Errorf("invalid keyLen %w", err)
}
copy(t.Key[0:], []byte(key))
return &t, nil
}
// localAddressExists returns true if the address addr exists on any of the
// network interfaces in the ifs slice.
func localAddressExists(ifs []net.Interface, addr net.IP) bool {
for _, i := range ifs {
addresses, err := i.Addrs()
if err != nil {
continue
}
for _, a := range addresses {
ip, ok := a.(*net.IPNet)
if !ok {
continue
}
if ip.IP.Equal(addr) {
return true
}
}
}
return false
}
// SPDX-License-Identifier:Apache-2.0
package native
import (
"github.com/prometheus/client_golang/prometheus"
bgpmetrics "go.universe.tf/metallb/internal/bgp/metrics"
)
var labels = []string{"peer"}
var stats = metrics{
sessionUp: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: bgpmetrics.Namespace,
Subsystem: bgpmetrics.Subsystem,
Name: bgpmetrics.SessionUp.Name,
Help: bgpmetrics.SessionUp.Help,
}, labels),
updatesSent: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: bgpmetrics.Namespace,
Subsystem: bgpmetrics.Subsystem,
Name: bgpmetrics.UpdatesSent.Name,
Help: bgpmetrics.UpdatesSent.Help,
}, labels),
prefixes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: bgpmetrics.Namespace,
Subsystem: bgpmetrics.Subsystem,
Name: bgpmetrics.Prefixes.Name,
Help: bgpmetrics.Prefixes.Help,
}, labels),
pendingPrefixes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: bgpmetrics.Namespace,
Subsystem: bgpmetrics.Subsystem,
Name: "pending_prefixes_total",
Help: "Number of prefixes that should be advertised on the BGP session",
}, labels),
}
type metrics struct {
sessionUp *prometheus.GaugeVec
updatesSent *prometheus.CounterVec
prefixes *prometheus.GaugeVec
pendingPrefixes *prometheus.GaugeVec
}
func init() {
prometheus.MustRegister(stats.sessionUp)
prometheus.MustRegister(stats.updatesSent)
prometheus.MustRegister(stats.prefixes)
prometheus.MustRegister(stats.pendingPrefixes)
}
func (m *metrics) DeleteSession(addr string) {
m.sessionUp.DeleteLabelValues(addr)
m.prefixes.DeleteLabelValues(addr)
m.pendingPrefixes.DeleteLabelValues(addr)
m.updatesSent.DeleteLabelValues(addr)
}
func (m *metrics) SessionUp(addr string) {
m.sessionUp.WithLabelValues(addr).Set(1)
m.prefixes.WithLabelValues(addr).Set(0)
}
func (m *metrics) SessionDown(addr string) {
m.sessionUp.WithLabelValues(addr).Set(0)
m.prefixes.WithLabelValues(addr).Set(0)
}
func (m *metrics) UpdateSent(addr string) {
m.updatesSent.WithLabelValues(addr).Inc()
}
func (m *metrics) PendingPrefixes(addr string, n int) {
m.pendingPrefixes.WithLabelValues(addr).Set(float64(n))
}
func (m *metrics) AdvertisedPrefixes(addr string, n int) {
m.prefixes.WithLabelValues(addr).Set(float64(n))
m.pendingPrefixes.WithLabelValues(addr).Set(float64(n))
}
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package config // import "go.universe.tf/metallb/internal/config"
import (
"bytes"
"fmt"
"net"
"reflect"
"slices"
"sort"
"strings"
"time"
"errors"
"github.com/mikioh/ipaddr"
metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
metallbv1beta2 "go.universe.tf/metallb/api/v1beta2"
"go.universe.tf/metallb/internal/bgp/community"
"go.universe.tf/metallb/internal/ipfamily"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"
)
type ClusterResources struct {
Pools []metallbv1beta1.IPAddressPool `json:"ipaddresspools"`
Peers []metallbv1beta2.BGPPeer `json:"bgppeers"`
BFDProfiles []metallbv1beta1.BFDProfile `json:"bfdprofiles"`
BGPAdvs []metallbv1beta1.BGPAdvertisement `json:"bgpadvertisements"`
L2Advs []metallbv1beta1.L2Advertisement `json:"l2advertisements"`
Communities []metallbv1beta1.Community `json:"communities"`
PasswordSecrets map[string]corev1.Secret `json:"passwordsecrets"`
Nodes []corev1.Node `json:"nodes"`
Namespaces []corev1.Namespace `json:"namespaces"`
BGPExtras corev1.ConfigMap `json:"bgpextras"`
}
// Config is a parsed MetalLB configuration.
type Config struct {
// Routers that MetalLB should peer with.
Peers map[string]*Peer
// Address pools from which to allocate load balancer IPs.
Pools *Pools
// BFD profiles that can be used by peers.
BFDProfiles map[string]*BFDProfile
// Protocol dependent extra config. Currently used only by FRR
BGPExtras string
}
// Pools contains address pools and its namespace/service specific allocations.
type Pools struct {
// ByName a map containing all configured pools.
ByName map[string]*Pool
// ByNamespace contains pool names pinned to specific namespace.
ByNamespace map[string][]string
// ByServiceSelector contains pool names which has service selection labels.
ByServiceSelector []string
}
// Proto holds the protocol we are speaking.
type Proto string
// MetalLB supported protocols.
const (
BGP Proto = "bgp"
Layer2 Proto = "layer2"
)
const bgpExtrasField = "extras"
var Protocols = []Proto{
BGP, Layer2,
}
// Peer is the configuration of a BGP peering session.
type Peer struct {
// Peer name.
Name string
// AS number to use for the local end of the session.
MyASN uint32
// AS number to expect from the remote end of the session.
ASN uint32
// Detect the AS number to use for the remote end of the session.
DynamicASN string
// Address to dial when establishing the session.
Addr net.IP
// Iface is the Interface to use for Unnumbered BGP peering.
// Addr field must be nil.
Iface string
// Source address to use when establishing the session.
SrcAddr net.IP
// Port to dial when establishing the session.
Port uint16
// Requested BGP hold time, per RFC4271.
HoldTime *time.Duration
// Requested BGP keepalive time, per RFC4271.
KeepaliveTime *time.Duration
// Requested BGP connect time, controls how long BGP waits between connection attempts to a neighbor.
ConnectTime *time.Duration
// BGP router ID to advertise to the peer
RouterID net.IP
// Only connect to this peer on nodes that match one of these
// selectors.
NodeSelectors []labels.Selector
// Authentication password for routers enforcing TCP MD5 authenticated sessions
Password string
// The password set in the secret referenced by the secret reference, in clear text
SecretPassword string
// Optional reference to the secret that holds the password.
PasswordRef corev1.SecretReference
// The optional BFD profile to be used for this BGP session
BFDProfile string
// Optional EnableGracefulRestart enable BGP graceful restart functionality at the peer level.
EnableGracefulRestart bool
// Optional ebgp peer is multi-hops away.
EBGPMultiHop bool
// Optional name of the vrf to establish the session from
VRF string
// Option to advertise v4 addresses over v6 sessions and viceversa.
DualStackAddressFamily bool
// Deprecated: DisableMP is deprecated in favor of dualStackAddressFamily.
DisableMP bool
}
// Pool is the configuration of an IP address pool.
type Pool struct {
// Pool Name
Name string
// The addresses that are part of this pool, expressed as CIDR
// prefixes. config.Parse guarantees that these are
// non-overlapping, both within and between pools.
CIDR []*net.IPNet
// Some buggy consumer devices mistakenly drop IPv4 traffic for IP
// addresses ending in .0 or .255, due to poor implementations of
// smurf protection. This setting marks such addresses as
// unusable, for maximum compatibility with ancient parts of the
// internet.
AvoidBuggyIPs bool
// If false, prevents IP addresses to be automatically assigned
// from this pool.
AutoAssign bool
// The list of BGPAdvertisements associated with this address pool.
BGPAdvertisements []*BGPAdvertisement
// The list of L2Advertisements associated with this address pool.
L2Advertisements []*L2Advertisement
cidrsPerAddresses map[string][]*net.IPNet
ServiceAllocations *ServiceAllocation
}
// ServiceAllocation makes ip pool allocation to specific namespace and/or service.
type ServiceAllocation struct {
// The priority of ip pool for a given service allocation.
Priority int
// Set of namespaces on which ip pool can be attached.
Namespaces sets.Set[string]
// Service selectors to select service for which ip pool can be used
// for ip allocation.
ServiceSelectors []labels.Selector
}
// BGPAdvertisement describes one translation from an IP address to a BGP advertisement.
type BGPAdvertisement struct {
// The name of the advertisement
Name string
// Roll up the IP address into a CIDR prefix of this
// length. Optional, defaults to 32 (i.e. no aggregation) if not
// specified.
AggregationLength int
// Optional, defaults to 128 (i.e. no aggregation) if not
// specified.
AggregationLengthV6 int
// Value of the LOCAL_PREF BGP path attribute. Used only when
// advertising to IBGP peers (i.e. Peer.MyASN == Peer.ASN).
LocalPref uint32
// Value of the COMMUNITIES path attribute.
Communities map[community.BGPCommunity]bool
// The map of nodes allowed for this advertisement
Nodes map[string]bool
// Used to declare the intent of announcing IPs
// only to the BGPPeers in this list.
Peers []string
}
type L2Advertisement struct {
// The map of nodes allowed for this advertisement
Nodes map[string]bool
// The interfaces in Nodes allowed for this advertisement
Interfaces []string
// AllInterfaces tells if all the interfaces are allowed for this advertisement
AllInterfaces bool
}
// BFDProfile describes a BFD profile to be applied to a set of peers.
type BFDProfile struct {
Name string
ReceiveInterval *uint32
TransmitInterval *uint32
DetectMultiplier *uint32
EchoInterval *uint32
EchoMode bool
PassiveMode bool
MinimumTTL *uint32
}
func (p *Pools) IsEmpty(pool string) bool {
return p.ByName[pool] == nil
}
// Parse loads and validates a Config from bs.
func For(resources ClusterResources, validate Validate) (*Config, error) {
err := validate(resources)
if err != nil {
return nil, err
}
cfg := &Config{}
cfg.BFDProfiles, err = bfdProfilesFor(resources)
if err != nil {
return nil, err
}
cfg.Peers, err = peersFor(resources, cfg.BFDProfiles)
if err != nil {
return nil, err
}
cfg.Pools, err = poolsFor(resources)
if err != nil {
return nil, err
}
cfg.BGPExtras = bgpExtrasFor(resources)
if err != nil {
return nil, err
}
err = validateConfig(cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
func bfdProfilesFor(resources ClusterResources) (map[string]*BFDProfile, error) {
res := make(map[string]*BFDProfile)
for i, bfd := range resources.BFDProfiles {
parsed, err := bfdProfileFromCR(bfd)
if err != nil {
return nil, fmt.Errorf("parsing bfd profile #%d: %s", i+1, err)
}
if _, ok := res[parsed.Name]; ok {
return nil, fmt.Errorf("found duplicate bfd profile name %s", parsed.Name)
}
res[bfd.Name] = parsed
}
return res, nil
}
func peersFor(resources ClusterResources, BFDProfiles map[string]*BFDProfile) (map[string]*Peer, error) {
var res = make(map[string]*Peer)
for _, p := range resources.Peers {
peer, err := peerFromCR(p, resources.PasswordSecrets)
if err != nil {
return nil, fmt.Errorf("parsing peer %s %w", p.Name, err)
}
if peer.BFDProfile != "" {
if _, ok := BFDProfiles[peer.BFDProfile]; !ok {
return nil, TransientError{fmt.Sprintf("peer %s referencing non existing bfd profile %s", p.Name, peer.BFDProfile)}
}
}
for _, ep := range res {
// TODO: Be smarter regarding conflicting peers. For example, two
// peers could have a different hold time but they'd still result
// in two BGP sessions between the speaker and the remote host.
if reflect.DeepEqual(peer, ep) {
return nil, fmt.Errorf("peer %s already exists", p.Name)
}
}
res[peer.Name] = peer
}
return res, nil
}
func poolsFor(resources ClusterResources) (*Pools, error) {
pools := make(map[string]*Pool)
communities, err := communitiesFromCrs(resources.Communities)
if err != nil {
return nil, err
}
var allCIDRs []*net.IPNet
for _, p := range resources.Pools {
pool, err := addressPoolFromCR(p, resources.Namespaces)
if err != nil {
return nil, fmt.Errorf("parsing address pool %s: %s", p.Name, err)
}
// Check that the pool isn't already defined
if pools[p.Name] != nil {
return nil, fmt.Errorf("duplicate definition of pool %q", p.Name)
}
// Check that all specified CIDR ranges are non-overlapping.
for _, cidr := range pool.CIDR {
for _, m := range allCIDRs {
if cidrsOverlap(cidr, m) {
return nil, fmt.Errorf("CIDR %q in pool %q overlaps with already defined CIDR %q", cidr, p.Name, m)
}
}
allCIDRs = append(allCIDRs, cidr)
}
pools[p.Name] = pool
}
err = setL2AdvertisementsToPools(resources.Pools, resources.L2Advs, resources.Nodes, pools)
if err != nil {
return nil, err
}
err = validateDuplicateBGPAdvertisements(resources.BGPAdvs)
if err != nil {
return nil, err
}
err = setBGPAdvertisementsToPools(resources.Pools, resources.BGPAdvs, resources.Nodes, pools, communities)
if err != nil {
return nil, err
}
return &Pools{ByName: pools, ByNamespace: poolsByNamespace(pools),
ByServiceSelector: poolsByServiceSelector(pools)}, nil
}
func bgpExtrasFor(resources ClusterResources) string {
if resources.BGPExtras.Data == nil {
return ""
}
return resources.BGPExtras.Data[bgpExtrasField]
}
func communitiesFromCrs(cs []metallbv1beta1.Community) (map[string]community.BGPCommunity, error) {
communities := map[string]community.BGPCommunity{}
for _, c := range cs {
for _, communityAlias := range c.Spec.Communities {
v, err := community.New(communityAlias.Value)
if err != nil {
return nil, fmt.Errorf("parsing community %q: %s", communityAlias.Name, err)
}
if _, ok := communities[communityAlias.Name]; ok {
return nil, fmt.Errorf("duplicate definition of community %q", communityAlias.Name)
}
communities[communityAlias.Name] = v
}
}
return communities, nil
}
func peerFromCR(p metallbv1beta2.BGPPeer, passwordSecrets map[string]corev1.Secret) (*Peer, error) {
if p.Spec.MyASN == 0 {
return nil, errors.New("missing local ASN")
}
if p.Spec.ASN == 0 && p.Spec.DynamicASN == "" {
return nil, errors.New("missing peer ASN and dynamicASN")
}
if p.Spec.ASN != 0 && p.Spec.DynamicASN != "" {
return nil, errors.New("both peer ASN and dynamicASN specified")
}
if p.Spec.DynamicASN != "" && p.Spec.DynamicASN != metallbv1beta2.InternalASNMode && p.Spec.DynamicASN != metallbv1beta2.ExternalASNMode {
return nil, fmt.Errorf("invalid dynamicASN %s", p.Spec.DynamicASN)
}
if p.Spec.ASN == p.Spec.MyASN && p.Spec.EBGPMultiHop {
return nil, errors.New("invalid ebgp-multihop parameter set for an ibgp peer")
}
if p.Spec.Address == "" && p.Spec.Interface == "" {
return nil, fmt.Errorf("peer has no Address or Interface specified")
}
if p.Spec.Address != "" && p.Spec.Interface != "" {
return nil, fmt.Errorf("peer has both Address and Interface specified")
}
holdTime, keepaliveTime, err := parseTimers(p.Spec.HoldTime, p.Spec.KeepaliveTime)
if err != nil {
return nil, fmt.Errorf("invalid BGPPeer timers: %w", err)
}
var ip net.IP
if p.Spec.Address != "" {
ip = net.ParseIP(p.Spec.Address)
if ip == nil {
return nil, fmt.Errorf("invalid BGPPeer address %q", p.Spec.Address)
}
}
// Ideally we would set a default RouterID here, instead of having
// to do it elsewhere in the code. Unfortunately, we don't know
// the node IP here.
var routerID net.IP
if p.Spec.RouterID != "" {
routerID = net.ParseIP(p.Spec.RouterID)
if routerID == nil {
return nil, fmt.Errorf("invalid router ID %q", p.Spec.RouterID)
}
}
src := net.ParseIP(p.Spec.SrcAddress)
if p.Spec.SrcAddress != "" && src == nil {
return nil, fmt.Errorf("invalid source IP %q", p.Spec.SrcAddress)
}
err = validateLabelSelectorDuplicate(p.Spec.NodeSelectors, "nodeSelectors")
if err != nil {
return nil, err
}
var nodeSels []labels.Selector
for _, s := range p.Spec.NodeSelectors {
labelSelector, err := metav1.LabelSelectorAsSelector(&s)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("failed to convert peer %s node selector", p.Name))
}
nodeSels = append(nodeSels, labelSelector)
}
if len(nodeSels) == 0 {
nodeSels = []labels.Selector{labels.Everything()}
}
if p.Spec.Password != "" && p.Spec.PasswordSecret.Name != "" {
return nil, fmt.Errorf("can not have both password and secret ref set in peer config %q/%q", p.Namespace, p.Name)
}
secretPassword := ""
if p.Spec.PasswordSecret.Name != "" {
secretPassword, err = passwordFromSecretForPeer(p, passwordSecrets)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("failed to parse peer %s password secret", p.Name))
}
}
var connectTime *time.Duration
if p.Spec.ConnectTime != nil {
connectTime = ptr.To(p.Spec.ConnectTime.Duration)
}
return &Peer{
Name: p.Name,
MyASN: p.Spec.MyASN,
ASN: p.Spec.ASN,
DynamicASN: string(p.Spec.DynamicASN),
Addr: ip,
Iface: p.Spec.Interface,
SrcAddr: src,
Port: p.Spec.Port,
HoldTime: holdTime,
KeepaliveTime: keepaliveTime,
ConnectTime: connectTime,
RouterID: routerID,
NodeSelectors: nodeSels,
SecretPassword: secretPassword,
Password: p.Spec.Password,
PasswordRef: p.Spec.PasswordSecret,
BFDProfile: p.Spec.BFDProfile,
EnableGracefulRestart: p.Spec.EnableGracefulRestart,
EBGPMultiHop: p.Spec.EBGPMultiHop,
VRF: p.Spec.VRFName,
DualStackAddressFamily: p.Spec.DualStackAddressFamily,
DisableMP: p.Spec.DisableMP,
}, nil
}
func passwordFromSecretForPeer(p metallbv1beta2.BGPPeer, passwordSecrets map[string]corev1.Secret) (string, error) {
secret, ok := passwordSecrets[p.Spec.PasswordSecret.Name]
if !ok {
return "", TransientError{Message: fmt.Sprintf("secret ref not found for peer config %q/%q", p.Namespace, p.Name)}
}
if secret.Type != corev1.SecretTypeBasicAuth {
return "", fmt.Errorf("secret type mismatch on %q/%q, type %q is expected ", secret.Namespace,
secret.Name, corev1.SecretTypeBasicAuth)
}
srcPass, ok := secret.Data["password"]
if !ok {
return "", fmt.Errorf("password not specified in the secret %q/%q", secret.Namespace, secret.Name)
}
return string(srcPass), nil
}
func addressPoolFromCR(p metallbv1beta1.IPAddressPool, namespaces []corev1.Namespace) (*Pool, error) {
if p.Name == "" {
return nil, errors.New("missing pool name")
}
ret := &Pool{
Name: p.Name,
AvoidBuggyIPs: p.Spec.AvoidBuggyIPs,
AutoAssign: true,
}
if p.Spec.AutoAssign != nil {
ret.AutoAssign = *p.Spec.AutoAssign
}
if len(p.Spec.Addresses) == 0 {
return nil, errors.New("pool has no prefixes defined")
}
ret.cidrsPerAddresses = map[string][]*net.IPNet{}
for _, cidr := range p.Spec.Addresses {
nets, err := ParseCIDR(cidr)
if err != nil {
return nil, fmt.Errorf("invalid CIDR %q in pool %q: %s", cidr, p.Name, err)
}
ret.CIDR = append(ret.CIDR, nets...)
ret.cidrsPerAddresses[cidr] = nets
}
serviceAllocations, err := addressPoolServiceAllocationsFromCR(p, namespaces)
if err != nil {
return nil, err
}
ret.ServiceAllocations = serviceAllocations
return ret, nil
}
func addressPoolServiceAllocationsFromCR(p metallbv1beta1.IPAddressPool, namespaces []corev1.Namespace) (*ServiceAllocation, error) {
if p.Spec.AllocateTo == nil {
return nil, nil
}
poolNamespaces := sets.Set[string]{}
for _, poolNs := range p.Spec.AllocateTo.Namespaces {
if poolNamespaces.Has(poolNs) {
return nil, errors.New("duplicate definition in namespaces field")
}
poolNamespaces.Insert(poolNs)
}
serviceAllocations := &ServiceAllocation{Priority: p.Spec.AllocateTo.Priority, Namespaces: poolNamespaces}
if len(poolNamespaces) == 0 && len(p.Spec.AllocateTo.NamespaceSelectors) == 0 && len(p.Spec.AllocateTo.ServiceSelectors) == 0 {
// If no specific namespaces or service selectors are set, match everything
serviceAllocations.ServiceSelectors = []labels.Selector{labels.Everything()}
return serviceAllocations, nil
}
err := validateLabelSelectorDuplicate(p.Spec.AllocateTo.NamespaceSelectors, "namespaceSelectors")
if err != nil {
return nil, err
}
err = validateLabelSelectorDuplicate(p.Spec.AllocateTo.ServiceSelectors, "serviceSelectors")
if err != nil {
return nil, err
}
for i := range p.Spec.AllocateTo.NamespaceSelectors {
l, err := metav1.LabelSelectorAsSelector(&p.Spec.AllocateTo.NamespaceSelectors[i])
if err != nil {
return nil, errors.Join(err, fmt.Errorf("invalid namespace label selector %v in ip pool %s", &p.Spec.AllocateTo.NamespaceSelectors[i], p.Name))
}
for _, ns := range namespaces {
nsLabels := labels.Set(ns.Labels)
if l.Matches(nsLabels) {
serviceAllocations.Namespaces.Insert(ns.Name)
}
}
}
for i := range p.Spec.AllocateTo.ServiceSelectors {
l, err := metav1.LabelSelectorAsSelector(&p.Spec.AllocateTo.ServiceSelectors[i])
if err != nil {
return nil, errors.Join(err, fmt.Errorf("invalid service label selector %v in ip pool %s", p.Spec.AllocateTo.ServiceSelectors[i], p.Name))
}
serviceAllocations.ServiceSelectors = append(serviceAllocations.ServiceSelectors, l)
}
return serviceAllocations, nil
}
func poolsByNamespace(pools map[string]*Pool) map[string][]string {
var poolsForNamespace map[string][]string
for _, pool := range pools {
if pool.ServiceAllocations == nil {
continue
}
if poolsForNamespace == nil && len(pool.ServiceAllocations.Namespaces) > 0 {
poolsForNamespace = make(map[string][]string)
}
for namespace := range pool.ServiceAllocations.Namespaces {
poolsForNamespace[namespace] = append(poolsForNamespace[namespace], pool.Name)
}
}
return poolsForNamespace
}
func poolsByServiceSelector(pools map[string]*Pool) []string {
var poolsByServiceSelector []string
for _, pool := range pools {
if pool.ServiceAllocations == nil || len(pool.ServiceAllocations.ServiceSelectors) == 0 {
continue
}
poolsByServiceSelector = append(poolsByServiceSelector, pool.Name)
}
sort.Strings(poolsByServiceSelector)
return poolsByServiceSelector
}
func bfdProfileFromCR(p metallbv1beta1.BFDProfile) (*BFDProfile, error) {
if p.Name == "" {
return nil, fmt.Errorf("missing bfd profile name")
}
res := &BFDProfile{}
res.Name = p.Name
var err error
res.DetectMultiplier, err = bfdIntFromConfig(p.Spec.DetectMultiplier, 2, 255)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("invalid detect multiplier value"))
}
res.ReceiveInterval, err = bfdIntFromConfig(p.Spec.ReceiveInterval, 10, 60000)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("invalid receive interval value"))
}
res.TransmitInterval, err = bfdIntFromConfig(p.Spec.TransmitInterval, 10, 60000)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("invalid transmit interval value"))
}
res.MinimumTTL, err = bfdIntFromConfig(p.Spec.MinimumTTL, 1, 254)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("invalid minimum ttl value"))
}
res.EchoInterval, err = bfdIntFromConfig(p.Spec.EchoInterval, 10, 60000)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("invalid echo interval value"))
}
if p.Spec.EchoMode != nil {
res.EchoMode = *p.Spec.EchoMode
}
if p.Spec.PassiveMode != nil {
res.PassiveMode = *p.Spec.PassiveMode
}
return res, nil
}
func setL2AdvertisementsToPools(ipPools []metallbv1beta1.IPAddressPool, l2Advs []metallbv1beta1.L2Advertisement,
nodes []corev1.Node, ipPoolMap map[string]*Pool) error {
for _, l2Adv := range l2Advs {
adv, err := l2AdvertisementFromCR(l2Adv, nodes)
if err != nil {
return err
}
ipPoolsSelected, err := selectedPools(ipPools, l2Adv.Spec.IPAddressPoolSelectors)
if err != nil {
return err
}
// No pool selector means select all pools
if len(l2Adv.Spec.IPAddressPools) == 0 && len(l2Adv.Spec.IPAddressPoolSelectors) == 0 {
for _, pool := range ipPoolMap {
if !containsAdvertisement(pool.L2Advertisements, adv) {
pool.L2Advertisements = append(pool.L2Advertisements, adv)
}
}
continue
}
for _, poolName := range append(l2Adv.Spec.IPAddressPools, ipPoolsSelected...) {
if pool, ok := ipPoolMap[poolName]; ok {
if !containsAdvertisement(pool.L2Advertisements, adv) {
pool.L2Advertisements = append(pool.L2Advertisements, adv)
}
}
}
}
return nil
}
func setBGPAdvertisementsToPools(ipPools []metallbv1beta1.IPAddressPool, bgpAdvs []metallbv1beta1.BGPAdvertisement,
nodes []corev1.Node, ipPoolMap map[string]*Pool, communities map[string]community.BGPCommunity) error {
for _, bgpAdv := range bgpAdvs {
adv, err := bgpAdvertisementFromCR(bgpAdv, communities, nodes)
if err != nil {
return err
}
ipPoolsSelected, err := selectedPools(ipPools, bgpAdv.Spec.IPAddressPoolSelectors)
if err != nil {
return err
}
// No pool selector means select all pools
if len(bgpAdv.Spec.IPAddressPools) == 0 && len(bgpAdv.Spec.IPAddressPoolSelectors) == 0 {
for _, pool := range ipPoolMap {
err := validateBGPAdvPerPool(adv, pool)
if err != nil {
return err
}
pool.BGPAdvertisements = append(pool.BGPAdvertisements, adv)
}
continue
}
for _, poolName := range append(bgpAdv.Spec.IPAddressPools, ipPoolsSelected...) {
if pool, ok := ipPoolMap[poolName]; ok {
err := validateBGPAdvPerPool(adv, pool)
if err != nil {
return err
}
pool.BGPAdvertisements = append(pool.BGPAdvertisements, adv)
}
}
}
return nil
}
func l2AdvertisementFromCR(crdAd metallbv1beta1.L2Advertisement, nodes []corev1.Node) (*L2Advertisement, error) {
err := validateDuplicate(crdAd.Spec.IPAddressPools, "ipAddressPools")
if err != nil {
return nil, err
}
err = validateLabelSelectorDuplicate(crdAd.Spec.IPAddressPoolSelectors, "ipAddressPoolSelectors")
if err != nil {
return nil, err
}
err = validateLabelSelectorDuplicate(crdAd.Spec.NodeSelectors, "nodeSelectors")
if err != nil {
return nil, err
}
selected, err := selectedNodes(nodes, crdAd.Spec.NodeSelectors)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("failed to parse node selector for %s", crdAd.Name))
}
l2 := &L2Advertisement{
Nodes: selected,
Interfaces: crdAd.Spec.Interfaces,
}
if len(crdAd.Spec.Interfaces) == 0 {
l2.AllInterfaces = true
}
return l2, nil
}
func bgpAdvertisementFromCR(crdAd metallbv1beta1.BGPAdvertisement, communities map[string]community.BGPCommunity, nodes []corev1.Node) (*BGPAdvertisement, error) {
err := validateDuplicate(crdAd.Spec.IPAddressPools, "ipAddressPools")
if err != nil {
return nil, err
}
err = validateDuplicate(crdAd.Spec.Communities, "community")
if err != nil {
return nil, err
}
err = validateDuplicate(crdAd.Spec.Peers, "peers")
if err != nil {
return nil, err
}
err = validateLabelSelectorDuplicate(crdAd.Spec.IPAddressPoolSelectors, "ipAddressPoolSelectors")
if err != nil {
return nil, err
}
err = validateLabelSelectorDuplicate(crdAd.Spec.NodeSelectors, "nodeSelectors")
if err != nil {
return nil, err
}
ad := &BGPAdvertisement{
Name: crdAd.Name,
AggregationLength: 32,
AggregationLengthV6: 128,
LocalPref: 0,
Communities: map[community.BGPCommunity]bool{},
}
if crdAd.Spec.AggregationLength != nil {
ad.AggregationLength = int(*crdAd.Spec.AggregationLength) // TODO CRD cast
}
if ad.AggregationLength > 32 {
return nil, fmt.Errorf("invalid aggregation length %q for IPv4", ad.AggregationLength)
}
if crdAd.Spec.AggregationLengthV6 != nil {
ad.AggregationLengthV6 = int(*crdAd.Spec.AggregationLengthV6) // TODO CRD cast
if ad.AggregationLengthV6 > 128 {
return nil, fmt.Errorf("invalid aggregation length %q for IPv6", ad.AggregationLengthV6)
}
}
ad.LocalPref = crdAd.Spec.LocalPref
if len(crdAd.Spec.Peers) > 0 {
ad.Peers = make([]string, 0, len(crdAd.Spec.Peers))
ad.Peers = append(ad.Peers, crdAd.Spec.Peers...)
}
for _, c := range crdAd.Spec.Communities {
v, err := getCommunityValue(c, communities)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("invalid community %q in BGP advertisement", c))
}
ad.Communities[v] = true
}
selected, err := selectedNodes(nodes, crdAd.Spec.NodeSelectors)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("failed to parse node selector for %s", crdAd.Name))
}
ad.Nodes = selected
return ad, nil
}
// getCommunityValue returns the BGPCommunity from the communities map if it exists there. Otherwise, it creates a
// new BGP community object from the provided communityString.
func getCommunityValue(communityString string, communities map[string]community.BGPCommunity) (community.BGPCommunity, error) {
if v, ok := communities[communityString]; ok {
return v, nil
}
var c community.BGPCommunity
c, err := community.New(communityString)
if errors.Is(err, community.ErrInvalidCommunityValue) {
return c, err
}
// Return TransientError on invalidCommunityFormat, in case it refers
// a Community resource that doesn't exist yet.
if errors.Is(err, community.ErrInvalidCommunityFormat) {
return c, TransientError{err.Error()}
}
return c, nil
}
func parseTimers(ht, ka *metav1.Duration) (*time.Duration, *time.Duration, error) {
if ht == nil && ka == nil {
return nil, nil, nil
}
var holdTime *time.Duration
var keepaliveTime *time.Duration
if ht != nil && ka != nil {
holdTime = &ht.Duration
keepaliveTime = &ka.Duration
}
if ht != nil && ka == nil {
holdTime = &ht.Duration
keepaliveTime = ptr.To(ht.Duration / 3)
}
if ht == nil && ka != nil {
holdTime = ptr.To(ka.Duration * 3)
keepaliveTime = &ka.Duration
}
rounded := time.Duration(int(holdTime.Seconds())) * time.Second
if rounded != 0 && rounded < 3*time.Second {
return nil, nil, fmt.Errorf("invalid hold time %q: must be 0 or >=3s", ht)
}
if *keepaliveTime > *holdTime {
return nil, nil, fmt.Errorf("invalid keepaliveTime %q, must be lower than holdTime %q", ka, ht)
}
return holdTime, keepaliveTime, nil
}
func validateBGPAdvPerPool(adv *BGPAdvertisement, pool *Pool) error {
for addr, cidrs := range pool.cidrsPerAddresses {
if len(cidrs) == 0 {
continue
}
maxLength := adv.AggregationLength
if cidrs[0].IP.To4() == nil {
maxLength = adv.AggregationLengthV6
}
// in case of range format, we may have a set of cidrs associated to a given address.
// We reject if none of the cidrs are compatible with the aggregation length.
lowest := lowestMask(cidrs)
if maxLength < lowest {
return fmt.Errorf("invalid aggregation length %d: prefix %d in "+
"this pool is more specific than the aggregation length for addresses %s", adv.AggregationLength, lowest, addr)
}
}
// Verify that BGP ADVs set a unique local preference value per BGP update.
for _, bgpAdv := range pool.BGPAdvertisements {
if adv.LocalPref != bgpAdv.LocalPref {
if !advertisementsAreCompatible(adv, bgpAdv, pool) {
return fmt.Errorf("invalid local preference %d: local preferernce %d was "+
"already set for the same type of BGP update. Check existing BGP advertisements "+
"with common pools and aggregation lengths", adv.LocalPref, bgpAdv.LocalPref)
}
}
}
return nil
}
func advertisementsAreCompatible(newAdv, adv *BGPAdvertisement, pool *Pool) bool {
if isAggrLengthDifferent(newAdv, adv, pool) {
return true
}
// BGP ADVs with different set of BGP peers do not collide.
if len(newAdv.Peers) != 0 && len(adv.Peers) != 0 {
equalPeer := false
for _, peer := range newAdv.Peers {
if slices.Contains(adv.Peers, peer) {
equalPeer = true
break
}
}
if !equalPeer {
return true
}
}
// BGP ADVs with different set of nodes do not collide.
for node := range newAdv.Nodes {
if _, ok := adv.Nodes[node]; ok {
return false
}
}
return true
}
func isAggrLengthDifferent(newAdv, adv *BGPAdvertisement, pool *Pool) bool {
var hasV4, hasV6 bool
for _, cidrs := range pool.cidrsPerAddresses {
family := ipfamily.ForCIDR(cidrs[0])
if family == ipfamily.IPv4 {
hasV4 = true
}
if family == ipfamily.IPv6 {
hasV6 = true
}
if hasV4 && hasV6 {
break
}
}
if !hasV6 && !hasV4 { // compatible?!
return true
}
if adv.AggregationLength != newAdv.AggregationLength && !hasV6 {
return true
}
if adv.AggregationLengthV6 != newAdv.AggregationLengthV6 && !hasV4 {
return true
}
// has both, both must be different
if adv.AggregationLength != newAdv.AggregationLength && adv.AggregationLengthV6 != newAdv.AggregationLengthV6 {
return true
}
return false
}
func ParseCIDR(cidr string) ([]*net.IPNet, error) {
if !strings.Contains(cidr, "-") {
_, n, err := net.ParseCIDR(cidr)
if err != nil {
return nil, fmt.Errorf("invalid CIDR %q", cidr)
}
return []*net.IPNet{n}, nil
}
fs := strings.SplitN(cidr, "-", 2)
if len(fs) != 2 {
return nil, fmt.Errorf("invalid IP range %q", cidr)
}
start := net.ParseIP(strings.TrimSpace(fs[0]))
if start == nil {
return nil, fmt.Errorf("invalid IP range %q: invalid start IP %q", cidr, fs[0])
}
end := net.ParseIP(strings.TrimSpace(fs[1]))
if end == nil {
return nil, fmt.Errorf("invalid IP range %q: invalid end IP %q", cidr, fs[1])
}
if bytes.Compare(start, end) > 0 {
return nil, fmt.Errorf("invalid IP range %q: start IP %q is after the end IP %q", cidr, start, end)
}
var ret []*net.IPNet
for _, pfx := range ipaddr.Summarize(start, end) {
n := &net.IPNet{
IP: pfx.IP,
Mask: pfx.Mask,
}
ret = append(ret, n)
}
return ret, nil
}
func cidrsOverlap(a, b *net.IPNet) bool {
return cidrContainsCIDR(a, b) || cidrContainsCIDR(b, a)
}
func cidrContainsCIDR(outer, inner *net.IPNet) bool {
ol, _ := outer.Mask.Size()
il, _ := inner.Mask.Size()
if ol == il && outer.IP.Equal(inner.IP) {
return true
}
if ol < il && outer.Contains(inner.IP) {
return true
}
return false
}
func lowestMask(cidrs []*net.IPNet) int {
if len(cidrs) == 0 {
return 0
}
lowest, _ := cidrs[0].Mask.Size()
for _, c := range cidrs {
s, _ := c.Mask.Size()
if lowest > s {
lowest = s
}
}
return lowest
}
func bfdIntFromConfig(value *uint32, min, max uint32) (*uint32, error) {
if value == nil {
return nil, nil
}
if *value < min || *value > max {
return nil, fmt.Errorf("invalid value %d, must be in %d-%d range", *value, min, max)
}
return value, nil
}
func validateDuplicateBGPAdvertisements(ads []metallbv1beta1.BGPAdvertisement) error {
for i := 0; i < len(ads); i++ {
for j := i + 1; j < len(ads); j++ {
if reflect.DeepEqual(ads[i], ads[j]) {
return fmt.Errorf("duplicate definition of bgpadvertisements. advertisement %d and %d are equal", i+1, j+1)
}
}
}
return nil
}
func containsAdvertisement(advs []*L2Advertisement, toCheck *L2Advertisement) bool {
for _, adv := range advs {
if adv.AllInterfaces != toCheck.AllInterfaces {
continue
}
if !reflect.DeepEqual(adv.Nodes, toCheck.Nodes) {
continue
}
if !sets.New(adv.Interfaces...).Equal(sets.New(toCheck.Interfaces...)) {
continue
}
return true
}
return false
}
func selectedNodes(nodes []corev1.Node, selectors []metav1.LabelSelector) (map[string]bool, error) {
labelSelectors := []labels.Selector{}
for _, selector := range selectors {
l, err := metav1.LabelSelectorAsSelector(&selector)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("invalid label selector %v", selector))
}
labelSelectors = append(labelSelectors, l)
}
res := make(map[string]bool)
OUTER:
for _, node := range nodes {
if len(labelSelectors) == 0 { // no selector mean all nodes are valid
res[node.Name] = true
}
for _, s := range labelSelectors {
nodeLabels := labels.Set(node.Labels)
if s.Matches(nodeLabels) {
res[node.Name] = true
continue OUTER
}
}
}
return res, nil
}
func selectedPools(pools []metallbv1beta1.IPAddressPool, selectors []metav1.LabelSelector) ([]string, error) {
labelSelectors := []labels.Selector{}
for _, selector := range selectors {
l, err := metav1.LabelSelectorAsSelector(&selector)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("invalid label selector %v", selector))
}
labelSelectors = append(labelSelectors, l)
}
var ipPools []string
OUTER:
for _, pool := range pools {
for _, s := range labelSelectors {
poolLabels := labels.Set(pool.Labels)
if s.Matches(poolLabels) {
ipPools = append(ipPools, pool.Name)
continue OUTER
}
}
}
return ipPools, nil
}
func validateLabelSelectorDuplicate(labelSelectors []metav1.LabelSelector, labelSelectorType string) error {
for _, ls := range labelSelectors {
for _, me := range ls.MatchExpressions {
sort.Strings(me.Values)
}
}
for i := 0; i < len(labelSelectors); i++ {
err := validateLabelSelectorMatchExpressions(labelSelectors[i].MatchExpressions)
if err != nil {
return err
}
for j := i + 1; j < len(labelSelectors); j++ {
if labelSelectors[i].String() == labelSelectors[j].String() {
return fmt.Errorf("duplicate definition of %s %q", labelSelectorType, labelSelectors[i])
}
}
}
return nil
}
func validateLabelSelectorMatchExpressions(matchExpressions []metav1.LabelSelectorRequirement) error {
for i := 0; i < len(matchExpressions); i++ {
err := validateDuplicate(matchExpressions[i].Values, "match expression value in label selector")
if err != nil {
return err
}
for j := i + 1; j < len(matchExpressions); j++ {
if matchExpressions[i].String() == matchExpressions[j].String() {
return fmt.Errorf("duplicate definition of %s %q", "match expressions", matchExpressions[i])
}
}
}
return nil
}
func validateDuplicate(strSlice []string, sliceType string) error {
for i := 0; i < len(strSlice); i++ {
for j := i + 1; j < len(strSlice); j++ {
if strSlice[i] == strSlice[j] {
return fmt.Errorf("duplicate definition of %s %q", sliceType, strSlice[i])
}
}
}
return nil
}
// SPDX-License-Identifier:Apache-2.0
package config
import (
"errors"
"fmt"
metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
metallbv1beta2 "go.universe.tf/metallb/api/v1beta2"
"go.universe.tf/metallb/internal/bgp/community"
"go.universe.tf/metallb/internal/ipfamily"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
type Validate func(ClusterResources) error
func ValidationFor(bgpImpl string) Validate {
switch bgpImpl {
case "frr":
return DiscardNativeOnly
case "frr-k8s":
return DiscardNativeOnly
case "native":
return DiscardFRROnly
}
return DontValidate
}
// DiscardFRROnly returns an error if the current configFile contains
// any options that are available only in the FRR implementation.
func DiscardFRROnly(c ClusterResources) error {
for _, p := range c.Peers {
if p.Spec.BFDProfile != "" {
return fmt.Errorf("peer %s has bfd-profile set on native bgp mode", p.Spec.Address)
}
if p.Spec.KeepaliveTime != nil && p.Spec.KeepaliveTime.Duration != 0 {
return fmt.Errorf("peer %s has keepalive-time set on native bgp mode", p.Spec.Address)
}
if p.Spec.VRFName != "" {
return fmt.Errorf("peer %s has vrf set on native bgp mode", p.Spec.Address)
}
if p.Spec.ConnectTime != nil {
return fmt.Errorf("peer %s has connect time set on native bgp mode", p.Spec.Address)
}
if p.Spec.EnableGracefulRestart {
return fmt.Errorf("peer %s has EnableGracefulRestart flag set on native bgp mode", p.Spec.Address)
}
if p.Spec.DisableMP {
return fmt.Errorf("peer %s has disable MP flag set on native bgp mode", p.Spec.Address)
}
if p.Spec.DualStackAddressFamily {
return fmt.Errorf("peer %s has dualstackaddressfamily flag set on native bgp mode", p.Spec.Address)
}
if p.Spec.DynamicASN != "" {
return fmt.Errorf("peer %s has dynamicASN set on native bgp mode", p.Spec.Address)
}
if p.Spec.Interface != "" {
return fmt.Errorf("peer %s has interface set on native bgp mode", p.Spec.Address)
}
}
if len(c.BFDProfiles) > 0 {
return errors.New("bfd profiles section set")
}
// Only IPv4 BGP advertisements are supported in native mode.
if err := findIPv6BGPAdvertisement(c); err != nil {
return err
}
// Only legacy type communities are supported in native mode.
return findNonLegacyCommunity(c)
}
// findIPv6BGPAdvertisement checks for IPv6 addresses. If it finds at least one IPv6 BGP advertisement, it will throw
// and error as it's not supported in native mode.
func findIPv6BGPAdvertisement(c ClusterResources) error {
if len(c.BGPAdvs) == 0 {
return nil
}
bgpSelectors, err := poolSelectorsForBGP(c)
if err != nil {
return err
}
for _, p := range c.Pools {
if !bgpSelectors.matchesPool(p) {
continue
}
for _, cidr := range p.Spec.Addresses {
nets, err := ParseCIDR(cidr)
if err != nil {
return fmt.Errorf("invalid CIDR %q in pool %q: %s", cidr, p.Name, err)
}
for _, n := range nets {
if n.IP.To4() == nil {
return fmt.Errorf("pool %q has ipv6 CIDR %s, native bgp mode does not support ipv6", p.Name, n)
}
}
}
}
return nil
}
// findNonLegacyCommunity returns an error if it can find a non legacy community. If a community string can not be
// parsed, the string will be ignored.
func findNonLegacyCommunity(c ClusterResources) error {
for _, adv := range c.BGPAdvs {
for _, cs := range adv.Spec.Communities {
c, err := community.New(cs)
if err != nil {
// Skip aliases.
continue
}
if !community.IsLegacy(c) {
return fmt.Errorf("native BGP mode only supports legacy communities, BGP advertisement %q "+
"has non legacy community %q", adv.Name, cs)
}
}
}
for _, co := range c.Communities {
for _, cs := range co.Spec.Communities {
c, err := community.New(cs.Value)
if err != nil {
// Skip it if we cannot parse it - the purpose of this very verification is not to make sure that
// a string can be parsed or not.
continue
}
if !community.IsLegacy(c) {
return fmt.Errorf("native BGP mode only supports legacy communities, BGP community CR %q "+
"has non legacy community %q", co.Name, cs)
}
}
}
return nil
}
// DontValidate is a Validate function that always returns
// success.
func DontValidate(c ClusterResources) error {
return nil
}
// DiscardNativeOnly returns an error if the current configFile contains
// any options that are available only in the native implementation.
func DiscardNativeOnly(c ClusterResources) error {
if len(c.Peers) > 1 {
peerAddr := make(map[string]bool)
routerID := c.Peers[0].Spec.RouterID
peer0 := peerIdentifier(c.Peers[0].Spec)
peerAddr[peer0] = true
for _, p := range c.Peers[1:] {
if p.Spec.RouterID != routerID {
return fmt.Errorf("peer %s has RouterID different from %s, in FRR mode all RouterID must be equal", p.Spec.RouterID, c.Peers[0].Spec.RouterID)
}
peerID := peerIdentifier(p.Spec)
if _, ok := peerAddr[peerID]; ok {
return fmt.Errorf("peer %s already exists, FRR mode doesn't support duplicate BGPPeers", p.Spec.Address)
}
peerAddr[peerID] = true
}
}
for _, p := range c.Peers {
for _, p1 := range c.Peers[1:] {
if p.Spec.MyASN != p1.Spec.MyASN &&
p.Spec.VRFName == p1.Spec.VRFName {
return fmt.Errorf("peer %s has myAsn different from %s, in FRR mode all myAsn must be equal for the same VRF", p.Spec.Address, p1.Spec.Address)
}
}
}
return nil
}
// validateConfig is meant to validate all the inter-dependencies of a parsed configuration.
// In this case, we ensure that bfd echo is not enabled on a v6 pool.
func validateConfig(cfg *Config) error {
for _, p := range cfg.Pools.ByName {
containsV6 := false
for _, cidr := range p.CIDR {
if ipfamily.ForCIDR(cidr) == ipfamily.IPv6 {
containsV6 = true
break
}
}
if !containsV6 { // we only care about v6 advertisements
continue
}
for _, a := range p.BGPAdvertisements {
if len(a.Peers) == 0 { // all peers
for _, peer := range cfg.Peers {
if hasBFDEcho(peer, cfg.BFDProfiles) {
return fmt.Errorf("pool %s has bgpadvertisement %s which references peer %s which has bfd echo enabled, which is not possible", p.Name, a.Name, peer.Name)
}
}
continue
}
for _, peerName := range a.Peers {
if peer, ok := cfg.Peers[peerName]; ok {
if hasBFDEcho(peer, cfg.BFDProfiles) {
return fmt.Errorf("pool %s has bgpadvertisement %s which references peer %s which has bfd echo enabled, which is not possible", p.Name, a.Name, peer.Name)
}
}
}
}
}
return nil
}
func hasBFDEcho(peer *Peer, bfdProfiles map[string]*BFDProfile) bool {
profile, ok := bfdProfiles[peer.BFDProfile]
if !ok {
return false
}
if profile.EchoMode {
return true
}
return false
}
func peerIdentifier(peer metallbv1beta2.BGPPeerSpec) string {
id := peer.Address
if peer.Address == "" {
id = peer.Interface
}
return fmt.Sprintf("%s-%s", id, peer.VRFName)
}
type poolSelector struct {
byName map[string]struct{}
byLabels []labels.Selector
}
func (s poolSelector) matchesPool(p metallbv1beta1.IPAddressPool) bool {
if len(s.byLabels) == 0 && len(s.byName) == 0 {
return true
}
if _, ok := s.byName[p.Name]; ok {
return true
}
for _, l := range s.byLabels {
if l.Matches(labels.Set(p.Labels)) {
return true
}
}
return false
}
func poolSelectorsForBGP(c ClusterResources) (poolSelector, error) {
selectedPools := make(map[string]struct{})
poolsSelectors := []labels.Selector{}
for _, adv := range c.BGPAdvs {
if len(adv.Spec.IPAddressPools) == 0 &&
len(adv.Spec.IPAddressPoolSelectors) == 0 {
return poolSelector{}, nil // no selectors, let's catch em all!
}
for _, p := range adv.Spec.IPAddressPools {
selectedPools[p] = struct{}{}
}
for _, selector := range adv.Spec.IPAddressPoolSelectors {
l, err := metav1.LabelSelectorAsSelector(&selector)
if err != nil {
return poolSelector{}, fmt.Errorf("invalid label selector %v", selector)
}
poolsSelectors = append(poolsSelectors, l)
}
}
return poolSelector{
byName: selectedPools,
byLabels: poolsSelectors,
}, nil
}
// SPDX-License-Identifier:Apache-2.0
package config
import (
"strings"
metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
metallbv1beta2 "go.universe.tf/metallb/api/v1beta2"
apivalidate "go.universe.tf/metallb/internal/k8s/webhooks/validate"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// TransientError is an error that happens due to interdependencies
// between crds, such as referencing non-existing bfd profile.
type TransientError struct {
Message string
}
func (e TransientError) Error() string { return e.Message }
// validator is an implementation of the resources validator
// that tries to parse the configuration and fails if the returned
// error is non transitient.
type validator struct {
validate Validate
}
func (v *validator) Validate(resources ...client.ObjectList) error {
clusterResources := ClusterResources{
Pools: make([]metallbv1beta1.IPAddressPool, 0),
Peers: make([]metallbv1beta2.BGPPeer, 0),
BFDProfiles: make([]metallbv1beta1.BFDProfile, 0),
BGPAdvs: make([]metallbv1beta1.BGPAdvertisement, 0),
L2Advs: make([]metallbv1beta1.L2Advertisement, 0),
Communities: make([]metallbv1beta1.Community, 0),
}
for _, list := range resources {
switch list := list.(type) {
case *metallbv1beta1.IPAddressPoolList:
clusterResources.Pools = append(clusterResources.Pools, list.Items...)
case *metallbv1beta2.BGPPeerList:
clusterResources.Peers = append(clusterResources.Peers, list.Items...)
case *metallbv1beta1.BFDProfileList:
clusterResources.BFDProfiles = append(clusterResources.BFDProfiles, list.Items...)
case *metallbv1beta1.BGPAdvertisementList:
clusterResources.BGPAdvs = append(clusterResources.BGPAdvs, list.Items...)
case *metallbv1beta1.L2AdvertisementList:
clusterResources.L2Advs = append(clusterResources.L2Advs, list.Items...)
case *metallbv1beta1.CommunityList:
clusterResources.Communities = append(clusterResources.Communities, list.Items...)
case *v1.NodeList:
clusterResources.Nodes = append(clusterResources.Nodes, list.Items...)
}
}
clusterResources = resetTransientErrorsFields(clusterResources)
_, err := For(clusterResources, v.validate)
return err
}
func NewValidator(validate Validate) apivalidate.ClusterObjects {
return &validator{validate: validate}
}
// Returns the given ClusterResources without the fields that can cause a TransientError.
// We use this so the webhooks do not make assumptions based on the ordering of objects.
func resetTransientErrorsFields(clusterResources ClusterResources) ClusterResources {
for i := range clusterResources.Peers {
clusterResources.Peers[i].Spec.BFDProfile = ""
clusterResources.Peers[i].Spec.PasswordSecret = v1.SecretReference{}
}
for i, bgpAdv := range clusterResources.BGPAdvs {
var communities []string
for _, community := range bgpAdv.Spec.Communities {
// We want to pass only communities that are potentially explicit values.
if strings.Contains(community, ":") {
communities = append(communities, community)
}
}
clusterResources.BGPAdvs[i].Spec.Communities = communities
}
return clusterResources
}