// SPDX-License-Identifier:Apache-2.0 package community import ( "errors" "fmt" "strconv" "strings" ) var ( ErrInvalidCommunityValue = errors.New("invalid community value") ErrInvalidCommunityFormat = errors.New("invalid community format") ) // largeBGPCommunityMarker is the prefix that shall be used to indicate that a given community value is of type large // community. If and when extended BGP communities will be supported, the largeBGPCommunityMarker allows us to // distinguish between extended and large communities. const largeBGPCommunityMarker = "large" // BGPCommunity represents a BGP community. type BGPCommunity interface { LessThan(BGPCommunity) bool String() string } // New parses the provided string and returns a newly created BGPCommunity object. // Strings are parsed according to Juniper style syntax (https://www.juniper.net/documentation/us/en/software/\ // junos/routing-policy/bgp/topics/concept/policy-bgp-communities-extended-communities-match-conditions-overview.html // Legacy communities are of format "<AS number>:<community value>". // Extended communities (support of which is yet to be implemented) are of format "<type>:<administrator>:<assigned-number>". // Large communities are of format large:<global administrator>:<localdata part 1>:<localdata part 2>. func New(c string) (BGPCommunity, error) { var bgpCommunity BGPCommunity fs := strings.Split(c, ":") switch l := len(fs); l { case 2: var fields [2]uint16 for i := 0; i < 2; i++ { b, err := strconv.ParseUint(fs[i], 10, 16) if err != nil { return bgpCommunity, fmt.Errorf("%w: invalid section %q of community %q, err: %q", ErrInvalidCommunityValue, fs[i], c, err) } fields[i] = uint16(b) } return BGPCommunityLegacy{ upperVal: fields[0], lowerVal: fields[1], }, nil case 4: if fs[0] != largeBGPCommunityMarker { return bgpCommunity, fmt.Errorf("%w: invalid marker for large community, expected community to be of "+ "format %s:<uint32>:<uint32>:<uint32> but got %q instead", ErrInvalidCommunityValue, largeBGPCommunityMarker, c) } var fields [3]uint32 for i := 1; i < 4; i++ { b, err := strconv.ParseUint(fs[i], 10, 32) if err != nil { return bgpCommunity, fmt.Errorf("%w: invalid section %q of community %q, err: %q", ErrInvalidCommunityValue, fs[i], c, err) } fields[i-1] = uint32(b) } return BGPCommunityLarge{ globalAdministrator: fields[0], localDataPart1: fields[1], localDataPart2: fields[2], }, nil } return bgpCommunity, fmt.Errorf("%w: %s", ErrInvalidCommunityFormat, c) } // BGPCommunityLegacy holds the internal representation of a BGP legacy community. type BGPCommunityLegacy struct { upperVal uint16 lowerVal uint16 } // LessThan makes 2 different BGPCommunity objects comparable. For the sake of comparison, legacy communities are // considered to be large communities in format <legacy community>:0:0 and thus can be compared with large communities. func (b BGPCommunityLegacy) LessThan(c BGPCommunity) bool { return lessThan(b, c) } // String returns the string representation of this community. Legacy communities will be printed in new-format, // "<AS number>:<community value>". func (b BGPCommunityLegacy) String() string { return fmt.Sprintf("%d:%d", b.upperVal, b.lowerVal) } // ToUint32 returns the uint32 representation of this legacy community. func (b BGPCommunityLegacy) ToUint32() uint32 { return (uint32(b.upperVal) << 16) + uint32(b.lowerVal) } // BGPCommunity holds the internal representation of a large BGP community. type BGPCommunityLarge struct { globalAdministrator uint32 localDataPart1 uint32 localDataPart2 uint32 } // LessThan makes 2 different BGPCommunity objects comparable. For the sake of comparison, legacy communities are // considered to be large communities in format <legacy community>:0:0 and thus can be compared with large communities. func (b BGPCommunityLarge) LessThan(c BGPCommunity) bool { return lessThan(b, c) } // String returns the string representation of this community. Large communities will be printed as // ("<global administrator>:<localdata part 1>:<localdata part 2>"). func (b BGPCommunityLarge) String() string { return fmt.Sprintf("%d:%d:%d", b.globalAdministrator, b.localDataPart1, b.localDataPart2) } // IsLegacy returns true if this is a Legacy community. func IsLegacy(c BGPCommunity) bool { _, ok := c.(BGPCommunityLegacy) return ok } // IsLarge returns true if this is a Legacy community. func IsLarge(c BGPCommunity) bool { _, ok := c.(BGPCommunityLarge) return ok } // lessThan is a helper function that compares two communities regardless of their type. func lessThan(b BGPCommunity, c BGPCommunity) bool { var bl BGPCommunityLarge var cl BGPCommunityLarge switch v := b.(type) { case BGPCommunityLegacy: bl = BGPCommunityLarge{ globalAdministrator: v.ToUint32(), localDataPart1: 0, localDataPart2: 0, } case BGPCommunityLarge: bl = v } switch v := c.(type) { case BGPCommunityLegacy: cl = BGPCommunityLarge{ globalAdministrator: v.ToUint32(), localDataPart1: 0, localDataPart2: 0, } case BGPCommunityLarge: cl = v } return bl.globalAdministrator < cl.globalAdministrator || (bl.globalAdministrator == cl.globalAdministrator && (bl.localDataPart1 < cl.localDataPart1 || (bl.localDataPart1 == cl.localDataPart1 && bl.localDataPart2 < cl.localDataPart2))) }
// SPDX-License-Identifier:Apache-2.0 package native import "time" const ( backoffMax = 2 * time.Minute backoffFactor = 2 ) // backoff implements multiplicative backoff for retrying failing // operations. type backoff struct { nextDelay time.Duration } // Duration returns how long to wait before the next retry. func (b *backoff) Duration() time.Duration { ret := b.nextDelay if b.nextDelay == 0 { b.nextDelay = time.Second } else { b.nextDelay *= backoffFactor if b.nextDelay > backoffMax { b.nextDelay = backoffMax } } return ret } // Reset removes any existing backoff, so the next Duration() will // return 0. func (b *backoff) Reset() { b.nextDelay = 0 }
// SPDX-License-Identifier:Apache-2.0 package native import ( "bytes" "encoding/binary" "fmt" "io" "net" "time" "go.universe.tf/metallb/internal/bgp" "go.universe.tf/metallb/internal/bgp/community" "go.universe.tf/metallb/internal/safeconvert" ) func sendOpen(w io.Writer, asn uint32, routerID net.IP, holdTime time.Duration) error { if routerID.To4() == nil { panic("non-ipv4 address used as RouterID") } msg := struct { // Header Marker1, Marker2 uint64 Len uint16 Type uint8 // OPEN Version uint8 ASN16 uint16 HoldTime uint16 RouterID [4]byte // Options (we only send one, capabilities) OptsLen uint8 OptType uint8 OptLen uint8 // Capabilities: multiprotocol extension for IPv4+IPv6 // unicast, and 4-byte ASNs MP4Type uint8 MP4Len uint8 AFI4 uint16 SAFI4 uint16 MP6Type uint8 MP6Len uint8 AFI6 uint16 SAFI6 uint16 CapType uint8 CapLen uint8 ASN32 uint32 }{ Marker1: 0xffffffffffffffff, Marker2: 0xffffffffffffffff, Len: 0, // Filled below Type: 1, // OPEN Version: 4, ASN16: uint16(asn), // Possibly tweaked below HoldTime: uint16(holdTime.Seconds()), // RouterID filled below OptsLen: 20, OptType: 2, // Capabilities OptLen: 18, MP4Type: 1, // BGP Multi-protocol Extensions MP4Len: 4, AFI4: 1, // IPv4 SAFI4: 1, // Unicast MP6Type: 1, // BGP Multi-protocol Extensions MP6Len: 4, AFI6: 2, // IPv6 SAFI6: 1, // Unicast CapType: 65, // 4-byte ASN CapLen: 4, ASN32: asn, } var err error msg.Len, err = safeconvert.IntToUInt16(binary.Size(msg)) if err != nil { return fmt.Errorf("invalid message len %w", err) } if asn > 65535 { msg.ASN16 = 23456 } copy(msg.RouterID[:], routerID.To4()) return binary.Write(w, binary.BigEndian, msg) } type openResult struct { asn uint32 holdTime time.Duration mp4 bool mp6 bool // Four-byte ASN supported fbasn bool } var notificationCodes = map[uint16]string{ 0x0100: "Message header error (unspecific)", 0x0101: "Connection not synchronized", 0x0102: "Bad message length", 0x0103: "Bad message type", 0x0200: "OPEN message error (unspecific)", 0x0201: "Unsupported version number", 0x0202: "Bad peer AS", 0x0203: "Bad BGP identifier", 0x0204: "Unsupported optional parameter", 0x0206: "Unacceptable hold time", 0x0207: "Unsupported capability", 0x0300: "UPDATE message error (unspecific)", 0x0301: "Malformed Attribute List", 0x0302: "Unrecognized Well-known Attribute", 0x0303: "Missing Well-known Attribute", 0x0304: "Attribute Flags Error", 0x0305: "Attribute Length Error", 0x0306: "Invalid ORIGIN Attribute", 0x0308: "Invalid NEXT_HOP Attribute", 0x0309: "Optional Attribute Error", 0x030a: "Invalid Network Field", 0x030b: "Malformed AS_PATH", 0x0400: "Hold Timer Expired (unspecific)", 0x0500: "BGP FSM state error (unspecific)", 0x0501: "Receive Unexpected Message in OpenSent State", 0x0502: "Receive Unexpected Message in OpenConfirm State", 0x0503: "Receive Unexpected Message in Established State", 0x0601: "Maximum Number of Prefixes Reached", 0x0602: "Administrative Shutdown", 0x0603: "Peer De-configured", 0x0604: "Administrative Reset", 0x0605: "Connection Rejected", 0x0606: "Other Configuration Change", 0x0607: "Connection Collision Resolution", 0x0608: "Out of Resources", } // readNotification reads the body of a notification message (header // has already been consumed). It must always return an error, because // receiving a notification is an error. func readNotification(r io.Reader) error { var code uint16 if err := binary.Read(r, binary.BigEndian, &code); err != nil { return err } v, ok := notificationCodes[code] if !ok { v = "unknown code" } return fmt.Errorf("got BGP notification code 0x%04x (%s)", code, v) } func readOpen(r io.Reader) (*openResult, error) { hdr := struct { // Header Marker1, Marker2 uint64 Len uint16 Type uint8 }{} if err := binary.Read(r, binary.BigEndian, &hdr); err != nil { return nil, err } if hdr.Marker1 != 0xffffffffffffffff || hdr.Marker2 != 0xffffffffffffffff { return nil, fmt.Errorf("synchronization error, incorrect header marker") } if hdr.Type == 3 { return nil, readNotification(r) } if hdr.Type != 1 { return nil, fmt.Errorf("message type is not OPEN, got %d, want 1", hdr.Type) } if hdr.Len < 37 { return nil, fmt.Errorf("message length %d too small to be OPEN", hdr.Len) } lr := &io.LimitedReader{ R: r, N: int64(hdr.Len) - 19, } open := struct { Version uint8 ASN16 uint16 HoldTime uint16 RouterID uint32 OptsLen uint8 }{} if err := binary.Read(lr, binary.BigEndian, &open); err != nil { return nil, err } fmt.Printf("%#v\n", open) if open.Version != 4 { return nil, fmt.Errorf("wrong BGP version") } if open.HoldTime != 0 && open.HoldTime < 3 { return nil, fmt.Errorf("invalid hold time %q, must be 0 or >=3s", open.HoldTime) } ret := &openResult{ asn: uint32(open.ASN16), holdTime: time.Duration(open.HoldTime) * time.Second, } if err := readOptions(lr, ret); err != nil { return nil, err } return ret, nil } func readOptions(r io.Reader, ret *openResult) error { for { hdr := struct { Type uint8 Len uint8 }{} if err := binary.Read(r, binary.BigEndian, &hdr); err != nil { if err == io.EOF { return nil } return err } if hdr.Type != 2 { return fmt.Errorf("unknown BGP option type %d", hdr.Type) } lr := &io.LimitedReader{ R: r, N: int64(hdr.Len), } if err := readCapabilities(lr, ret); err != nil { return err } if lr.N != 0 { return fmt.Errorf("%d trailing garbage bytes after capability option", lr.N) } } } func readCapabilities(r io.Reader, ret *openResult) error { for { cap := struct { Code uint8 Len uint8 }{} if err := binary.Read(r, binary.BigEndian, &cap); err != nil { if err == io.EOF { return nil } return err } lr := io.LimitedReader{ R: r, N: int64(cap.Len), } switch cap.Code { case 65: if err := binary.Read(&lr, binary.BigEndian, &ret.asn); err != nil { return err } ret.fbasn = true case 1: af := struct{ AFI, SAFI uint16 }{} if err := binary.Read(&lr, binary.BigEndian, &af); err != nil { return err } switch { case af.AFI == 1 && af.SAFI == 1: ret.mp4 = true case af.AFI == 2 && af.SAFI == 1: ret.mp6 = true } default: // TODO: only ignore capabilities that we know are fine to // ignore. if _, err := io.Copy(io.Discard, &lr); err != nil { return err } } if lr.N != 0 { return fmt.Errorf("%d leftover bytes after decoding capability %d", lr.N, cap.Code) } } } func sendUpdate(w io.Writer, asn uint32, ibgp, fbasn bool, nextHop net.IP, adv *bgp.Advertisement) error { var b bytes.Buffer hdr := struct { M1, M2 uint64 Len uint16 Type uint8 WdrLen uint16 AttrLen uint16 }{ M1: uint64(0xffffffffffffffff), M2: uint64(0xffffffffffffffff), Type: 2, } if err := binary.Write(&b, binary.BigEndian, hdr); err != nil { return err } l := b.Len() if err := encodePathAttrs(&b, asn, ibgp, fbasn, nextHop, adv); err != nil { return err } toWrite, err := safeconvert.IntToUInt16(b.Len() - l) if err != nil { return err } binary.BigEndian.PutUint16(b.Bytes()[21:23], toWrite) encodePrefixes(&b, []*net.IPNet{adv.Prefix}) toWrite, err = safeconvert.IntToUInt16(b.Len()) if err != nil { return err } binary.BigEndian.PutUint16(b.Bytes()[16:18], toWrite) if _, err := io.Copy(w, &b); err != nil { return err } return nil } func encodePrefixes(b *bytes.Buffer, pfxs []*net.IPNet) { for _, pfx := range pfxs { o, _ := pfx.Mask.Size() b.WriteByte(byte(o)) b.Write(pfx.IP.To4()[:bytesForBits(o)]) } } func bytesForBits(n int) int { // Evil bit hack that rounds n up to the next multiple of 8, then // divides by 8. This returns the minimum number of whole bytes // required to contain n bits. return ((n + 7) &^ 7) / 8 } func encodePathAttrs(b *bytes.Buffer, asn uint32, ibgp, fbasn bool, nextHop net.IP, adv *bgp.Advertisement) error { b.Write([]byte{ 0x40, 1, // mandatory, origin 1, // len 0, // igb 0x40, 2, // mandatory, as-path }) if ibgp { b.WriteByte(0) // empty AS path } else { if fbasn { b.Write([]byte{ 6, // len (1x 4-byte ASN) 2, // AS_SEQUENCE 1, // len (in number of ASes) }) if err := binary.Write(b, binary.BigEndian, asn); err != nil { return err } } else { b.Write([]byte{ 4, // len (1x 2-byte ASN) 2, // AS_SEQUENCE 1, // len (in number of ASes) }) asnToWrite, err := safeconvert.Uint32ToInt16(asn) if err != nil { return fmt.Errorf("invalid asn: %w", err) } if err := binary.Write(b, binary.BigEndian, asnToWrite); err != nil { return err } } } b.Write([]byte{ 0x40, 3, // mandatory, next-hop 4, // len }) b.Write(nextHop) if ibgp { b.Write([]byte{ 0x40, 5, // well-known, localpref 4, // len }) if err := binary.Write(b, binary.BigEndian, adv.LocalPref); err != nil { return err } } if len(adv.Communities) > 0 { // Convert communities to legacy communities uint32 representation. Throw an error if any non legacy type // communities were found. var legacyCommunities []uint32 for _, c := range adv.Communities { legacyCommunity, ok := c.(community.BGPCommunityLegacy) if !ok { return fmt.Errorf("invalid community type for BGP native mode, community %s is not a legacy BGP "+ "Community", c) } legacyCommunities = append(legacyCommunities, legacyCommunity.ToUint32()) } b.Write([]byte{ 0xc0, 8, // optional transitive, communities }) toWrite, err := safeconvert.IntToUInt8(len(legacyCommunities) * 4) if err != nil { return fmt.Errorf("invalid size of legacy communities: %w", err) } if err := binary.Write(b, binary.BigEndian, toWrite); err != nil { return err } for _, c := range legacyCommunities { if err := binary.Write(b, binary.BigEndian, c); err != nil { return err } } } return nil } func sendWithdraw(w io.Writer, prefixes []*net.IPNet) error { var b bytes.Buffer hdr := struct { M1, M2 uint64 Len uint16 Type uint8 WdrLen uint16 }{ M1: uint64(0xffffffffffffffff), M2: uint64(0xffffffffffffffff), Type: 2, } if err := binary.Write(&b, binary.BigEndian, hdr); err != nil { return err } l := b.Len() encodePrefixes(&b, prefixes) toWrite, err := safeconvert.IntToUInt16(b.Len() - l) if err != nil { return fmt.Errorf("invalid buffer %w", err) } binary.BigEndian.PutUint16(b.Bytes()[19:21], toWrite) if err := binary.Write(&b, binary.BigEndian, uint16(0)); err != nil { return err } toWrite, err = safeconvert.IntToUInt16(b.Len()) if err != nil { return fmt.Errorf("invalid buffer %w", err) } binary.BigEndian.PutUint16(b.Bytes()[16:18], toWrite) if _, err := io.Copy(w, &b); err != nil { return err } return nil } func sendKeepalive(w io.Writer) error { msg := struct { Marker1, Marker2 uint64 Len uint16 Type uint8 }{ Marker1: 0xffffffffffffffff, Marker2: 0xffffffffffffffff, Len: 19, Type: 4, } return binary.Write(w, binary.BigEndian, msg) }
// SPDX-License-Identifier:Apache-2.0 package native import ( "bytes" "context" "encoding/binary" "errors" "fmt" "hash/crc32" "io" "net" "os" "sync" "syscall" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "go.universe.tf/metallb/internal/bgp" "go.universe.tf/metallb/internal/config" "go.universe.tf/metallb/internal/safeconvert" "golang.org/x/sys/unix" ) var errClosed = errors.New("session closed") // session represents one BGP session to an external router. type session struct { bgp.SessionParameters peerFBASNSupport bool logger log.Logger newHoldTime chan bool backoff backoff mu sync.Mutex cond *sync.Cond closed bool conn net.Conn actualHoldTime time.Duration nextHop net.IP advertised map[string]*bgp.Advertisement new map[string]*bgp.Advertisement // peerName identifies this BGP session to be used for metrics peerName string } // The 'Native' implementation does not require a session manager . type sessionManager struct { } func NewSessionManager(l log.Logger) bgp.SessionManager { return &sessionManager{} } // NewSession() creates a BGP session using the given session parameters. // // The session will immediately try to connect and synchronize its // local state with the peer. func (sm *sessionManager) NewSession(l log.Logger, args bgp.SessionParameters) (bgp.Session, error) { sessionsParams := args // native mode does not support empty holdtime, // we explicitly set it to 90s in this case. if args.HoldTime == nil { ht := 90 * time.Second sessionsParams.HoldTime = &ht } ret := &session{ SessionParameters: sessionsParams, logger: log.With(l, "peer", args.PeerAddress, "localASN", args.MyASN, "peerASN", args.PeerASN), newHoldTime: make(chan bool, 1), advertised: map[string]*bgp.Advertisement{}, peerName: fmt.Sprintf("%s:%d", args.PeerAddress, args.PeerPort), } ret.cond = sync.NewCond(&ret.mu) go ret.sendKeepalives() go ret.run() stats.sessionUp.WithLabelValues(ret.PeerAddress).Set(0) stats.prefixes.WithLabelValues(ret.PeerAddress).Set(0) return ret, nil } func (sm *sessionManager) SyncBFDProfiles(profiles map[string]*config.BFDProfile) error { if len(profiles) > 0 { return errors.New("bfd profiles not supported in native mode") } return nil } func (sm *sessionManager) SyncExtraInfo(extras string) error { if extras != "" { return errors.New("bgp extra info not supported in native mode") } return nil } func (sm *sessionManager) SetEventCallback(func(interface{})) {} // run tries to stay connected to the peer, and pumps route updates to it. func (s *session) run() { defer stats.DeleteSession(s.peerName) for { if err := s.connect(); err != nil { if err == errClosed { return } level.Error(s.logger).Log("op", "connect", "error", err, "msg", "failed to connect to peer") backoff := s.backoff.Duration() time.Sleep(backoff) continue } stats.SessionUp(s.peerName) s.backoff.Reset() level.Info(s.logger).Log("event", "sessionUp", "msg", "BGP session established") if !s.sendUpdates() { return } stats.SessionDown(s.peerName) level.Warn(s.logger).Log("event", "sessionDown", "msg", "BGP session down") } } // sendUpdates waits for changes to desired advertisements, and pushes // them out to the peer. func (s *session) sendUpdates() bool { s.mu.Lock() defer s.mu.Unlock() if s.closed { return false } if s.conn == nil { return true } ibgp := s.MyASN == s.PeerASN fbasn := s.peerFBASNSupport if s.new != nil { s.advertised, s.new = s.new, nil } for c, adv := range s.advertised { if err := sendUpdate(s.conn, s.MyASN, ibgp, fbasn, s.nextHop, adv); err != nil { s.abort() level.Error(s.logger).Log("op", "sendUpdate", "ip", c, "error", err, "msg", "failed to send BGP update") return true } stats.UpdateSent(s.peerName) } stats.AdvertisedPrefixes(s.peerName, len(s.advertised)) for { for s.new == nil && s.conn != nil { s.cond.Wait() } if s.closed { return false } if s.conn == nil { return true } if s.new == nil { // nil is "no pending updates", contrast to a non-nil // empty map which means "withdraw all". continue } for c, adv := range s.new { if adv2, ok := s.advertised[c]; ok && adv.Equal(adv2) { // Peer already has correct state for this // advertisement, nothing to do. continue } if err := sendUpdate(s.conn, s.MyASN, ibgp, fbasn, s.nextHop, adv); err != nil { s.abort() level.Error(s.logger).Log("op", "sendUpdate", "prefix", c, "error", err, "msg", "failed to send BGP update") return true } stats.UpdateSent(s.peerName) } wdr := []*net.IPNet{} for c, adv := range s.advertised { if s.new[c] == nil { wdr = append(wdr, adv.Prefix) } } if len(wdr) > 0 { if err := sendWithdraw(s.conn, wdr); err != nil { s.abort() for _, pfx := range wdr { level.Error(s.logger).Log("op", "sendWithdraw", "prefix", pfx, "error", err, "msg", "failed to send BGP withdraw") } return true } stats.UpdateSent(s.peerName) } s.advertised, s.new = s.new, nil stats.AdvertisedPrefixes(s.peerName, len(s.advertised)) } } // connect establishes the BGP session with the peer. // Sets TCP_MD5 sockopt if password is !="". func (s *session) connect() error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return errClosed } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() deadline, _ := ctx.Deadline() conn, err := dialMD5(ctx, fmt.Sprintf("%s:%d", s.PeerAddress, s.PeerPort), s.SourceAddress, s.Password) if err != nil { return fmt.Errorf("dial %q: %s", s.PeerAddress, err) } if err = conn.SetDeadline(deadline); err != nil { conn.Close() return fmt.Errorf("setting deadline on conn to %q: %s", s.PeerAddress, err) } addr, ok := conn.LocalAddr().(*net.TCPAddr) if !ok { conn.Close() return fmt.Errorf("getting local addr for default nexthop to %q: %s", s.PeerAddress, err) } s.nextHop = addr.IP routerID := s.RouterID if routerID == nil { routerID, err = getRouterID(s.nextHop, s.CurrentNode) if err != nil { return err } } if err = sendOpen(conn, s.MyASN, routerID, *s.HoldTime); err != nil { conn.Close() return fmt.Errorf("send OPEN to %q: %s", s.PeerAddress, err) } op, err := readOpen(conn) if err != nil { conn.Close() return fmt.Errorf("read OPEN from %q: %s", s.PeerAddress, err) } if op.asn != s.PeerASN { conn.Close() return fmt.Errorf("unexpected peer ASN %d, want %d", op.asn, s.PeerASN) } s.peerFBASNSupport = op.fbasn if s.MyASN > 65536 && !s.peerFBASNSupport { conn.Close() return fmt.Errorf("peer does not support 4-byte ASNs") } // BGP session is established, clear the connect timeout deadline. if err := conn.SetDeadline(time.Time{}); err != nil { conn.Close() return fmt.Errorf("clearing deadline on conn to %q: %s", s.PeerAddress, err) } // Consume BGP messages until the connection closes. go s.consumeBGP(conn) // Send one keepalive to say that yes, we accept the OPEN. if err := sendKeepalive(conn); err != nil { conn.Close() return fmt.Errorf("accepting peer OPEN from %q: %s", s.PeerAddress, err) } // Set up regular keepalives from now on. s.actualHoldTime = *s.HoldTime if op.holdTime < s.actualHoldTime { s.actualHoldTime = op.holdTime } select { case s.newHoldTime <- true: default: } s.conn = conn return nil } func hashRouterID(hostname string) (net.IP, error) { buf := new(bytes.Buffer) err := binary.Write(buf, binary.LittleEndian, crc32.ChecksumIEEE([]byte(hostname))) if err != nil { return nil, err } return net.IP(buf.Bytes()), nil } // Ipv4; Use the address as-is. // Ipv6; Pick the first ipv4 address on the same interface as the address. func getRouterID(addr net.IP, myNode string) (net.IP, error) { if addr.To4() != nil { return addr, nil } ifaces, err := net.Interfaces() if err != nil { return hashRouterID(myNode) } for _, i := range ifaces { addrs, err := i.Addrs() if err != nil { continue } for _, a := range addrs { var ip net.IP switch v := a.(type) { case *net.IPNet: ip = v.IP case *net.IPAddr: ip = v.IP } if ip.Equal(addr) { // This is the interface. // Loop through the addresses again and search for ipv4 for _, a := range addrs { var ip net.IP switch v := a.(type) { case *net.IPNet: ip = v.IP case *net.IPAddr: ip = v.IP } if ip.To4() != nil { return ip, nil } } return hashRouterID(myNode) } } } return hashRouterID(myNode) } // sendKeepalives sends BGP KEEPALIVE packets at the negotiated rate // whenever the session is connected. func (s *session) sendKeepalives() { var ( t *time.Ticker ch <-chan time.Time ) for { select { case <-s.newHoldTime: s.mu.Lock() ht := s.actualHoldTime s.mu.Unlock() if t != nil { t.Stop() t = nil ch = nil } if ht != 0 { t = time.NewTicker(ht / 3) ch = t.C } case <-ch: if err := s.sendKeepalive(); err == errClosed { // Session has been closed by package caller, we're // done here. return } } } } // sendKeepalive sends a single BGP KEEPALIVE packet. func (s *session) sendKeepalive() error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return errClosed } if s.conn == nil { // No connection established, othing to do. return nil } if err := sendKeepalive(s.conn); err != nil { s.abort() level.Error(s.logger).Log("op", "sendKeepalive", "error", err, "msg", "failed to send keepalive") return fmt.Errorf("sending keepalive to %q: %s", s.PeerAddress, err) } return nil } // consumeBGP receives BGP messages from the peer, and ignores // them. It does minimal checks for the well-formedness of messages, // and terminates the connection if something looks wrong. func (s *session) consumeBGP(conn io.ReadCloser) { defer func() { s.mu.Lock() defer s.mu.Unlock() if s.conn == conn { s.abort() } else { conn.Close() } }() for { hdr := struct { Marker1, Marker2 uint64 Len uint16 Type uint8 }{} if err := binary.Read(conn, binary.BigEndian, &hdr); err != nil { // TODO: log, or propagate the error somehow. return } if hdr.Marker1 != 0xffffffffffffffff || hdr.Marker2 != 0xffffffffffffffff { // TODO: propagate return } if hdr.Type == 3 { // TODO: propagate better than just logging directly. err := readNotification(conn) level.Error(s.logger).Log("event", "peerNotification", "error", err, "msg", "peer sent notification, closing session") return } if _, err := io.Copy(io.Discard, io.LimitReader(conn, int64(hdr.Len)-19)); err != nil { // TODO: propagate return } } } func validate(adv *bgp.Advertisement) error { if adv.Prefix.IP.To4() == nil { return fmt.Errorf("cannot advertise non-v4 prefix %q", adv.Prefix) } if len(adv.Communities) > 63 { return fmt.Errorf("max supported communities is 63, got %d", len(adv.Communities)) } return nil } // Set updates the set of Advertisements that this session's peer should receive. // // Changes are propagated to the peer asynchronously, Set may return // before the peer learns about the changes. func (s *session) Set(advs ...*bgp.Advertisement) error { s.mu.Lock() defer s.mu.Unlock() newAdvs := map[string]*bgp.Advertisement{} for _, adv := range advs { err := validate(adv) if err != nil { return err } newAdvs[adv.Prefix.String()] = adv } s.new = newAdvs stats.PendingPrefixes(s.peerName, len(s.new)) s.cond.Broadcast() return nil } // abort closes any existing connection, updates stats, and cleans up // state ready for another connection attempt. func (s *session) abort() { if s.conn != nil { s.conn.Close() s.conn = nil stats.SessionDown(s.peerName) } // Next time we retry the connection, we can just skip straight to // the desired end state. if s.new != nil { s.advertised, s.new = s.new, nil stats.PendingPrefixes(s.peerName, len(s.advertised)) } s.cond.Broadcast() } // Close shuts down the BGP session. func (s *session) Close() error { s.mu.Lock() defer s.mu.Unlock() s.closed = true s.abort() return nil } // DialTCP does the part of creating a connection manually, including setting the // proper TCP MD5 options when the password is not empty. Works by manipulating // the low level FD's, skipping the net.Conn API as it has not hooks to set // the necessary sockopts for TCP MD5. func dialMD5(ctx context.Context, addr string, srcAddr net.IP, password string) (net.Conn, error) { // If srcAddr exists on any of the local network interfaces, use it as the // source address of the TCP socket. Otherwise, use the IPv6 unspecified // address ("::") to let the kernel figure out the source address. // NOTE: On Linux, "::" also includes "0.0.0.0" (all IPv4 addresses). a := "[::]" if srcAddr != nil { ifs, err := net.Interfaces() if err != nil { return nil, fmt.Errorf("querying local interfaces: %w", err) } if !localAddressExists(ifs, srcAddr) { return nil, fmt.Errorf("address %q doesn't exist on this host", srcAddr) } a = fmt.Sprintf("[%s]", srcAddr.String()) } laddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", a)) if err != nil { return nil, fmt.Errorf("error resolving local address: %s ", err) } raddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return nil, fmt.Errorf("invalid remote address: %s ", err) } var family int var ra, la unix.Sockaddr if raddr.IP.To4() != nil { family = unix.AF_INET rsockaddr := &unix.SockaddrInet4{Port: raddr.Port} copy(rsockaddr.Addr[:], raddr.IP.To4()) ra = rsockaddr lsockaddr := &unix.SockaddrInet4{} copy(lsockaddr.Addr[:], laddr.IP.To4()) la = lsockaddr } else { family = unix.AF_INET6 rsockaddr := &unix.SockaddrInet6{Port: raddr.Port} copy(rsockaddr.Addr[:], raddr.IP.To16()) ra = rsockaddr var zone uint32 if laddr.Zone != "" { intf, errs := net.InterfaceByName(laddr.Zone) if errs != nil { return nil, errs } zone, err = safeconvert.IntToUInt32(intf.Index) if err != nil { return nil, fmt.Errorf("invalid interface index %d", intf.Index) } } lsockaddr := &unix.SockaddrInet6{ZoneId: zone} copy(lsockaddr.Addr[:], laddr.IP.To16()) la = lsockaddr } sockType := unix.SOCK_STREAM | unix.SOCK_CLOEXEC | unix.SOCK_NONBLOCK proto := 0 fd, err := unix.Socket(family, sockType, proto) if err != nil { return nil, err } // A new socket was created so we must close it before this // function returns either on failure or success. On success, // net.FileConn() in newTCPConn() increases the refcount of // the socket so this fi.Close() doesn't destroy the socket. // The caller must call Close() with the file later. // Note that the above os.NewFile() doesn't play with the // refcount. fi := os.NewFile(uintptr(fd), "") defer func() { if tmpErr := fi.Close(); tmpErr != nil { err = tmpErr } }() if password != "" { sig, err := buildTCPMD5Sig(raddr.IP, password) if err != nil { return nil, err } // Better way may be available in Go 1.11, see go-review.googlesource.com/c/go/+/72810 if err = os.NewSyscallError("setsockopt", unix.SetsockoptTCPMD5Sig(fd, unix.IPPROTO_TCP, unix.TCP_MD5SIG, sig)); err != nil { return nil, err } } if err = unix.Bind(fd, la); err != nil { return nil, os.NewSyscallError("bind", err) } err = unix.Connect(fd, ra) switch err { case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR: case nil: return net.FileConn(fi) default: return nil, os.NewSyscallError("connect", err) } // With a non-blocking socket, the connection process is // asynchronous, so we need to manually wait with epoll until the // connection succeeds. All of the following is doing that, with // appropriate use of the deadline in the context. epfd, err := unix.EpollCreate1(syscall.EPOLL_CLOEXEC) if err != nil { return nil, err } defer unix.Close(epfd) var event unix.EpollEvent events := make([]unix.EpollEvent, 1) event.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLPRI event.Fd, err = safeconvert.IntToInt32(fd) if err != nil { return nil, fmt.Errorf("invalid fd %w", err) } if err = unix.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { return nil, err } for { timeout := int(-1) if deadline, ok := ctx.Deadline(); ok { timeout = int(time.Until(deadline).Nanoseconds() / 1000000) if timeout <= 0 { return nil, fmt.Errorf("timeout") } } nevents, err := unix.EpollWait(epfd, events, timeout) if err != nil { return nil, err } if nevents == 0 { return nil, fmt.Errorf("timeout") } fdToCheck, err := safeconvert.IntToInt32(fd) if err != nil { return nil, fmt.Errorf("invalid fd %w", err) } if nevents > 1 || events[0].Fd != fdToCheck { return nil, fmt.Errorf("unexpected epoll behavior") } nerr, err := unix.GetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_ERROR) if err != nil { return nil, os.NewSyscallError("getsockopt", err) } switch err := syscall.Errno(nerr); err { case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR: case syscall.Errno(0), unix.EISCONN: return net.FileConn(fi) default: return nil, os.NewSyscallError("getsockopt", err) } } } func buildTCPMD5Sig(addr net.IP, key string) (*unix.TCPMD5Sig, error) { t := unix.TCPMD5Sig{} if addr.To4() != nil { t.Addr.Family = unix.AF_INET copy(t.Addr.Data[2:], addr.To4()) } else { t.Addr.Family = unix.AF_INET6 copy(t.Addr.Data[6:], addr.To16()) } var err error t.Keylen, err = safeconvert.IntToUInt16(len(key)) if err != nil { return nil, fmt.Errorf("invalid keyLen %w", err) } copy(t.Key[0:], []byte(key)) return &t, nil } // localAddressExists returns true if the address addr exists on any of the // network interfaces in the ifs slice. func localAddressExists(ifs []net.Interface, addr net.IP) bool { for _, i := range ifs { addresses, err := i.Addrs() if err != nil { continue } for _, a := range addresses { ip, ok := a.(*net.IPNet) if !ok { continue } if ip.IP.Equal(addr) { return true } } } return false }
// SPDX-License-Identifier:Apache-2.0 package native import ( "github.com/prometheus/client_golang/prometheus" bgpmetrics "go.universe.tf/metallb/internal/bgp/metrics" ) var labels = []string{"peer"} var stats = metrics{ sessionUp: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: bgpmetrics.Namespace, Subsystem: bgpmetrics.Subsystem, Name: bgpmetrics.SessionUp.Name, Help: bgpmetrics.SessionUp.Help, }, labels), updatesSent: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: bgpmetrics.Namespace, Subsystem: bgpmetrics.Subsystem, Name: bgpmetrics.UpdatesSent.Name, Help: bgpmetrics.UpdatesSent.Help, }, labels), prefixes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: bgpmetrics.Namespace, Subsystem: bgpmetrics.Subsystem, Name: bgpmetrics.Prefixes.Name, Help: bgpmetrics.Prefixes.Help, }, labels), pendingPrefixes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: bgpmetrics.Namespace, Subsystem: bgpmetrics.Subsystem, Name: "pending_prefixes_total", Help: "Number of prefixes that should be advertised on the BGP session", }, labels), } type metrics struct { sessionUp *prometheus.GaugeVec updatesSent *prometheus.CounterVec prefixes *prometheus.GaugeVec pendingPrefixes *prometheus.GaugeVec } func init() { prometheus.MustRegister(stats.sessionUp) prometheus.MustRegister(stats.updatesSent) prometheus.MustRegister(stats.prefixes) prometheus.MustRegister(stats.pendingPrefixes) } func (m *metrics) DeleteSession(addr string) { m.sessionUp.DeleteLabelValues(addr) m.prefixes.DeleteLabelValues(addr) m.pendingPrefixes.DeleteLabelValues(addr) m.updatesSent.DeleteLabelValues(addr) } func (m *metrics) SessionUp(addr string) { m.sessionUp.WithLabelValues(addr).Set(1) m.prefixes.WithLabelValues(addr).Set(0) } func (m *metrics) SessionDown(addr string) { m.sessionUp.WithLabelValues(addr).Set(0) m.prefixes.WithLabelValues(addr).Set(0) } func (m *metrics) UpdateSent(addr string) { m.updatesSent.WithLabelValues(addr).Inc() } func (m *metrics) PendingPrefixes(addr string, n int) { m.pendingPrefixes.WithLabelValues(addr).Set(float64(n)) } func (m *metrics) AdvertisedPrefixes(addr string, n int) { m.prefixes.WithLabelValues(addr).Set(float64(n)) m.pendingPrefixes.WithLabelValues(addr).Set(float64(n)) }
// Copyright 2017 Google Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package config // import "go.universe.tf/metallb/internal/config" import ( "bytes" "fmt" "net" "reflect" "slices" "sort" "strings" "time" "errors" "github.com/mikioh/ipaddr" metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" metallbv1beta2 "go.universe.tf/metallb/api/v1beta2" "go.universe.tf/metallb/internal/bgp/community" "go.universe.tf/metallb/internal/ipfamily" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/ptr" ) type ClusterResources struct { Pools []metallbv1beta1.IPAddressPool `json:"ipaddresspools"` Peers []metallbv1beta2.BGPPeer `json:"bgppeers"` BFDProfiles []metallbv1beta1.BFDProfile `json:"bfdprofiles"` BGPAdvs []metallbv1beta1.BGPAdvertisement `json:"bgpadvertisements"` L2Advs []metallbv1beta1.L2Advertisement `json:"l2advertisements"` Communities []metallbv1beta1.Community `json:"communities"` PasswordSecrets map[string]corev1.Secret `json:"passwordsecrets"` Nodes []corev1.Node `json:"nodes"` Namespaces []corev1.Namespace `json:"namespaces"` BGPExtras corev1.ConfigMap `json:"bgpextras"` } // Config is a parsed MetalLB configuration. type Config struct { // Routers that MetalLB should peer with. Peers map[string]*Peer // Address pools from which to allocate load balancer IPs. Pools *Pools // BFD profiles that can be used by peers. BFDProfiles map[string]*BFDProfile // Protocol dependent extra config. Currently used only by FRR BGPExtras string } // Pools contains address pools and its namespace/service specific allocations. type Pools struct { // ByName a map containing all configured pools. ByName map[string]*Pool // ByNamespace contains pool names pinned to specific namespace. ByNamespace map[string][]string // ByServiceSelector contains pool names which has service selection labels. ByServiceSelector []string } // Proto holds the protocol we are speaking. type Proto string // MetalLB supported protocols. const ( BGP Proto = "bgp" Layer2 Proto = "layer2" ) const bgpExtrasField = "extras" var Protocols = []Proto{ BGP, Layer2, } // Peer is the configuration of a BGP peering session. type Peer struct { // Peer name. Name string // AS number to use for the local end of the session. MyASN uint32 // AS number to expect from the remote end of the session. ASN uint32 // Detect the AS number to use for the remote end of the session. DynamicASN string // Address to dial when establishing the session. Addr net.IP // Iface is the Interface to use for Unnumbered BGP peering. // Addr field must be nil. Iface string // Source address to use when establishing the session. SrcAddr net.IP // Port to dial when establishing the session. Port uint16 // Requested BGP hold time, per RFC4271. HoldTime *time.Duration // Requested BGP keepalive time, per RFC4271. KeepaliveTime *time.Duration // Requested BGP connect time, controls how long BGP waits between connection attempts to a neighbor. ConnectTime *time.Duration // BGP router ID to advertise to the peer RouterID net.IP // Only connect to this peer on nodes that match one of these // selectors. NodeSelectors []labels.Selector // Authentication password for routers enforcing TCP MD5 authenticated sessions Password string // The password set in the secret referenced by the secret reference, in clear text SecretPassword string // Optional reference to the secret that holds the password. PasswordRef corev1.SecretReference // The optional BFD profile to be used for this BGP session BFDProfile string // Optional EnableGracefulRestart enable BGP graceful restart functionality at the peer level. EnableGracefulRestart bool // Optional ebgp peer is multi-hops away. EBGPMultiHop bool // Optional name of the vrf to establish the session from VRF string // Option to advertise v4 addresses over v6 sessions and viceversa. DualStackAddressFamily bool } // Pool is the configuration of an IP address pool. type Pool struct { // Pool Name Name string // The addresses that are part of this pool, expressed as CIDR // prefixes. config.Parse guarantees that these are // non-overlapping, both within and between pools. CIDR []*net.IPNet // Some buggy consumer devices mistakenly drop IPv4 traffic for IP // addresses ending in .0 or .255, due to poor implementations of // smurf protection. This setting marks such addresses as // unusable, for maximum compatibility with ancient parts of the // internet. AvoidBuggyIPs bool // If false, prevents IP addresses to be automatically assigned // from this pool. AutoAssign bool // The list of BGPAdvertisements associated with this address pool. BGPAdvertisements []*BGPAdvertisement // The list of L2Advertisements associated with this address pool. L2Advertisements []*L2Advertisement cidrsPerAddresses map[string][]*net.IPNet ServiceAllocations *ServiceAllocation } // ServiceAllocation makes ip pool allocation to specific namespace and/or service. type ServiceAllocation struct { // The priority of ip pool for a given service allocation. Priority int // Set of namespaces on which ip pool can be attached. Namespaces sets.Set[string] // Service selectors to select service for which ip pool can be used // for ip allocation. ServiceSelectors []labels.Selector } // BGPAdvertisement describes one translation from an IP address to a BGP advertisement. type BGPAdvertisement struct { // The name of the advertisement Name string // Roll up the IP address into a CIDR prefix of this // length. Optional, defaults to 32 (i.e. no aggregation) if not // specified. AggregationLength int // Optional, defaults to 128 (i.e. no aggregation) if not // specified. AggregationLengthV6 int // Value of the LOCAL_PREF BGP path attribute. Used only when // advertising to IBGP peers (i.e. Peer.MyASN == Peer.ASN). LocalPref uint32 // Value of the COMMUNITIES path attribute. Communities map[community.BGPCommunity]bool // The map of nodes allowed for this advertisement Nodes map[string]bool // Used to declare the intent of announcing IPs // only to the BGPPeers in this list. Peers []string } type L2Advertisement struct { // The map of nodes allowed for this advertisement Nodes map[string]bool // The interfaces in Nodes allowed for this advertisement Interfaces []string // AllInterfaces tells if all the interfaces are allowed for this advertisement AllInterfaces bool } // BFDProfile describes a BFD profile to be applied to a set of peers. type BFDProfile struct { Name string ReceiveInterval *uint32 TransmitInterval *uint32 DetectMultiplier *uint32 EchoInterval *uint32 EchoMode bool PassiveMode bool MinimumTTL *uint32 } func (p *Pools) IsEmpty(pool string) bool { return p.ByName[pool] == nil } // Parse loads and validates a Config from bs. func For(resources ClusterResources, validate Validate) (*Config, error) { err := validate(resources) if err != nil { return nil, err } cfg := &Config{} cfg.BFDProfiles, err = bfdProfilesFor(resources) if err != nil { return nil, err } cfg.Peers, err = peersFor(resources, cfg.BFDProfiles) if err != nil { return nil, err } cfg.Pools, err = poolsFor(resources) if err != nil { return nil, err } cfg.BGPExtras = bgpExtrasFor(resources) if err != nil { return nil, err } err = validateConfig(cfg) if err != nil { return nil, err } return cfg, nil } func bfdProfilesFor(resources ClusterResources) (map[string]*BFDProfile, error) { res := make(map[string]*BFDProfile) for i, bfd := range resources.BFDProfiles { parsed, err := bfdProfileFromCR(bfd) if err != nil { return nil, fmt.Errorf("parsing bfd profile #%d: %s", i+1, err) } if _, ok := res[parsed.Name]; ok { return nil, fmt.Errorf("found duplicate bfd profile name %s", parsed.Name) } res[bfd.Name] = parsed } return res, nil } func peersFor(resources ClusterResources, BFDProfiles map[string]*BFDProfile) (map[string]*Peer, error) { var res = make(map[string]*Peer) for _, p := range resources.Peers { peer, err := peerFromCR(p, resources.PasswordSecrets) if err != nil { return nil, fmt.Errorf("parsing peer %s %w", p.Name, err) } if peer.BFDProfile != "" { if _, ok := BFDProfiles[peer.BFDProfile]; !ok { return nil, TransientError{fmt.Sprintf("peer %s referencing non existing bfd profile %s", p.Name, peer.BFDProfile)} } } for _, ep := range res { // TODO: Be smarter regarding conflicting peers. For example, two // peers could have a different hold time but they'd still result // in two BGP sessions between the speaker and the remote host. if reflect.DeepEqual(peer, ep) { return nil, fmt.Errorf("peer %s already exists", p.Name) } } res[peer.Name] = peer } return res, nil } func poolsFor(resources ClusterResources) (*Pools, error) { pools := make(map[string]*Pool) communities, err := communitiesFromCrs(resources.Communities) if err != nil { return nil, err } var allCIDRs []*net.IPNet for _, p := range resources.Pools { pool, err := addressPoolFromCR(p, resources.Namespaces) if err != nil { return nil, fmt.Errorf("parsing address pool %s: %s", p.Name, err) } // Check that the pool isn't already defined if pools[p.Name] != nil { return nil, fmt.Errorf("duplicate definition of pool %q", p.Name) } // Check that all specified CIDR ranges are non-overlapping. for _, cidr := range pool.CIDR { for _, m := range allCIDRs { if cidrsOverlap(cidr, m) { return nil, fmt.Errorf("CIDR %q in pool %q overlaps with already defined CIDR %q", cidr, p.Name, m) } } allCIDRs = append(allCIDRs, cidr) } pools[p.Name] = pool } err = setL2AdvertisementsToPools(resources.Pools, resources.L2Advs, resources.Nodes, pools) if err != nil { return nil, err } err = validateDuplicateBGPAdvertisements(resources.BGPAdvs) if err != nil { return nil, err } err = setBGPAdvertisementsToPools(resources.Pools, resources.BGPAdvs, resources.Nodes, pools, communities) if err != nil { return nil, err } return &Pools{ByName: pools, ByNamespace: poolsByNamespace(pools), ByServiceSelector: poolsByServiceSelector(pools)}, nil } func bgpExtrasFor(resources ClusterResources) string { if resources.BGPExtras.Data == nil { return "" } return resources.BGPExtras.Data[bgpExtrasField] } func communitiesFromCrs(cs []metallbv1beta1.Community) (map[string]community.BGPCommunity, error) { communities := map[string]community.BGPCommunity{} for _, c := range cs { for _, communityAlias := range c.Spec.Communities { v, err := community.New(communityAlias.Value) if err != nil { return nil, fmt.Errorf("parsing community %q: %s", communityAlias.Name, err) } if _, ok := communities[communityAlias.Name]; ok { return nil, fmt.Errorf("duplicate definition of community %q", communityAlias.Name) } communities[communityAlias.Name] = v } } return communities, nil } func peerFromCR(p metallbv1beta2.BGPPeer, passwordSecrets map[string]corev1.Secret) (*Peer, error) { if p.Spec.MyASN == 0 { return nil, errors.New("missing local ASN") } if p.Spec.ASN == 0 && p.Spec.DynamicASN == "" { return nil, errors.New("missing peer ASN and dynamicASN") } if p.Spec.ASN != 0 && p.Spec.DynamicASN != "" { return nil, errors.New("both peer ASN and dynamicASN specified") } if p.Spec.DynamicASN != "" && p.Spec.DynamicASN != metallbv1beta2.InternalASNMode && p.Spec.DynamicASN != metallbv1beta2.ExternalASNMode { return nil, fmt.Errorf("invalid dynamicASN %s", p.Spec.DynamicASN) } if p.Spec.ASN == p.Spec.MyASN && p.Spec.EBGPMultiHop { return nil, errors.New("invalid ebgp-multihop parameter set for an ibgp peer") } if p.Spec.Address == "" && p.Spec.Interface == "" { return nil, fmt.Errorf("peer has no Address or Interface specified") } if p.Spec.Address != "" && p.Spec.Interface != "" { return nil, fmt.Errorf("peer has both Address and Interface specified") } holdTime, keepaliveTime, err := parseTimers(p.Spec.HoldTime, p.Spec.KeepaliveTime) if err != nil { return nil, fmt.Errorf("invalid BGPPeer timers: %w", err) } var ip net.IP if p.Spec.Address != "" { ip = net.ParseIP(p.Spec.Address) if ip == nil { return nil, fmt.Errorf("invalid BGPPeer address %q", p.Spec.Address) } } // Ideally we would set a default RouterID here, instead of having // to do it elsewhere in the code. Unfortunately, we don't know // the node IP here. var routerID net.IP if p.Spec.RouterID != "" { routerID = net.ParseIP(p.Spec.RouterID) if routerID == nil { return nil, fmt.Errorf("invalid router ID %q", p.Spec.RouterID) } } src := net.ParseIP(p.Spec.SrcAddress) if p.Spec.SrcAddress != "" && src == nil { return nil, fmt.Errorf("invalid source IP %q", p.Spec.SrcAddress) } err = validateLabelSelectorDuplicate(p.Spec.NodeSelectors, "nodeSelectors") if err != nil { return nil, err } var nodeSels []labels.Selector for _, s := range p.Spec.NodeSelectors { labelSelector, err := metav1.LabelSelectorAsSelector(&s) if err != nil { return nil, errors.Join(err, fmt.Errorf("failed to convert peer %s node selector", p.Name)) } nodeSels = append(nodeSels, labelSelector) } if len(nodeSels) == 0 { nodeSels = []labels.Selector{labels.Everything()} } if p.Spec.Password != "" && p.Spec.PasswordSecret.Name != "" { return nil, fmt.Errorf("can not have both password and secret ref set in peer config %q/%q", p.Namespace, p.Name) } secretPassword := "" if p.Spec.PasswordSecret.Name != "" { secretPassword, err = passwordFromSecretForPeer(p, passwordSecrets) if err != nil { return nil, errors.Join(err, fmt.Errorf("failed to parse peer %s password secret", p.Name)) } } var connectTime *time.Duration if p.Spec.ConnectTime != nil { connectTime = ptr.To(p.Spec.ConnectTime.Duration) } return &Peer{ Name: p.Name, MyASN: p.Spec.MyASN, ASN: p.Spec.ASN, DynamicASN: string(p.Spec.DynamicASN), Addr: ip, Iface: p.Spec.Interface, SrcAddr: src, Port: p.Spec.Port, HoldTime: holdTime, KeepaliveTime: keepaliveTime, ConnectTime: connectTime, RouterID: routerID, NodeSelectors: nodeSels, SecretPassword: secretPassword, Password: p.Spec.Password, PasswordRef: p.Spec.PasswordSecret, BFDProfile: p.Spec.BFDProfile, EnableGracefulRestart: p.Spec.EnableGracefulRestart, EBGPMultiHop: p.Spec.EBGPMultiHop, VRF: p.Spec.VRFName, DualStackAddressFamily: p.Spec.DualStackAddressFamily, }, nil } func passwordFromSecretForPeer(p metallbv1beta2.BGPPeer, passwordSecrets map[string]corev1.Secret) (string, error) { secret, ok := passwordSecrets[p.Spec.PasswordSecret.Name] if !ok { return "", TransientError{Message: fmt.Sprintf("secret ref not found for peer config %q/%q", p.Namespace, p.Name)} } if secret.Type != corev1.SecretTypeBasicAuth { return "", fmt.Errorf("secret type mismatch on %q/%q, type %q is expected ", secret.Namespace, secret.Name, corev1.SecretTypeBasicAuth) } srcPass, ok := secret.Data["password"] if !ok { return "", fmt.Errorf("password not specified in the secret %q/%q", secret.Namespace, secret.Name) } return string(srcPass), nil } func addressPoolFromCR(p metallbv1beta1.IPAddressPool, namespaces []corev1.Namespace) (*Pool, error) { if p.Name == "" { return nil, errors.New("missing pool name") } ret := &Pool{ Name: p.Name, AvoidBuggyIPs: p.Spec.AvoidBuggyIPs, AutoAssign: true, } if p.Spec.AutoAssign != nil { ret.AutoAssign = *p.Spec.AutoAssign } if len(p.Spec.Addresses) == 0 { return nil, errors.New("pool has no prefixes defined") } ret.cidrsPerAddresses = map[string][]*net.IPNet{} for _, cidr := range p.Spec.Addresses { nets, err := ParseCIDR(cidr) if err != nil { return nil, fmt.Errorf("invalid CIDR %q in pool %q: %s", cidr, p.Name, err) } ret.CIDR = append(ret.CIDR, nets...) ret.cidrsPerAddresses[cidr] = nets } serviceAllocations, err := addressPoolServiceAllocationsFromCR(p, namespaces) if err != nil { return nil, err } ret.ServiceAllocations = serviceAllocations return ret, nil } func addressPoolServiceAllocationsFromCR(p metallbv1beta1.IPAddressPool, namespaces []corev1.Namespace) (*ServiceAllocation, error) { if p.Spec.AllocateTo == nil { return nil, nil } poolNamespaces := sets.Set[string]{} for _, poolNs := range p.Spec.AllocateTo.Namespaces { if poolNamespaces.Has(poolNs) { return nil, errors.New("duplicate definition in namespaces field") } poolNamespaces.Insert(poolNs) } serviceAllocations := &ServiceAllocation{Priority: p.Spec.AllocateTo.Priority, Namespaces: poolNamespaces} if len(poolNamespaces) == 0 && len(p.Spec.AllocateTo.NamespaceSelectors) == 0 && len(p.Spec.AllocateTo.ServiceSelectors) == 0 { // If no specific namespaces or service selectors are set, match everything serviceAllocations.ServiceSelectors = []labels.Selector{labels.Everything()} return serviceAllocations, nil } err := validateLabelSelectorDuplicate(p.Spec.AllocateTo.NamespaceSelectors, "namespaceSelectors") if err != nil { return nil, err } err = validateLabelSelectorDuplicate(p.Spec.AllocateTo.ServiceSelectors, "serviceSelectors") if err != nil { return nil, err } for i := range p.Spec.AllocateTo.NamespaceSelectors { l, err := metav1.LabelSelectorAsSelector(&p.Spec.AllocateTo.NamespaceSelectors[i]) if err != nil { return nil, errors.Join(err, fmt.Errorf("invalid namespace label selector %v in ip pool %s", &p.Spec.AllocateTo.NamespaceSelectors[i], p.Name)) } for _, ns := range namespaces { nsLabels := labels.Set(ns.Labels) if l.Matches(nsLabels) { serviceAllocations.Namespaces.Insert(ns.Name) } } } for i := range p.Spec.AllocateTo.ServiceSelectors { l, err := metav1.LabelSelectorAsSelector(&p.Spec.AllocateTo.ServiceSelectors[i]) if err != nil { return nil, errors.Join(err, fmt.Errorf("invalid service label selector %v in ip pool %s", p.Spec.AllocateTo.ServiceSelectors[i], p.Name)) } serviceAllocations.ServiceSelectors = append(serviceAllocations.ServiceSelectors, l) } return serviceAllocations, nil } func poolsByNamespace(pools map[string]*Pool) map[string][]string { var poolsForNamespace map[string][]string for _, pool := range pools { if pool.ServiceAllocations == nil { continue } if poolsForNamespace == nil && len(pool.ServiceAllocations.Namespaces) > 0 { poolsForNamespace = make(map[string][]string) } for namespace := range pool.ServiceAllocations.Namespaces { poolsForNamespace[namespace] = append(poolsForNamespace[namespace], pool.Name) } } return poolsForNamespace } func poolsByServiceSelector(pools map[string]*Pool) []string { var poolsByServiceSelector []string for _, pool := range pools { if pool.ServiceAllocations == nil || len(pool.ServiceAllocations.ServiceSelectors) == 0 { continue } poolsByServiceSelector = append(poolsByServiceSelector, pool.Name) } sort.Strings(poolsByServiceSelector) return poolsByServiceSelector } func bfdProfileFromCR(p metallbv1beta1.BFDProfile) (*BFDProfile, error) { if p.Name == "" { return nil, fmt.Errorf("missing bfd profile name") } res := &BFDProfile{} res.Name = p.Name var err error res.DetectMultiplier, err = bfdIntFromConfig(p.Spec.DetectMultiplier, 2, 255) if err != nil { return nil, errors.Join(err, fmt.Errorf("invalid detect multiplier value")) } res.ReceiveInterval, err = bfdIntFromConfig(p.Spec.ReceiveInterval, 10, 60000) if err != nil { return nil, errors.Join(err, fmt.Errorf("invalid receive interval value")) } res.TransmitInterval, err = bfdIntFromConfig(p.Spec.TransmitInterval, 10, 60000) if err != nil { return nil, errors.Join(err, fmt.Errorf("invalid transmit interval value")) } res.MinimumTTL, err = bfdIntFromConfig(p.Spec.MinimumTTL, 1, 254) if err != nil { return nil, errors.Join(err, fmt.Errorf("invalid minimum ttl value")) } res.EchoInterval, err = bfdIntFromConfig(p.Spec.EchoInterval, 10, 60000) if err != nil { return nil, errors.Join(err, fmt.Errorf("invalid echo interval value")) } if p.Spec.EchoMode != nil { res.EchoMode = *p.Spec.EchoMode } if p.Spec.PassiveMode != nil { res.PassiveMode = *p.Spec.PassiveMode } return res, nil } func setL2AdvertisementsToPools(ipPools []metallbv1beta1.IPAddressPool, l2Advs []metallbv1beta1.L2Advertisement, nodes []corev1.Node, ipPoolMap map[string]*Pool) error { for _, l2Adv := range l2Advs { adv, err := l2AdvertisementFromCR(l2Adv, nodes) if err != nil { return err } ipPoolsSelected, err := selectedPools(ipPools, l2Adv.Spec.IPAddressPoolSelectors) if err != nil { return err } // No pool selector means select all pools if len(l2Adv.Spec.IPAddressPools) == 0 && len(l2Adv.Spec.IPAddressPoolSelectors) == 0 { for _, pool := range ipPoolMap { if !containsAdvertisement(pool.L2Advertisements, adv) { pool.L2Advertisements = append(pool.L2Advertisements, adv) } } continue } for _, poolName := range append(l2Adv.Spec.IPAddressPools, ipPoolsSelected...) { if pool, ok := ipPoolMap[poolName]; ok { if !containsAdvertisement(pool.L2Advertisements, adv) { pool.L2Advertisements = append(pool.L2Advertisements, adv) } } } } return nil } func setBGPAdvertisementsToPools(ipPools []metallbv1beta1.IPAddressPool, bgpAdvs []metallbv1beta1.BGPAdvertisement, nodes []corev1.Node, ipPoolMap map[string]*Pool, communities map[string]community.BGPCommunity) error { for _, bgpAdv := range bgpAdvs { adv, err := bgpAdvertisementFromCR(bgpAdv, communities, nodes) if err != nil { return err } ipPoolsSelected, err := selectedPools(ipPools, bgpAdv.Spec.IPAddressPoolSelectors) if err != nil { return err } // No pool selector means select all pools if len(bgpAdv.Spec.IPAddressPools) == 0 && len(bgpAdv.Spec.IPAddressPoolSelectors) == 0 { for _, pool := range ipPoolMap { err := validateBGPAdvPerPool(adv, pool) if err != nil { return err } pool.BGPAdvertisements = append(pool.BGPAdvertisements, adv) } continue } for _, poolName := range append(bgpAdv.Spec.IPAddressPools, ipPoolsSelected...) { if pool, ok := ipPoolMap[poolName]; ok { err := validateBGPAdvPerPool(adv, pool) if err != nil { return err } pool.BGPAdvertisements = append(pool.BGPAdvertisements, adv) } } } return nil } func l2AdvertisementFromCR(crdAd metallbv1beta1.L2Advertisement, nodes []corev1.Node) (*L2Advertisement, error) { err := validateDuplicate(crdAd.Spec.IPAddressPools, "ipAddressPools") if err != nil { return nil, err } err = validateLabelSelectorDuplicate(crdAd.Spec.IPAddressPoolSelectors, "ipAddressPoolSelectors") if err != nil { return nil, err } err = validateLabelSelectorDuplicate(crdAd.Spec.NodeSelectors, "nodeSelectors") if err != nil { return nil, err } selected, err := selectedNodes(nodes, crdAd.Spec.NodeSelectors) if err != nil { return nil, errors.Join(err, fmt.Errorf("failed to parse node selector for %s", crdAd.Name)) } l2 := &L2Advertisement{ Nodes: selected, Interfaces: crdAd.Spec.Interfaces, } if len(crdAd.Spec.Interfaces) == 0 { l2.AllInterfaces = true } return l2, nil } func bgpAdvertisementFromCR(crdAd metallbv1beta1.BGPAdvertisement, communities map[string]community.BGPCommunity, nodes []corev1.Node) (*BGPAdvertisement, error) { err := validateDuplicate(crdAd.Spec.IPAddressPools, "ipAddressPools") if err != nil { return nil, err } err = validateDuplicate(crdAd.Spec.Communities, "community") if err != nil { return nil, err } err = validateDuplicate(crdAd.Spec.Peers, "peers") if err != nil { return nil, err } err = validateLabelSelectorDuplicate(crdAd.Spec.IPAddressPoolSelectors, "ipAddressPoolSelectors") if err != nil { return nil, err } err = validateLabelSelectorDuplicate(crdAd.Spec.NodeSelectors, "nodeSelectors") if err != nil { return nil, err } ad := &BGPAdvertisement{ Name: crdAd.Name, AggregationLength: 32, AggregationLengthV6: 128, LocalPref: 0, Communities: map[community.BGPCommunity]bool{}, } if crdAd.Spec.AggregationLength != nil { ad.AggregationLength = int(*crdAd.Spec.AggregationLength) // TODO CRD cast } if ad.AggregationLength > 32 { return nil, fmt.Errorf("invalid aggregation length %q for IPv4", ad.AggregationLength) } if crdAd.Spec.AggregationLengthV6 != nil { ad.AggregationLengthV6 = int(*crdAd.Spec.AggregationLengthV6) // TODO CRD cast if ad.AggregationLengthV6 > 128 { return nil, fmt.Errorf("invalid aggregation length %q for IPv6", ad.AggregationLengthV6) } } ad.LocalPref = crdAd.Spec.LocalPref if len(crdAd.Spec.Peers) > 0 { ad.Peers = make([]string, 0, len(crdAd.Spec.Peers)) ad.Peers = append(ad.Peers, crdAd.Spec.Peers...) } for _, c := range crdAd.Spec.Communities { v, err := getCommunityValue(c, communities) if err != nil { return nil, errors.Join(err, fmt.Errorf("invalid community %q in BGP advertisement", c)) } ad.Communities[v] = true } selected, err := selectedNodes(nodes, crdAd.Spec.NodeSelectors) if err != nil { return nil, errors.Join(err, fmt.Errorf("failed to parse node selector for %s", crdAd.Name)) } ad.Nodes = selected return ad, nil } // getCommunityValue returns the BGPCommunity from the communities map if it exists there. Otherwise, it creates a // new BGP community object from the provided communityString. func getCommunityValue(communityString string, communities map[string]community.BGPCommunity) (community.BGPCommunity, error) { if v, ok := communities[communityString]; ok { return v, nil } var c community.BGPCommunity c, err := community.New(communityString) if errors.Is(err, community.ErrInvalidCommunityValue) { return c, err } // Return TransientError on invalidCommunityFormat, in case it refers // a Community resource that doesn't exist yet. if errors.Is(err, community.ErrInvalidCommunityFormat) { return c, TransientError{err.Error()} } return c, nil } func parseTimers(ht, ka *metav1.Duration) (*time.Duration, *time.Duration, error) { if ht == nil && ka == nil { return nil, nil, nil } var holdTime *time.Duration var keepaliveTime *time.Duration if ht != nil && ka != nil { holdTime = &ht.Duration keepaliveTime = &ka.Duration } if ht != nil && ka == nil { holdTime = &ht.Duration keepaliveTime = ptr.To(ht.Duration / 3) } if ht == nil && ka != nil { holdTime = ptr.To(ka.Duration * 3) keepaliveTime = &ka.Duration } rounded := time.Duration(int(holdTime.Seconds())) * time.Second if rounded != 0 && rounded < 3*time.Second { return nil, nil, fmt.Errorf("invalid hold time %q: must be 0 or >=3s", ht) } if *keepaliveTime > *holdTime { return nil, nil, fmt.Errorf("invalid keepaliveTime %q, must be lower than holdTime %q", ka, ht) } return holdTime, keepaliveTime, nil } func validateBGPAdvPerPool(adv *BGPAdvertisement, pool *Pool) error { for addr, cidrs := range pool.cidrsPerAddresses { if len(cidrs) == 0 { continue } maxLength := adv.AggregationLength if cidrs[0].IP.To4() == nil { maxLength = adv.AggregationLengthV6 } // in case of range format, we may have a set of cidrs associated to a given address. // We reject if none of the cidrs are compatible with the aggregation length. lowest := lowestMask(cidrs) if maxLength < lowest { return fmt.Errorf("invalid aggregation length %d: prefix %d in "+ "this pool is more specific than the aggregation length for addresses %s", adv.AggregationLength, lowest, addr) } } // Verify that BGP ADVs set a unique local preference value per BGP update. for _, bgpAdv := range pool.BGPAdvertisements { if adv.LocalPref != bgpAdv.LocalPref { if !advertisementsAreCompatible(adv, bgpAdv, pool) { return fmt.Errorf("invalid local preference %d: local preferernce %d was "+ "already set for the same type of BGP update. Check existing BGP advertisements "+ "with common pools and aggregation lengths", adv.LocalPref, bgpAdv.LocalPref) } } } return nil } func advertisementsAreCompatible(newAdv, adv *BGPAdvertisement, pool *Pool) bool { if isAggrLengthDifferent(newAdv, adv, pool) { return true } // BGP ADVs with different set of BGP peers do not collide. if len(newAdv.Peers) != 0 && len(adv.Peers) != 0 { equalPeer := false for _, peer := range newAdv.Peers { if slices.Contains(adv.Peers, peer) { equalPeer = true break } } if !equalPeer { return true } } // BGP ADVs with different set of nodes do not collide. for node := range newAdv.Nodes { if _, ok := adv.Nodes[node]; ok { return false } } return true } func isAggrLengthDifferent(newAdv, adv *BGPAdvertisement, pool *Pool) bool { var hasV4, hasV6 bool for _, cidrs := range pool.cidrsPerAddresses { family := ipfamily.ForCIDR(cidrs[0]) if family == ipfamily.IPv4 { hasV4 = true } if family == ipfamily.IPv6 { hasV6 = true } if hasV4 && hasV6 { break } } if !hasV6 && !hasV4 { // compatible?! return true } if adv.AggregationLength != newAdv.AggregationLength && !hasV6 { return true } if adv.AggregationLengthV6 != newAdv.AggregationLengthV6 && !hasV4 { return true } // has both, both must be different if adv.AggregationLength != newAdv.AggregationLength && adv.AggregationLengthV6 != newAdv.AggregationLengthV6 { return true } return false } func ParseCIDR(cidr string) ([]*net.IPNet, error) { if !strings.Contains(cidr, "-") { _, n, err := net.ParseCIDR(cidr) if err != nil { return nil, fmt.Errorf("invalid CIDR %q", cidr) } return []*net.IPNet{n}, nil } fs := strings.SplitN(cidr, "-", 2) if len(fs) != 2 { return nil, fmt.Errorf("invalid IP range %q", cidr) } start := net.ParseIP(strings.TrimSpace(fs[0])) if start == nil { return nil, fmt.Errorf("invalid IP range %q: invalid start IP %q", cidr, fs[0]) } end := net.ParseIP(strings.TrimSpace(fs[1])) if end == nil { return nil, fmt.Errorf("invalid IP range %q: invalid end IP %q", cidr, fs[1]) } if bytes.Compare(start, end) > 0 { return nil, fmt.Errorf("invalid IP range %q: start IP %q is after the end IP %q", cidr, start, end) } var ret []*net.IPNet for _, pfx := range ipaddr.Summarize(start, end) { n := &net.IPNet{ IP: pfx.IP, Mask: pfx.Mask, } ret = append(ret, n) } return ret, nil } func cidrsOverlap(a, b *net.IPNet) bool { return cidrContainsCIDR(a, b) || cidrContainsCIDR(b, a) } func cidrContainsCIDR(outer, inner *net.IPNet) bool { ol, _ := outer.Mask.Size() il, _ := inner.Mask.Size() if ol == il && outer.IP.Equal(inner.IP) { return true } if ol < il && outer.Contains(inner.IP) { return true } return false } func lowestMask(cidrs []*net.IPNet) int { if len(cidrs) == 0 { return 0 } lowest, _ := cidrs[0].Mask.Size() for _, c := range cidrs { s, _ := c.Mask.Size() if lowest > s { lowest = s } } return lowest } func bfdIntFromConfig(value *uint32, min, max uint32) (*uint32, error) { if value == nil { return nil, nil } if *value < min || *value > max { return nil, fmt.Errorf("invalid value %d, must be in %d-%d range", *value, min, max) } return value, nil } func validateDuplicateBGPAdvertisements(ads []metallbv1beta1.BGPAdvertisement) error { for i := 0; i < len(ads); i++ { for j := i + 1; j < len(ads); j++ { if reflect.DeepEqual(ads[i], ads[j]) { return fmt.Errorf("duplicate definition of bgpadvertisements. advertisement %d and %d are equal", i+1, j+1) } } } return nil } func containsAdvertisement(advs []*L2Advertisement, toCheck *L2Advertisement) bool { for _, adv := range advs { if adv.AllInterfaces != toCheck.AllInterfaces { continue } if !reflect.DeepEqual(adv.Nodes, toCheck.Nodes) { continue } if !sets.New(adv.Interfaces...).Equal(sets.New(toCheck.Interfaces...)) { continue } return true } return false } func selectedNodes(nodes []corev1.Node, selectors []metav1.LabelSelector) (map[string]bool, error) { labelSelectors := []labels.Selector{} for _, selector := range selectors { l, err := metav1.LabelSelectorAsSelector(&selector) if err != nil { return nil, errors.Join(err, fmt.Errorf("invalid label selector %v", selector)) } labelSelectors = append(labelSelectors, l) } res := make(map[string]bool) OUTER: for _, node := range nodes { if len(labelSelectors) == 0 { // no selector mean all nodes are valid res[node.Name] = true } for _, s := range labelSelectors { nodeLabels := labels.Set(node.Labels) if s.Matches(nodeLabels) { res[node.Name] = true continue OUTER } } } return res, nil } func selectedPools(pools []metallbv1beta1.IPAddressPool, selectors []metav1.LabelSelector) ([]string, error) { labelSelectors := []labels.Selector{} for _, selector := range selectors { l, err := metav1.LabelSelectorAsSelector(&selector) if err != nil { return nil, errors.Join(err, fmt.Errorf("invalid label selector %v", selector)) } labelSelectors = append(labelSelectors, l) } var ipPools []string OUTER: for _, pool := range pools { for _, s := range labelSelectors { poolLabels := labels.Set(pool.Labels) if s.Matches(poolLabels) { ipPools = append(ipPools, pool.Name) continue OUTER } } } return ipPools, nil } func validateLabelSelectorDuplicate(labelSelectors []metav1.LabelSelector, labelSelectorType string) error { for _, ls := range labelSelectors { for _, me := range ls.MatchExpressions { sort.Strings(me.Values) } } for i := 0; i < len(labelSelectors); i++ { err := validateLabelSelectorMatchExpressions(labelSelectors[i].MatchExpressions) if err != nil { return err } for j := i + 1; j < len(labelSelectors); j++ { if labelSelectors[i].String() == labelSelectors[j].String() { return fmt.Errorf("duplicate definition of %s %q", labelSelectorType, labelSelectors[i]) } } } return nil } func validateLabelSelectorMatchExpressions(matchExpressions []metav1.LabelSelectorRequirement) error { for i := 0; i < len(matchExpressions); i++ { err := validateDuplicate(matchExpressions[i].Values, "match expression value in label selector") if err != nil { return err } for j := i + 1; j < len(matchExpressions); j++ { if matchExpressions[i].String() == matchExpressions[j].String() { return fmt.Errorf("duplicate definition of %s %q", "match expressions", matchExpressions[i]) } } } return nil } func validateDuplicate(strSlice []string, sliceType string) error { for i := 0; i < len(strSlice); i++ { for j := i + 1; j < len(strSlice); j++ { if strSlice[i] == strSlice[j] { return fmt.Errorf("duplicate definition of %s %q", sliceType, strSlice[i]) } } } return nil }
// SPDX-License-Identifier:Apache-2.0 package config import ( "errors" "fmt" metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" metallbv1beta2 "go.universe.tf/metallb/api/v1beta2" "go.universe.tf/metallb/internal/bgp/community" "go.universe.tf/metallb/internal/ipfamily" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" ) type Validate func(ClusterResources) error func ValidationFor(bgpImpl string) Validate { switch bgpImpl { case "frr": return DiscardNativeOnly case "frr-k8s": return DiscardNativeOnly case "native": return DiscardFRROnly } return DontValidate } // DiscardFRROnly returns an error if the current configFile contains // any options that are available only in the FRR implementation. func DiscardFRROnly(c ClusterResources) error { for _, p := range c.Peers { if p.Spec.BFDProfile != "" { return fmt.Errorf("peer %s has bfd-profile set on native bgp mode", p.Spec.Address) } if p.Spec.KeepaliveTime != nil && p.Spec.KeepaliveTime.Duration != 0 { return fmt.Errorf("peer %s has keepalive-time set on native bgp mode", p.Spec.Address) } if p.Spec.VRFName != "" { return fmt.Errorf("peer %s has vrf set on native bgp mode", p.Spec.Address) } if p.Spec.ConnectTime != nil { return fmt.Errorf("peer %s has connect time set on native bgp mode", p.Spec.Address) } if p.Spec.EnableGracefulRestart { return fmt.Errorf("peer %s has EnableGracefulRestart flag set on native bgp mode", p.Spec.Address) } if p.Spec.DisableMP { return fmt.Errorf("peer %s has disable MP flag set on native bgp mode", p.Spec.Address) } if p.Spec.DualStackAddressFamily { return fmt.Errorf("peer %s has dualstackaddressfamily flag set on native bgp mode", p.Spec.Address) } if p.Spec.DynamicASN != "" { return fmt.Errorf("peer %s has dynamicASN set on native bgp mode", p.Spec.Address) } if p.Spec.Interface != "" { return fmt.Errorf("peer %s has interface set on native bgp mode", p.Spec.Address) } } if len(c.BFDProfiles) > 0 { return errors.New("bfd profiles section set") } // Only IPv4 BGP advertisements are supported in native mode. if err := findIPv6BGPAdvertisement(c); err != nil { return err } // Only legacy type communities are supported in native mode. return findNonLegacyCommunity(c) } // findIPv6BGPAdvertisement checks for IPv6 addresses. If it finds at least one IPv6 BGP advertisement, it will throw // and error as it's not supported in native mode. func findIPv6BGPAdvertisement(c ClusterResources) error { if len(c.BGPAdvs) == 0 { return nil } bgpSelectors, err := poolSelectorsForBGP(c) if err != nil { return err } for _, p := range c.Pools { if !bgpSelectors.matchesPool(p) { continue } for _, cidr := range p.Spec.Addresses { nets, err := ParseCIDR(cidr) if err != nil { return fmt.Errorf("invalid CIDR %q in pool %q: %s", cidr, p.Name, err) } for _, n := range nets { if n.IP.To4() == nil { return fmt.Errorf("pool %q has ipv6 CIDR %s, native bgp mode does not support ipv6", p.Name, n) } } } } return nil } // findNonLegacyCommunity returns an error if it can find a non legacy community. If a community string can not be // parsed, the string will be ignored. func findNonLegacyCommunity(c ClusterResources) error { for _, adv := range c.BGPAdvs { for _, cs := range adv.Spec.Communities { c, err := community.New(cs) if err != nil { // Skip aliases. continue } if !community.IsLegacy(c) { return fmt.Errorf("native BGP mode only supports legacy communities, BGP advertisement %q "+ "has non legacy community %q", adv.Name, cs) } } } for _, co := range c.Communities { for _, cs := range co.Spec.Communities { c, err := community.New(cs.Value) if err != nil { // Skip it if we cannot parse it - the purpose of this very verification is not to make sure that // a string can be parsed or not. continue } if !community.IsLegacy(c) { return fmt.Errorf("native BGP mode only supports legacy communities, BGP community CR %q "+ "has non legacy community %q", co.Name, cs) } } } return nil } // DontValidate is a Validate function that always returns // success. func DontValidate(c ClusterResources) error { return nil } // DiscardNativeOnly returns an error if the current configFile contains // any options that are available only in the native implementation. func DiscardNativeOnly(c ClusterResources) error { if len(c.Peers) > 1 { peerAddr := make(map[string]bool) routerID := c.Peers[0].Spec.RouterID peer0 := peerIdentifier(c.Peers[0].Spec) peerAddr[peer0] = true for _, p := range c.Peers[1:] { if p.Spec.RouterID != routerID { return fmt.Errorf("peer %s has RouterID different from %s, in FRR mode all RouterID must be equal", p.Spec.RouterID, c.Peers[0].Spec.RouterID) } peerID := peerIdentifier(p.Spec) if _, ok := peerAddr[peerID]; ok { return fmt.Errorf("peer %s already exists, FRR mode doesn't support duplicate BGPPeers", p.Spec.Address) } peerAddr[peerID] = true } } for _, p := range c.Peers { for _, p1 := range c.Peers[1:] { if p.Spec.MyASN != p1.Spec.MyASN && p.Spec.VRFName == p1.Spec.VRFName { return fmt.Errorf("peer %s has myAsn different from %s, in FRR mode all myAsn must be equal for the same VRF", p.Spec.Address, p1.Spec.Address) } } } return nil } // validateConfig is meant to validate all the inter-dependencies of a parsed configuration. // In this case, we ensure that bfd echo is not enabled on a v6 pool. func validateConfig(cfg *Config) error { for _, p := range cfg.Pools.ByName { containsV6 := false for _, cidr := range p.CIDR { if ipfamily.ForCIDR(cidr) == ipfamily.IPv6 { containsV6 = true break } } if !containsV6 { // we only care about v6 advertisements continue } for _, a := range p.BGPAdvertisements { if len(a.Peers) == 0 { // all peers for _, peer := range cfg.Peers { if hasBFDEcho(peer, cfg.BFDProfiles) { return fmt.Errorf("pool %s has bgpadvertisement %s which references peer %s which has bfd echo enabled, which is not possible", p.Name, a.Name, peer.Name) } } continue } for _, peerName := range a.Peers { if peer, ok := cfg.Peers[peerName]; ok { if hasBFDEcho(peer, cfg.BFDProfiles) { return fmt.Errorf("pool %s has bgpadvertisement %s which references peer %s which has bfd echo enabled, which is not possible", p.Name, a.Name, peer.Name) } } } } } return nil } func hasBFDEcho(peer *Peer, bfdProfiles map[string]*BFDProfile) bool { profile, ok := bfdProfiles[peer.BFDProfile] if !ok { return false } if profile.EchoMode { return true } return false } func peerIdentifier(peer metallbv1beta2.BGPPeerSpec) string { id := peer.Address if peer.Address == "" { id = peer.Interface } return fmt.Sprintf("%s-%s", id, peer.VRFName) } type poolSelector struct { byName map[string]struct{} byLabels []labels.Selector } func (s poolSelector) matchesPool(p metallbv1beta1.IPAddressPool) bool { if len(s.byLabels) == 0 && len(s.byName) == 0 { return true } if _, ok := s.byName[p.Name]; ok { return true } for _, l := range s.byLabels { if l.Matches(labels.Set(p.Labels)) { return true } } return false } func poolSelectorsForBGP(c ClusterResources) (poolSelector, error) { selectedPools := make(map[string]struct{}) poolsSelectors := []labels.Selector{} for _, adv := range c.BGPAdvs { if len(adv.Spec.IPAddressPools) == 0 && len(adv.Spec.IPAddressPoolSelectors) == 0 { return poolSelector{}, nil // no selectors, let's catch em all! } for _, p := range adv.Spec.IPAddressPools { selectedPools[p] = struct{}{} } for _, selector := range adv.Spec.IPAddressPoolSelectors { l, err := metav1.LabelSelectorAsSelector(&selector) if err != nil { return poolSelector{}, fmt.Errorf("invalid label selector %v", selector) } poolsSelectors = append(poolsSelectors, l) } } return poolSelector{ byName: selectedPools, byLabels: poolsSelectors, }, nil }
// SPDX-License-Identifier:Apache-2.0 package config import ( "strings" metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" metallbv1beta2 "go.universe.tf/metallb/api/v1beta2" apivalidate "go.universe.tf/metallb/internal/k8s/webhooks/validate" v1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) // TransientError is an error that happens due to interdependencies // between crds, such as referencing non-existing bfd profile. type TransientError struct { Message string } func (e TransientError) Error() string { return e.Message } // validator is an implementation of the resources validator // that tries to parse the configuration and fails if the returned // error is non transitient. type validator struct { validate Validate } func (v *validator) Validate(resources ...client.ObjectList) error { clusterResources := ClusterResources{ Pools: make([]metallbv1beta1.IPAddressPool, 0), Peers: make([]metallbv1beta2.BGPPeer, 0), BFDProfiles: make([]metallbv1beta1.BFDProfile, 0), BGPAdvs: make([]metallbv1beta1.BGPAdvertisement, 0), L2Advs: make([]metallbv1beta1.L2Advertisement, 0), Communities: make([]metallbv1beta1.Community, 0), } for _, list := range resources { switch list := list.(type) { case *metallbv1beta1.IPAddressPoolList: clusterResources.Pools = append(clusterResources.Pools, list.Items...) case *metallbv1beta2.BGPPeerList: clusterResources.Peers = append(clusterResources.Peers, list.Items...) case *metallbv1beta1.BFDProfileList: clusterResources.BFDProfiles = append(clusterResources.BFDProfiles, list.Items...) case *metallbv1beta1.BGPAdvertisementList: clusterResources.BGPAdvs = append(clusterResources.BGPAdvs, list.Items...) case *metallbv1beta1.L2AdvertisementList: clusterResources.L2Advs = append(clusterResources.L2Advs, list.Items...) case *metallbv1beta1.CommunityList: clusterResources.Communities = append(clusterResources.Communities, list.Items...) case *v1.NodeList: clusterResources.Nodes = append(clusterResources.Nodes, list.Items...) } } clusterResources = resetTransientErrorsFields(clusterResources) _, err := For(clusterResources, v.validate) return err } func NewValidator(validate Validate) apivalidate.ClusterObjects { return &validator{validate: validate} } // Returns the given ClusterResources without the fields that can cause a TransientError. // We use this so the webhooks do not make assumptions based on the ordering of objects. func resetTransientErrorsFields(clusterResources ClusterResources) ClusterResources { for i := range clusterResources.Peers { clusterResources.Peers[i].Spec.BFDProfile = "" clusterResources.Peers[i].Spec.PasswordSecret = v1.SecretReference{} } for i, bgpAdv := range clusterResources.BGPAdvs { var communities []string for _, community := range bgpAdv.Spec.Communities { // We want to pass only communities that are potentially explicit values. if strings.Contains(community, ":") { communities = append(communities, community) } } clusterResources.BGPAdvs[i].Spec.Communities = communities } return clusterResources }