Coverage Report

Created: 2024-09-11 06:42

/src/brpc/src/brpc/policy/nacos_naming_service.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "nacos_naming_service.h"
19
20
#include <gflags/gflags.h>
21
22
#include <set>
23
24
#include "brpc/http_status_code.h"
25
#include "brpc/log.h"
26
#include "butil/iobuf.h"
27
#include "butil/logging.h"
28
#include "butil/third_party/rapidjson/document.h"
29
30
namespace brpc {
31
namespace policy {
32
33
DEFINE_string(nacos_address, "",
34
              "The query string of request nacos for discovering service.");
35
DEFINE_string(nacos_service_discovery_path, "/nacos/v1/ns/instance/list",
36
              "The url path for discovering service.");
37
DEFINE_string(nacos_service_auth_path, "/nacos/v1/auth/login",
38
              "The url path for authentiction.");
39
DEFINE_int32(nacos_connect_timeout_ms, 200,
40
             "Timeout for creating connections to nacos in milliseconds");
41
DEFINE_string(nacos_username, "", "nacos username");
42
DEFINE_string(nacos_password, "", "nacos password");
43
DEFINE_string(nacos_load_balancer, "rr", "nacos load balancer name");
44
45
0
int NacosNamingService::Connect() {
46
0
    ChannelOptions opt;
47
0
    opt.protocol = PROTOCOL_HTTP;
48
0
    opt.connect_timeout_ms = FLAGS_nacos_connect_timeout_ms;
49
0
    const int ret = _channel.Init(FLAGS_nacos_address.c_str(),
50
0
                                  FLAGS_nacos_load_balancer.c_str(), &opt);
51
0
    if (ret != 0) {
52
0
        LOG(ERROR) << "Fail to init channel to nacos at "
53
0
                   << FLAGS_nacos_address;
54
0
    }
55
0
    return ret;
56
0
}
57
58
0
int NacosNamingService::RefreshAccessToken(const char *service_name) {
59
0
    Controller cntl;
60
0
    cntl.http_request().uri() = FLAGS_nacos_service_auth_path;
61
0
    cntl.http_request().set_method(brpc::HttpMethod::HTTP_METHOD_POST);
62
0
    cntl.http_request().set_content_type("application/x-www-form-urlencoded");
63
64
0
    auto &buf = cntl.request_attachment();
65
0
    buf.append("username=");
66
0
    buf.append(FLAGS_nacos_username);
67
0
    buf.append("&password=");
68
0
    buf.append(FLAGS_nacos_password);
69
70
0
    _channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
71
0
    if (cntl.Failed()) {
72
0
        LOG(ERROR) << "Fail to access " << FLAGS_nacos_service_auth_path << ": "
73
0
                   << cntl.ErrorText();
74
0
        return -1;
75
0
    }
76
77
0
    BUTIL_RAPIDJSON_NAMESPACE::Document doc;
78
0
    if (doc.Parse(cntl.response_attachment().to_string().c_str())
79
0
            .HasParseError()) {
80
0
        LOG(ERROR) << "Failed to parse nacos auth response";
81
0
        return -1;
82
0
    }
83
0
    if (!doc.IsObject()) {
84
0
        LOG(ERROR) << "The nacos's auth response for " << service_name
85
0
                   << " is not a json object";
86
0
        return -1;
87
0
    }
88
89
0
    auto iter = doc.FindMember("accessToken");
90
0
    if (iter != doc.MemberEnd() && iter->value.IsString()) {
91
0
        _access_token = iter->value.GetString();
92
0
    } else {
93
0
        LOG(ERROR) << "The nacos's auth response for " << service_name
94
0
                   << " has no accessToken field";
95
0
        return -1;
96
0
    }
97
98
0
    auto iter_ttl = doc.FindMember("tokenTtl");
99
0
    if (iter_ttl != doc.MemberEnd() && iter_ttl->value.IsInt()) {
100
0
        _token_expire_time = time(NULL) + iter_ttl->value.GetInt() - 10;
101
0
    } else {
102
0
        _token_expire_time = 0;
103
0
    }
104
105
0
    return 0;
106
0
}
107
108
int NacosNamingService::GetServerNodes(const char *service_name,
109
                                       bool token_changed,
110
0
                                       std::vector<ServerNode> *nodes) {
111
0
    if (_nacos_url.empty() || token_changed) {
112
0
        _nacos_url = FLAGS_nacos_service_discovery_path;
113
0
        _nacos_url += "?";
114
0
        if (!_access_token.empty()) {
115
0
            _nacos_url += "accessToken=" + _access_token;
116
0
            _nacos_url += "&";
117
0
        }
118
0
        _nacos_url += service_name;
119
0
    }
120
121
0
    Controller cntl;
122
0
    cntl.http_request().uri() = _nacos_url;
123
0
    _channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
124
0
    if (cntl.Failed()) {
125
0
        LOG(ERROR) << "Fail to access " << _nacos_url << ": "
126
0
                   << cntl.ErrorText();
127
0
        return -1;
128
0
    }
129
0
    if (cntl.http_response().status_code() != HTTP_STATUS_OK) {
130
0
        LOG(ERROR) << "Failed to request nacos, http status code: "
131
0
                   << cntl.http_response().status_code();
132
0
        return -1;
133
0
    }
134
135
0
    BUTIL_RAPIDJSON_NAMESPACE::Document doc;
136
0
    if (doc.Parse(cntl.response_attachment().to_string().c_str())
137
0
            .HasParseError()) {
138
0
        LOG(ERROR) << "Failed to parse nacos response";
139
0
        return -1;
140
0
    }
141
0
    if (!doc.IsObject()) {
142
0
        LOG(ERROR) << "The nacos's response for " << service_name
143
0
                   << " is not a json object";
144
0
        return -1;
145
0
    }
146
147
0
    auto it_hosts = doc.FindMember("hosts");
148
0
    if (it_hosts == doc.MemberEnd()) {
149
0
        LOG(ERROR) << "The nacos's response for " << service_name
150
0
                   << " has no hosts member";
151
0
        return -1;
152
0
    }
153
0
    auto &hosts = it_hosts->value;
154
0
    if (!hosts.IsArray()) {
155
0
        LOG(ERROR) << "hosts member in nacos response is not an array";
156
0
        return -1;
157
0
    }
158
159
0
    std::set<ServerNode> presence;
160
0
    for (auto it = hosts.Begin(); it != hosts.End(); ++it) {
161
0
        auto &host = *it;
162
0
        if (!host.IsObject()) {
163
0
            LOG(ERROR) << "host member in nacos response is not an object";
164
0
            continue;
165
0
        }
166
167
0
        auto it_ip = host.FindMember("ip");
168
0
        if (it_ip == host.MemberEnd() || !it_ip->value.IsString()) {
169
0
            LOG(ERROR) << "host in nacos response has not ip";
170
0
            continue;
171
0
        }
172
0
        auto &ip = it_ip->value;
173
174
0
        auto it_port = host.FindMember("port");
175
0
        if (it_port == host.MemberEnd() || !it_port->value.IsInt()) {
176
0
            LOG(ERROR) << "host in nacos response has not port";
177
0
            continue;
178
0
        }
179
0
        auto &port = it_port->value;
180
181
0
        auto it_enabled = host.FindMember("enabled");
182
0
        if (it_enabled == host.MemberEnd() || !(it_enabled->value.IsBool()) ||
183
0
            !(it_enabled->value.GetBool())) {
184
0
            LOG(INFO) << "nacos " << ip.GetString() << ":" << port.GetInt()
185
0
                      << " is not enabled";
186
0
            continue;
187
0
        }
188
189
0
        auto it_healthy = host.FindMember("healthy");
190
0
        if (it_healthy == host.MemberEnd() || !(it_healthy->value.IsBool()) ||
191
0
            !(it_healthy->value.GetBool())) {
192
0
            LOG(INFO) << "nacos " << ip.GetString() << ":" << port.GetInt()
193
0
                      << " is not healthy";
194
0
            continue;
195
0
        }
196
197
0
        butil::EndPoint end_point;
198
0
        if (str2endpoint(ip.GetString(), port.GetUint(), &end_point) != 0) {
199
0
            LOG(ERROR) << "ncos service with illegal address or port: "
200
0
                       << ip.GetString() << ":" << port.GetUint();
201
0
            continue;
202
0
        }
203
204
0
        ServerNode node(end_point);
205
0
        auto it_weight = host.FindMember("weight");
206
0
        if (it_weight != host.MemberEnd() && it_weight->value.IsNumber()) {
207
0
            node.tag =
208
0
                std::to_string(static_cast<long>(it_weight->value.GetDouble()));
209
0
        }
210
211
0
        presence.insert(node);
212
0
    }
213
214
0
    nodes->reserve(presence.size());
215
0
    nodes->assign(presence.begin(), presence.end());
216
217
0
    if (nodes->empty() && hosts.Size() != 0) {
218
0
        LOG(ERROR) << "All service about " << service_name
219
0
                   << " from nacos is invalid, refuse to update servers";
220
0
        return -1;
221
0
    }
222
223
0
    RPC_VLOG << "Got " << nodes->size()
224
0
             << (nodes->size() > 1 ? " servers" : " server") << " from "
225
0
             << service_name;
226
227
0
    auto it_cache = doc.FindMember("cacheMillis");
228
0
    if (it_cache != doc.MemberEnd() && it_cache->value.IsInt64()) {
229
0
        _cache_ms = it_cache->value.GetInt64();
230
0
    }
231
232
0
    return 0;
233
0
}
234
235
NacosNamingService::NacosNamingService()
236
0
    : _nacos_connected(false), _cache_ms(-1), _token_expire_time(0) {}
237
238
0
int NacosNamingService::GetNamingServiceAccessIntervalMs() const {
239
0
    if (0 < _cache_ms) {
240
0
        return _cache_ms;
241
0
    }
242
0
    return PeriodicNamingService::GetNamingServiceAccessIntervalMs();
243
0
}
244
245
int NacosNamingService::GetServers(const char *service_name,
246
0
                                   std::vector<ServerNode> *servers) {
247
0
    if (!_nacos_connected) {
248
0
        const int ret = Connect();
249
0
        if (0 == ret) {
250
0
            _nacos_connected = true;
251
0
        } else {
252
0
            return ret;
253
0
        }
254
0
    }
255
256
0
    const bool authentiction_enabled =
257
0
        !FLAGS_nacos_username.empty() && !FLAGS_nacos_password.empty();
258
0
    const bool has_invalid_access_token =
259
0
        _access_token.empty() ||
260
0
        (0 < _token_expire_time && _token_expire_time <= time(NULL));
261
0
    bool token_changed = false;
262
263
0
    if (authentiction_enabled && has_invalid_access_token) {
264
0
        const int ret = RefreshAccessToken(service_name);
265
0
        if (ret == 0) {
266
0
            token_changed = true;
267
0
        } else {
268
0
            return ret;
269
0
        }
270
0
    }
271
272
0
    servers->clear();
273
0
    return GetServerNodes(service_name, token_changed, servers);
274
0
}
275
276
void NacosNamingService::Describe(std::ostream &os,
277
0
                                  const DescribeOptions &) const {
278
0
    os << "nacos";
279
0
    return;
280
0
}
281
282
0
NamingService *NacosNamingService::New() const {
283
0
    return new NacosNamingService;
284
0
}
285
286
0
void NacosNamingService::Destroy() { delete this; }
287
288
}  // namespace policy
289
}  // namespace brpc