Coverage Report

Created: 2025-12-08 06:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/brpc/src/brpc/server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
19
#include <iomanip>
20
#include <arpa/inet.h>                              // inet_aton
21
#include <fcntl.h>                                  // O_CREAT
22
#include <sys/stat.h>                               // mkdir
23
#include <gflags/gflags.h>
24
#include <google/protobuf/descriptor.h>             // ServiceDescriptor
25
#include "idl_options.pb.h"                         // option(idl_support)
26
#include "bthread/unstable.h"                       // bthread_keytable_pool_init
27
#include "butil/macros.h"                           // ARRAY_SIZE
28
#include "butil/fd_guard.h"                         // fd_guard
29
#include "butil/logging.h"                          // CHECK
30
#include "butil/time.h"
31
#include "butil/class_name.h"
32
#include "butil/string_printf.h"
33
#include "butil/debug/leak_annotations.h"
34
#include "brpc/log.h"
35
#include "brpc/compress.h"
36
#include "brpc/checksum.h"
37
#include "brpc/policy/nova_pbrpc_protocol.h"
38
#include "brpc/global.h"
39
#include "brpc/socket_map.h"                   // SocketMapList
40
#include "brpc/acceptor.h"                     // Acceptor
41
#include "brpc/details/ssl_helper.h"           // CreateServerSSLContext
42
#include "brpc/protocol.h"                     // ListProtocols
43
#include "brpc/nshead_service.h"               // NsheadService
44
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
45
#include "brpc/thrift_service.h"               // ThriftService
46
#endif
47
#include "brpc/builtin/bad_method_service.h"   // BadMethodService
48
#include "brpc/builtin/get_favicon_service.h"
49
#include "brpc/builtin/get_js_service.h"
50
#include "brpc/builtin/grpc_health_check_service.h"  // GrpcHealthCheckService
51
#include "brpc/builtin/version_service.h"
52
#include "brpc/builtin/health_service.h"
53
#include "brpc/builtin/list_service.h"
54
#include "brpc/builtin/status_service.h"
55
#include "brpc/builtin/protobufs_service.h"
56
#include "brpc/builtin/threads_service.h"
57
#include "brpc/builtin/vlog_service.h"
58
#include "brpc/builtin/index_service.h"        // IndexService
59
#include "brpc/builtin/connections_service.h"  // ConnectionsService
60
#include "brpc/builtin/flags_service.h"        // FlagsService
61
#include "brpc/builtin/vars_service.h"         // VarsService
62
#include "brpc/builtin/rpcz_service.h"         // RpczService
63
#include "brpc/builtin/dir_service.h"          // DirService
64
#include "brpc/builtin/pprof_service.h"        // PProfService
65
#include "brpc/builtin/bthreads_service.h"     // BthreadsService
66
#include "brpc/builtin/ids_service.h"          // IdsService
67
#include "brpc/builtin/sockets_service.h"      // SocketsService
68
#include "brpc/builtin/hotspots_service.h"     // HotspotsService
69
#include "brpc/builtin/prometheus_metrics_service.h"
70
#include "brpc/builtin/memory_service.h"
71
#include "brpc/details/method_status.h"
72
#include "brpc/load_balancer.h"
73
#include "brpc/naming_service.h"
74
#include "brpc/simple_data_pool.h"
75
#include "brpc/server.h"
76
#include "brpc/trackme.h"
77
#include "brpc/restful.h"
78
#include "brpc/rtmp.h"
79
#include "brpc/builtin/common.h"               // GetProgramName
80
#include "brpc/details/tcmalloc_extension.h"
81
#include "brpc/rdma/rdma_helper.h"
82
#include "brpc/baidu_master_service.h"
83
84
0
inline std::ostream& operator<<(std::ostream& os, const timeval& tm) {
85
0
    const char old_fill = os.fill();
86
0
    os << tm.tv_sec << '.' << std::setw(6) << std::setfill('0') << tm.tv_usec;
87
0
    os.fill(old_fill);
88
0
    return os;
89
0
}
90
91
extern "C" {
92
void* bthread_get_assigned_data();
93
}
94
95
DECLARE_int32(task_group_ntags);
96
97
namespace brpc {
98
99
BAIDU_CASSERT(sizeof(int32_t) == sizeof(butil::subtle::Atomic32),
100
              Atomic32_must_be_int32);
101
102
extern const char* const g_server_info_prefix = "rpc_server";
103
104
0
const char* status_str(Server::Status s) {
105
0
    switch (s) {
106
0
    case Server::UNINITIALIZED: return "UNINITIALIZED";
107
0
    case Server::READY: return "READY";
108
0
    case Server::RUNNING: return "RUNNING";
109
0
    case Server::STOPPING: return "STOPPING";
110
0
    }
111
0
    return "UNKNOWN_STATUS";
112
0
}
113
114
butil::static_atomic<int> g_running_server_count = BUTIL_STATIC_ATOMIC_INIT(0);
115
116
// Following services may have security issues and are disabled by default.
117
DEFINE_bool(enable_dir_service, false, "Enable /dir");
118
DEFINE_bool(enable_threads_service, false, "Enable /threads");
119
120
DECLARE_int32(usercode_backup_threads);
121
DECLARE_bool(usercode_in_pthread);
122
123
// NOTE: never make s_ncore extern const whose ctor seq against other
124
// compilation units is undefined.
125
const int s_ncore = sysconf(_SC_NPROCESSORS_ONLN);
126
127
ServerOptions::ServerOptions()
128
0
    : idle_timeout_sec(-1)
129
0
    , nshead_service(NULL)
130
0
    , thrift_service(NULL)
131
0
    , mongo_service_adaptor(NULL)
132
0
    , auth(NULL)
133
0
    , server_owns_auth(false)
134
0
    , interceptor(NULL)
135
0
    , server_owns_interceptor(false)
136
0
    , num_threads(8)
137
0
    , max_concurrency(0)
138
0
    , session_local_data_factory(NULL)
139
0
    , reserved_session_local_data(0)
140
0
    , thread_local_data_factory(NULL)
141
0
    , reserved_thread_local_data(0)
142
0
    , bthread_init_fn(NULL)
143
0
    , bthread_init_args(NULL)
144
0
    , bthread_init_count(0)
145
0
    , internal_port(-1)
146
0
    , has_builtin_services(true)
147
0
    , force_ssl(false)
148
0
    , use_rdma(false)
149
0
    , baidu_master_service(NULL)
150
0
    , http_master_service(NULL)
151
0
    , health_reporter(NULL)
152
0
    , rtmp_service(NULL)
153
0
    , redis_service(NULL)
154
0
    , bthread_tag(BTHREAD_TAG_DEFAULT)
155
0
    , rpc_pb_message_factory(NULL)
156
0
    , ignore_eovercrowded(false) {
157
0
    if (s_ncore > 0) {
158
0
        num_threads = s_ncore + 1;
159
0
    }
160
0
}
161
162
0
ServerSSLOptions* ServerOptions::mutable_ssl_options() {
163
0
    if (!_ssl_options) {
164
0
        _ssl_options.reset(new ServerSSLOptions);
165
0
    }
166
0
    return _ssl_options.get();
167
0
}
168
169
Server::MethodProperty::OpaqueParams::OpaqueParams()
170
0
    : is_tabbed(false)
171
0
    , allow_default_url(false)
172
0
    , allow_http_body_to_pb(true)
173
0
    , pb_bytes_to_base64(false)
174
0
    , pb_single_repeated_to_array(false) {
175
0
}
176
177
Server::MethodProperty::MethodProperty()
178
0
    : is_builtin_service(false)
179
0
    , own_method_status(false)
180
0
    , http_url(NULL)
181
0
    , service(NULL)
182
0
    , method(NULL)
183
0
    , status(NULL)
184
0
    , ignore_eovercrowded(false) {
185
0
}
186
187
0
static timeval GetUptime(void* arg/*start_time*/) {
188
0
    return butil::microseconds_to_timeval(butil::cpuwide_time_us() - (intptr_t)arg);
189
0
}
190
191
0
static void PrintStartTime(std::ostream& os, void* arg) {
192
    // Print when this server was Server::Start()-ed.
193
0
    time_t start_time = static_cast<Server*>(arg)->last_start_time();
194
0
    struct tm timeinfo;
195
0
    char buf[64];
196
0
    strftime(buf, sizeof(buf), "%Y/%m/%d-%H:%M:%S",
197
0
             localtime_r(&start_time, &timeinfo));
198
0
    os << buf;
199
0
}
200
201
0
static void PrintSupportedLB(std::ostream& os, void*) {
202
0
    LoadBalancerExtension()->List(os, ' ');
203
0
}
204
205
0
static void PrintSupportedNS(std::ostream& os, void*) {
206
0
    NamingServiceExtension()->List(os, ' ');
207
0
}
208
209
0
static void PrintSupportedProtocols(std::ostream& os, void*) {
210
0
    std::vector<Protocol> protocols;
211
0
    ListProtocols(&protocols);
212
0
    for (size_t i = 0; i < protocols.size(); ++i) {
213
0
        if (i != 0) {
214
0
            os << ' ';
215
0
        }
216
0
        os << (protocols[i].name ? protocols[i].name : "(null)");
217
0
    }
218
0
}
219
220
0
static void PrintSupportedCompressions(std::ostream& os, void*) {
221
0
    std::vector<CompressHandler> compressors;
222
0
    ListCompressHandler(&compressors);
223
0
    for (size_t i = 0; i < compressors.size(); ++i) {
224
0
        if (i != 0) {
225
0
            os << ' ';
226
0
        }
227
0
        os << (compressors[i].name ? compressors[i].name : "(null)");
228
0
    }
229
0
}
230
231
0
static void PrintSupportedChecksums(std::ostream& os, void*) {
232
0
    std::vector<ChecksumHandler> handlers;
233
0
    ListChecksumHandler(&handlers);
234
0
    for (size_t i = 0; i < handlers.size(); ++i) {
235
0
        if (i != 0) {
236
0
            os << ' ';
237
0
        }
238
0
        os << (handlers[i].name ? handlers[i].name : "(null)");
239
0
    }
240
0
}
241
242
0
static void PrintEnabledProfilers(std::ostream& os, void*) {
243
0
    if (cpu_profiler_enabled) {
244
0
        os << "cpu ";
245
0
    }
246
0
    if (IsHeapProfilerEnabled()) {
247
0
        if (has_TCMALLOC_SAMPLE_PARAMETER()) {
248
0
            os << "heap ";
249
0
        } else {
250
0
            os << "heap(no TCMALLOC_SAMPLE_PARAMETER in env) ";
251
0
        }
252
0
    }
253
0
    os << "contention";
254
0
}
255
256
static bvar::PassiveStatus<std::string> s_lb_st(
257
    "rpc_load_balancer", PrintSupportedLB, NULL);
258
259
static bvar::PassiveStatus<std::string> s_ns_st(
260
    "rpc_naming_service", PrintSupportedNS, NULL);
261
262
static bvar::PassiveStatus<std::string> s_proto_st(
263
    "rpc_protocols", PrintSupportedProtocols, NULL);
264
265
static bvar::PassiveStatus<std::string> s_comp_st(
266
    "rpc_compressions", PrintSupportedCompressions, NULL);
267
268
static bvar::PassiveStatus<std::string> s_cksum_st(
269
    "rpc_checksums", PrintSupportedChecksums, NULL);
270
271
static bvar::PassiveStatus<std::string> s_prof_st(
272
    "rpc_profilers", PrintEnabledProfilers, NULL);
273
274
0
static int32_t GetConnectionCount(void* arg) {
275
0
    ServerStatistics ss;
276
0
    static_cast<Server*>(arg)->GetStat(&ss);
277
0
    return ss.connection_count;
278
0
}
279
280
0
static int32_t GetServiceCount(void* arg) {
281
0
    ServerStatistics ss;
282
0
    static_cast<Server*>(arg)->GetStat(&ss);
283
0
    return ss.user_service_count;
284
0
}
285
286
0
static int32_t GetBuiltinServiceCount(void* arg) {
287
0
    ServerStatistics ss;
288
0
    static_cast<Server*>(arg)->GetStat(&ss);
289
0
    return ss.builtin_service_count;
290
0
}
291
292
0
static bvar::Vector<unsigned, 2> GetSessionLocalDataCount(void* arg) {
293
0
    bvar::Vector<unsigned, 2> v;
294
0
    SimpleDataPool::Stat s =
295
0
        static_cast<Server*>(arg)->session_local_data_pool()->stat();
296
0
    v[0] = s.ncreated - s.nfree;
297
0
    v[1] = s.nfree;
298
0
    return v;
299
0
}
300
301
0
static int cast_no_barrier_int(void* arg) {
302
0
    return butil::subtle::NoBarrier_Load(static_cast<int*>(arg));
303
0
}
304
305
0
std::string Server::ServerPrefix() const {
306
0
    if(_options.server_info_name.empty()) {
307
0
        return butil::string_printf("%s_%d", g_server_info_prefix, listen_address().port);
308
0
    } else {
309
0
        return std::string(g_server_info_prefix) + "_" + _options.server_info_name;
310
0
    }
311
0
}
312
313
0
void* Server::UpdateDerivedVars(void* arg) {
314
0
    const int64_t start_us = butil::cpuwide_time_us();
315
316
0
    Server* server = static_cast<Server*>(arg);
317
0
    const std::string prefix = server->ServerPrefix();
318
0
    std::vector<SocketId> conns;
319
0
    std::vector<SocketId> internal_conns;
320
321
0
    server->_nerror_bvar.expose_as(prefix, "error");
322
323
0
    server->_eps_bvar.expose_as(prefix, "eps");
324
325
0
    server->_concurrency_bvar.expose_as(prefix, "concurrency");
326
327
0
    bvar::PassiveStatus<timeval> uptime_st(
328
0
        prefix, "uptime", GetUptime, (void*)(intptr_t)start_us);
329
330
0
    bvar::PassiveStatus<std::string> start_time_st(
331
0
        prefix, "start_time", PrintStartTime, server);
332
333
0
    bvar::PassiveStatus<int32_t> nconn_st(
334
0
        prefix, "connection_count", GetConnectionCount, server);
335
336
0
    bvar::PassiveStatus<int32_t> nservice_st(
337
0
        prefix, "service_count", GetServiceCount, server);
338
339
0
    bvar::PassiveStatus<int32_t> nbuiltinservice_st(
340
0
        prefix, "builtin_service_count", GetBuiltinServiceCount, server);
341
342
0
    bvar::PassiveStatus<bvar::Vector<unsigned, 2> > nsessiondata_st(
343
0
        GetSessionLocalDataCount, server);
344
0
    if (server->session_local_data_pool()) {
345
0
        nsessiondata_st.expose_as(prefix, "session_local_data_count");
346
0
        nsessiondata_st.set_vector_names("using,free");
347
0
    }
348
349
0
    std::string mprefix = prefix;
350
0
    for (MethodMap::iterator it = server->_method_map.begin();
351
0
         it != server->_method_map.end(); ++it) {
352
        // Not expose counters on builtin services.
353
0
        if (!it->second.is_builtin_service) {
354
0
            mprefix.resize(prefix.size());
355
0
            mprefix.push_back('_');
356
0
            bvar::to_underscored_name(&mprefix, it->second.method->full_name());
357
0
            it->second.status->Expose(mprefix);
358
0
        }
359
0
    }
360
0
    if (server->options().baidu_master_service) {
361
0
        server->options().baidu_master_service->Expose(prefix);
362
0
    }
363
0
    if (server->options().nshead_service) {
364
0
        server->options().nshead_service->Expose(prefix);
365
0
    }
366
367
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
368
    if (server->options().thrift_service) {
369
        server->options().thrift_service->Expose(prefix);
370
    }
371
#endif
372
373
0
    int64_t last_time = butil::gettimeofday_us();
374
0
    int consecutive_nosleep = 0;
375
0
    while (1) {
376
0
        const int64_t sleep_us = 1000000L + last_time - butil::gettimeofday_us();
377
0
        if (sleep_us < 1000L) {
378
0
            if (++consecutive_nosleep >= 2) {
379
0
                consecutive_nosleep = 0;
380
0
                LOG(WARNING) << __FUNCTION__ << " is too busy!";
381
0
            }
382
0
        } else {
383
0
            consecutive_nosleep = 0;
384
0
            if (bthread_usleep(sleep_us) < 0) {
385
0
                PLOG_IF(ERROR, errno != ESTOP) << "Fail to sleep";
386
0
                return NULL;
387
0
            }
388
0
        }
389
0
        last_time = butil::gettimeofday_us();
390
391
        // Update stats of accepted sockets.
392
0
        if (server->_am) {
393
0
            server->_am->ListConnections(&conns);
394
0
        }
395
0
        if (server->_internal_am) {
396
0
            server->_internal_am->ListConnections(&internal_conns);
397
0
        }
398
0
        const int64_t now_ms = butil::cpuwide_time_ms();
399
0
        for (size_t i = 0; i < conns.size(); ++i) {
400
0
            SocketUniquePtr ptr;
401
0
            if (Socket::Address(conns[i], &ptr) == 0) {
402
0
                ptr->UpdateStatsEverySecond(now_ms);
403
0
            }
404
0
        }
405
0
        for (size_t i = 0; i < internal_conns.size(); ++i) {
406
0
            SocketUniquePtr ptr;
407
0
            if (Socket::Address(internal_conns[i], &ptr) == 0) {
408
0
                ptr->UpdateStatsEverySecond(now_ms);
409
0
            }
410
0
        }
411
0
    }
412
0
}
413
414
0
const std::string& Server::ServiceProperty::service_name() const {
415
0
    if (service) {
416
0
        return service->GetDescriptor()->full_name();
417
0
    } else if (restful_map) {
418
0
        return restful_map->service_name();
419
0
    }
420
0
    const static std::string s_unknown_name = "";
421
0
    return s_unknown_name;
422
0
}
423
424
Server::Server(ProfilerLinker)
425
0
    : _session_local_data_pool(NULL)
426
0
    , _status(UNINITIALIZED)
427
0
    , _builtin_service_count(0)
428
0
    , _virtual_service_count(0)
429
0
    , _failed_to_set_max_concurrency_of_method(false)
430
0
    , _failed_to_set_ignore_eovercrowded(false)
431
0
    , _am(NULL)
432
0
    , _internal_am(NULL)
433
0
    , _first_service(NULL)
434
0
    , _tab_info_list(NULL)
435
0
    , _global_restful_map(NULL)
436
0
    , _last_start_time(0)
437
0
    , _derivative_thread(INVALID_BTHREAD)
438
0
    , _keytable_pool(NULL)
439
0
    , _eps_bvar(&_nerror_bvar)
440
0
    , _concurrency(0)
441
0
    , _concurrency_bvar(cast_no_barrier_int, &_concurrency)
442
0
    , _has_progressive_read_method(false) {
443
0
    BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0,
444
0
                  Server_concurrency_must_be_aligned_by_cacheline);
445
0
}
446
447
0
Server::~Server() {
448
0
    Stop(0);
449
0
    Join();
450
0
    ClearServices();
451
0
    FreeSSLContexts();
452
0
    delete _session_local_data_pool;
453
0
    _session_local_data_pool = NULL;
454
455
0
    delete _options.nshead_service;
456
0
    _options.nshead_service = NULL;
457
458
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
459
    delete _options.thrift_service;
460
    _options.thrift_service = NULL;
461
#endif
462
463
0
    delete _options.baidu_master_service;
464
0
    _options.baidu_master_service = NULL;
465
466
0
    delete _options.http_master_service;
467
0
    _options.http_master_service = NULL;
468
469
0
    delete _options.rpc_pb_message_factory;
470
0
    _options.rpc_pb_message_factory = NULL;
471
472
0
    delete _am;
473
0
    _am = NULL;
474
0
    delete _internal_am;
475
0
    _internal_am = NULL;
476
477
0
    delete _tab_info_list;
478
0
    _tab_info_list = NULL;
479
480
0
    delete _global_restful_map;
481
0
    _global_restful_map = NULL;
482
483
0
    if (!_options.pid_file.empty()) {
484
0
        unlink(_options.pid_file.c_str());
485
0
    }
486
0
    if (_options.server_owns_auth) {
487
0
        delete _options.auth;
488
0
        _options.auth = NULL;
489
0
    }
490
0
    if (_options.server_owns_interceptor) {
491
0
        delete _options.interceptor;
492
0
        _options.interceptor = NULL;
493
0
    }
494
495
0
    delete _options.redis_service;
496
0
    _options.redis_service = NULL;
497
0
}
498
499
0
int Server::AddBuiltinServices() {
500
    // Firstly add services shown in tabs.
501
0
    if (AddBuiltinService(new (std::nothrow) StatusService)) {
502
0
        LOG(ERROR) << "Fail to add StatusService";
503
0
        return -1;
504
0
    }
505
0
    if (AddBuiltinService(new (std::nothrow) VarsService)) {
506
0
        LOG(ERROR) << "Fail to add VarsService";
507
0
        return -1;
508
0
    }
509
0
    if (AddBuiltinService(new (std::nothrow) ConnectionsService)) {
510
0
        LOG(ERROR) << "Fail to add ConnectionsService";
511
0
        return -1;
512
0
    }
513
0
    if (AddBuiltinService(new (std::nothrow) FlagsService)) {
514
0
        LOG(ERROR) << "Fail to add FlagsService";
515
0
        return -1;
516
0
    }
517
0
    if (AddBuiltinService(new (std::nothrow) RpczService)) {
518
0
        LOG(ERROR) << "Fail to add RpczService";
519
0
        return -1;
520
0
    }
521
0
    if (AddBuiltinService(new (std::nothrow) HotspotsService)) {
522
0
        LOG(ERROR) << "Fail to add HotspotsService";
523
0
        return -1;
524
0
    }
525
0
    if (AddBuiltinService(new (std::nothrow) IndexService)) {
526
0
        LOG(ERROR) << "Fail to add IndexService";
527
0
        return -1;
528
0
    }
529
530
    // Add other services.
531
0
    if (AddBuiltinService(new (std::nothrow) VersionService(this))) {
532
0
        LOG(ERROR) << "Fail to add VersionService";
533
0
        return -1;
534
0
    }
535
0
    if (AddBuiltinService(new (std::nothrow) HealthService)) {
536
0
        LOG(ERROR) << "Fail to add HealthService";
537
0
        return -1;
538
0
    }
539
0
    if (AddBuiltinService(new (std::nothrow) ProtobufsService(this))) {
540
0
        LOG(ERROR) << "Fail to add ProtobufsService";
541
0
        return -1;
542
0
    }
543
0
    if (AddBuiltinService(new (std::nothrow) BadMethodService)) {
544
0
        LOG(ERROR) << "Fail to add BadMethodService";
545
0
        return -1;
546
0
    }
547
0
    if (AddBuiltinService(new (std::nothrow) ListService(this))) {
548
0
        LOG(ERROR) << "Fail to add ListService";
549
0
        return -1;
550
0
    }
551
0
    if (AddBuiltinService(new (std::nothrow) PrometheusMetricsService)) {
552
0
        LOG(ERROR) << "Fail to add MetricsService";
553
0
        return -1;
554
0
    }
555
0
    if (FLAGS_enable_threads_service &&
556
0
        AddBuiltinService(new (std::nothrow) ThreadsService)) {
557
0
        LOG(ERROR) << "Fail to add ThreadsService";
558
0
        return -1;
559
0
    }
560
0
    if (AddBuiltinService(new (std::nothrow) MemoryService)) {
561
0
        LOG(ERROR) << "Fail to add MemoryService";
562
0
        return -1;
563
0
    }
564
565
0
#if !BRPC_WITH_GLOG
566
0
    if (AddBuiltinService(new (std::nothrow) VLogService)) {
567
0
        LOG(ERROR) << "Fail to add VLogService";
568
0
        return -1;
569
0
    }
570
0
#endif
571
572
0
    if (AddBuiltinService(new (std::nothrow) PProfService)) {
573
0
        LOG(ERROR) << "Fail to add PProfService";
574
0
        return -1;
575
0
    }
576
0
    if (FLAGS_enable_dir_service &&
577
0
        AddBuiltinService(new (std::nothrow) DirService)) {
578
0
        LOG(ERROR) << "Fail to add DirService";
579
0
        return -1;
580
0
    }
581
0
    if (AddBuiltinService(new (std::nothrow) BthreadsService)) {
582
0
        LOG(ERROR) << "Fail to add BthreadsService";
583
0
        return -1;
584
0
    }
585
0
    if (AddBuiltinService(new (std::nothrow) IdsService)) {
586
0
        LOG(ERROR) << "Fail to add IdsService";
587
0
        return -1;
588
0
    }
589
0
    if (AddBuiltinService(new (std::nothrow) SocketsService)) {
590
0
        LOG(ERROR) << "Fail to add SocketsService";
591
0
        return -1;
592
0
    }
593
0
    if (AddBuiltinService(new (std::nothrow) GetFaviconService)) {
594
0
        LOG(ERROR) << "Fail to add GetFaviconService";
595
0
        return -1;
596
0
    }
597
0
    if (AddBuiltinService(new (std::nothrow) GetJsService)) {
598
0
        LOG(ERROR) << "Fail to add GetJsService";
599
0
        return -1;
600
0
    }
601
0
    if (AddBuiltinService(new (std::nothrow) GrpcHealthCheckService)) {
602
0
        LOG(ERROR) << "Fail to add GrpcHealthCheckService";
603
0
        return -1;
604
0
    }
605
0
    return 0;
606
0
}
607
608
0
bool is_http_protocol(const char* name) {
609
0
    if (name[0] != 'h') {
610
0
        return false;
611
0
    }
612
0
    return strcmp(name, "http") == 0 || strcmp(name, "h2") == 0;
613
0
}
614
615
0
Acceptor* Server::BuildAcceptor() {
616
0
    std::set<std::string> whitelist;
617
0
    for (butil::StringSplitter sp(_options.enabled_protocols.c_str(), ' ');
618
0
         sp; ++sp) {
619
0
        std::string protocol(sp.field(), sp.length());
620
0
        whitelist.insert(protocol);
621
0
    }
622
0
    const bool has_whitelist = !whitelist.empty();
623
0
    Acceptor* acceptor = new (std::nothrow) Acceptor(_keytable_pool);
624
0
    if (NULL == acceptor) {
625
0
        LOG(ERROR) << "Fail to new Acceptor";
626
0
        return NULL;
627
0
    }
628
0
    InputMessageHandler handler;
629
0
    std::vector<Protocol> protocols;
630
0
    ListProtocols(&protocols);
631
0
    for (size_t i = 0; i < protocols.size(); ++i) {
632
0
        if (protocols[i].process_request == NULL) {
633
            // The protocol does not support server-side.
634
0
            continue;
635
0
        }
636
0
        if (has_whitelist &&
637
0
            !is_http_protocol(protocols[i].name) &&
638
0
            !whitelist.erase(protocols[i].name)) {
639
            // the protocol is not allowed to serve.
640
0
            RPC_VLOG << "Skip protocol=" << protocols[i].name;
641
0
            continue;
642
0
        }
643
        // `process_request' is required at server side
644
0
        handler.parse = protocols[i].parse;
645
0
        handler.process = protocols[i].process_request;
646
0
        handler.verify = protocols[i].verify;
647
0
        handler.arg = this;
648
0
        handler.name = protocols[i].name;
649
0
        if (acceptor->AddHandler(handler) != 0) {
650
0
            LOG(ERROR) << "Fail to add handler into Acceptor("
651
0
                       << acceptor << ')';
652
0
            delete acceptor;
653
0
            return NULL;
654
0
        }
655
0
    }
656
0
    if (!whitelist.empty()) {
657
0
        std::ostringstream err;
658
0
        err << "ServerOptions.enabled_protocols has unknown protocols=`";
659
0
        for (std::set<std::string>::const_iterator it = whitelist.begin();
660
0
             it != whitelist.end(); ++it) {
661
0
            err << *it << ' ';
662
0
        }
663
0
        err << '\'';
664
0
        delete acceptor;
665
0
        LOG(ERROR) << err.str();
666
0
        return NULL;
667
0
    }
668
0
    return acceptor;
669
0
}
670
671
0
int Server::InitializeOnce() {
672
0
    if (_status != UNINITIALIZED) {
673
0
        return 0;
674
0
    }
675
0
    GlobalInitializeOrDie();
676
677
0
    if (_status != UNINITIALIZED) {
678
0
        return 0;
679
0
    }
680
0
    _status = READY;
681
0
    return 0;
682
0
}
683
684
0
int Server::InitALPNOptions(const ServerSSLOptions* options) {
685
0
    if (options == nullptr) {
686
0
        LOG(ERROR) << "Fail to init alpn options, ssl options is nullptr.";
687
0
        return -1;
688
0
    }   
689
690
0
    std::string raw_protocol;
691
0
    const std::string& alpns = options->alpns;
692
0
    for (butil::StringSplitter split(alpns.data(), ','); split; ++split) {
693
0
        butil::StringPiece alpn(split.field(), split.length());
694
0
        alpn.trim_spaces();
695
696
        // Check protocol valid(exist and server support)
697
0
        AdaptiveProtocolType protocol_type(alpn);
698
0
        const Protocol* protocol = FindProtocol(protocol_type);
699
0
        if (protocol == nullptr || !protocol->support_server()) {
700
0
            LOG(ERROR) << "Server does not support alpn=" << alpn;
701
0
            return -1;
702
0
        }
703
0
        raw_protocol.append(ALPNProtocolToString(protocol_type));
704
0
    }
705
0
    _raw_alpns = std::move(raw_protocol);
706
0
    return 0;
707
0
}
708
709
0
static void* CreateServerTLS(const void* args) {
710
0
    return static_cast<const DataFactory*>(args)->CreateData();
711
0
}
712
0
static void DestroyServerTLS(void* data, const void* void_factory) {
713
0
    static_cast<const DataFactory*>(void_factory)->DestroyData(data);
714
0
}
715
716
struct BthreadInitArgs {
717
    bool (*bthread_init_fn)(void* args); // default: NULL (do nothing)
718
    void* bthread_init_args;             // default: NULL
719
    bool result;
720
    bool done;
721
    bool stop;
722
    bthread_t th;
723
};
724
725
0
static void* BthreadInitEntry(void* void_args) {
726
0
    BthreadInitArgs* args = (BthreadInitArgs*)void_args;
727
0
    args->result = args->bthread_init_fn(args->bthread_init_args);
728
0
    args->done = true;
729
0
    while (!args->stop) {
730
0
        bthread_usleep(1000);
731
0
    }
732
0
    return NULL;
733
0
}
734
735
struct RevertServerStatus {
736
0
    inline void operator()(Server* s) const {
737
0
        if (s != NULL) {
738
0
            s->Stop(0);
739
0
            s->Join();
740
0
        }
741
0
    }
742
};
743
744
0
static int get_port_from_fd(int fd) {
745
0
    struct sockaddr_in addr;
746
0
    socklen_t size = sizeof(addr);
747
0
    if (getsockname(fd, (struct sockaddr*)&addr, &size) < 0) {
748
0
        return -1;
749
0
    }
750
0
    return ntohs(addr.sin_port);
751
0
}
752
753
bool Server::CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
754
0
                                      ConcurrencyLimiter** out) {
755
0
    if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED) {
756
0
        *out = NULL;
757
0
        return true;
758
0
    }
759
0
    const ConcurrencyLimiter* cl =
760
0
        ConcurrencyLimiterExtension()->Find(amc.type().c_str());
761
0
    if (cl == NULL) {
762
0
        LOG(ERROR) << "Fail to find ConcurrencyLimiter by `" << amc.value() << "'";
763
0
        return false;
764
0
    }
765
0
    ConcurrencyLimiter* cl_copy = cl->New(amc);
766
0
    if (cl_copy == NULL) {
767
0
        LOG(ERROR) << "Fail to new ConcurrencyLimiter";
768
0
        return false;
769
0
    }
770
0
    *out = cl_copy;
771
0
    return true;
772
0
}
773
774
#if BRPC_WITH_RDMA
775
static bool OptionsAvailableOverRdma(const ServerOptions* opt) {
776
    if (opt->rtmp_service) {
777
        LOG(WARNING) << "RTMP is not supported by RDMA";
778
        return false;
779
    }
780
    if (opt->has_ssl_options()) {
781
        LOG(WARNING) << "SSL is not supported by RDMA";
782
        return false;
783
    }
784
    if (opt->nshead_service) {
785
        LOG(WARNING) << "NSHEAD is not supported by RDMA";
786
        return false;
787
    }
788
    if (opt->mongo_service_adaptor) {
789
        LOG(WARNING) << "MONGO is not supported by RDMA";
790
        return false;
791
    }
792
    return true;
793
}
794
#endif
795
796
static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0);
797
static bool g_default_ignore_eovercrowded(false);
798
799
0
inline void copy_and_fill_server_options(ServerOptions& dst, const ServerOptions& src) {
800
// follow Server::~Server()
801
0
#define FREE_PTR_IF_NOT_REUSED(ptr)         \
802
0
    if (dst.ptr != src.ptr) {               \
803
0
        delete dst.ptr;                     \
804
0
        dst.ptr = NULL;                     \
805
0
    }
806
807
0
    if (&dst != &src) {
808
0
        FREE_PTR_IF_NOT_REUSED(nshead_service);
809
810
 #ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
811
        FREE_PTR_IF_NOT_REUSED(thrift_service);
812
 #endif
813
814
0
        FREE_PTR_IF_NOT_REUSED(baidu_master_service);
815
0
        FREE_PTR_IF_NOT_REUSED(http_master_service);
816
0
        FREE_PTR_IF_NOT_REUSED(rpc_pb_message_factory);
817
818
0
        if (dst.pid_file != src.pid_file && !dst.pid_file.empty()) {
819
0
            unlink(dst.pid_file.c_str());
820
0
        }
821
822
0
        if (dst.server_owns_auth) {
823
0
            FREE_PTR_IF_NOT_REUSED(auth);
824
0
        }
825
826
0
        if (dst.server_owns_interceptor) {
827
0
            FREE_PTR_IF_NOT_REUSED(interceptor);
828
0
        }
829
830
0
        FREE_PTR_IF_NOT_REUSED(redis_service);
831
832
        // copy data members directly
833
0
        dst = src;
834
0
    }
835
0
#undef FREE_PTR_IF_NOT_REUSED
836
837
    // Create the resource if:
838
    //   1. `dst` copied from user and user forgot to create
839
    //   2. `dst` created by our
840
0
    if (!dst.rpc_pb_message_factory) {
841
0
        dst.rpc_pb_message_factory = new DefaultRpcPBMessageFactory();
842
0
    }
843
0
}
844
845
int Server::StartInternal(const butil::EndPoint& endpoint,
846
                          const PortRange& port_range,
847
0
                          const ServerOptions *opt) {
848
0
    std::unique_ptr<Server, RevertServerStatus> revert_server(this);
849
0
    if (_failed_to_set_max_concurrency_of_method) {
850
0
        _failed_to_set_max_concurrency_of_method = false;
851
0
        LOG(ERROR) << "previous call to MaxConcurrencyOf() was failed, "
852
0
            "fix it before starting server";
853
0
        return -1;
854
0
    }
855
0
    if (_failed_to_set_ignore_eovercrowded) {
856
0
        _failed_to_set_ignore_eovercrowded = false;
857
0
        LOG(ERROR) << "previous call to IgnoreEovercrowdedOf() was failed, "
858
0
            "fix it before starting server";
859
0
        return -1;
860
0
    }
861
0
    if (InitializeOnce() != 0) {
862
0
        LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
863
0
        return -1;
864
0
    }
865
0
    const Status st = status();
866
0
    if (st != READY) {
867
0
        if (st == RUNNING) {
868
0
            LOG(ERROR) << "Server[" << version() << "] is already running on "
869
0
                       << _listen_addr;
870
0
        } else {
871
0
            LOG(ERROR) << "Can't start Server[" << version()
872
0
                       << "] which is " << status_str(status());
873
0
        }
874
0
        return -1;
875
0
    }
876
877
0
    copy_and_fill_server_options(_options, opt ? *opt : ServerOptions());
878
879
0
    if (!_options.h2_settings.IsValid(true/*log_error*/)) {
880
0
        LOG(ERROR) << "Invalid h2_settings";
881
0
        return -1;
882
0
    }
883
884
0
    if (_options.bthread_tag < BTHREAD_TAG_DEFAULT ||
885
0
        _options.bthread_tag >= FLAGS_task_group_ntags) {
886
0
        LOG(ERROR) << "Fail to set tag " << _options.bthread_tag
887
0
                   << ", tag range is [" << BTHREAD_TAG_DEFAULT << ":"
888
0
                   << FLAGS_task_group_ntags << ")";
889
0
        return -1;
890
0
    }
891
892
0
    if (_options.use_rdma) {
893
#if BRPC_WITH_RDMA
894
        if (!OptionsAvailableOverRdma(&_options)) {
895
            return -1;
896
        }
897
        rdma::GlobalRdmaInitializeOrDie();
898
        if (!rdma::InitPollingModeWithTag(_options.bthread_tag)) {
899
            return -1;
900
        }
901
#else
902
0
        LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
903
0
        return -1;
904
0
#endif
905
0
    }
906
907
0
    if (_options.http_master_service) {
908
        // Check requirements for http_master_service:
909
        //  has "default_method" & request/response have no fields
910
0
        const google::protobuf::ServiceDescriptor* sd =
911
0
            _options.http_master_service->GetDescriptor();
912
0
        const google::protobuf::MethodDescriptor* md =
913
0
            sd->FindMethodByName("default_method");
914
0
        if (md == NULL) {
915
0
            LOG(ERROR) << "http_master_service must have a method named `default_method'";
916
0
            return -1;
917
0
        }
918
0
        if (md->input_type()->field_count() != 0) {
919
0
            LOG(ERROR) << "The request type of http_master_service must have "
920
0
                "no fields, actually " << md->input_type()->field_count();
921
0
            return -1;
922
0
        }
923
0
        if (md->output_type()->field_count() != 0) {
924
0
            LOG(ERROR) << "The response type of http_master_service must have "
925
0
                "no fields, actually " << md->output_type()->field_count();
926
0
            return -1;
927
0
        }
928
0
    }
929
930
    // CAUTION:
931
    //   Following code may run multiple times if this server is started and
932
    //   stopped more than once. Reuse or delete previous resources!
933
934
0
    if (_options.session_local_data_factory) {
935
0
        if (_session_local_data_pool == NULL) {
936
0
            _session_local_data_pool =
937
0
                new (std::nothrow) SimpleDataPool(_options.session_local_data_factory);
938
0
            if (NULL == _session_local_data_pool) {
939
0
                LOG(ERROR) << "Fail to new SimpleDataPool";
940
0
                return -1;
941
0
            }
942
0
        } else {
943
0
            _session_local_data_pool->Reset(_options.session_local_data_factory);
944
0
        }
945
0
        _session_local_data_pool->Reserve(_options.reserved_session_local_data);
946
0
    }
947
948
    // Leak of `_keytable_pool' and others is by design.
949
    // See comments in Server::Join() for details.
950
    // Instruct LeakSanitizer to ignore the designated memory leak.
951
0
    ANNOTATE_SCOPED_MEMORY_LEAK;
952
    // Init _keytable_pool always. If the server was stopped before, the pool
953
    // should be destroyed in Join().
954
0
    _keytable_pool = new bthread_keytable_pool_t;
955
0
    if (bthread_keytable_pool_init(_keytable_pool) != 0) {
956
0
        LOG(ERROR) << "Fail to init _keytable_pool";
957
0
        delete _keytable_pool;
958
0
        _keytable_pool = NULL;
959
0
        return -1;
960
0
    }
961
962
0
    if (_options.thread_local_data_factory) {
963
0
        _tl_options.thread_local_data_factory = _options.thread_local_data_factory;
964
0
        if (bthread_key_create2(&_tl_options.tls_key, DestroyServerTLS,
965
0
                                _options.thread_local_data_factory) != 0) {
966
0
            LOG(ERROR) << "Fail to create thread-local key";
967
0
            return -1;
968
0
        }
969
0
        if (_options.reserved_thread_local_data) {
970
0
            bthread_keytable_pool_reserve(_keytable_pool,
971
0
                                          _options.reserved_thread_local_data,
972
0
                                          _tl_options.tls_key,
973
0
                                          CreateServerTLS,
974
0
                                          _options.thread_local_data_factory);
975
0
        }
976
0
    } else {
977
0
        _tl_options = ThreadLocalOptions();
978
0
    }
979
980
0
    if (_options.bthread_init_count != 0 &&
981
0
        _options.bthread_init_fn != NULL) {
982
        // Create some special bthreads to call the init functions. The
983
        // bthreads will not quit until all bthreads finish the init function.
984
0
        BthreadInitArgs* init_args
985
0
            = new BthreadInitArgs[_options.bthread_init_count];
986
0
        size_t ncreated = 0;
987
0
        for (size_t i = 0; i < _options.bthread_init_count; ++i, ++ncreated) {
988
0
            init_args[i].bthread_init_fn = _options.bthread_init_fn;
989
0
            init_args[i].bthread_init_args = _options.bthread_init_args;
990
0
            init_args[i].result = false;
991
0
            init_args[i].done = false;
992
0
            init_args[i].stop = false;
993
0
            bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
994
0
            tmp.tag = _options.bthread_tag;
995
0
            tmp.keytable_pool = _keytable_pool;
996
0
            if (bthread_start_background(
997
0
                    &init_args[i].th, &tmp, BthreadInitEntry, &init_args[i]) != 0) {
998
0
                break;
999
0
            }
1000
0
        }
1001
        // Wait until all created bthreads finish the init function.
1002
0
        for (size_t i = 0; i < ncreated; ++i) {
1003
0
            while (!init_args[i].done) {
1004
0
                bthread_usleep(1000);
1005
0
            }
1006
0
        }
1007
        // Stop and join created bthreads.
1008
0
        for (size_t i = 0; i < ncreated; ++i) {
1009
0
            init_args[i].stop = true;
1010
0
        }
1011
0
        for (size_t i = 0; i < ncreated; ++i) {
1012
0
            bthread_join(init_args[i].th, NULL);
1013
0
        }
1014
0
        size_t num_failed_result = 0;
1015
0
        for (size_t i = 0; i < ncreated; ++i) {
1016
0
            if (!init_args[i].result) {
1017
0
                ++num_failed_result;
1018
0
            }
1019
0
        }
1020
0
        delete [] init_args;
1021
0
        if (ncreated != _options.bthread_init_count) {
1022
0
            LOG(ERROR) << "Fail to create "
1023
0
                       << _options.bthread_init_count - ncreated << " bthreads";
1024
0
            return -1;
1025
0
        }
1026
0
        if (num_failed_result != 0) {
1027
0
            LOG(ERROR) << num_failed_result << " bthread_init_fn failed";
1028
0
            return -1;
1029
0
        }
1030
0
    }
1031
1032
    // Free last SSL contexts
1033
0
    FreeSSLContexts();
1034
0
    if (_options.has_ssl_options()) {
1035
1036
        // Change ServerSSLOptions.alpns to _raw_alpns.
1037
        // AddCertificate function maybe access raw_alpns variable.
1038
0
        if (InitALPNOptions(_options.mutable_ssl_options()) != 0) {
1039
0
            return -1;
1040
0
        }
1041
0
        CertInfo& default_cert = _options.mutable_ssl_options()->default_cert;
1042
0
        if (default_cert.certificate.empty()) {
1043
0
            LOG(ERROR) << "default_cert is empty";
1044
0
            return -1;
1045
0
        }
1046
0
        if (AddCertificate(default_cert) != 0) {
1047
0
            return -1;
1048
0
        }
1049
0
        _default_ssl_ctx = _ssl_ctx_map.begin()->second.ctx;
1050
1051
0
        const std::vector<CertInfo>& certs = _options.mutable_ssl_options()->certs;
1052
0
        for (size_t i = 0; i < certs.size(); ++i) {
1053
0
            if (AddCertificate(certs[i]) != 0) {
1054
0
                return -1;
1055
0
            }
1056
0
        }
1057
0
    } else if (_options.force_ssl) {
1058
0
        LOG(ERROR) << "Fail to force SSL for all connections "
1059
0
                      "without ServerOptions.ssl_options";
1060
0
        return -1;
1061
0
    }
1062
1063
0
    _concurrency = 0;
1064
1065
0
    if (_options.has_builtin_services &&
1066
0
        _builtin_service_count <= 0 &&
1067
0
        AddBuiltinServices() != 0) {
1068
0
        LOG(ERROR) << "Fail to add builtin services";
1069
0
        return -1;
1070
0
    }
1071
    // If a server is started/stopped for mutiple times and one of the options
1072
    // sets has_builtin_service to true, builtin services will be enabled for
1073
    // any later re-start. Check this case and report to user.
1074
0
    if (!_options.has_builtin_services && _builtin_service_count > 0) {
1075
0
        LOG(ERROR) << "A server started/stopped for multiple times must be "
1076
0
            "consistent on ServerOptions.has_builtin_services";
1077
0
        return -1;
1078
0
    }
1079
1080
    // Prepare all restful maps
1081
0
    for (ServiceMap::const_iterator it = _fullname_service_map.begin();
1082
0
         it != _fullname_service_map.end(); ++it) {
1083
0
        if (it->second.restful_map) {
1084
0
            it->second.restful_map->PrepareForFinding();
1085
0
        }
1086
0
    }
1087
0
    if (_global_restful_map) {
1088
0
        _global_restful_map->PrepareForFinding();
1089
0
    }
1090
1091
0
    if (_options.num_threads > 0) {
1092
0
        if (FLAGS_usercode_in_pthread) {
1093
0
            _options.num_threads += FLAGS_usercode_backup_threads;
1094
0
        }
1095
0
        if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
1096
0
            _options.num_threads = BTHREAD_MIN_CONCURRENCY;
1097
0
        }
1098
0
        bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag);
1099
0
    }
1100
1101
0
    for (MethodMap::iterator it = _method_map.begin();
1102
0
        it != _method_map.end(); ++it) {
1103
0
        if (it->second.is_builtin_service) {
1104
0
            it->second.status->SetConcurrencyLimiter(NULL);
1105
0
        } else {
1106
0
            const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
1107
0
            if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED) {
1108
0
                amc = &_options.method_max_concurrency;
1109
0
            }
1110
0
            ConcurrencyLimiter* cl = NULL;
1111
0
            if (!CreateConcurrencyLimiter(*amc, &cl)) {
1112
0
                LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
1113
0
                return -1;
1114
0
            }
1115
0
            it->second.status->SetConcurrencyLimiter(cl);
1116
0
            it->second.max_concurrency.SetConcurrencyLimiter(cl);
1117
0
        }
1118
0
    }
1119
0
    if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
1120
0
        return -1;
1121
0
    }
1122
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
1123
    if (0 != SetServiceMaxConcurrency(_options.thrift_service)) {
1124
        return -1;
1125
    }
1126
#endif
1127
1128
1129
    // Create listening ports
1130
0
    if (port_range.min_port > port_range.max_port) {
1131
0
        LOG(ERROR) << "Invalid port_range=[" << port_range.min_port << '-'
1132
0
                   << port_range.max_port << ']';
1133
0
        return -1;
1134
0
    }
1135
0
    if (butil::is_endpoint_extended(endpoint) &&
1136
0
            (port_range.min_port != endpoint.port || port_range.max_port != endpoint.port)) {
1137
0
        LOG(ERROR) << "Only IPv4 address supports port range feature";
1138
0
        return -1;
1139
0
    }
1140
0
    _listen_addr = endpoint;
1141
0
    for (int port = port_range.min_port; port <= port_range.max_port; ++port) {
1142
0
        _listen_addr.port = port;
1143
0
        butil::fd_guard sockfd(tcp_listen(_listen_addr));
1144
0
        if (sockfd < 0) {
1145
0
            if (port != port_range.max_port) { // not the last port, try next
1146
0
                continue;
1147
0
            }
1148
0
            if (port_range.min_port != port_range.max_port) {
1149
0
                LOG(ERROR) << "Fail to listen " << _listen_addr.ip
1150
0
                           << ":[" << port_range.min_port << '-'
1151
0
                           << port_range.max_port << ']';
1152
0
            } else {
1153
0
                LOG(ERROR) << "Fail to listen " << _listen_addr;
1154
0
            }
1155
0
            return -1;
1156
0
        }
1157
0
        if (_listen_addr.port == 0) {
1158
            // port=0 makes kernel dynamically select a port from
1159
            // https://en.wikipedia.org/wiki/Ephemeral_port
1160
0
            _listen_addr.port = get_port_from_fd(sockfd);
1161
0
            if (_listen_addr.port <= 0) {
1162
0
                LOG(ERROR) << "Fail to get port from fd=" << sockfd;
1163
0
                return -1;
1164
0
            }
1165
0
        }
1166
0
        if (_am == NULL) {
1167
0
            _am = BuildAcceptor();
1168
0
            if (NULL == _am) {
1169
0
                LOG(ERROR) << "Fail to build acceptor";
1170
0
                return -1;
1171
0
            }
1172
0
            _am->_use_rdma = _options.use_rdma;
1173
0
            _am->_bthread_tag = _options.bthread_tag;
1174
0
        }
1175
        // Set `_status' to RUNNING before accepting connections
1176
        // to prevent requests being rejected as ELOGOFF
1177
0
        _status = RUNNING;
1178
0
        time(&_last_start_time);
1179
0
        GenerateVersionIfNeeded();
1180
0
        g_running_server_count.fetch_add(1, butil::memory_order_relaxed);
1181
1182
        // Pass ownership of `sockfd' to `_am'
1183
0
        if (_am->StartAccept(sockfd, _options.idle_timeout_sec,
1184
0
                             _default_ssl_ctx,
1185
0
                             _options.force_ssl) != 0) {
1186
0
            LOG(ERROR) << "Fail to start acceptor";
1187
0
            return -1;
1188
0
        }
1189
0
        sockfd.release();
1190
0
        break; // stop trying
1191
0
    }
1192
0
    if (_options.internal_port >= 0 && _options.has_builtin_services) {
1193
0
        if (_options.internal_port  == _listen_addr.port) {
1194
0
            LOG(ERROR) << "ServerOptions.internal_port=" << _options.internal_port
1195
0
                       << " is same with port=" << _listen_addr.port << " to Start()";
1196
0
            return -1;
1197
0
        }
1198
0
        if (_options.internal_port == 0) {
1199
0
            LOG(ERROR) << "ServerOptions.internal_port cannot be 0, which"
1200
0
                " allocates a dynamic and probabaly unfiltered port,"
1201
0
                " against the purpose of \"being internal\".";
1202
0
            return -1;
1203
0
        }
1204
0
        if (butil::is_endpoint_extended(endpoint)) {
1205
0
            LOG(ERROR) << "internal_port is available in IPv4 address only";
1206
0
            return -1;
1207
0
        }
1208
1209
0
        butil::EndPoint internal_point = _listen_addr;
1210
0
        internal_point.port = _options.internal_port;
1211
0
        butil::fd_guard sockfd(tcp_listen(internal_point));
1212
0
        if (sockfd < 0) {
1213
0
            LOG(ERROR) << "Fail to listen " << internal_point << " (internal)";
1214
0
            return -1;
1215
0
        }
1216
0
        if (NULL == _internal_am) {
1217
0
            _internal_am = BuildAcceptor();
1218
0
            if (NULL == _internal_am) {
1219
0
                LOG(ERROR) << "Fail to build internal acceptor";
1220
0
                return -1;
1221
0
            }
1222
0
        }
1223
        // Pass ownership of `sockfd' to `_internal_am'
1224
0
        if (_internal_am->StartAccept(sockfd, _options.idle_timeout_sec,
1225
0
                                      _default_ssl_ctx,
1226
0
                                      false) != 0) {
1227
0
            LOG(ERROR) << "Fail to start internal_acceptor";
1228
0
            return -1;
1229
0
        }
1230
0
        sockfd.release();
1231
0
    }
1232
1233
0
    PutPidFileIfNeeded();
1234
1235
    // Launch _derivative_thread.
1236
0
    CHECK_EQ(INVALID_BTHREAD, _derivative_thread);
1237
0
    bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
1238
0
    tmp.tag = _options.bthread_tag;
1239
0
    bthread_attr_set_name(&tmp, "UpdateDerivedVars");
1240
0
    if (bthread_start_background(&_derivative_thread, &tmp,
1241
0
                                 UpdateDerivedVars, this) != 0) {
1242
0
        LOG(ERROR) << "Fail to create _derivative_thread";
1243
0
        return -1;
1244
0
    }
1245
1246
    // Print tips to server launcher.
1247
0
    if (butil::is_endpoint_extended(_listen_addr)) {
1248
0
        const char* builtin_msg = _options.has_builtin_services ? " with builtin service" : "";
1249
0
        LOG(INFO) << "Server[" << version() << "] is serving on " << _listen_addr
1250
0
                  << builtin_msg << '.';
1251
        //TODO add TrackMe support
1252
0
    } else {
1253
0
        int http_port = _listen_addr.port;
1254
0
        std::ostringstream server_info;
1255
0
        server_info << "Server[" << version() << "] is serving on port="
1256
0
                    << _listen_addr.port;
1257
0
        if (_options.internal_port >= 0 && _options.has_builtin_services) {
1258
0
            http_port = _options.internal_port;
1259
0
            server_info << " and internal_port=" << _options.internal_port;
1260
0
        }
1261
0
        LOG(INFO) << server_info.str() << '.';
1262
1263
0
        if (_options.has_builtin_services) {
1264
0
            LOG(INFO) << "Check out http://" << butil::my_hostname() << ':'
1265
0
                    << http_port << " in web browser.";
1266
0
        } else {
1267
0
            LOG(WARNING) << "Builtin services are disabled according to "
1268
0
                "ServerOptions.has_builtin_services";
1269
0
        }
1270
        // For trackme reporting
1271
0
        SetTrackMeAddress(butil::EndPoint(butil::my_ip(), http_port));
1272
0
    }
1273
0
    revert_server.release();
1274
0
    return 0;
1275
0
}
1276
1277
0
int Server::Start(const butil::EndPoint& endpoint, const ServerOptions* opt) {
1278
0
    return StartInternal(
1279
0
        endpoint, PortRange(endpoint.port, endpoint.port), opt);
1280
0
}
1281
1282
0
int Server::Start(const char* ip_port_str, const ServerOptions* opt) {
1283
0
    butil::EndPoint point;
1284
0
    if (str2endpoint(ip_port_str, &point) != 0 &&
1285
0
        hostname2endpoint(ip_port_str, &point) != 0) {
1286
0
        LOG(ERROR) << "Invalid address=`" << ip_port_str << '\'';
1287
0
        return -1;
1288
0
    }
1289
0
    return Start(point, opt);
1290
0
}
1291
1292
0
int Server::Start(int port, const ServerOptions* opt) {
1293
0
    if (port < 0 || port > 65535) {
1294
0
        LOG(ERROR) << "Invalid port=" << port;
1295
0
        return -1;
1296
0
    }
1297
0
    return Start(butil::EndPoint(butil::IP_ANY, port), opt);
1298
0
}
1299
1300
int Server::Start(const char* ip_str, PortRange port_range,
1301
0
                  const ServerOptions *opt) {
1302
0
    butil::ip_t ip;
1303
0
    if (butil::str2ip(ip_str, &ip) != 0 &&
1304
0
        butil::hostname2ip(ip_str, &ip) != 0) {
1305
0
        LOG(ERROR) << "Invalid address=`" << ip_str << '\'';
1306
0
        return -1;
1307
0
    }
1308
0
    return StartInternal(butil::EndPoint(ip, 0), port_range, opt);
1309
0
}
1310
1311
0
int Server::Start(PortRange port_range, const ServerOptions* opt) {
1312
0
    return StartInternal(butil::EndPoint(butil::IP_ANY, 0), port_range, opt);
1313
0
}
1314
1315
0
int Server::Stop(int timeout_ms) {
1316
0
    if (_status != RUNNING) {
1317
0
        return -1;
1318
0
    }
1319
0
    _status = STOPPING;
1320
1321
0
    LOG(INFO) << "Server[" << version() << "] is going to quit";
1322
1323
0
    if (_am) {
1324
0
        _am->StopAccept(timeout_ms);
1325
0
    }
1326
0
    if (_internal_am) {
1327
        // TODO: calculate timeout?
1328
0
        _internal_am->StopAccept(timeout_ms);
1329
0
    }
1330
0
    return 0;
1331
0
}
1332
1333
// NOTE: Join() can happen before Stop().
1334
0
int Server::Join() {
1335
0
    if (_status != RUNNING && _status != STOPPING) {
1336
0
        return -1;
1337
0
    }
1338
0
    if (_am) {
1339
0
        _am->Join();
1340
0
    }
1341
0
    if (_internal_am) {
1342
0
        _internal_am->Join();
1343
0
    }
1344
1345
0
    if (_session_local_data_pool) {
1346
        // We can't delete the pool right here because there's a bvar watching
1347
        // this pool in _derivative_thread which does not quit yet.
1348
0
        _session_local_data_pool->Reset(NULL);
1349
0
    }
1350
1351
0
    if (_keytable_pool) {
1352
        // Destroy _keytable_pool to delete keytables inside. This has to be
1353
        // done here (before leaving Join) because it's legal for users to
1354
        // delete bthread keys after Join which makes related objects
1355
        // in KeyTables undeletable anymore and leaked.
1356
0
        CHECK_EQ(0, bthread_keytable_pool_destroy(_keytable_pool));
1357
        // TODO: Can't delete _keytable_pool which may be accessed by
1358
        // still-running bthreads (created by the server). The memory is
1359
        // leaked but servers are unlikely to be started/stopped frequently,
1360
        // the leak is acceptable in most scenarios.
1361
0
        _keytable_pool = NULL;
1362
0
    }
1363
1364
    // Delete tls_key as well since we don't need it anymore.
1365
0
    if (_tl_options.tls_key != INVALID_BTHREAD_KEY) {
1366
0
        CHECK_EQ(0, bthread_key_delete(_tl_options.tls_key));
1367
0
        _tl_options.tls_key = INVALID_BTHREAD_KEY;
1368
0
    }
1369
1370
    // Have to join _derivative_thread, which may assume that server is running
1371
    // and services in server are not mutated, otherwise data race happens
1372
    // between Add/RemoveService after Join() and the thread.
1373
0
    if (_derivative_thread != INVALID_BTHREAD) {
1374
0
        bthread_stop(_derivative_thread);
1375
0
        bthread_join(_derivative_thread, NULL);
1376
0
        _derivative_thread = INVALID_BTHREAD;
1377
0
    }
1378
1379
0
    g_running_server_count.fetch_sub(1, butil::memory_order_relaxed);
1380
0
    _status = READY;
1381
0
    return 0;
1382
0
}
1383
1384
int Server::AddServiceInternal(google::protobuf::Service* service,
1385
                               bool is_builtin_service,
1386
0
                               const ServiceOptions& svc_opt) {
1387
0
    if (NULL == service) {
1388
0
        LOG(ERROR) << "Parameter[service] is NULL!";
1389
0
        return -1;
1390
0
    }
1391
0
    const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor();
1392
0
    if (sd->method_count() == 0) {
1393
0
        LOG(ERROR) << "service=" << sd->full_name()
1394
0
                   << " does not have any method.";
1395
0
        return -1;
1396
0
    }
1397
1398
0
    if (InitializeOnce() != 0) {
1399
0
        LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
1400
0
        return -1;
1401
0
    }
1402
0
    if (status() != READY) {
1403
0
        LOG(ERROR) << "Can't add service=" << sd->full_name() << " to Server["
1404
0
                   << version() << "] which is " << status_str(status());
1405
0
        return -1;
1406
0
    }
1407
1408
0
    if (_fullname_service_map.seek(sd->full_name()) != NULL) {
1409
0
        LOG(ERROR) << "service=" << sd->full_name() << " already exists";
1410
0
        return -1;
1411
0
    }
1412
0
    ServiceProperty* old_ss = _service_map.seek(sd->name());
1413
0
    if (old_ss != NULL) {
1414
        // names conflict.
1415
0
        LOG(ERROR) << "Conflict service name between "
1416
0
                   << sd->full_name() << " and "
1417
0
                   << old_ss->service_name();
1418
0
        return -1;
1419
0
    }
1420
1421
    // defined `option (idl_support) = true' or not.
1422
0
    const bool is_idl_support = sd->file()->options().GetExtension(idl_support);
1423
1424
0
    Tabbed* tabbed = dynamic_cast<Tabbed*>(service);
1425
0
    for (int i = 0; i < sd->method_count(); ++i) {
1426
0
        const google::protobuf::MethodDescriptor* md = sd->method(i);
1427
0
        MethodProperty mp;
1428
0
        mp.is_builtin_service = is_builtin_service;
1429
0
        mp.own_method_status = true;
1430
0
        mp.params.is_tabbed = !!tabbed;
1431
0
        mp.params.allow_default_url = svc_opt.allow_default_url;
1432
0
        mp.params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
1433
0
        mp.params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
1434
0
        mp.params.pb_single_repeated_to_array = svc_opt.pb_single_repeated_to_array;
1435
0
        mp.params.enable_progressive_read = svc_opt.enable_progressive_read;
1436
0
        if (mp.params.enable_progressive_read) {
1437
0
            _has_progressive_read_method = true;
1438
0
        }
1439
0
        mp.service = service;
1440
0
        mp.method = md;
1441
0
        mp.status = new MethodStatus;
1442
0
        _method_map[md->full_name()] = mp;
1443
0
        if (is_idl_support && sd->name() != sd->full_name()/*has ns*/) {
1444
0
            MethodProperty mp2 = mp;
1445
0
            mp2.own_method_status = false;
1446
            // have to map service_name + method_name as well because ubrpc
1447
            // does not send the namespace before service_name.
1448
0
            std::string full_name_wo_ns;
1449
0
            full_name_wo_ns.reserve(sd->name().size() + 1 + md->name().size());
1450
0
            full_name_wo_ns.append(sd->name());
1451
0
            full_name_wo_ns.push_back('.');
1452
0
            full_name_wo_ns.append(md->name());
1453
0
            if (_method_map.seek(full_name_wo_ns) == NULL) {
1454
0
                _method_map[full_name_wo_ns] = mp2;
1455
0
            } else {
1456
0
                LOG(ERROR) << '`' << full_name_wo_ns << "' already exists";
1457
0
                RemoveMethodsOf(service);
1458
0
                return -1;
1459
0
            }
1460
0
        }
1461
0
    }
1462
1463
0
    const ServiceProperty ss = {
1464
0
        is_builtin_service, svc_opt.ownership, service, NULL };
1465
0
    _fullname_service_map[sd->full_name()] = ss;
1466
0
    _service_map[sd->name()] = ss;
1467
0
    if (is_builtin_service) {
1468
0
        ++_builtin_service_count;
1469
0
    } else {
1470
0
        if (_first_service == NULL) {
1471
0
            _first_service = service;
1472
0
        }
1473
0
    }
1474
1475
0
    butil::StringPiece restful_mappings = svc_opt.restful_mappings;
1476
0
    restful_mappings.trim_spaces();
1477
0
    if (!restful_mappings.empty()) {
1478
        // Parse the mappings.
1479
0
        std::vector<RestfulMapping> mappings;
1480
0
        if (!ParseRestfulMappings(restful_mappings, &mappings)) {
1481
0
            LOG(ERROR) << "Fail to parse mappings `" << restful_mappings << '\'';
1482
0
            RemoveService(service);
1483
0
            return -1;
1484
0
        }
1485
0
        if (mappings.empty()) {
1486
            // we already trimmed at the beginning, this is impossible.
1487
0
            LOG(ERROR) << "Impossible: Nothing in restful_mappings";
1488
0
            RemoveService(service);
1489
0
            return -1;
1490
0
        }
1491
1492
        // Due the flexibility of URL matching, it's almost impossible to
1493
        // dispatch all kinds of URL to different methods *efficiently* just
1494
        // inside the HTTP protocol impl. We would like to match most-
1495
        // frequently-used URLs(/Service/Method) fastly and match more complex
1496
        // URLs inside separate functions.
1497
        // The trick is adding some entries inside the service maps without
1498
        // real services, mapping from the first component in the URL to a
1499
        // RestfulMap which does the complex matchings. For example:
1500
        //   "/v1/send => SendFn, /v1/recv => RecvFn, /v2/check => CheckFn"
1501
        // We'll create 2 entries in service maps (_fullname_service_map and
1502
        // _service_map) mapping from "v1" and "v2" to 2 different RestfulMap
1503
        // respectively. When the URL is accessed, we extract the first
1504
        // component, find the RestfulMap and do url matchings. Regular url
1505
        // handling is not affected.
1506
0
        for (size_t i = 0; i < mappings.size(); ++i) {
1507
0
            const std::string full_method_name =
1508
0
                sd->full_name() + "." + mappings[i].method_name;
1509
0
            MethodProperty* mp = _method_map.seek(full_method_name);
1510
0
            if (mp == NULL) {
1511
0
                LOG(ERROR) << "Unknown method=`" << full_method_name << '\'';
1512
0
                RemoveService(service);
1513
0
                return -1;
1514
0
            }
1515
1516
0
            const std::string& svc_name = mappings[i].path.service_name;
1517
0
            if (svc_name.empty()) {
1518
0
                if (_global_restful_map == NULL) {
1519
0
                    _global_restful_map = new RestfulMap("");
1520
0
                }
1521
0
                MethodProperty::OpaqueParams params;
1522
0
                params.is_tabbed = !!tabbed;
1523
0
                params.allow_default_url = svc_opt.allow_default_url;
1524
0
                params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
1525
0
                params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
1526
0
                params.pb_single_repeated_to_array = svc_opt.pb_single_repeated_to_array;
1527
0
                if (!_global_restful_map->AddMethod(
1528
0
                        mappings[i].path, service, params,
1529
0
                        mappings[i].method_name, mp->status)) {
1530
0
                    LOG(ERROR) << "Fail to map `" << mappings[i].path
1531
0
                               << "' to `" << full_method_name << '\'';
1532
0
                    RemoveService(service);
1533
0
                    return -1;
1534
0
                }
1535
0
                if (mp->http_url == NULL) {
1536
0
                    mp->http_url = new std::string(mappings[i].path.to_string());
1537
0
                } else {
1538
0
                    if (!mp->http_url->empty()) {
1539
0
                        mp->http_url->append(" @");
1540
0
                    }
1541
0
                    mp->http_url->append(mappings[i].path.to_string());
1542
0
                }
1543
0
                continue;
1544
0
            }
1545
0
            ServiceProperty* sp = _fullname_service_map.seek(svc_name);
1546
0
            ServiceProperty* sp2 = _service_map.seek(svc_name);
1547
0
            if (((!!sp) != (!!sp2)) ||
1548
0
                (sp != NULL && sp->service != sp2->service)) {
1549
0
                LOG(ERROR) << "Impossible: _fullname_service and _service_map are"
1550
0
                        " inconsistent before inserting " << svc_name;
1551
0
                RemoveService(service);
1552
0
                return -1;
1553
0
            }
1554
0
            RestfulMap* m = NULL;
1555
0
            if (sp == NULL) {
1556
0
                m = new RestfulMap(mappings[i].path.service_name);
1557
0
            } else {
1558
0
                m = sp->restful_map;
1559
0
            }
1560
0
            MethodProperty::OpaqueParams params;
1561
0
            params.is_tabbed = !!tabbed;
1562
0
            params.allow_default_url = svc_opt.allow_default_url;
1563
0
            params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
1564
0
            params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
1565
0
            params.pb_single_repeated_to_array = svc_opt.pb_single_repeated_to_array;
1566
0
            if (!m->AddMethod(mappings[i].path, service, params,
1567
0
                              mappings[i].method_name, mp->status)) {
1568
0
                LOG(ERROR) << "Fail to map `" << mappings[i].path << "' to `"
1569
0
                           << sd->full_name() << '.' << mappings[i].method_name
1570
0
                           << '\'';
1571
0
                if (sp == NULL) {
1572
0
                    delete m;
1573
0
                }
1574
0
                RemoveService(service);
1575
0
                return -1;
1576
0
            }
1577
0
            if (mp->http_url == NULL) {
1578
0
                mp->http_url = new std::string(mappings[i].path.to_string());
1579
0
            } else {
1580
0
                if (!mp->http_url->empty()) {
1581
0
                    mp->http_url->append(" @");
1582
0
                }
1583
0
                mp->http_url->append(mappings[i].path.to_string());
1584
0
            }
1585
0
            if (sp == NULL) {
1586
0
                ServiceProperty ss =
1587
0
                    { false, SERVER_DOESNT_OWN_SERVICE, NULL, m };
1588
0
                _fullname_service_map[svc_name] = ss;
1589
0
                _service_map[svc_name] = ss;
1590
0
                ++_virtual_service_count;
1591
0
            }
1592
0
        }
1593
0
    }
1594
1595
0
    if (tabbed) {
1596
0
        if (_tab_info_list == NULL) {
1597
0
            _tab_info_list = new TabInfoList;
1598
0
        }
1599
0
        const size_t last_size = _tab_info_list->size();
1600
0
        tabbed->GetTabInfo(_tab_info_list);
1601
0
        const size_t cur_size = _tab_info_list->size();
1602
0
        for (size_t i = last_size; i != cur_size; ++i) {
1603
0
            const TabInfo& info = (*_tab_info_list)[i];
1604
0
            if (!info.valid()) {
1605
0
                LOG(ERROR) << "Invalid TabInfo: path=" << info.path
1606
0
                           << " tab_name=" << info.tab_name;
1607
0
                _tab_info_list->resize(last_size);
1608
0
                RemoveService(service);
1609
0
                return -1;
1610
0
            }
1611
0
        }
1612
0
    }
1613
0
    return 0;
1614
0
}
1615
1616
ServiceOptions::ServiceOptions()
1617
0
    : ownership(SERVER_DOESNT_OWN_SERVICE)
1618
0
    , allow_default_url(false)
1619
0
    , allow_http_body_to_pb(true)
1620
#ifdef BAIDU_INTERNAL
1621
    , pb_bytes_to_base64(false)
1622
#else
1623
0
    , pb_bytes_to_base64(true)
1624
#endif
1625
0
    , pb_single_repeated_to_array(false)
1626
0
    , enable_progressive_read(false)
1627
0
    {}
1628
1629
int Server::AddService(google::protobuf::Service* service,
1630
0
                       ServiceOwnership ownership) {
1631
0
    ServiceOptions options;
1632
0
    options.ownership = ownership;
1633
0
    return AddServiceInternal(service, false, options);
1634
0
}
1635
1636
int Server::AddService(google::protobuf::Service* service,
1637
                       ServiceOwnership ownership,
1638
                       const butil::StringPiece& restful_mappings,
1639
0
                       bool allow_default_url) {
1640
0
    ServiceOptions options;
1641
0
    options.ownership = ownership;
1642
    // TODO: This is weird
1643
0
    options.restful_mappings = restful_mappings.as_string();
1644
0
    options.allow_default_url = allow_default_url;
1645
0
    return AddServiceInternal(service, false, options);
1646
0
}
1647
1648
int Server::AddService(google::protobuf::Service* service,
1649
0
                       const ServiceOptions& options) {
1650
0
    return AddServiceInternal(service, false, options);
1651
0
}
1652
1653
0
int Server::AddBuiltinService(google::protobuf::Service* service) {
1654
0
    ServiceOptions options;
1655
0
    options.ownership = SERVER_OWNS_SERVICE;
1656
0
    return AddServiceInternal(service, true, options);
1657
0
}
1658
1659
0
void Server::RemoveMethodsOf(google::protobuf::Service* service) {
1660
0
    const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor();
1661
0
    const bool is_idl_support = sd->file()->options().GetExtension(idl_support);
1662
0
    std::string full_name_wo_ns;
1663
0
    for (int i = 0; i < sd->method_count(); ++i) {
1664
0
        const google::protobuf::MethodDescriptor* md = sd->method(i);
1665
0
        MethodProperty* mp = _method_map.seek(md->full_name());
1666
0
        if (is_idl_support) {
1667
0
            full_name_wo_ns.clear();
1668
0
            full_name_wo_ns.reserve(sd->name().size() + 1 + md->name().size());
1669
0
            full_name_wo_ns.append(sd->name());
1670
0
            full_name_wo_ns.push_back('.');
1671
0
            full_name_wo_ns.append(md->name());
1672
0
            _method_map.erase(full_name_wo_ns);
1673
0
        }
1674
0
        if (mp == NULL) {
1675
0
            LOG(ERROR) << "Fail to find method=" << md->full_name();
1676
0
            continue;
1677
0
        }
1678
0
        if (mp->http_url) {
1679
0
            butil::StringSplitter at_sp(mp->http_url->c_str(), '@');
1680
0
            for (; at_sp; ++at_sp) {
1681
0
                butil::StringPiece path(at_sp.field(), at_sp.length());
1682
0
                path.trim_spaces();
1683
0
                butil::StringSplitter slash_sp(
1684
0
                    path.data(), path.data() + path.size(), '/');
1685
0
                if (slash_sp == NULL) {
1686
0
                    LOG(ERROR) << "Invalid http_url=" << *mp->http_url;
1687
0
                    break;
1688
0
                }
1689
0
                butil::StringPiece v_svc_name(slash_sp.field(), slash_sp.length());
1690
0
                const ServiceProperty* vsp = FindServicePropertyByName(v_svc_name);
1691
0
                if (vsp == NULL) {
1692
0
                    if (_global_restful_map) {
1693
0
                        std::string path_str;
1694
0
                        path.CopyToString(&path_str);
1695
0
                        if (_global_restful_map->RemoveByPathString(path_str)) {
1696
0
                            continue;
1697
0
                        }
1698
0
                    }
1699
0
                    LOG(ERROR) << "Impossible: service=" << v_svc_name
1700
0
                               << " for restful_map does not exist";
1701
0
                    break;
1702
0
                }
1703
0
                std::string path_str;
1704
0
                path.CopyToString(&path_str);
1705
0
                if (!vsp->restful_map->RemoveByPathString(path_str)) {
1706
0
                    LOG(ERROR) << "Fail to find path=" << path
1707
0
                               << " in restful_map of service=" << v_svc_name;
1708
0
                }
1709
0
            }
1710
0
            delete mp->http_url;
1711
0
        }
1712
1713
0
        if (mp->own_method_status) {
1714
0
            delete mp->status;
1715
0
        }
1716
0
        _method_map.erase(md->full_name());
1717
0
    }
1718
0
}
1719
1720
0
int Server::RemoveService(google::protobuf::Service* service) {
1721
0
    if (NULL == service) {
1722
0
        LOG(ERROR) << "Parameter[service] is NULL";
1723
0
        return -1;
1724
0
    }
1725
0
    if (status() != READY) {
1726
0
        LOG(ERROR) << "Can't remove service="
1727
0
                   << service->GetDescriptor()->full_name() << " from Server["
1728
0
                   << version() << "] which is " << status_str(status());
1729
0
        return -1;
1730
0
    }
1731
1732
0
    const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor();
1733
0
    ServiceProperty* ss = _fullname_service_map.seek(sd->full_name());
1734
0
    if (ss == NULL) {
1735
0
        RPC_VLOG << "Fail to find service=" << sd->full_name().c_str();
1736
0
        return -1;
1737
0
    }
1738
0
    RemoveMethodsOf(service);
1739
0
    if (ss->ownership == SERVER_OWNS_SERVICE) {
1740
0
        delete ss->service;
1741
0
    }
1742
0
    const bool is_builtin_service = ss->is_builtin_service;
1743
0
    _fullname_service_map.erase(sd->full_name());
1744
0
    _service_map.erase(sd->name());
1745
1746
    // Note: ss is invalidated.
1747
0
    if (is_builtin_service) {
1748
0
        --_builtin_service_count;
1749
0
    } else {
1750
0
        if (_first_service == service) {
1751
0
            _first_service = NULL;
1752
0
        }
1753
0
    }
1754
0
    return 0;
1755
0
}
1756
1757
0
void Server::ClearServices() {
1758
0
    if (status() != READY) {
1759
0
        LOG_IF(ERROR, status() != UNINITIALIZED)
1760
0
            << "Can't clear services from Server[" << version()
1761
0
            << "] which is " << status_str(status());
1762
0
        return;
1763
0
    }
1764
0
    for (ServiceMap::const_iterator it = _fullname_service_map.begin();
1765
0
         it != _fullname_service_map.end(); ++it) {
1766
0
        if (it->second.ownership == SERVER_OWNS_SERVICE) {
1767
0
            delete it->second.service;
1768
0
        }
1769
0
        delete it->second.restful_map;
1770
0
    }
1771
0
    for (MethodMap::const_iterator it = _method_map.begin();
1772
0
         it != _method_map.end(); ++it) {
1773
0
        if (it->second.own_method_status) {
1774
0
            delete it->second.status;
1775
0
        }
1776
0
        delete it->second.http_url;
1777
0
    }
1778
0
    _fullname_service_map.clear();
1779
0
    _service_map.clear();
1780
0
    _method_map.clear();
1781
0
    _builtin_service_count = 0;
1782
0
    _virtual_service_count = 0;
1783
0
    _first_service = NULL;
1784
0
}
1785
1786
google::protobuf::Service* Server::FindServiceByFullName(
1787
0
    const butil::StringPiece& full_name) const {
1788
0
    ServiceProperty* ss = _fullname_service_map.seek(full_name);
1789
0
    return (ss ? ss->service : NULL);
1790
0
}
1791
1792
google::protobuf::Service* Server::FindServiceByName(
1793
0
    const butil::StringPiece& name) const {
1794
0
    ServiceProperty* ss = _service_map.seek(name);
1795
0
    return (ss ? ss->service : NULL);
1796
0
}
1797
1798
0
void Server::GetStat(ServerStatistics* stat) const {
1799
0
    stat->connection_count = 0;
1800
0
    if (_am) {
1801
0
        stat->connection_count += _am->ConnectionCount();
1802
0
    }
1803
0
    if (_internal_am) {
1804
0
        stat->connection_count += _internal_am->ConnectionCount();
1805
0
    }
1806
0
    stat->user_service_count = service_count();
1807
0
    stat->builtin_service_count = builtin_service_count();
1808
0
}
1809
1810
0
void Server::ListServices(std::vector<google::protobuf::Service*> *services) {
1811
0
    if (!services) {
1812
0
        return;
1813
0
    }
1814
0
    services->clear();
1815
0
    services->reserve(service_count());
1816
0
    for (ServiceMap::const_iterator it = _fullname_service_map.begin();
1817
0
         it != _fullname_service_map.end(); ++it) {
1818
0
        if (it->second.is_user_service()) {
1819
0
            services->push_back(it->second.service);
1820
0
        }
1821
0
    }
1822
0
}
1823
1824
0
void Server::GenerateVersionIfNeeded() {
1825
0
    if (!_version.empty()) {
1826
0
        return;
1827
0
    }
1828
0
    int extra_count = !!_options.nshead_service + !!_options.rtmp_service +
1829
0
        !!_options.thrift_service + !!_options.redis_service;
1830
0
    _version.reserve((extra_count + service_count()) * 20);
1831
0
    for (ServiceMap::const_iterator it = _fullname_service_map.begin();
1832
0
         it != _fullname_service_map.end(); ++it) {
1833
0
        if (it->second.is_user_service()) {
1834
0
            if (!_version.empty()) {
1835
0
                _version.push_back('+');
1836
0
            }
1837
0
            _version.append(butil::class_name_str(*it->second.service));
1838
0
        }
1839
0
    }
1840
0
    if (_options.nshead_service) {
1841
0
        if (!_version.empty()) {
1842
0
            _version.push_back('+');
1843
0
        }
1844
0
        _version.append(butil::class_name_str(*_options.nshead_service));
1845
0
    }
1846
1847
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
1848
    if (_options.thrift_service) {
1849
        if (!_version.empty()) {
1850
            _version.push_back('+');
1851
        }
1852
        _version.append(butil::class_name_str(*_options.thrift_service));
1853
    }
1854
#endif
1855
1856
0
    if (_options.rtmp_service) {
1857
0
        if (!_version.empty()) {
1858
0
            _version.push_back('+');
1859
0
        }
1860
0
        _version.append(butil::class_name_str(*_options.rtmp_service));
1861
0
    }
1862
1863
0
    if (_options.redis_service) {
1864
0
        if (!_version.empty()) {
1865
0
            _version.push_back('+');
1866
0
        }
1867
0
        _version.append(butil::class_name_str(*_options.redis_service));
1868
0
    }
1869
0
}
1870
1871
0
void Server::PutPidFileIfNeeded() {
1872
0
    if (_options.pid_file.empty()) {
1873
0
        return;
1874
0
    }
1875
0
    RPC_VLOG << "pid_file = " << _options.pid_file;
1876
    // Recursively create directory
1877
0
    for (size_t pos = _options.pid_file.find('/'); pos != std::string::npos;
1878
0
            pos = _options.pid_file.find('/', pos + 1)) {
1879
0
        std::string dir_name =_options.pid_file.substr(0, pos + 1);
1880
0
        int rc = mkdir(dir_name.c_str(),
1881
0
                       S_IFDIR | S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP);
1882
0
        if (rc != 0 && errno != EEXIST
1883
#if defined(OS_MACOSX)
1884
        && errno != EISDIR
1885
#endif
1886
0
        ) {
1887
0
            PLOG(WARNING) << "Fail to create " << dir_name;
1888
0
            _options.pid_file.clear();
1889
0
            return;
1890
0
        }
1891
0
    }
1892
0
    int fd = open(_options.pid_file.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
1893
0
    if (fd < 0) {
1894
0
        LOG(WARNING) << "Fail to open " << _options.pid_file;
1895
0
        _options.pid_file.clear();
1896
0
        return;
1897
0
    }
1898
0
    char buf[32];
1899
0
    int nw = snprintf(buf, sizeof(buf), "%lld", (long long)getpid());
1900
0
    CHECK_EQ(nw, write(fd, buf, nw));
1901
0
    CHECK_EQ(0, close(fd));
1902
0
}
1903
1904
0
void Server::RunUntilAskedToQuit() {
1905
0
    while (!IsAskedToQuit()) {
1906
0
        bthread_usleep(1000000L);
1907
0
    }
1908
0
    Stop(0/*not used now*/);
1909
0
    Join();
1910
0
}
1911
1912
0
void* thread_local_data() {
1913
0
    const Server::ThreadLocalOptions* tl_options =
1914
0
        static_cast<const Server::ThreadLocalOptions*>(bthread_get_assigned_data());
1915
0
    if (tl_options == NULL) { // not in server threads.
1916
0
        return NULL;
1917
0
    }
1918
0
    if (BAIDU_UNLIKELY(tl_options->thread_local_data_factory == NULL)) {
1919
0
        CHECK(false) << "The protocol impl. may not set tls correctly";
1920
0
        return NULL;
1921
0
    }
1922
0
    void* data = bthread_getspecific(tl_options->tls_key);
1923
0
    if (data == NULL) {
1924
0
        data = tl_options->thread_local_data_factory->CreateData();
1925
0
        if (data != NULL) {
1926
0
            CHECK_EQ(0, bthread_setspecific(tl_options->tls_key, data));
1927
0
        }
1928
0
    }
1929
0
    return data;
1930
0
}
1931
1932
inline void tabs_li(std::ostream& os, const char* link,
1933
0
                    const char* tab_name, const char* current_tab_name) {
1934
0
    os << "<li id='" << link << '\'';
1935
0
    if (strcmp(current_tab_name, tab_name) == 0) {
1936
0
        os << " class='current'";
1937
0
    }
1938
0
    os << '>' << tab_name << "</li>\n";
1939
0
}
1940
1941
void Server::PrintTabsBody(std::ostream& os,
1942
0
                           const char* current_tab_name) const {
1943
0
    os << "<ul class='tabs-menu'>\n";
1944
0
    if (_tab_info_list) {
1945
0
        for (size_t i = 0; i < _tab_info_list->size(); ++i) {
1946
0
            const TabInfo& info = (*_tab_info_list)[i];
1947
0
            tabs_li(os, info.path.c_str(), info.tab_name.c_str(),
1948
0
                    current_tab_name);
1949
0
        }
1950
0
    }
1951
0
    os << "<li id='https://github.com/apache/brpc/blob/master/docs/cn/builtin_service.md' "
1952
0
        "class='help'>?</li>\n</ul>\n"
1953
0
        "<div style='height:40px;'></div>";  // placeholder
1954
0
}
1955
1956
static pthread_mutex_t g_dummy_server_mutex = PTHREAD_MUTEX_INITIALIZER;
1957
static Server* g_dummy_server = NULL;
1958
1959
0
int StartDummyServerAt(int port, ProfilerLinker) {
1960
0
    if (port < 0 || port >= 65536) {
1961
0
        LOG(ERROR) << "Invalid port=" << port;
1962
0
        return -1;
1963
0
    }
1964
0
    if (g_dummy_server == NULL) {  // (1)
1965
0
        BAIDU_SCOPED_LOCK(g_dummy_server_mutex);
1966
0
        if (g_dummy_server == NULL) {
1967
0
            Server* dummy_server = new Server;
1968
0
            dummy_server->set_version(butil::string_printf(
1969
0
                        "DummyServerOf(%s)", GetProgramName()));
1970
0
            ServerOptions options;
1971
0
            options.num_threads = 0;
1972
0
            options.bthread_tag = bthread_self_tag();
1973
0
            if (dummy_server->Start(port, &options) != 0) {
1974
0
                LOG(ERROR) << "Fail to start dummy_server at port=" << port;
1975
0
                return -1;
1976
0
            }
1977
            // (1) may see uninitialized dummy_server due to relaxed memory
1978
            // fencing, but we only expose a function to test existence
1979
            // of g_dummy_server, everything should be fine.
1980
0
            g_dummy_server = dummy_server;
1981
0
            return 0;
1982
0
        }
1983
0
    }
1984
0
    LOG(ERROR) << "Already have dummy_server at port="
1985
0
               << g_dummy_server->listen_address().port;
1986
0
    return -1;
1987
0
}
1988
1989
0
bool IsDummyServerRunning() {
1990
0
    return g_dummy_server != NULL;
1991
0
}
1992
1993
const Server::MethodProperty*
1994
0
Server::FindMethodPropertyByFullName(const butil::StringPiece& fullname) const  {
1995
0
    return _method_map.seek(fullname);
1996
0
}
1997
1998
const Server::MethodProperty*
1999
Server::FindMethodPropertyByFullName(const butil::StringPiece& service_name/*full*/,
2000
0
                                     const butil::StringPiece& method_name) const {
2001
0
    const size_t fullname_len = service_name.size() + 1 + method_name.size();
2002
0
    if (fullname_len <= 256) {
2003
        // Avoid allocation in most cases.
2004
0
        char buf[fullname_len];
2005
0
        memcpy(buf, service_name.data(), service_name.size());
2006
0
        buf[service_name.size()] = '.';
2007
0
        memcpy(buf + service_name.size() + 1, method_name.data(), method_name.size());
2008
0
        return FindMethodPropertyByFullName(butil::StringPiece(buf, fullname_len));
2009
0
    } else {
2010
0
        std::string full_method_name;
2011
0
        full_method_name.reserve(fullname_len);
2012
0
        full_method_name.append(service_name.data(), service_name.size());
2013
0
        full_method_name.push_back('.');
2014
0
        full_method_name.append(method_name.data(), method_name.size());
2015
0
        return FindMethodPropertyByFullName(full_method_name);
2016
0
    }
2017
0
}
2018
2019
const Server::MethodProperty*
2020
Server::FindMethodPropertyByNameAndIndex(const butil::StringPiece& service_name,
2021
0
                                         int method_index) const {
2022
0
    const Server::ServiceProperty* sp = FindServicePropertyByName(service_name);
2023
0
    if (NULL == sp) {
2024
0
        return NULL;
2025
0
    }
2026
0
    const google::protobuf::ServiceDescriptor* sd = sp->service->GetDescriptor();
2027
0
    if (method_index < 0 || method_index >= sd->method_count()) {
2028
0
        return NULL;
2029
0
    }
2030
0
    const google::protobuf::MethodDescriptor* method = sd->method(method_index);
2031
0
    return FindMethodPropertyByFullName(method->full_name());
2032
0
}
2033
2034
const Server::ServiceProperty*
2035
0
Server::FindServicePropertyByFullName(const butil::StringPiece& fullname) const {
2036
0
    return _fullname_service_map.seek(fullname);
2037
0
}
2038
2039
const Server::ServiceProperty*
2040
0
Server::FindServicePropertyByName(const butil::StringPiece& name) const {
2041
0
    return _service_map.seek(name);
2042
0
}
2043
2044
0
int Server::AddCertificate(const CertInfo& cert) {
2045
0
    if (!_options.has_ssl_options()) {
2046
0
        LOG(ERROR) << "ServerOptions.ssl_options is not configured yet";
2047
0
        return -1;
2048
0
    }
2049
0
    std::string cert_key(cert.certificate);
2050
0
    cert_key.append(cert.private_key);
2051
0
    if (_ssl_ctx_map.seek(cert_key) != NULL) {
2052
0
        LOG(WARNING) << cert << " already exists";
2053
0
        return 0;
2054
0
    }
2055
2056
0
    SSLContext ssl_ctx;
2057
0
    ssl_ctx.filters = cert.sni_filters;
2058
0
    ssl_ctx.ctx = std::make_shared<SocketSSLContext>();
2059
0
    SSL_CTX* raw_ctx = CreateServerSSLContext(
2060
0
            cert.certificate, cert.private_key,
2061
0
            _options.ssl_options(), &_raw_alpns, &ssl_ctx.filters);
2062
0
    if (raw_ctx == NULL) {
2063
0
        return -1;
2064
0
    }
2065
0
    ssl_ctx.ctx->raw_ctx = raw_ctx;
2066
2067
0
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
2068
0
    SSL_CTX_set_tlsext_servername_callback(ssl_ctx.ctx->raw_ctx, SSLSwitchCTXByHostname);
2069
0
    SSL_CTX_set_tlsext_servername_arg(ssl_ctx.ctx->raw_ctx, this);
2070
0
#endif
2071
2072
0
    if (!_reload_cert_maps.Modify(AddCertMapping, ssl_ctx)) {
2073
0
        LOG(ERROR) << "Fail to add mappings into _reload_cert_maps";
2074
0
        return -1;
2075
0
    }
2076
0
    _ssl_ctx_map[cert_key] = ssl_ctx;
2077
0
    return 0;
2078
0
}
2079
2080
0
bool Server::AddCertMapping(CertMaps& bg, const SSLContext& ssl_ctx) {
2081
0
    for (size_t i = 0; i < ssl_ctx.filters.size(); ++i) {
2082
0
        const char* hostname = ssl_ctx.filters[i].c_str();
2083
0
        CertMap* cmap = NULL;
2084
0
        if (strncmp(hostname, "*.", 2) == 0) {
2085
0
            cmap = &(bg.wildcard_cert_map);
2086
0
            hostname += 2;
2087
0
        } else {
2088
0
            cmap = &(bg.cert_map);
2089
0
        }
2090
0
        if (cmap->seek(hostname) == NULL) {
2091
0
            cmap->insert(hostname, ssl_ctx.ctx);
2092
0
        } else {
2093
0
            LOG(WARNING) << "Duplicate certificate hostname=" << hostname;
2094
0
        }
2095
0
    }
2096
0
    return true;
2097
0
}
2098
2099
0
int Server::RemoveCertificate(const CertInfo& cert) {
2100
0
    if (!_options.has_ssl_options()) {
2101
0
        LOG(ERROR) << "ServerOptions.ssl_options is not configured yet";
2102
0
        return -1;
2103
0
    }
2104
0
    std::string cert_key(cert.certificate);
2105
0
    cert_key.append(cert.private_key);
2106
0
    SSLContext* ssl_ctx = _ssl_ctx_map.seek(cert_key);
2107
0
    if (ssl_ctx == NULL) {
2108
0
        LOG(WARNING) << cert << " doesn't exist";
2109
0
        return 0;
2110
0
    }
2111
0
    if (ssl_ctx->ctx == _default_ssl_ctx) {
2112
0
        LOG(WARNING) << "Cannot remove: " << cert
2113
0
                     << " since it's the default certificate";
2114
0
        return -1;
2115
0
    }
2116
2117
0
    if (!_reload_cert_maps.Modify(RemoveCertMapping, *ssl_ctx)) {
2118
0
        LOG(ERROR) << "Fail to remove mappings from _reload_cert_maps";
2119
0
        return -1;
2120
0
    }
2121
2122
0
    _ssl_ctx_map.erase(cert_key);
2123
0
    return 0;
2124
0
}
2125
2126
0
bool Server::RemoveCertMapping(CertMaps& bg, const SSLContext& ssl_ctx) {
2127
0
    for (size_t i = 0; i < ssl_ctx.filters.size(); ++i) {
2128
0
        const char* hostname = ssl_ctx.filters[i].c_str();
2129
0
        CertMap* cmap = NULL;
2130
0
        if (strncmp(hostname, "*.", 2) == 0) {
2131
0
            cmap = &(bg.wildcard_cert_map);
2132
0
            hostname += 2;
2133
0
        } else {
2134
0
            cmap = &(bg.cert_map);
2135
0
        }
2136
0
        std::shared_ptr<SocketSSLContext>* ctx = cmap->seek(hostname);
2137
0
        if (ctx != NULL && *ctx == ssl_ctx.ctx) {
2138
0
            cmap->erase(hostname);
2139
0
        }
2140
0
    }
2141
0
    return true;
2142
0
}
2143
2144
0
int Server::ResetCertificates(const std::vector<CertInfo>& certs) {
2145
0
    if (!_options.has_ssl_options()) {
2146
0
        LOG(ERROR) << "ServerOptions.ssl_options is not configured yet";
2147
0
        return -1;
2148
0
    }
2149
2150
0
    SSLContextMap tmp_map;
2151
0
    if (tmp_map.init(certs.size() + 1) != 0) {
2152
0
        LOG(ERROR) << "Fail to init tmp_map";
2153
0
        return -1;
2154
0
    }
2155
2156
    // Add default certificate into tmp_map first since it can't be reloaded
2157
0
    std::string default_cert_key =
2158
0
        _options.ssl_options().default_cert.certificate
2159
0
        + _options.ssl_options().default_cert.private_key;
2160
0
    tmp_map[default_cert_key] = _ssl_ctx_map[default_cert_key];
2161
2162
0
    for (size_t i = 0; i < certs.size(); ++i) {
2163
0
        std::string cert_key(certs[i].certificate);
2164
0
        cert_key.append(certs[i].private_key);
2165
0
        if (tmp_map.seek(cert_key) != NULL) {
2166
0
            LOG(WARNING) << certs[i] << " already exists";
2167
0
            return 0;
2168
0
        }
2169
2170
0
        SSLContext ssl_ctx;
2171
0
        ssl_ctx.filters = certs[i].sni_filters;
2172
0
        ssl_ctx.ctx = std::make_shared<SocketSSLContext>();
2173
0
        ssl_ctx.ctx->raw_ctx = CreateServerSSLContext(
2174
0
            certs[i].certificate, certs[i].private_key,
2175
0
            _options.ssl_options(), &_raw_alpns, &ssl_ctx.filters);
2176
0
        if (ssl_ctx.ctx->raw_ctx == NULL) {
2177
0
            return -1;
2178
0
        }
2179
2180
0
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
2181
0
        SSL_CTX_set_tlsext_servername_callback(ssl_ctx.ctx->raw_ctx, SSLSwitchCTXByHostname);
2182
0
        SSL_CTX_set_tlsext_servername_arg(ssl_ctx.ctx->raw_ctx, this);
2183
0
#endif
2184
0
        tmp_map[cert_key] = ssl_ctx;
2185
0
    }
2186
2187
0
    if (!_reload_cert_maps.Modify(ResetCertMappings, tmp_map)) {
2188
0
        return -1;
2189
0
    }
2190
2191
0
    _ssl_ctx_map.swap(tmp_map);
2192
0
    return 0;
2193
0
}
2194
2195
0
bool Server::ResetCertMappings(CertMaps& bg, const SSLContextMap& ctx_map) {
2196
0
    bg.cert_map.clear();
2197
0
    bg.wildcard_cert_map.clear();
2198
2199
0
    for (SSLContextMap::const_iterator it =
2200
0
                 ctx_map.begin(); it != ctx_map.end(); ++it) {
2201
0
        const SSLContext& ssl_ctx = it->second;
2202
0
        for (size_t i = 0; i < ssl_ctx.filters.size(); ++i) {
2203
0
            const char* hostname = ssl_ctx.filters[i].c_str();
2204
0
            CertMap* cmap = NULL;
2205
0
            if (strncmp(hostname, "*.", 2) == 0) {
2206
0
                cmap = &(bg.wildcard_cert_map);
2207
0
                hostname += 2;
2208
0
            } else {
2209
0
                cmap = &(bg.cert_map);
2210
0
            }
2211
0
            if (cmap->seek(hostname) == NULL) {
2212
0
                cmap->insert(hostname, ssl_ctx.ctx);
2213
0
            } else {
2214
0
                LOG(WARNING) << "Duplicate certificate hostname=" << hostname;
2215
0
            }
2216
0
        }
2217
0
    }
2218
0
    return true;
2219
0
}
2220
2221
0
void Server::FreeSSLContexts() {
2222
0
    _ssl_ctx_map.clear();
2223
0
    _reload_cert_maps.Modify(ClearCertMapping);
2224
0
    _default_ssl_ctx = NULL;
2225
0
}
2226
2227
0
bool Server::ClearCertMapping(CertMaps& bg) {
2228
0
    bg.cert_map.clear();
2229
0
    bg.wildcard_cert_map.clear();
2230
0
    return true;
2231
0
}
2232
2233
0
int Server::ResetMaxConcurrency(int max_concurrency) {
2234
0
    if (!IsRunning()) {
2235
0
        LOG(WARNING) << "ResetMaxConcurrency is only allowed for a Running Server";
2236
0
        return -1;
2237
0
    }
2238
    // Assume that modifying int32 is atomical in X86
2239
0
    _options.max_concurrency = max_concurrency;
2240
0
    return 0;
2241
0
}
2242
2243
0
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
2244
0
    if (mp->status == NULL) {
2245
0
        LOG(ERROR) << "method=" << mp->method->full_name()
2246
0
                   << " does not support max_concurrency";
2247
0
        _failed_to_set_max_concurrency_of_method = true;
2248
0
        return g_default_max_concurrency_of_method;
2249
0
    }
2250
0
    return mp->max_concurrency;
2251
0
}
2252
2253
0
int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
2254
0
    if (mp == NULL || mp->status == NULL) {
2255
0
        return 0;
2256
0
    }
2257
0
    return mp->max_concurrency;
2258
0
}
2259
2260
0
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) {
2261
0
    do {
2262
0
        if (full_method_name == butil::class_name_str<NsheadService>()) {
2263
0
            if (NULL == options().nshead_service) {
2264
0
                break;
2265
0
            }
2266
0
            return options().nshead_service->_max_concurrency;
2267
0
        }
2268
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
2269
        if (full_method_name == butil::class_name_str<ThriftService>()) {
2270
            if (NULL == options().thrift_service) {
2271
                break;
2272
            }
2273
            return options().thrift_service->_max_concurrency;
2274
        }
2275
#endif
2276
0
        if (full_method_name == butil::class_name_str<BaiduMasterService>()) {
2277
0
            if (NULL == options().baidu_master_service) {
2278
0
                break;
2279
0
            }
2280
0
            return options().baidu_master_service->_max_concurrency;
2281
0
        }
2282
2283
0
        MethodProperty* mp = _method_map.seek(full_method_name);
2284
0
        if (mp == NULL) {
2285
0
            break;
2286
0
        }
2287
0
        return MaxConcurrencyOf(mp);
2288
2289
0
    } while (false);
2290
2291
0
    LOG(ERROR) << "Fail to find method=" << full_method_name;
2292
0
    _failed_to_set_max_concurrency_of_method = true;
2293
0
    return g_default_max_concurrency_of_method;
2294
0
}
2295
2296
0
int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const {
2297
0
    return MaxConcurrencyOf(_method_map.seek(full_method_name));
2298
0
}
2299
2300
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name,
2301
0
                              const butil::StringPiece& method_name) {
2302
0
    MethodProperty* mp = const_cast<MethodProperty*>(
2303
0
        FindMethodPropertyByFullName(full_service_name, method_name));
2304
0
    if (mp == NULL) {
2305
0
        LOG(ERROR) << "Fail to find method=" << full_service_name
2306
0
                   << '/' << method_name;
2307
0
        _failed_to_set_max_concurrency_of_method = true;
2308
0
        return g_default_max_concurrency_of_method;
2309
0
    }
2310
0
    return MaxConcurrencyOf(mp);
2311
0
}
2312
2313
int Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name,
2314
0
                             const butil::StringPiece& method_name) const {
2315
0
    return MaxConcurrencyOf(FindMethodPropertyByFullName(
2316
0
                                full_service_name, method_name));
2317
0
}
2318
2319
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(google::protobuf::Service* service,
2320
0
                              const butil::StringPiece& method_name) {
2321
0
    return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name);
2322
0
}
2323
2324
int Server::MaxConcurrencyOf(google::protobuf::Service* service,
2325
0
                             const butil::StringPiece& method_name) const {
2326
0
    return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name);
2327
0
}
2328
2329
0
bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) {
2330
0
    MethodProperty* mp = _method_map.seek(full_method_name);
2331
0
    if (mp == NULL) {
2332
0
        LOG(ERROR) << "Fail to find method=" << full_method_name;
2333
0
        _failed_to_set_ignore_eovercrowded = true;
2334
0
        return g_default_ignore_eovercrowded;
2335
0
    }
2336
0
    if (IsRunning()) {
2337
0
        LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started";
2338
0
        return g_default_ignore_eovercrowded;
2339
0
    }
2340
0
    if (mp->status == NULL) {
2341
0
        LOG(ERROR) << "method=" << mp->method->full_name()
2342
0
                   << " does not support ignore_eovercrowded";
2343
0
        _failed_to_set_ignore_eovercrowded = true;
2344
0
        return g_default_ignore_eovercrowded;
2345
0
    }
2346
0
    return mp->ignore_eovercrowded;
2347
0
}
2348
2349
0
bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const {
2350
0
    MethodProperty* mp = _method_map.seek(full_method_name);
2351
0
    if (IsRunning()) {
2352
0
        LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started";
2353
0
        return g_default_ignore_eovercrowded;
2354
0
    }
2355
0
    if (mp == NULL || mp->status == NULL) {
2356
0
        return false;
2357
0
    }
2358
0
    return mp->ignore_eovercrowded;
2359
0
}
2360
2361
0
bool Server::AcceptRequest(Controller* cntl) const {
2362
0
    const Interceptor* interceptor = _options.interceptor;
2363
0
    if (!interceptor) {
2364
0
        return true;
2365
0
    }
2366
2367
0
    int error_code = 0;
2368
0
    std::string error_text;
2369
0
    if (cntl &&
2370
0
        !interceptor->Accept(cntl, error_code, error_text)) {
2371
0
        cntl->SetFailed(error_code,
2372
0
                        "Reject by Interceptor: %s",
2373
0
                        error_text.c_str());
2374
0
        return false;
2375
0
    }
2376
2377
0
    return true;
2378
0
}
2379
2380
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
2381
int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
2382
0
                                   int* al, void* se) {
2383
0
    (void)al;
2384
0
    Server* server = reinterpret_cast<Server*>(se);
2385
0
    const char* hostname = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name);
2386
0
    bool strict_sni = server->_options.ssl_options().strict_sni;
2387
0
    if (hostname == NULL) {
2388
0
        return strict_sni ? SSL_TLSEXT_ERR_ALERT_FATAL : SSL_TLSEXT_ERR_NOACK;
2389
0
    }
2390
2391
0
    butil::DoublyBufferedData<CertMaps>::ScopedPtr s;
2392
0
    if (server->_reload_cert_maps.Read(&s) != 0) {
2393
0
        return SSL_TLSEXT_ERR_ALERT_FATAL;
2394
0
    }
2395
2396
0
    std::shared_ptr<SocketSSLContext>* pctx = s->cert_map.seek(hostname);
2397
0
    if (pctx == NULL) {
2398
0
        const char* dot = hostname;
2399
0
        for (; *dot != '\0'; ++dot) {
2400
0
            if (*dot == '.') {
2401
0
                ++dot;
2402
0
                break;
2403
0
            }
2404
0
        }
2405
0
        if (*dot != '\0') {
2406
0
            pctx = s->wildcard_cert_map.seek(dot);
2407
0
        }
2408
0
    }
2409
0
    if (pctx == NULL) {
2410
0
        if (strict_sni) {
2411
0
            return SSL_TLSEXT_ERR_ALERT_FATAL;
2412
0
        }
2413
        // Use default SSL_CTX which is the current one
2414
0
        return SSL_TLSEXT_ERR_OK;
2415
0
    }
2416
2417
    // Switch SSL_CTX to the one with correct hostname
2418
0
    SSL_set_SSL_CTX(ssl, (*pctx)->raw_ctx);
2419
    return SSL_TLSEXT_ERR_OK;
2420
0
}
2421
#endif // SSL_CTRL_SET_TLSEXT_HOSTNAME
2422
2423
}  // namespace brpc