Coverage Report

Created: 2026-01-13 06:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/brpc/load_balancer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
19
#include <gflags/gflags.h>
20
#include "brpc/reloadable_flags.h"
21
#include "brpc/load_balancer.h"
22
#include "brpc/socket.h"
23
24
25
namespace brpc {
26
27
DEFINE_bool(show_lb_in_vars, false, "Describe LoadBalancers in vars");
28
DEFINE_int32(default_weight_of_wlb, 0, "Default weight value of Weighted LoadBalancer(wlb). "
29
             "wlb policy degradation is enabled when default_weight_of_wlb > 0 to avoid some "
30
             "problems when user is using wlb but forgot to set the weights of some of their "
31
             "downstream instances. Then these instances will be set default_weight_of_wlb as "
32
             "their weights. wlb policy degradation is not enabled by default.");
33
BRPC_VALIDATE_GFLAG(show_lb_in_vars, PassValidate);
34
35
// For assigning unique names for lb.
36
static butil::static_atomic<int> g_lb_counter = BUTIL_STATIC_ATOMIC_INIT(0);
37
38
0
bool LoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) {
39
0
    SocketUniquePtr ptr;
40
0
    bool res = Socket::Address(id, &ptr) == 0 && ptr->IsAvailable();
41
0
    if (res) {
42
0
        *out = std::move(ptr);
43
0
    }
44
0
    return res;
45
0
}
46
47
0
void SharedLoadBalancer::DescribeLB(std::ostream& os, void* arg) {
48
0
    (static_cast<SharedLoadBalancer*>(arg))->Describe(os, DescribeOptions());
49
0
}
50
51
0
void SharedLoadBalancer::ExposeLB() {
52
0
    bool changed = false;
53
0
    _st_mutex.lock();
54
0
    if (!_exposed) {
55
0
        _exposed = true;
56
0
        changed = true;
57
0
    }
58
0
    _st_mutex.unlock();
59
0
    if (changed) {
60
0
        char name[32];
61
0
        snprintf(name, sizeof(name), "_load_balancer_%d", g_lb_counter.fetch_add(
62
0
                     1, butil::memory_order_relaxed));
63
0
        _st.expose(name);
64
0
    }
65
0
}
66
67
SharedLoadBalancer::SharedLoadBalancer()
68
0
    : _lb(NULL)
69
0
    , _weight_sum(0)
70
0
    , _exposed(false)
71
0
    , _st(DescribeLB, this) {
72
0
}
73
74
0
SharedLoadBalancer::~SharedLoadBalancer() {
75
0
    _st.hide();
76
0
    if (_lb) {
77
0
        _lb->Destroy();
78
0
        _lb = NULL;
79
0
    }
80
0
}
81
82
0
int SharedLoadBalancer::Init(const char* lb_protocol) {
83
0
    std::string lb_name;
84
0
    butil::StringPiece lb_params;
85
0
    if (!ParseParameters(lb_protocol, &lb_name, &lb_params)) {
86
0
        LOG(FATAL) << "Fail to parse this load balancer protocol '" << lb_protocol << '\'';
87
0
        return -1;
88
0
    }
89
0
    const LoadBalancer* lb = LoadBalancerExtension()->Find(lb_name.c_str());
90
0
    if (lb == NULL) {
91
0
        LOG(FATAL) << "Fail to find LoadBalancer by `" << lb_name << "'";
92
0
        return -1;
93
0
    }
94
0
    _lb = lb->New(lb_params);
95
0
    if (_lb == NULL) {
96
0
        LOG(FATAL) << "Fail to new LoadBalancer";
97
0
        return -1;
98
0
    }
99
0
    if (FLAGS_show_lb_in_vars && !_exposed) {
100
0
        ExposeLB();
101
0
    }
102
0
    return 0;
103
0
}
104
105
void SharedLoadBalancer::Describe(std::ostream& os,
106
0
                                  const DescribeOptions& options) {
107
0
    if (_lb == NULL) {
108
0
        os << "lb=NULL";
109
0
    } else {
110
0
        _lb->Describe(os, options);
111
0
    }
112
0
}
113
114
bool SharedLoadBalancer::ParseParameters(const butil::StringPiece& lb_protocol,
115
                                         std::string* lb_name,
116
0
                                         butil::StringPiece* lb_params) {
117
0
    lb_name->clear();
118
0
    lb_params->clear();
119
0
    if (lb_protocol.empty()) {
120
0
        return false;
121
0
    }
122
0
    const char separator = ':';
123
0
    size_t pos = lb_protocol.find(separator);
124
0
    if (pos == std::string::npos) {
125
0
        lb_name->append(lb_protocol.data(), lb_protocol.size());
126
0
    } else {
127
0
        lb_name->append(lb_protocol.data(), pos);
128
0
        if (pos < lb_protocol.size() - sizeof(separator)) {
129
0
            *lb_params = lb_protocol.substr(pos + sizeof(separator));
130
0
        }
131
0
    }
132
133
0
    return true;
134
0
}
135
                                         
136
} // namespace brpc