Coverage Report

Created: 2025-03-11 06:06

/src/brpc/src/brpc/global.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
19
#ifndef USE_MESALINK
20
#include <openssl/ssl.h>
21
#include <openssl/conf.h>
22
#else
23
#include <mesalink/openssl/ssl.h>
24
#endif
25
26
#include <gflags/gflags.h>
27
#include <fcntl.h>                               // O_RDONLY
28
#include <signal.h>
29
30
#include "butil/build_config.h"                  // OS_LINUX
31
// Naming services
32
#ifdef BAIDU_INTERNAL
33
#include "brpc/policy/baidu_naming_service.h"
34
#endif
35
#include "brpc/policy/file_naming_service.h"
36
#include "brpc/policy/list_naming_service.h"
37
#include "brpc/policy/domain_naming_service.h"
38
#include "brpc/policy/remote_file_naming_service.h"
39
#include "brpc/policy/consul_naming_service.h"
40
#include "brpc/policy/discovery_naming_service.h"
41
#include "brpc/policy/nacos_naming_service.h"
42
43
// Load Balancers
44
#include "brpc/policy/round_robin_load_balancer.h"
45
#include "brpc/policy/weighted_round_robin_load_balancer.h"
46
#include "brpc/policy/randomized_load_balancer.h"
47
#include "brpc/policy/weighted_randomized_load_balancer.h"
48
#include "brpc/policy/locality_aware_load_balancer.h"
49
#include "brpc/policy/consistent_hashing_load_balancer.h"
50
#include "brpc/policy/hasher.h"
51
#include "brpc/policy/dynpart_load_balancer.h"
52
53
54
// Span
55
#include "brpc/span.h"
56
#include "bthread/unstable.h"
57
58
// Compress handlers
59
#include "brpc/compress.h"
60
#include "brpc/policy/gzip_compress.h"
61
#include "brpc/policy/snappy_compress.h"
62
63
// Protocols
64
#include "brpc/protocol.h"
65
#include "brpc/policy/baidu_rpc_protocol.h"
66
#include "brpc/policy/http_rpc_protocol.h"
67
#include "brpc/policy/http2_rpc_protocol.h"
68
#include "brpc/policy/hulu_pbrpc_protocol.h"
69
#include "brpc/policy/nova_pbrpc_protocol.h"
70
#include "brpc/policy/public_pbrpc_protocol.h"
71
#include "brpc/policy/ubrpc2pb_protocol.h"
72
#include "brpc/policy/sofa_pbrpc_protocol.h"
73
#include "brpc/policy/memcache_binary_protocol.h"
74
#include "brpc/policy/streaming_rpc_protocol.h"
75
#include "brpc/policy/mongo_protocol.h"
76
#include "brpc/policy/redis_protocol.h"
77
#include "brpc/policy/nshead_mcpack_protocol.h"
78
#include "brpc/policy/rtmp_protocol.h"
79
#include "brpc/policy/esp_protocol.h"
80
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
81
# include "brpc/policy/thrift_protocol.h"
82
#endif
83
84
// Concurrency Limiters
85
#include "brpc/concurrency_limiter.h"
86
#include "brpc/policy/auto_concurrency_limiter.h"
87
#include "brpc/policy/constant_concurrency_limiter.h"
88
#include "brpc/policy/timeout_concurrency_limiter.h"
89
90
#include "brpc/input_messenger.h"     // get_or_new_client_side_messenger
91
#include "brpc/socket_map.h"          // SocketMapList
92
#include "brpc/server.h"
93
#include "brpc/trackme.h"             // TrackMe
94
#include "brpc/details/usercode_backup_pool.h"
95
#if defined(OS_LINUX)
96
#include <malloc.h>                   // malloc_trim
97
#endif
98
#include "butil/fd_guard.h"
99
#include "butil/files/file_watcher.h"
100
101
extern "C" {
102
// defined in gperftools/malloc_extension_c.h
103
void BAIDU_WEAK MallocExtension_ReleaseFreeMemory(void);
104
}
105
106
namespace brpc {
107
108
DECLARE_bool(usercode_in_pthread);
109
110
DEFINE_int32(free_memory_to_system_interval, 0,
111
             "Try to return free memory to system every so many seconds, "
112
             "values <= 0 disables this feature");
113
BRPC_VALIDATE_GFLAG(free_memory_to_system_interval, PassValidate);
114
115
namespace policy {
116
// Defined in http_rpc_protocol.cpp
117
void InitCommonStrings();
118
}
119
120
using namespace policy;
121
122
const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
123
124
struct GlobalExtensions {
125
    GlobalExtensions()
126
0
        : dns(80)
127
0
        , dns_with_ssl(443)
128
0
        , ch_mh_lb(CONS_HASH_LB_MURMUR3)
129
0
        , ch_md5_lb(CONS_HASH_LB_MD5)
130
0
        , ch_ketama_lb(CONS_HASH_LB_KETAMA)
131
0
        , constant_cl(0) {
132
0
    }
133
    
134
#ifdef BAIDU_INTERNAL
135
    BaiduNamingService bns;
136
#endif
137
    FileNamingService fns;
138
    ListNamingService lns;
139
    DomainListNamingService dlns;
140
    DomainNamingService dns;
141
    DomainNamingService dns_with_ssl;
142
    RemoteFileNamingService rfns;
143
    ConsulNamingService cns;
144
    DiscoveryNamingService dcns;
145
    NacosNamingService nns;
146
147
    RoundRobinLoadBalancer rr_lb;
148
    WeightedRoundRobinLoadBalancer wrr_lb;
149
    RandomizedLoadBalancer randomized_lb;
150
    WeightedRandomizedLoadBalancer wr_lb;
151
    LocalityAwareLoadBalancer la_lb;
152
    ConsistentHashingLoadBalancer ch_mh_lb;
153
    ConsistentHashingLoadBalancer ch_md5_lb;
154
    ConsistentHashingLoadBalancer ch_ketama_lb;
155
    DynPartLoadBalancer dynpart_lb;
156
157
    AutoConcurrencyLimiter auto_cl;
158
    ConstantConcurrencyLimiter constant_cl;
159
    TimeoutConcurrencyLimiter timeout_cl;
160
};
161
162
static pthread_once_t register_extensions_once = PTHREAD_ONCE_INIT;
163
static GlobalExtensions* g_ext = NULL;
164
165
0
static long ReadPortOfDummyServer(const char* filename) {
166
0
    butil::fd_guard fd(open(filename, O_RDONLY));
167
0
    if (fd < 0) {
168
0
        LOG(ERROR) << "Fail to open `" << DUMMY_SERVER_PORT_FILE << "'";
169
0
        return -1;
170
0
    }
171
0
    char port_str[32];
172
0
    const ssize_t nr = read(fd, port_str, sizeof(port_str));
173
0
    if (nr <= 0) {
174
0
        LOG(ERROR) << "Fail to read `" << DUMMY_SERVER_PORT_FILE << "': "
175
0
                   << (nr == 0 ? "nothing to read" : berror());
176
0
        return -1;
177
0
    }
178
0
    port_str[std::min((size_t)nr, sizeof(port_str)-1)] = '\0';
179
0
    const char* p = port_str;
180
0
    for (; isspace(*p); ++p) {}
181
0
    char* endptr = NULL;
182
0
    const long port = strtol(p, &endptr, 10);
183
0
    for (; isspace(*endptr); ++endptr) {}
184
0
    if (*endptr != '\0') {
185
0
        LOG(ERROR) << "Invalid port=`" << port_str << "'";
186
0
        return -1;
187
0
    }
188
0
    return port;
189
0
}
190
191
// Expose counters of butil::IOBuf
192
0
static int64_t GetIOBufBlockCount(void*) {
193
0
    return butil::IOBuf::block_count();
194
0
}
195
0
static int64_t GetIOBufBlockCountHitTLSThreshold(void*) {
196
0
    return butil::IOBuf::block_count_hit_tls_threshold();
197
0
}
198
0
static int64_t GetIOBufNewBigViewCount(void*) {
199
0
    return butil::IOBuf::new_bigview_count();
200
0
}
201
0
static int64_t GetIOBufBlockMemory(void*) {
202
0
    return butil::IOBuf::block_memory();
203
0
}
204
205
// Defined in server.cpp
206
extern butil::static_atomic<int> g_running_server_count;
207
0
static int GetRunningServerCount(void*) {
208
0
    return g_running_server_count.load(butil::memory_order_relaxed);
209
0
}
210
211
// Update global stuff periodically.
212
0
static void* GlobalUpdate(void*) {
213
    // Expose variables.
214
0
    bvar::PassiveStatus<int64_t> var_iobuf_block_count(
215
0
        "iobuf_block_count", GetIOBufBlockCount, NULL);
216
0
    bvar::PassiveStatus<int64_t> var_iobuf_block_count_hit_tls_threshold(
217
0
        "iobuf_block_count_hit_tls_threshold",
218
0
        GetIOBufBlockCountHitTLSThreshold, NULL);
219
0
    bvar::PassiveStatus<int64_t> var_iobuf_new_bigview_count(
220
0
        GetIOBufNewBigViewCount, NULL);
221
0
    bvar::PerSecond<bvar::PassiveStatus<int64_t> > var_iobuf_new_bigview_second(
222
0
        "iobuf_newbigview_second", &var_iobuf_new_bigview_count);
223
0
    bvar::PassiveStatus<int64_t> var_iobuf_block_memory(
224
0
        "iobuf_block_memory", GetIOBufBlockMemory, NULL);
225
0
    bvar::PassiveStatus<int> var_running_server_count(
226
0
        "rpc_server_count", GetRunningServerCount, NULL);
227
228
0
    butil::FileWatcher fw;
229
0
    if (fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE) < 0) {
230
0
        LOG(FATAL) << "Fail to init FileWatcher on `" << DUMMY_SERVER_PORT_FILE << "'";
231
0
        return NULL;
232
0
    }
233
234
0
    std::vector<SocketId> conns;
235
0
    const int64_t start_time_us = butil::gettimeofday_us();
236
0
    const int WARN_NOSLEEP_THRESHOLD = 2;
237
0
    int64_t last_time_us = start_time_us;
238
0
    int consecutive_nosleep = 0;
239
0
    int64_t last_return_free_memory_time = start_time_us;
240
0
    while (1) {
241
0
        const int64_t sleep_us = 1000000L + last_time_us - butil::gettimeofday_us();
242
0
        if (sleep_us > 0) {
243
0
            if (bthread_usleep(sleep_us) < 0) {
244
0
                PLOG_IF(FATAL, errno != ESTOP) << "Fail to sleep";
245
0
                break;
246
0
            }
247
0
            consecutive_nosleep = 0;
248
0
        } else {
249
0
            if (++consecutive_nosleep >= WARN_NOSLEEP_THRESHOLD) {
250
0
                consecutive_nosleep = 0;
251
0
                LOG(WARNING) << __FUNCTION__ << " is too busy!";
252
0
            }
253
0
        }
254
0
        last_time_us = butil::gettimeofday_us();
255
256
0
        TrackMe();
257
258
0
        if (!IsDummyServerRunning()
259
0
            && g_running_server_count.load(butil::memory_order_relaxed) == 0
260
0
            && fw.check_and_consume() > 0) {
261
0
            long port = ReadPortOfDummyServer(DUMMY_SERVER_PORT_FILE);
262
0
            if (port >= 0) {
263
0
                StartDummyServerAt(port);
264
0
            }
265
0
        }
266
267
0
        SocketMapList(&conns);
268
0
        const int64_t now_ms = butil::cpuwide_time_ms();
269
0
        for (size_t i = 0; i < conns.size(); ++i) {
270
0
            SocketUniquePtr ptr;
271
0
            if (Socket::Address(conns[i], &ptr) == 0) {
272
0
                ptr->UpdateStatsEverySecond(now_ms);
273
0
            }
274
0
        }
275
276
0
        const int return_mem_interval =
277
0
            FLAGS_free_memory_to_system_interval/*reloadable*/;
278
0
        if (return_mem_interval > 0 &&
279
0
            last_time_us >= last_return_free_memory_time +
280
0
            return_mem_interval * 1000000L) {
281
0
            last_return_free_memory_time = last_time_us;
282
            // TODO: Calling MallocExtension::instance()->ReleaseFreeMemory may
283
            // crash the program in later calls to malloc, verified on tcmalloc
284
            // 1.7 and 2.5, which means making the static member function weak
285
            // in details/tcmalloc_extension.cpp is probably not correct, however
286
            // it does work for heap profilers.
287
0
            if (MallocExtension_ReleaseFreeMemory != NULL) {
288
0
                MallocExtension_ReleaseFreeMemory();
289
0
            } else {
290
0
#if defined(OS_LINUX)
291
                // GNU specific.
292
0
                malloc_trim(10 * 1024 * 1024/*leave 10M pad*/);
293
0
#endif
294
0
            }
295
0
        }
296
0
    }
297
0
    return NULL;
298
0
}
299
300
#if GOOGLE_PROTOBUF_VERSION < 3022000
301
static void BaiduStreamingLogHandler(google::protobuf::LogLevel level,
302
                                     const char* filename, int line,
303
0
                                     const std::string& message) {
304
0
    switch (level) {
305
0
    case google::protobuf::LOGLEVEL_INFO:
306
0
        LOG(INFO) << filename << ':' << line << ' ' << message;
307
0
        return;
308
0
    case google::protobuf::LOGLEVEL_WARNING:
309
0
        LOG(WARNING) << filename << ':' << line << ' ' << message;
310
0
        return;
311
0
    case google::protobuf::LOGLEVEL_ERROR:
312
0
        LOG(ERROR) << filename << ':' << line << ' ' << message;
313
0
        return;
314
0
    case google::protobuf::LOGLEVEL_FATAL:
315
0
        LOG(FATAL) << filename << ':' << line << ' ' << message;
316
0
        return;
317
0
    }
318
0
    CHECK(false) << filename << ':' << line << ' ' << message;
319
0
}
320
#endif
321
322
0
static void GlobalInitializeOrDieImpl() {
323
    //////////////////////////////////////////////////////////////////
324
    // Be careful about usages of gflags inside this function which //
325
    // may be called before main() only seeing gflags with default  //
326
    // values even if the gflags will be set after main().          //
327
    //////////////////////////////////////////////////////////////////
328
329
    // Ignore SIGPIPE.
330
0
    struct sigaction oldact;
331
0
    if (sigaction(SIGPIPE, NULL, &oldact) != 0 ||
332
0
            (oldact.sa_handler == NULL && oldact.sa_sigaction == NULL)) {
333
0
        CHECK(SIG_ERR != signal(SIGPIPE, SIG_IGN));
334
0
    }
335
336
0
#if GOOGLE_PROTOBUF_VERSION < 3022000
337
    // Make GOOGLE_LOG print to comlog device
338
0
    SetLogHandler(&BaiduStreamingLogHandler);
339
0
#endif
340
341
    // Set bthread create span function
342
0
    bthread_set_create_span_func(CreateBthreadSpan);
343
344
    // Setting the variable here does not work, the profiler probably check
345
    // the variable before main() for only once.
346
    // setenv("TCMALLOC_SAMPLE_PARAMETER", "524288", 0);
347
348
    // Initialize openssl library
349
0
    SSL_library_init();
350
    // RPC doesn't require openssl.cnf, users can load it by themselves if needed
351
0
    SSL_load_error_strings();
352
0
    if (SSLThreadInit() != 0 || SSLDHInit() != 0) {
353
0
        exit(1);
354
0
    }
355
356
    // Defined in http_rpc_protocol.cpp
357
0
    InitCommonStrings();
358
359
    // Leave memory of these extensions to process's clean up.
360
0
    g_ext = new(std::nothrow) GlobalExtensions();
361
0
    if (NULL == g_ext) {
362
0
        exit(1);
363
0
    }
364
    // Naming Services
365
#ifdef BAIDU_INTERNAL
366
    NamingServiceExtension()->RegisterOrDie("bns", &g_ext->bns);
367
#endif
368
0
    NamingServiceExtension()->RegisterOrDie("file", &g_ext->fns);
369
0
    NamingServiceExtension()->RegisterOrDie("list", &g_ext->lns);
370
0
    NamingServiceExtension()->RegisterOrDie("dlist", &g_ext->dlns);
371
0
    NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns);
372
0
    NamingServiceExtension()->RegisterOrDie("https", &g_ext->dns_with_ssl);
373
0
    NamingServiceExtension()->RegisterOrDie("redis", &g_ext->dns);
374
0
    NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns);
375
0
    NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns);
376
0
    NamingServiceExtension()->RegisterOrDie("discovery", &g_ext->dcns);
377
0
    NamingServiceExtension()->RegisterOrDie("nacos", &g_ext->nns);
378
379
    // Load Balancers
380
0
    LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb);
381
0
    LoadBalancerExtension()->RegisterOrDie("wrr", &g_ext->wrr_lb);
382
0
    LoadBalancerExtension()->RegisterOrDie("random", &g_ext->randomized_lb);
383
0
    LoadBalancerExtension()->RegisterOrDie("wr", &g_ext->wr_lb);
384
0
    LoadBalancerExtension()->RegisterOrDie("la", &g_ext->la_lb);
385
0
    LoadBalancerExtension()->RegisterOrDie("c_murmurhash", &g_ext->ch_mh_lb);
386
0
    LoadBalancerExtension()->RegisterOrDie("c_md5", &g_ext->ch_md5_lb);
387
0
    LoadBalancerExtension()->RegisterOrDie("c_ketama", &g_ext->ch_ketama_lb);
388
0
    LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);
389
390
    // Compress Handlers
391
0
    const CompressHandler gzip_compress =
392
0
        { GzipCompress, GzipDecompress, "gzip" };
393
0
    if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) {
394
0
        exit(1);
395
0
    }
396
0
    const CompressHandler zlib_compress =
397
0
        { ZlibCompress, ZlibDecompress, "zlib" };
398
0
    if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) {
399
0
        exit(1);
400
0
    }
401
0
    const CompressHandler snappy_compress =
402
0
        { SnappyCompress, SnappyDecompress, "snappy" };
403
0
    if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) {
404
0
        exit(1);
405
0
    }
406
407
    // Protocols
408
0
    Protocol baidu_protocol = { ParseRpcMessage,
409
0
                                SerializeRequestDefault, PackRpcRequest,
410
0
                                ProcessRpcRequest, ProcessRpcResponse,
411
0
                                VerifyRpcRequest, NULL, NULL,
412
0
                                CONNECTION_TYPE_ALL, "baidu_std" };
413
0
    if (RegisterProtocol(PROTOCOL_BAIDU_STD, baidu_protocol) != 0) {
414
0
        exit(1);
415
0
    }
416
417
0
    Protocol streaming_protocol = { ParseStreamingMessage,
418
0
                                    NULL, NULL, ProcessStreamingMessage,
419
0
                                    ProcessStreamingMessage,
420
0
                                    NULL, NULL, NULL,
421
0
                                    CONNECTION_TYPE_SINGLE, "streaming_rpc" };
422
423
0
    if (RegisterProtocol(PROTOCOL_STREAMING_RPC, streaming_protocol) != 0) {
424
0
        exit(1);
425
0
    }
426
427
0
    Protocol http_protocol = { ParseHttpMessage,
428
0
                               SerializeHttpRequest, PackHttpRequest,
429
0
                               ProcessHttpRequest, ProcessHttpResponse,
430
0
                               VerifyHttpRequest, ParseHttpServerAddress,
431
0
                               GetHttpMethodName,
432
0
                               CONNECTION_TYPE_POOLED_AND_SHORT,
433
0
                               "http" };
434
0
    if (RegisterProtocol(PROTOCOL_HTTP, http_protocol) != 0) {
435
0
        exit(1);
436
0
    }
437
438
0
    Protocol http2_protocol = { ParseH2Message,
439
0
                                SerializeHttpRequest, PackH2Request,
440
0
                                ProcessHttpRequest, ProcessHttpResponse,
441
0
                                VerifyHttpRequest, ParseHttpServerAddress,
442
0
                                GetHttpMethodName,
443
0
                                CONNECTION_TYPE_SINGLE,
444
0
                                "h2" };
445
0
    if (RegisterProtocol(PROTOCOL_H2, http2_protocol) != 0) {
446
0
        exit(1);
447
0
    }
448
449
0
    Protocol hulu_protocol = { ParseHuluMessage,
450
0
                               SerializeRequestDefault, PackHuluRequest,
451
0
                               ProcessHuluRequest, ProcessHuluResponse,
452
0
                               VerifyHuluRequest, NULL, NULL,
453
0
                               CONNECTION_TYPE_ALL, "hulu_pbrpc" };
454
0
    if (RegisterProtocol(PROTOCOL_HULU_PBRPC, hulu_protocol) != 0) {
455
0
        exit(1);
456
0
    }
457
458
    // Only valid at client side
459
0
    Protocol nova_protocol = { ParseNsheadMessage,
460
0
                               SerializeNovaRequest, PackNovaRequest,
461
0
                               NULL, ProcessNovaResponse,
462
0
                               NULL, NULL, NULL,
463
0
                               CONNECTION_TYPE_POOLED_AND_SHORT,  "nova_pbrpc" };
464
0
    if (RegisterProtocol(PROTOCOL_NOVA_PBRPC, nova_protocol) != 0) {
465
0
        exit(1);
466
0
    }
467
468
    // Only valid at client side
469
0
    Protocol public_pbrpc_protocol = { ParseNsheadMessage,
470
0
                                       SerializePublicPbrpcRequest,
471
0
                                       PackPublicPbrpcRequest,
472
0
                                       NULL, ProcessPublicPbrpcResponse,
473
0
                                       NULL, NULL, NULL,
474
                                       // public_pbrpc server implementation
475
                                       // doesn't support full duplex
476
0
                                       CONNECTION_TYPE_POOLED_AND_SHORT,
477
0
                                       "public_pbrpc" };
478
0
    if (RegisterProtocol(PROTOCOL_PUBLIC_PBRPC, public_pbrpc_protocol) != 0) {
479
0
        exit(1);
480
0
    }
481
482
0
    Protocol sofa_protocol = { ParseSofaMessage,
483
0
                               SerializeRequestDefault, PackSofaRequest,
484
0
                               ProcessSofaRequest, ProcessSofaResponse,
485
0
                               VerifySofaRequest, NULL, NULL,
486
0
                               CONNECTION_TYPE_ALL, "sofa_pbrpc" };
487
0
    if (RegisterProtocol(PROTOCOL_SOFA_PBRPC, sofa_protocol) != 0) {
488
0
        exit(1);
489
0
    }
490
491
    // Only valid at server side. We generalize all the protocols that
492
    // prefixes with nshead as `nshead_protocol' and specify the content
493
    // parsing after nshead by ServerOptions.nshead_service.
494
0
    Protocol nshead_protocol = { ParseNsheadMessage,
495
0
                                 SerializeNsheadRequest, PackNsheadRequest,
496
0
                                 ProcessNsheadRequest, ProcessNsheadResponse,
497
0
                                 VerifyNsheadRequest, NULL, NULL,
498
0
                                 CONNECTION_TYPE_POOLED_AND_SHORT, "nshead" };
499
0
    if (RegisterProtocol(PROTOCOL_NSHEAD, nshead_protocol) != 0) {
500
0
        exit(1);
501
0
    }
502
503
0
    Protocol mc_binary_protocol = { ParseMemcacheMessage,
504
0
                                    SerializeMemcacheRequest,
505
0
                                    PackMemcacheRequest,
506
0
                                    NULL, ProcessMemcacheResponse,
507
0
                                    NULL, NULL, GetMemcacheMethodName,
508
0
                                    CONNECTION_TYPE_ALL, "memcache" };
509
0
    if (RegisterProtocol(PROTOCOL_MEMCACHE, mc_binary_protocol) != 0) {
510
0
        exit(1);
511
0
    }
512
513
0
    Protocol redis_protocol = { ParseRedisMessage,
514
0
                                SerializeRedisRequest,
515
0
                                PackRedisRequest,
516
0
                                ProcessRedisRequest, ProcessRedisResponse,
517
0
                                NULL, NULL, GetRedisMethodName,
518
0
                                CONNECTION_TYPE_ALL, "redis" };
519
0
    if (RegisterProtocol(PROTOCOL_REDIS, redis_protocol) != 0) {
520
0
        exit(1);
521
0
    }
522
523
0
    Protocol mongo_protocol = { ParseMongoMessage,
524
0
                                NULL, NULL,
525
0
                                ProcessMongoRequest, NULL,
526
0
                                NULL, NULL, NULL,
527
0
                                CONNECTION_TYPE_POOLED, "mongo" };
528
0
    if (RegisterProtocol(PROTOCOL_MONGO, mongo_protocol) != 0) {
529
0
        exit(1);
530
0
    }
531
532
// Use Macro is more straight forward than weak link technology(becasue of static link issue)
533
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
534
    Protocol thrift_binary_protocol = {
535
        policy::ParseThriftMessage,
536
        policy::SerializeThriftRequest, policy::PackThriftRequest,
537
        policy::ProcessThriftRequest, policy::ProcessThriftResponse,
538
        policy::VerifyThriftRequest, NULL, NULL,
539
        CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" };
540
    if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
541
        exit(1);
542
    }
543
#endif
544
545
    // Only valid at client side
546
0
    Protocol ubrpc_compack_protocol = {
547
0
        ParseNsheadMessage,
548
0
        SerializeUbrpcCompackRequest, PackUbrpcRequest,
549
0
        NULL, ProcessUbrpcResponse,
550
0
        NULL, NULL, NULL,
551
0
        CONNECTION_TYPE_POOLED_AND_SHORT,  "ubrpc_compack" };
552
0
    if (RegisterProtocol(PROTOCOL_UBRPC_COMPACK, ubrpc_compack_protocol) != 0) {
553
0
        exit(1);
554
0
    }
555
0
    Protocol ubrpc_mcpack2_protocol = {
556
0
        ParseNsheadMessage,
557
0
        SerializeUbrpcMcpack2Request, PackUbrpcRequest,
558
0
        NULL, ProcessUbrpcResponse,
559
0
        NULL, NULL, NULL,
560
0
        CONNECTION_TYPE_POOLED_AND_SHORT,  "ubrpc_mcpack2" };
561
0
    if (RegisterProtocol(PROTOCOL_UBRPC_MCPACK2, ubrpc_mcpack2_protocol) != 0) {
562
0
        exit(1);
563
0
    }
564
565
    // Only valid at client side
566
0
    Protocol nshead_mcpack_protocol = {
567
0
        ParseNsheadMessage,
568
0
        SerializeNsheadMcpackRequest, PackNsheadMcpackRequest,
569
0
        NULL, ProcessNsheadMcpackResponse,
570
0
        NULL, NULL, NULL,
571
0
        CONNECTION_TYPE_POOLED_AND_SHORT,  "nshead_mcpack" };
572
0
    if (RegisterProtocol(PROTOCOL_NSHEAD_MCPACK, nshead_mcpack_protocol) != 0) {
573
0
        exit(1);
574
0
    }
575
576
0
    Protocol rtmp_protocol = {
577
0
        ParseRtmpMessage,
578
0
        SerializeRtmpRequest, PackRtmpRequest,
579
0
        ProcessRtmpMessage, ProcessRtmpMessage,
580
0
        NULL, NULL, NULL,
581
0
        (ConnectionType)(CONNECTION_TYPE_SINGLE|CONNECTION_TYPE_SHORT),
582
0
        "rtmp" };
583
0
    if (RegisterProtocol(PROTOCOL_RTMP, rtmp_protocol) != 0) {
584
0
        exit(1);
585
0
    }
586
587
0
    Protocol esp_protocol = {
588
0
        ParseEspMessage,
589
0
        SerializeEspRequest, PackEspRequest,
590
0
        NULL, ProcessEspResponse,
591
0
        NULL, NULL, NULL,
592
0
        CONNECTION_TYPE_POOLED_AND_SHORT, "esp"};
593
0
    if (RegisterProtocol(PROTOCOL_ESP, esp_protocol) != 0) {
594
0
        exit(1);
595
0
    }
596
597
0
    std::vector<Protocol> protocols;
598
0
    ListProtocols(&protocols);
599
0
    for (size_t i = 0; i < protocols.size(); ++i) {
600
0
        if (protocols[i].process_response) {
601
0
            InputMessageHandler handler;
602
            // `process_response' is required at client side
603
0
            handler.parse = protocols[i].parse;
604
0
            handler.process = protocols[i].process_response;
605
            // No need to verify at client side
606
0
            handler.verify = NULL;
607
0
            handler.arg = NULL;
608
0
            handler.name = protocols[i].name;
609
0
            if (get_or_new_client_side_messenger()->AddHandler(handler) != 0) {
610
0
                exit(1);
611
0
            }
612
0
        }
613
0
    }
614
615
    // Concurrency Limiters
616
0
    ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
617
0
    ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
618
0
    ConcurrencyLimiterExtension()->RegisterOrDie("timeout", &g_ext->timeout_cl);
619
620
0
    if (FLAGS_usercode_in_pthread) {
621
        // Optional. If channel/server are initialized before main(), this
622
        // flag may be false at here even if it will be set to true after
623
        // main(). In which case, the usercode pool will not be initialized
624
        // until the pool is used.
625
0
        InitUserCodeBackupPoolOnceOrDie();
626
0
    }
627
628
    // We never join GlobalUpdate, let it quit with the process.
629
0
    bthread_t th;
630
0
    CHECK(bthread_start_background(&th, NULL, GlobalUpdate, NULL) == 0)
631
0
        << "Fail to start GlobalUpdate";
632
0
}
633
634
0
void GlobalInitializeOrDie() {
635
0
    if (pthread_once(&register_extensions_once,
636
0
                     GlobalInitializeOrDieImpl) != 0) {
637
0
        LOG(FATAL) << "Fail to pthread_once";
638
0
        exit(1);
639
0
    }
640
0
}
641
642
} // namespace brpc