Coverage Report

Created: 2025-05-08 06:06

/src/brpc/src/brpc/global.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
19
#ifndef USE_MESALINK
20
#include <openssl/ssl.h>
21
#include <openssl/conf.h>
22
#else
23
#include <mesalink/openssl/ssl.h>
24
#endif
25
26
#include <gflags/gflags.h>
27
#include <fcntl.h>                               // O_RDONLY
28
#include <signal.h>
29
30
#include "butil/build_config.h"                  // OS_LINUX
31
// Naming services
32
#ifdef BAIDU_INTERNAL
33
#include "brpc/policy/baidu_naming_service.h"
34
#endif
35
#include "brpc/policy/file_naming_service.h"
36
#include "brpc/policy/list_naming_service.h"
37
#include "brpc/policy/domain_naming_service.h"
38
#include "brpc/policy/remote_file_naming_service.h"
39
#include "brpc/policy/consul_naming_service.h"
40
#include "brpc/policy/discovery_naming_service.h"
41
#include "brpc/policy/nacos_naming_service.h"
42
43
// Load Balancers
44
#include "brpc/policy/round_robin_load_balancer.h"
45
#include "brpc/policy/weighted_round_robin_load_balancer.h"
46
#include "brpc/policy/randomized_load_balancer.h"
47
#include "brpc/policy/weighted_randomized_load_balancer.h"
48
#include "brpc/policy/locality_aware_load_balancer.h"
49
#include "brpc/policy/consistent_hashing_load_balancer.h"
50
#include "brpc/policy/hasher.h"
51
#include "brpc/policy/dynpart_load_balancer.h"
52
53
54
// Span
55
#include "brpc/span.h"
56
#include "bthread/unstable.h"
57
58
// Compress handlers
59
#include "brpc/compress.h"
60
#include "brpc/policy/gzip_compress.h"
61
#include "brpc/policy/snappy_compress.h"
62
63
// Protocols
64
#include "brpc/protocol.h"
65
#include "brpc/policy/baidu_rpc_protocol.h"
66
#include "brpc/policy/http_rpc_protocol.h"
67
#include "brpc/policy/http2_rpc_protocol.h"
68
#include "brpc/policy/hulu_pbrpc_protocol.h"
69
#include "brpc/policy/nova_pbrpc_protocol.h"
70
#include "brpc/policy/public_pbrpc_protocol.h"
71
#include "brpc/policy/ubrpc2pb_protocol.h"
72
#include "brpc/policy/sofa_pbrpc_protocol.h"
73
#include "brpc/policy/memcache_binary_protocol.h"
74
#include "brpc/policy/streaming_rpc_protocol.h"
75
#include "brpc/policy/mongo_protocol.h"
76
#include "brpc/policy/redis_protocol.h"
77
#include "brpc/policy/nshead_mcpack_protocol.h"
78
#include "brpc/policy/rtmp_protocol.h"
79
#include "brpc/policy/esp_protocol.h"
80
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
81
# include "brpc/policy/thrift_protocol.h"
82
#endif
83
84
// Concurrency Limiters
85
#include "brpc/concurrency_limiter.h"
86
#include "brpc/policy/auto_concurrency_limiter.h"
87
#include "brpc/policy/constant_concurrency_limiter.h"
88
#include "brpc/policy/timeout_concurrency_limiter.h"
89
90
#include "brpc/input_messenger.h"     // get_or_new_client_side_messenger
91
#include "brpc/socket_map.h"          // SocketMapList
92
#include "brpc/server.h"
93
#include "brpc/trackme.h"             // TrackMe
94
#include "brpc/details/usercode_backup_pool.h"
95
#if defined(OS_LINUX)
96
#include <malloc.h>                   // malloc_trim
97
#endif
98
#include "butil/fd_guard.h"
99
#include "butil/files/file_watcher.h"
100
101
extern "C" {
102
// defined in gperftools/malloc_extension_c.h
103
void BAIDU_WEAK MallocExtension_ReleaseFreeMemory(void);
104
}
105
106
namespace brpc {
107
108
DECLARE_bool(usercode_in_pthread);
109
110
DEFINE_int32(free_memory_to_system_interval, 0,
111
             "Try to return free memory to system every so many seconds, "
112
             "values <= 0 disables this feature");
113
BRPC_VALIDATE_GFLAG(free_memory_to_system_interval, PassValidate);
114
115
namespace policy {
116
// Defined in http_rpc_protocol.cpp
117
void InitCommonStrings();
118
}
119
120
using namespace policy;
121
122
const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
123
124
struct GlobalExtensions {
125
    GlobalExtensions()
126
0
        : dns(80)
127
0
        , dns_with_ssl(443)
128
0
        , ch_mh_lb(CONS_HASH_LB_MURMUR3)
129
0
        , ch_md5_lb(CONS_HASH_LB_MD5)
130
0
        , ch_ketama_lb(CONS_HASH_LB_KETAMA)
131
0
        , constant_cl(0) {
132
0
    }
133
    
134
#ifdef BAIDU_INTERNAL
135
    BaiduNamingService bns;
136
#endif
137
    FileNamingService fns;
138
    ListNamingService lns;
139
    DomainListNamingService dlns;
140
    DomainNamingService dns;
141
    DomainNamingService dns_with_ssl;
142
    RemoteFileNamingService rfns;
143
    ConsulNamingService cns;
144
    DiscoveryNamingService dcns;
145
    NacosNamingService nns;
146
147
    RoundRobinLoadBalancer rr_lb;
148
    WeightedRoundRobinLoadBalancer wrr_lb;
149
    RandomizedLoadBalancer randomized_lb;
150
    WeightedRandomizedLoadBalancer wr_lb;
151
    LocalityAwareLoadBalancer la_lb;
152
    ConsistentHashingLoadBalancer ch_mh_lb;
153
    ConsistentHashingLoadBalancer ch_md5_lb;
154
    ConsistentHashingLoadBalancer ch_ketama_lb;
155
    DynPartLoadBalancer dynpart_lb;
156
157
    AutoConcurrencyLimiter auto_cl;
158
    ConstantConcurrencyLimiter constant_cl;
159
    TimeoutConcurrencyLimiter timeout_cl;
160
};
161
162
static pthread_once_t register_extensions_once = PTHREAD_ONCE_INIT;
163
static GlobalExtensions* g_ext = NULL;
164
165
0
static long ReadPortOfDummyServer(const char* filename) {
166
0
    butil::fd_guard fd(open(filename, O_RDONLY));
167
0
    if (fd < 0) {
168
0
        LOG(ERROR) << "Fail to open `" << DUMMY_SERVER_PORT_FILE << "'";
169
0
        return -1;
170
0
    }
171
0
    char port_str[32];
172
0
    const ssize_t nr = read(fd, port_str, sizeof(port_str));
173
0
    if (nr <= 0) {
174
0
        LOG(ERROR) << "Fail to read `" << DUMMY_SERVER_PORT_FILE << "': "
175
0
                   << (nr == 0 ? "nothing to read" : berror());
176
0
        return -1;
177
0
    }
178
0
    port_str[std::min((size_t)nr, sizeof(port_str)-1)] = '\0';
179
0
    const char* p = port_str;
180
0
    for (; isspace(*p); ++p) {}
181
0
    char* endptr = NULL;
182
0
    const long port = strtol(p, &endptr, 10);
183
0
    for (; isspace(*endptr); ++endptr) {}
184
0
    if (*endptr != '\0') {
185
0
        LOG(ERROR) << "Invalid port=`" << port_str << "'";
186
0
        return -1;
187
0
    }
188
0
    return port;
189
0
}
190
191
// Expose counters of butil::IOBuf
192
0
static int64_t GetIOBufBlockCount(void*) {
193
0
    return butil::IOBuf::block_count();
194
0
}
195
0
static int64_t GetIOBufBlockCountHitTLSThreshold(void*) {
196
0
    return butil::IOBuf::block_count_hit_tls_threshold();
197
0
}
198
0
static int64_t GetIOBufNewBigViewCount(void*) {
199
0
    return butil::IOBuf::new_bigview_count();
200
0
}
201
0
static int64_t GetIOBufBlockMemory(void*) {
202
0
    return butil::IOBuf::block_memory();
203
0
}
204
205
// Defined in server.cpp
206
extern butil::static_atomic<int> g_running_server_count;
207
0
static int GetRunningServerCount(void*) {
208
0
    return g_running_server_count.load(butil::memory_order_relaxed);
209
0
}
210
211
// Update global stuff periodically.
212
0
static void* GlobalUpdate(void*) {
213
    // Expose variables.
214
0
    bvar::PassiveStatus<int64_t> var_iobuf_block_count(
215
0
        "iobuf_block_count", GetIOBufBlockCount, NULL);
216
0
    bvar::PassiveStatus<int64_t> var_iobuf_block_count_hit_tls_threshold(
217
0
        "iobuf_block_count_hit_tls_threshold",
218
0
        GetIOBufBlockCountHitTLSThreshold, NULL);
219
0
    bvar::PassiveStatus<int64_t> var_iobuf_new_bigview_count(
220
0
        GetIOBufNewBigViewCount, NULL);
221
0
    bvar::PerSecond<bvar::PassiveStatus<int64_t> > var_iobuf_new_bigview_second(
222
0
        "iobuf_newbigview_second", &var_iobuf_new_bigview_count);
223
0
    bvar::PassiveStatus<int64_t> var_iobuf_block_memory(
224
0
        "iobuf_block_memory", GetIOBufBlockMemory, NULL);
225
0
    bvar::PassiveStatus<int> var_running_server_count(
226
0
        "rpc_server_count", GetRunningServerCount, NULL);
227
228
0
    butil::FileWatcher fw;
229
0
    if (fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE) < 0) {
230
0
        LOG(FATAL) << "Fail to init FileWatcher on `" << DUMMY_SERVER_PORT_FILE << "'";
231
0
        return NULL;
232
0
    }
233
234
0
    std::vector<SocketId> conns;
235
0
    const int64_t start_time_us = butil::gettimeofday_us();
236
0
    const int WARN_NOSLEEP_THRESHOLD = 2;
237
0
    int64_t last_time_us = start_time_us;
238
0
    int consecutive_nosleep = 0;
239
0
    int64_t last_return_free_memory_time = start_time_us;
240
0
    while (1) {
241
0
        const int64_t sleep_us = 1000000L + last_time_us - butil::gettimeofday_us();
242
0
        if (sleep_us > 0) {
243
0
            if (bthread_usleep(sleep_us) < 0) {
244
0
                PLOG_IF(FATAL, errno != ESTOP) << "Fail to sleep";
245
0
                break;
246
0
            }
247
0
            consecutive_nosleep = 0;
248
0
        } else {
249
0
            if (++consecutive_nosleep >= WARN_NOSLEEP_THRESHOLD) {
250
0
                consecutive_nosleep = 0;
251
0
                LOG(WARNING) << __FUNCTION__ << " is too busy!";
252
0
            }
253
0
        }
254
0
        last_time_us = butil::gettimeofday_us();
255
256
0
        TrackMe();
257
258
0
        if (!IsDummyServerRunning()
259
0
            && g_running_server_count.load(butil::memory_order_relaxed) == 0
260
0
            && fw.check_and_consume() > 0) {
261
0
            long port = ReadPortOfDummyServer(DUMMY_SERVER_PORT_FILE);
262
0
            if (port >= 0) {
263
0
                StartDummyServerAt(port);
264
0
            }
265
0
        }
266
267
0
        SocketMapList(&conns);
268
0
        const int64_t now_ms = butil::cpuwide_time_ms();
269
0
        for (size_t i = 0; i < conns.size(); ++i) {
270
0
            SocketUniquePtr ptr;
271
0
            if (Socket::Address(conns[i], &ptr) == 0) {
272
0
                ptr->UpdateStatsEverySecond(now_ms);
273
0
            }
274
0
        }
275
276
0
        const int return_mem_interval =
277
0
            FLAGS_free_memory_to_system_interval/*reloadable*/;
278
0
        if (return_mem_interval > 0 &&
279
0
            last_time_us >= last_return_free_memory_time +
280
0
            return_mem_interval * 1000000L) {
281
0
            last_return_free_memory_time = last_time_us;
282
            // TODO: Calling MallocExtension::instance()->ReleaseFreeMemory may
283
            // crash the program in later calls to malloc, verified on tcmalloc
284
            // 1.7 and 2.5, which means making the static member function weak
285
            // in details/tcmalloc_extension.cpp is probably not correct, however
286
            // it does work for heap profilers.
287
0
            if (MallocExtension_ReleaseFreeMemory != NULL) {
288
0
                MallocExtension_ReleaseFreeMemory();
289
0
            } else {
290
0
#if defined(OS_LINUX)
291
                // GNU specific.
292
0
                malloc_trim(10 * 1024 * 1024/*leave 10M pad*/);
293
0
#endif
294
0
            }
295
0
        }
296
0
    }
297
0
    return NULL;
298
0
}
299
300
#if GOOGLE_PROTOBUF_VERSION < 3022000
301
static void BaiduStreamingLogHandler(google::protobuf::LogLevel level,
302
                                     const char* filename, int line,
303
0
                                     const std::string& message) {
304
0
    switch (level) {
305
0
    case google::protobuf::LOGLEVEL_INFO:
306
0
        LOG(INFO) << filename << ':' << line << ' ' << message;
307
0
        return;
308
0
    case google::protobuf::LOGLEVEL_WARNING:
309
0
        LOG(WARNING) << filename << ':' << line << ' ' << message;
310
0
        return;
311
0
    case google::protobuf::LOGLEVEL_ERROR:
312
0
        LOG(ERROR) << filename << ':' << line << ' ' << message;
313
0
        return;
314
0
    case google::protobuf::LOGLEVEL_FATAL:
315
0
        LOG(FATAL) << filename << ':' << line << ' ' << message;
316
0
        return;
317
0
    }
318
0
    CHECK(false) << filename << ':' << line << ' ' << message;
319
0
}
320
#endif
321
322
0
static void GlobalInitializeOrDieImpl() {
323
    //////////////////////////////////////////////////////////////////
324
    // Be careful about usages of gflags inside this function which //
325
    // may be called before main() only seeing gflags with default  //
326
    // values even if the gflags will be set after main().          //
327
    //////////////////////////////////////////////////////////////////
328
329
    // Ignore SIGPIPE.
330
0
    struct sigaction oldact;
331
0
    if (sigaction(SIGPIPE, NULL, &oldact) != 0 ||
332
0
            (oldact.sa_handler == NULL && oldact.sa_sigaction == NULL)) {
333
0
        CHECK(SIG_ERR != signal(SIGPIPE, SIG_IGN));
334
0
    }
335
336
0
#if GOOGLE_PROTOBUF_VERSION < 3022000
337
    // Make GOOGLE_LOG print to comlog device
338
0
    SetLogHandler(&BaiduStreamingLogHandler);
339
0
#endif
340
341
    // Set bthread create span function
342
0
    bthread_set_create_span_func(CreateBthreadSpan);
343
344
    // Setting the variable here does not work, the profiler probably check
345
    // the variable before main() for only once.
346
    // setenv("TCMALLOC_SAMPLE_PARAMETER", "524288", 0);
347
348
    // Initialize openssl library
349
0
    SSL_library_init();
350
    // RPC doesn't require openssl.cnf, users can load it by themselves if needed
351
0
    SSL_load_error_strings();
352
0
    if (SSLThreadInit() != 0 || SSLDHInit() != 0) {
353
0
        exit(1);
354
0
    }
355
356
    // Defined in http_rpc_protocol.cpp
357
0
    InitCommonStrings();
358
359
    // Leave memory of these extensions to process's clean up.
360
0
    g_ext = new(std::nothrow) GlobalExtensions();
361
0
    if (NULL == g_ext) {
362
0
        exit(1);
363
0
    }
364
    // Naming Services
365
#ifdef BAIDU_INTERNAL
366
    NamingServiceExtension()->RegisterOrDie("bns", &g_ext->bns);
367
#endif
368
0
    NamingServiceExtension()->RegisterOrDie("file", &g_ext->fns);
369
0
    NamingServiceExtension()->RegisterOrDie("list", &g_ext->lns);
370
0
    NamingServiceExtension()->RegisterOrDie("dlist", &g_ext->dlns);
371
0
    NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns);
372
0
    NamingServiceExtension()->RegisterOrDie("https", &g_ext->dns_with_ssl);
373
0
    NamingServiceExtension()->RegisterOrDie("redis", &g_ext->dns);
374
0
    NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns);
375
0
    NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns);
376
0
    NamingServiceExtension()->RegisterOrDie("discovery", &g_ext->dcns);
377
0
    NamingServiceExtension()->RegisterOrDie("nacos", &g_ext->nns);
378
379
    // Load Balancers
380
0
    LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb);
381
0
    LoadBalancerExtension()->RegisterOrDie("wrr", &g_ext->wrr_lb);
382
0
    LoadBalancerExtension()->RegisterOrDie("random", &g_ext->randomized_lb);
383
0
    LoadBalancerExtension()->RegisterOrDie("wr", &g_ext->wr_lb);
384
0
    LoadBalancerExtension()->RegisterOrDie("la", &g_ext->la_lb);
385
0
    LoadBalancerExtension()->RegisterOrDie("c_murmurhash", &g_ext->ch_mh_lb);
386
0
    LoadBalancerExtension()->RegisterOrDie("c_md5", &g_ext->ch_md5_lb);
387
0
    LoadBalancerExtension()->RegisterOrDie("c_ketama", &g_ext->ch_ketama_lb);
388
0
    LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb);
389
390
    // Compress Handlers
391
0
    CompressHandler gzip_compress = { GzipCompress, GzipDecompress, "gzip" };
392
0
    if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) {
393
0
        exit(1);
394
0
    }
395
0
    CompressHandler zlib_compress = { ZlibCompress, ZlibDecompress, "zlib" };
396
0
    if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) {
397
0
        exit(1);
398
0
    }
399
0
    CompressHandler snappy_compress = { SnappyCompress, SnappyDecompress, "snappy" };
400
0
    if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) {
401
0
        exit(1);
402
0
    }
403
404
    // Protocols
405
0
    Protocol baidu_protocol = { ParseRpcMessage,
406
0
                                SerializeRpcRequest, PackRpcRequest,
407
0
                                ProcessRpcRequest, ProcessRpcResponse,
408
0
                                VerifyRpcRequest, NULL, NULL,
409
0
                                CONNECTION_TYPE_ALL, "baidu_std" };
410
0
    if (RegisterProtocol(PROTOCOL_BAIDU_STD, baidu_protocol) != 0) {
411
0
        exit(1);
412
0
    }
413
414
0
    Protocol streaming_protocol = { ParseStreamingMessage,
415
0
                                    NULL, NULL, ProcessStreamingMessage,
416
0
                                    ProcessStreamingMessage,
417
0
                                    NULL, NULL, NULL,
418
0
                                    CONNECTION_TYPE_SINGLE, "streaming_rpc" };
419
420
0
    if (RegisterProtocol(PROTOCOL_STREAMING_RPC, streaming_protocol) != 0) {
421
0
        exit(1);
422
0
    }
423
424
0
    Protocol http_protocol = { ParseHttpMessage,
425
0
                               SerializeHttpRequest, PackHttpRequest,
426
0
                               ProcessHttpRequest, ProcessHttpResponse,
427
0
                               VerifyHttpRequest, ParseHttpServerAddress,
428
0
                               GetHttpMethodName,
429
0
                               CONNECTION_TYPE_POOLED_AND_SHORT,
430
0
                               "http" };
431
0
    if (RegisterProtocol(PROTOCOL_HTTP, http_protocol) != 0) {
432
0
        exit(1);
433
0
    }
434
435
0
    Protocol http2_protocol = { ParseH2Message,
436
0
                                SerializeHttpRequest, PackH2Request,
437
0
                                ProcessHttpRequest, ProcessHttpResponse,
438
0
                                VerifyHttpRequest, ParseHttpServerAddress,
439
0
                                GetHttpMethodName,
440
0
                                CONNECTION_TYPE_SINGLE,
441
0
                                "h2" };
442
0
    if (RegisterProtocol(PROTOCOL_H2, http2_protocol) != 0) {
443
0
        exit(1);
444
0
    }
445
446
0
    Protocol hulu_protocol = { ParseHuluMessage,
447
0
                               SerializeRequestDefault, PackHuluRequest,
448
0
                               ProcessHuluRequest, ProcessHuluResponse,
449
0
                               VerifyHuluRequest, NULL, NULL,
450
0
                               CONNECTION_TYPE_ALL, "hulu_pbrpc" };
451
0
    if (RegisterProtocol(PROTOCOL_HULU_PBRPC, hulu_protocol) != 0) {
452
0
        exit(1);
453
0
    }
454
455
    // Only valid at client side
456
0
    Protocol nova_protocol = { ParseNsheadMessage,
457
0
                               SerializeNovaRequest, PackNovaRequest,
458
0
                               NULL, ProcessNovaResponse,
459
0
                               NULL, NULL, NULL,
460
0
                               CONNECTION_TYPE_POOLED_AND_SHORT,  "nova_pbrpc" };
461
0
    if (RegisterProtocol(PROTOCOL_NOVA_PBRPC, nova_protocol) != 0) {
462
0
        exit(1);
463
0
    }
464
465
    // Only valid at client side
466
0
    Protocol public_pbrpc_protocol = { ParseNsheadMessage,
467
0
                                       SerializePublicPbrpcRequest,
468
0
                                       PackPublicPbrpcRequest,
469
0
                                       NULL, ProcessPublicPbrpcResponse,
470
0
                                       NULL, NULL, NULL,
471
                                       // public_pbrpc server implementation
472
                                       // doesn't support full duplex
473
0
                                       CONNECTION_TYPE_POOLED_AND_SHORT,
474
0
                                       "public_pbrpc" };
475
0
    if (RegisterProtocol(PROTOCOL_PUBLIC_PBRPC, public_pbrpc_protocol) != 0) {
476
0
        exit(1);
477
0
    }
478
479
0
    Protocol sofa_protocol = { ParseSofaMessage,
480
0
                               SerializeRequestDefault, PackSofaRequest,
481
0
                               ProcessSofaRequest, ProcessSofaResponse,
482
0
                               VerifySofaRequest, NULL, NULL,
483
0
                               CONNECTION_TYPE_ALL, "sofa_pbrpc" };
484
0
    if (RegisterProtocol(PROTOCOL_SOFA_PBRPC, sofa_protocol) != 0) {
485
0
        exit(1);
486
0
    }
487
488
    // Only valid at server side. We generalize all the protocols that
489
    // prefixes with nshead as `nshead_protocol' and specify the content
490
    // parsing after nshead by ServerOptions.nshead_service.
491
0
    Protocol nshead_protocol = { ParseNsheadMessage,
492
0
                                 SerializeNsheadRequest, PackNsheadRequest,
493
0
                                 ProcessNsheadRequest, ProcessNsheadResponse,
494
0
                                 VerifyNsheadRequest, NULL, NULL,
495
0
                                 CONNECTION_TYPE_POOLED_AND_SHORT, "nshead" };
496
0
    if (RegisterProtocol(PROTOCOL_NSHEAD, nshead_protocol) != 0) {
497
0
        exit(1);
498
0
    }
499
500
0
    Protocol mc_binary_protocol = { ParseMemcacheMessage,
501
0
                                    SerializeMemcacheRequest,
502
0
                                    PackMemcacheRequest,
503
0
                                    NULL, ProcessMemcacheResponse,
504
0
                                    NULL, NULL, GetMemcacheMethodName,
505
0
                                    CONNECTION_TYPE_ALL, "memcache" };
506
0
    if (RegisterProtocol(PROTOCOL_MEMCACHE, mc_binary_protocol) != 0) {
507
0
        exit(1);
508
0
    }
509
510
0
    Protocol redis_protocol = { ParseRedisMessage,
511
0
                                SerializeRedisRequest,
512
0
                                PackRedisRequest,
513
0
                                ProcessRedisRequest, ProcessRedisResponse,
514
0
                                NULL, NULL, GetRedisMethodName,
515
0
                                CONNECTION_TYPE_ALL, "redis" };
516
0
    if (RegisterProtocol(PROTOCOL_REDIS, redis_protocol) != 0) {
517
0
        exit(1);
518
0
    }
519
520
0
    Protocol mongo_protocol = { ParseMongoMessage,
521
0
                                NULL, NULL,
522
0
                                ProcessMongoRequest, NULL,
523
0
                                NULL, NULL, NULL,
524
0
                                CONNECTION_TYPE_POOLED, "mongo" };
525
0
    if (RegisterProtocol(PROTOCOL_MONGO, mongo_protocol) != 0) {
526
0
        exit(1);
527
0
    }
528
529
// Use Macro is more straight forward than weak link technology(becasue of static link issue)
530
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
531
    Protocol thrift_binary_protocol = {
532
        policy::ParseThriftMessage,
533
        policy::SerializeThriftRequest, policy::PackThriftRequest,
534
        policy::ProcessThriftRequest, policy::ProcessThriftResponse,
535
        policy::VerifyThriftRequest, NULL, NULL,
536
        CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" };
537
    if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
538
        exit(1);
539
    }
540
#endif
541
542
    // Only valid at client side
543
0
    Protocol ubrpc_compack_protocol = {
544
0
        ParseNsheadMessage,
545
0
        SerializeUbrpcCompackRequest, PackUbrpcRequest,
546
0
        NULL, ProcessUbrpcResponse,
547
0
        NULL, NULL, NULL,
548
0
        CONNECTION_TYPE_POOLED_AND_SHORT,  "ubrpc_compack" };
549
0
    if (RegisterProtocol(PROTOCOL_UBRPC_COMPACK, ubrpc_compack_protocol) != 0) {
550
0
        exit(1);
551
0
    }
552
0
    Protocol ubrpc_mcpack2_protocol = {
553
0
        ParseNsheadMessage,
554
0
        SerializeUbrpcMcpack2Request, PackUbrpcRequest,
555
0
        NULL, ProcessUbrpcResponse,
556
0
        NULL, NULL, NULL,
557
0
        CONNECTION_TYPE_POOLED_AND_SHORT,  "ubrpc_mcpack2" };
558
0
    if (RegisterProtocol(PROTOCOL_UBRPC_MCPACK2, ubrpc_mcpack2_protocol) != 0) {
559
0
        exit(1);
560
0
    }
561
562
    // Only valid at client side
563
0
    Protocol nshead_mcpack_protocol = {
564
0
        ParseNsheadMessage,
565
0
        SerializeNsheadMcpackRequest, PackNsheadMcpackRequest,
566
0
        NULL, ProcessNsheadMcpackResponse,
567
0
        NULL, NULL, NULL,
568
0
        CONNECTION_TYPE_POOLED_AND_SHORT,  "nshead_mcpack" };
569
0
    if (RegisterProtocol(PROTOCOL_NSHEAD_MCPACK, nshead_mcpack_protocol) != 0) {
570
0
        exit(1);
571
0
    }
572
573
0
    Protocol rtmp_protocol = {
574
0
        ParseRtmpMessage,
575
0
        SerializeRtmpRequest, PackRtmpRequest,
576
0
        ProcessRtmpMessage, ProcessRtmpMessage,
577
0
        NULL, NULL, NULL,
578
0
        (ConnectionType)(CONNECTION_TYPE_SINGLE|CONNECTION_TYPE_SHORT),
579
0
        "rtmp" };
580
0
    if (RegisterProtocol(PROTOCOL_RTMP, rtmp_protocol) != 0) {
581
0
        exit(1);
582
0
    }
583
584
0
    Protocol esp_protocol = {
585
0
        ParseEspMessage,
586
0
        SerializeEspRequest, PackEspRequest,
587
0
        NULL, ProcessEspResponse,
588
0
        NULL, NULL, NULL,
589
0
        CONNECTION_TYPE_POOLED_AND_SHORT, "esp"};
590
0
    if (RegisterProtocol(PROTOCOL_ESP, esp_protocol) != 0) {
591
0
        exit(1);
592
0
    }
593
594
0
    std::vector<Protocol> protocols;
595
0
    ListProtocols(&protocols);
596
0
    for (size_t i = 0; i < protocols.size(); ++i) {
597
0
        if (protocols[i].process_response) {
598
0
            InputMessageHandler handler;
599
            // `process_response' is required at client side
600
0
            handler.parse = protocols[i].parse;
601
0
            handler.process = protocols[i].process_response;
602
            // No need to verify at client side
603
0
            handler.verify = NULL;
604
0
            handler.arg = NULL;
605
0
            handler.name = protocols[i].name;
606
0
            if (get_or_new_client_side_messenger()->AddHandler(handler) != 0) {
607
0
                exit(1);
608
0
            }
609
0
        }
610
0
    }
611
612
    // Concurrency Limiters
613
0
    ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
614
0
    ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
615
0
    ConcurrencyLimiterExtension()->RegisterOrDie("timeout", &g_ext->timeout_cl);
616
617
0
    if (FLAGS_usercode_in_pthread) {
618
        // Optional. If channel/server are initialized before main(), this
619
        // flag may be false at here even if it will be set to true after
620
        // main(). In which case, the usercode pool will not be initialized
621
        // until the pool is used.
622
0
        InitUserCodeBackupPoolOnceOrDie();
623
0
    }
624
625
    // We never join GlobalUpdate, let it quit with the process.
626
0
    bthread_t th;
627
0
    CHECK(bthread_start_background(&th, NULL, GlobalUpdate, NULL) == 0)
628
0
        << "Fail to start GlobalUpdate";
629
0
}
630
631
0
void GlobalInitializeOrDie() {
632
0
    if (pthread_once(&register_extensions_once,
633
0
                     GlobalInitializeOrDieImpl) != 0) {
634
0
        LOG(FATAL) << "Fail to pthread_once";
635
0
        exit(1);
636
0
    }
637
0
}
638
639
} // namespace brpc