Coverage Report

Created: 2024-09-08 06:43

/src/brpc/src/brpc/policy/discovery_naming_service.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
19
#include <gflags/gflags.h>
20
#include "butil/third_party/rapidjson/document.h"
21
#include "butil/third_party/rapidjson/memorybuffer.h"
22
#include "butil/third_party/rapidjson/writer.h"
23
#include "butil/string_printf.h"
24
#include "butil/strings/string_split.h"
25
#include "butil/fast_rand.h"
26
#include "bthread/bthread.h"
27
#include "brpc/channel.h"
28
#include "brpc/controller.h"
29
#include "brpc/policy/discovery_naming_service.h"
30
31
namespace brpc {
32
namespace policy {
33
34
#ifdef BILIBILI_INTERNAL
35
# define DEFAULT_DISCOVERY_API_ADDR "http://api.bilibili.co/discovery/nodes"
36
#else
37
# define DEFAULT_DISCOVERY_API_ADDR ""
38
#endif
39
40
DEFINE_string(discovery_api_addr, DEFAULT_DISCOVERY_API_ADDR, "The address of discovery api");
41
DEFINE_int32(discovery_timeout_ms, 3000, "Timeout for discovery requests");
42
DEFINE_string(discovery_env, "prod", "Environment of services");
43
DEFINE_string(discovery_status, "1", "Status of services. 1 for ready, 2 for not ready, 3 for all");
44
DEFINE_string(discovery_zone, "", "Zone of services");
45
DEFINE_int32(discovery_renew_interval_s, 30, "The interval between two consecutive renews");
46
DEFINE_int32(discovery_reregister_threshold, 3, "The renew error threshold beyond"
47
        " which Register would be called again");
48
49
static pthread_once_t s_init_discovery_channel_once = PTHREAD_ONCE_INIT;
50
static Channel* s_discovery_channel = NULL;
51
52
0
static int ListDiscoveryNodes(const char* discovery_api_addr, std::string* servers) {
53
0
    Channel api_channel;
54
0
    ChannelOptions channel_options;
55
0
    channel_options.protocol = PROTOCOL_HTTP;
56
0
    channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
57
0
    channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
58
0
    if (api_channel.Init(discovery_api_addr, "", &channel_options) != 0) {
59
0
        LOG(FATAL) << "Fail to init channel to " << discovery_api_addr;
60
0
        return -1;
61
0
    }
62
0
    Controller cntl;
63
0
    cntl.http_request().uri() = discovery_api_addr;
64
0
    api_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
65
0
    if (cntl.Failed()) {
66
0
        LOG(FATAL) << "Fail to access " << cntl.http_request().uri()
67
0
                   << ": " << cntl.ErrorText();
68
0
        return -1;
69
0
    }
70
71
0
    servers->assign("list://");
72
73
0
    const std::string response = cntl.response_attachment().to_string();
74
0
    BUTIL_RAPIDJSON_NAMESPACE::Document d;
75
0
    d.Parse(response.c_str());
76
0
    if (!d.IsObject()) {
77
0
        LOG(ERROR) << "Fail to parse " << response << " as json object";
78
0
        return -1;
79
0
    }
80
0
    auto itr = d.FindMember("data");
81
0
    if (itr == d.MemberEnd()) {
82
0
        LOG(ERROR) << "No data field in discovery nodes response";
83
0
        return -1;
84
0
    }
85
0
    const BUTIL_RAPIDJSON_NAMESPACE::Value& data = itr->value;
86
0
    if (!data.IsArray()) {
87
0
        LOG(ERROR) << "data field is not an array";
88
0
        return -1;
89
0
    }
90
0
    for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < data.Size(); ++i) {
91
0
        const BUTIL_RAPIDJSON_NAMESPACE::Value& addr_item = data[i];
92
0
        auto itr_addr = addr_item.FindMember("addr");
93
0
        auto itr_status = addr_item.FindMember("status");
94
0
        if (itr_addr == addr_item.MemberEnd() ||
95
0
                !itr_addr->value.IsString() ||
96
0
                itr_status == addr_item.MemberEnd() ||
97
0
                !itr_status->value.IsUint() ||
98
0
                itr_status->value.GetUint() != 0) {
99
0
            continue;
100
0
        }
101
0
        servers->push_back(',');
102
0
        servers->append(itr_addr->value.GetString(),
103
0
                        itr_addr->value.GetStringLength());
104
0
    }
105
0
    return 0;
106
0
}
107
108
0
static void NewDiscoveryChannel() {
109
    // NOTE: Newly added discovery server is NOT detected until this server
110
    // is restarted. The reasons for this design is that NS cluster rarely 
111
    // changes. Although we could detect new discovery servers by implmenenting
112
    // a NamingService, however which is too heavy for solving such a rare case.
113
0
    std::string discovery_servers;
114
0
    if (ListDiscoveryNodes(FLAGS_discovery_api_addr.c_str(), &discovery_servers) != 0) {
115
0
        LOG(ERROR) << "Fail to get discovery nodes from " << FLAGS_discovery_api_addr;
116
0
        return;
117
0
    }
118
0
    ChannelOptions channel_options;
119
0
    channel_options.protocol = PROTOCOL_HTTP;
120
0
    channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
121
0
    channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
122
0
    s_discovery_channel = new Channel;
123
0
    if (s_discovery_channel->Init(discovery_servers.c_str(), "rr", &channel_options) != 0) {
124
0
        LOG(ERROR) << "Fail to init channel to " << discovery_servers;
125
0
        return;
126
0
    }
127
0
}
128
129
0
inline Channel* GetOrNewDiscoveryChannel() {
130
0
    pthread_once(&s_init_discovery_channel_once, NewDiscoveryChannel);
131
0
    return s_discovery_channel;
132
0
}
133
134
0
bool DiscoveryRegisterParam::IsValid() const {
135
0
    return !appid.empty() && !hostname.empty() && !addrs.empty() &&
136
0
            !env.empty() && !zone.empty() && !version.empty();
137
0
}
138
139
DiscoveryClient::DiscoveryClient()
140
    : _th(INVALID_BTHREAD)
141
0
    , _registered(false) {}
142
143
0
DiscoveryClient::~DiscoveryClient() {
144
0
    if (_registered.load(butil::memory_order_acquire)) {
145
0
        bthread_stop(_th);
146
0
        bthread_join(_th, NULL);
147
0
        DoCancel();
148
0
    }
149
0
}
150
151
0
static int ParseCommonResult(const butil::IOBuf& buf, std::string* error_text) {
152
0
    const std::string s = buf.to_string();
153
0
    BUTIL_RAPIDJSON_NAMESPACE::Document d;
154
0
    d.Parse(s.c_str());
155
0
    if (!d.IsObject()) {
156
0
        LOG(ERROR) << "Fail to parse " << buf << " as json object";
157
0
        return -1;
158
0
    }
159
0
    auto itr_code = d.FindMember("code");
160
0
    if (itr_code == d.MemberEnd() || !itr_code->value.IsInt()) {
161
0
        LOG(ERROR) << "Invalid `code' field in " << buf;
162
0
        return -1;
163
0
    }
164
0
    int code = itr_code->value.GetInt();
165
0
    auto itr_message = d.FindMember("message");
166
0
    if (itr_message != d.MemberEnd() && itr_message->value.IsString() && error_text) {
167
0
        error_text->assign(itr_message->value.GetString(),
168
0
                           itr_message->value.GetStringLength());
169
0
    }
170
0
    return code;
171
0
}
172
173
0
int DiscoveryClient::DoRenew() const {
174
    // May create short connections which are OK.
175
0
    ChannelOptions channel_options;
176
0
    channel_options.protocol = PROTOCOL_HTTP;
177
0
    channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
178
0
    channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
179
0
    Channel chan;
180
0
    if (chan.Init(_current_discovery_server, &channel_options) != 0) {
181
0
        LOG(FATAL) << "Fail to init channel to " << _current_discovery_server;
182
0
        return -1;
183
0
    }
184
185
0
    Controller cntl;
186
0
    cntl.http_request().set_method(HTTP_METHOD_POST);
187
0
    cntl.http_request().uri() = "/discovery/renew";
188
0
    cntl.http_request().set_content_type("application/x-www-form-urlencoded");
189
0
    butil::IOBufBuilder os;
190
0
    os << "appid=" << _params.appid
191
0
        << "&hostname=" << _params.hostname
192
0
        << "&env=" << _params.env
193
0
        << "&region=" << _params.region
194
0
        << "&zone=" << _params.zone;
195
0
    os.move_to(cntl.request_attachment());
196
0
    chan.CallMethod(NULL, &cntl, NULL, NULL, NULL);
197
0
    if (cntl.Failed()) {
198
0
        LOG(ERROR) << "Fail to post /discovery/renew: " << cntl.ErrorText();
199
0
        return -1;
200
0
    }
201
0
    std::string error_text;
202
0
    if (ParseCommonResult(cntl.response_attachment(), &error_text) != 0) {
203
0
        LOG(ERROR) << "Fail to renew " << _params.hostname << " to " << _params.appid
204
0
            << ": " << error_text;
205
0
        return -1;
206
0
    }
207
0
    return 0;
208
0
}
209
210
0
void* DiscoveryClient::PeriodicRenew(void* arg) {
211
0
    DiscoveryClient* d = static_cast<DiscoveryClient*>(arg);
212
0
    int consecutive_renew_error = 0;
213
0
    int64_t init_sleep_s = FLAGS_discovery_renew_interval_s / 2 +
214
0
        butil::fast_rand_less_than(FLAGS_discovery_renew_interval_s / 2);
215
0
    if (bthread_usleep(init_sleep_s * 1000000) != 0) {
216
0
        if (errno == ESTOP) {
217
0
            return NULL;
218
0
        }
219
0
    }
220
221
0
    while (!bthread_stopped(bthread_self())) {
222
0
        if (consecutive_renew_error == FLAGS_discovery_reregister_threshold) {
223
0
            LOG(WARNING) << "Re-register since discovery renew error threshold reached";
224
            // Do register until succeed or Cancel is called
225
0
            while (!bthread_stopped(bthread_self())) {
226
0
                if (d->DoRegister() == 0) {
227
0
                    break;
228
0
                }
229
0
                bthread_usleep(FLAGS_discovery_renew_interval_s * 1000000);
230
0
            }
231
0
            consecutive_renew_error = 0;
232
0
        }
233
0
        if (d->DoRenew() != 0) {
234
0
            consecutive_renew_error++;
235
0
            continue;
236
0
        }
237
0
        consecutive_renew_error = 0;
238
0
        bthread_usleep(FLAGS_discovery_renew_interval_s * 1000000);
239
0
    }
240
0
    return NULL;
241
0
}
242
243
0
int DiscoveryClient::Register(const DiscoveryRegisterParam& params) {
244
0
    if (_registered.load(butil::memory_order_relaxed) ||
245
0
            _registered.exchange(true, butil::memory_order_release)) {
246
0
        return 0;
247
0
    }
248
0
    if (!params.IsValid()) {
249
0
        return -1;
250
0
    }
251
0
    _params = params;
252
253
0
    if (DoRegister() != 0) {
254
0
        return -1;
255
0
    }
256
0
    if (bthread_start_background(&_th, NULL, PeriodicRenew, this) != 0) {
257
0
        LOG(ERROR) << "Fail to start background PeriodicRenew";
258
0
        return -1;
259
0
    }
260
0
    return 0;
261
0
}
262
263
0
int DiscoveryClient::DoRegister() {
264
0
    Channel* chan = GetOrNewDiscoveryChannel();
265
0
    if (NULL == chan) {
266
0
        LOG(ERROR) << "Fail to create discovery channel";
267
0
        return -1;
268
0
    }
269
0
    Controller cntl;
270
0
    cntl.http_request().set_method(HTTP_METHOD_POST);
271
0
    cntl.http_request().uri() = "/discovery/register";
272
0
    cntl.http_request().set_content_type("application/x-www-form-urlencoded");
273
0
    butil::IOBufBuilder os;
274
0
    os << "appid=" << _params.appid
275
0
        << "&hostname=" << _params.hostname;
276
277
0
    std::vector<butil::StringPiece> addrs;
278
0
    butil::SplitString(_params.addrs, ',', &addrs);
279
0
    for (size_t i = 0; i < addrs.size(); ++i) {
280
0
        if (!addrs[i].empty()) {
281
0
            os << "&addrs=" << addrs[i];
282
0
        }
283
0
    }
284
285
0
    os << "&env=" << _params.env
286
0
        << "&zone=" << _params.zone
287
0
        << "&region=" << _params.region
288
0
        << "&status=" << _params.status
289
0
        << "&version=" << _params.version
290
0
        << "&metadata=" << _params.metadata;
291
0
    os.move_to(cntl.request_attachment());
292
0
    chan->CallMethod(NULL, &cntl, NULL, NULL, NULL);
293
0
    if (cntl.Failed()) {
294
0
        LOG(ERROR) << "Fail to register " << _params.appid << ": " << cntl.ErrorText();
295
0
        return -1;
296
0
    }
297
0
    std::string error_text;
298
0
    if (ParseCommonResult(cntl.response_attachment(), &error_text) != 0) {
299
0
        LOG(ERROR) << "Fail to register " << _params.hostname << " to " << _params.appid
300
0
                << ": " << error_text;
301
0
        return -1;
302
0
    }
303
0
    _current_discovery_server = cntl.remote_side();
304
0
    return 0;
305
0
}
306
307
0
int DiscoveryClient::DoCancel() const {
308
    // May create short connections which are OK.
309
0
    ChannelOptions channel_options;
310
0
    channel_options.protocol = PROTOCOL_HTTP;
311
0
    channel_options.timeout_ms = FLAGS_discovery_timeout_ms;
312
0
    channel_options.connect_timeout_ms = FLAGS_discovery_timeout_ms / 3;
313
0
    Channel chan;
314
0
    if (chan.Init(_current_discovery_server, &channel_options) != 0) {
315
0
        LOG(FATAL) << "Fail to init channel to " << _current_discovery_server;
316
0
        return -1;
317
0
    }
318
319
0
    Controller cntl;
320
0
    cntl.http_request().set_method(HTTP_METHOD_POST);
321
0
    cntl.http_request().uri() = "/discovery/cancel";
322
0
    cntl.http_request().set_content_type("application/x-www-form-urlencoded");
323
0
    butil::IOBufBuilder os;
324
0
    os << "appid=" << _params.appid
325
0
        << "&hostname=" << _params.hostname
326
0
        << "&env=" << _params.env
327
0
        << "&region=" << _params.region
328
0
        << "&zone=" << _params.zone;
329
0
    os.move_to(cntl.request_attachment());
330
0
    chan.CallMethod(NULL, &cntl, NULL, NULL, NULL);
331
0
    if (cntl.Failed()) {
332
0
        LOG(ERROR) << "Fail to post /discovery/cancel: " << cntl.ErrorText();
333
0
        return -1;
334
0
    }
335
0
    std::string error_text;
336
0
    if (ParseCommonResult(cntl.response_attachment(), &error_text) != 0) {
337
0
        LOG(ERROR) << "Fail to cancel " << _params.hostname << " in " << _params.appid
338
0
            << ": " << error_text;
339
0
        return -1;
340
0
    }
341
0
    return 0;
342
0
}
343
344
// ========== DiscoveryNamingService =============
345
346
int DiscoveryNamingService::GetServers(const char* service_name,
347
0
                                       std::vector<ServerNode>* servers) {
348
0
    if (service_name == NULL || *service_name == '\0' ||
349
0
        FLAGS_discovery_env.empty() ||
350
0
        FLAGS_discovery_status.empty()) {
351
0
        LOG_ONCE(ERROR) << "Invalid parameters";
352
0
        return -1;
353
0
    }
354
0
    Channel* chan = GetOrNewDiscoveryChannel();
355
0
    if (NULL == chan) {
356
0
        LOG(ERROR) << "Fail to create discovery channel";
357
0
        return -1;
358
0
    }
359
0
    servers->clear();
360
0
    Controller cntl;
361
0
    std::string uri_str = butil::string_printf(
362
0
            "/discovery/fetchs?appid=%s&env=%s&status=%s", service_name,
363
0
            FLAGS_discovery_env.c_str(), FLAGS_discovery_status.c_str());
364
0
    if (!FLAGS_discovery_zone.empty()) {
365
0
        uri_str.append("&zone=");
366
0
        uri_str.append(FLAGS_discovery_zone);
367
0
    }
368
0
    cntl.http_request().uri() = uri_str;
369
0
    chan->CallMethod(NULL, &cntl, NULL, NULL, NULL);
370
0
    if (cntl.Failed()) {
371
0
        LOG(ERROR) << "Fail to get /discovery/fetchs: " << cntl.ErrorText();
372
0
        return -1;
373
0
    }
374
375
0
    const std::string response = cntl.response_attachment().to_string();
376
0
    BUTIL_RAPIDJSON_NAMESPACE::Document d;
377
0
    d.Parse(response.c_str());
378
0
    if (!d.IsObject()) {
379
0
        LOG(ERROR) << "Fail to parse " << response << " as json object";
380
0
        return -1;
381
0
    }
382
0
    auto itr_data = d.FindMember("data");
383
0
    if (itr_data == d.MemberEnd()) {
384
0
        LOG(ERROR) << "No data field in discovery/fetchs response";
385
0
        return -1;
386
0
    }
387
0
    const BUTIL_RAPIDJSON_NAMESPACE::Value& data = itr_data->value;
388
0
    auto itr_service = data.FindMember(service_name);
389
0
    if (itr_service == data.MemberEnd()) {
390
0
        LOG(ERROR) << "No " << service_name << " field in discovery response";
391
0
        return -1;
392
0
    }
393
0
    const BUTIL_RAPIDJSON_NAMESPACE::Value& services = itr_service->value;
394
0
    auto itr_instances = services.FindMember("instances");
395
0
    if (itr_instances == services.MemberEnd()) {
396
0
        LOG(ERROR) << "Fail to find instances";
397
0
        return -1;
398
0
    }
399
0
    const BUTIL_RAPIDJSON_NAMESPACE::Value& instances = itr_instances->value;
400
0
    if (!instances.IsArray()) {
401
0
        LOG(ERROR) << "Fail to parse instances as an array";
402
0
        return -1;
403
0
    }
404
405
0
    for (BUTIL_RAPIDJSON_NAMESPACE::SizeType i = 0; i < instances.Size(); ++i) {
406
0
        std::string metadata;
407
        // convert metadata in object to string
408
0
        auto itr_metadata = instances[i].FindMember("metadata");
409
0
        if (itr_metadata != instances[i].MemberEnd()) {
410
0
            BUTIL_RAPIDJSON_NAMESPACE::MemoryBuffer buffer;
411
0
            BUTIL_RAPIDJSON_NAMESPACE::Writer<BUTIL_RAPIDJSON_NAMESPACE::MemoryBuffer> writer(buffer);
412
0
            itr_metadata->value.Accept(writer);
413
0
            metadata.assign(buffer.GetBuffer(), buffer.GetSize());
414
0
        }
415
416
0
        auto itr = instances[i].FindMember("addrs");
417
0
        if (itr == instances[i].MemberEnd() || !itr->value.IsArray()) {
418
0
            LOG(ERROR) << "Fail to find addrs or addrs is not an array";
419
0
            return -1;
420
0
        }
421
0
        const BUTIL_RAPIDJSON_NAMESPACE::Value& addrs = itr->value;
422
0
        for (BUTIL_RAPIDJSON_NAMESPACE::SizeType j = 0; j < addrs.Size(); ++j) {
423
0
            if (!addrs[j].IsString()) {
424
0
                continue;
425
0
            }
426
            // The result returned by discovery include protocol prefix, such as
427
            // http://172.22.35.68:6686, which should be removed.
428
0
            butil::StringPiece addr(addrs[j].GetString(), addrs[j].GetStringLength());
429
0
            butil::StringPiece::size_type pos = addr.find("://");
430
0
            if (pos != butil::StringPiece::npos) {
431
0
                if (pos != 4 /* sizeof("grpc") */ ||
432
0
                        strncmp("grpc", addr.data(), 4) != 0) {
433
                    // Skip server that has prefix but not start with "grpc"
434
0
                    continue;
435
0
                }
436
0
                addr.remove_prefix(pos + 3);
437
0
            }
438
0
            ServerNode node;
439
0
            node.tag = metadata;
440
            // Variable addr contains data from addrs[j].GetString(), it is a
441
            // null-terminated string, so it is safe to pass addr.data() as the
442
            // first parameter to str2endpoint.
443
0
            if (str2endpoint(addr.data(), &node.addr) != 0) {
444
0
                LOG(ERROR) << "Invalid address=`" << addr << '\'';
445
0
                continue;
446
0
            }
447
0
            servers->push_back(node);
448
0
        }
449
0
    }
450
0
    return 0;
451
0
}
452
453
void DiscoveryNamingService::Describe(std::ostream& os,
454
0
                                      const DescribeOptions&) const {
455
0
    os << "discovery";
456
0
    return;
457
0
}
458
459
0
NamingService* DiscoveryNamingService::New() const {
460
0
    return new DiscoveryNamingService;
461
0
}
462
463
0
void DiscoveryNamingService::Destroy() {
464
0
    delete this;
465
0
}
466
467
468
} // namespace policy
469
} // namespace brpc