Coverage Report

Created: 2024-09-11 06:42

/src/brpc/src/brpc/selective_channel.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
19
#include <map>
20
#include <gflags/gflags.h>
21
#include "bthread/bthread.h"                         // bthread_id_xx
22
#include "brpc/socket.h"                             // SocketUser
23
#include "brpc/load_balancer.h"                      // LoadBalancer
24
#include "brpc/details/controller_private_accessor.h"        // RPCSender
25
#include "brpc/selective_channel.h"
26
#include "brpc/global.h"
27
28
29
namespace brpc {
30
31
DEFINE_int32(channel_check_interval, 1, 
32
             "seconds between consecutive health-checking of unaccessible"
33
             " sub channels inside SelectiveChannel");
34
35
namespace schan {
36
37
// This map is generally very small, std::map may be good enough.
38
typedef std::map<ChannelBase*, Socket*> ChannelToIdMap;
39
40
// Representing a sub channel.
41
class SubChannel : public SocketUser {
42
public:
43
    ChannelBase* chan;
44
45
    // internal channel is deleted after the fake Socket is SetFailed
46
0
    void BeforeRecycle(Socket*) {
47
0
        delete chan;
48
0
        delete this;
49
0
    }
50
51
0
    int CheckHealth(Socket* ptr) {
52
0
        if (ptr->health_check_count() == 0) {
53
0
            LOG(INFO) << "Checking " << *chan << " chan=0x" << (void*)chan
54
0
                      << " Fake" << *ptr;
55
0
        }
56
0
        return chan->CheckHealth();
57
0
    }
58
59
0
    void AfterRevived(Socket* ptr) {
60
0
        LOG(INFO) << "Revived " << *chan << " chan=0x" << (void*)chan
61
0
                  << " Fake" << *ptr << " (Connectable)";
62
0
    }
63
};
64
65
0
int GetSubChannelWeight(SocketUser* u) {
66
0
    return static_cast<SubChannel*>(u)->chan->Weight();
67
0
}
68
69
// Load balance between fake sockets whose SocketUsers are sub channels.
70
class ChannelBalancer : public SharedLoadBalancer {
71
public:
72
    struct SelectOut {
73
0
        SelectOut() : need_feedback(false) {}
74
75
0
        ChannelBase* channel() {
76
0
            return static_cast<SubChannel*>(fake_sock->user())->chan;
77
0
        }
78
        
79
        SocketUniquePtr fake_sock;
80
        bool need_feedback;
81
    };
82
    
83
0
    ChannelBalancer() {}
84
    ~ChannelBalancer();
85
    int Init(const char* lb_name);
86
    int AddChannel(ChannelBase* sub_channel,
87
                   SelectiveChannel::ChannelHandle* handle);
88
    void RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle handle);
89
    int SelectChannel(const LoadBalancer::SelectIn& in, SelectOut* out);
90
    int CheckHealth();
91
    void Describe(std::ostream& os, const DescribeOptions&);
92
93
private:
94
    butil::Mutex _mutex;
95
    // Find out duplicated sub channels.
96
    ChannelToIdMap _chan_map;
97
};
98
99
class SubDone;
100
class Sender;
101
102
struct Resource {
103
0
    Resource() : response(NULL), sub_done(NULL) {}
104
        
105
    google::protobuf::Message* response;
106
    SubDone* sub_done;
107
};
108
109
// The done to sub channels.
110
class SubDone : public google::protobuf::Closure {
111
public:
112
    explicit SubDone(Sender* owner)
113
        : _owner(owner)
114
        , _cid(INVALID_BTHREAD_ID)
115
0
        , _peer_id(INVALID_SOCKET_ID) {
116
0
    }
117
0
    ~SubDone() {}
118
    void Run();
119
120
    Sender* _owner;
121
    CallId _cid;
122
    SocketId _peer_id;
123
    Controller _cntl;
124
};
125
126
// The sender to intercept Controller::IssueRPC
127
class Sender : public RPCSender,
128
               public google::protobuf::Closure {
129
friend class SubDone;
130
public:
131
    Sender(Controller* cntl,
132
           const google::protobuf::Message* request,
133
           google::protobuf::Message* response,
134
           google::protobuf::Closure* user_done);
135
0
    ~Sender() { Clear(); }
136
    int IssueRPC(int64_t start_realtime_us);
137
    Resource PopFree();
138
    bool PushFree(const Resource& r);
139
    const Controller* SubController(int index) const;
140
    void Run();
141
    void Clear();
142
143
private:
144
    Controller* _main_cntl;
145
    const google::protobuf::Message* _request;
146
    google::protobuf::Message* _response;
147
    google::protobuf::Closure* _user_done;
148
    short _nfree;
149
    short _nalloc;
150
    bool _finished;
151
    Resource _free_resources[2];
152
    Resource _alloc_resources[2];
153
    SubDone _sub_done0;
154
};
155
156
// ===============================================
157
158
0
ChannelBalancer::~ChannelBalancer() {
159
0
    for (ChannelToIdMap::iterator
160
0
             it = _chan_map.begin(); it != _chan_map.end(); ++it) {
161
0
        it->second->ReleaseAdditionalReference();
162
0
        it->second->ReleaseHCRelatedReference();
163
0
    }
164
0
    _chan_map.clear();
165
0
}
166
167
0
int ChannelBalancer::Init(const char* lb_name) {
168
0
    return SharedLoadBalancer::Init(lb_name);
169
0
}
170
171
int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
172
0
                                SelectiveChannel::ChannelHandle* handle) {
173
0
    if (NULL == sub_channel) {
174
0
        LOG(ERROR) << "Parameter[sub_channel] is NULL";
175
0
        return -1;
176
0
    }
177
0
    BAIDU_SCOPED_LOCK(_mutex);
178
0
    if (_chan_map.find(sub_channel) != _chan_map.end()) {
179
0
        LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
180
0
        return -1;
181
0
    }
182
0
    SubChannel* sub_chan = new (std::nothrow) SubChannel;
183
0
    if (sub_chan == NULL) {
184
0
        LOG(FATAL) << "Fail to to new SubChannel";
185
0
        return -1;
186
0
    }
187
0
    sub_chan->chan = sub_channel;
188
0
    SocketId sock_id;
189
0
    SocketOptions options;
190
0
    options.user = sub_chan;
191
0
    options.health_check_interval_s = FLAGS_channel_check_interval;
192
193
0
    if (Socket::Create(options, &sock_id) != 0) {
194
0
        delete sub_chan;
195
0
        LOG(ERROR) << "Fail to create fake socket for sub channel";
196
0
        return -1;
197
0
    }
198
0
    SocketUniquePtr ptr;
199
0
    int rc = Socket::AddressFailedAsWell(sock_id, &ptr);
200
0
    if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) {
201
0
        LOG(FATAL) << "Fail to address SocketId=" << sock_id;
202
0
        return -1;
203
0
    }
204
0
    if (!AddServer(ServerId(sock_id))) {
205
0
        LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
206
        // sub_chan will be deleted when the socket is recycled.
207
0
        ptr->SetFailed();
208
        // Cancel health checking.
209
0
        ptr->ReleaseHCRelatedReference();
210
0
        return -1;
211
0
    }
212
    // The health-check-related reference has been held on created.
213
0
    _chan_map[sub_channel]= ptr.get();
214
0
    if (handle) {
215
0
        *handle = sock_id;
216
0
    }
217
0
    return 0;
218
0
}
219
220
0
void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle handle) {
221
0
    if (!RemoveServer(ServerId(handle))) {
222
0
        return;
223
0
    }
224
0
    SocketUniquePtr ptr;
225
0
    const int rc = Socket::AddressFailedAsWell(handle, &ptr);
226
0
    if (rc >= 0) {
227
0
        SubChannel* sub = static_cast<SubChannel*>(ptr->user());
228
0
        {
229
0
            BAIDU_SCOPED_LOCK(_mutex);
230
0
            CHECK_EQ(1UL, _chan_map.erase(sub->chan));
231
0
        }
232
0
        if (rc == 0) {
233
0
            ptr->ReleaseAdditionalReference();
234
0
        }
235
        // Cancel health checking.
236
0
        ptr->ReleaseHCRelatedReference();
237
0
    }
238
0
}
239
240
inline int ChannelBalancer::SelectChannel(const LoadBalancer::SelectIn& in,
241
0
                                          SelectOut* out) {
242
0
    LoadBalancer::SelectOut sel_out(&out->fake_sock);
243
0
    const int rc = SelectServer(in, &sel_out);
244
0
    if (rc != 0) {
245
0
        return rc;
246
0
    }
247
0
    out->need_feedback = sel_out.need_feedback;
248
0
    return 0;
249
0
}
250
251
0
int ChannelBalancer::CheckHealth() {
252
0
    BAIDU_SCOPED_LOCK(_mutex);
253
0
    for (ChannelToIdMap::const_iterator it = _chan_map.begin();
254
0
         it != _chan_map.end(); ++it) {
255
0
        if (!it->second->Failed() &&
256
0
            it->first->CheckHealth() == 0) {
257
0
            return 0;
258
0
        }
259
0
    }
260
0
    return -1;
261
0
}
262
263
void ChannelBalancer::Describe(std::ostream& os,
264
0
                               const DescribeOptions& options) {
265
0
    BAIDU_SCOPED_LOCK(_mutex);
266
0
    if (!options.verbose) {
267
0
        os << _chan_map.size();
268
0
        return;
269
0
    }
270
0
    for (ChannelToIdMap::const_iterator it = _chan_map.begin();
271
0
         it != _chan_map.end(); ++it) {
272
0
        if (it != _chan_map.begin()) {
273
0
            os << ' ';
274
0
        }
275
0
        it->first->Describe(os, options);
276
0
    }
277
0
}
278
279
// ===================================
280
281
Sender::Sender(Controller* cntl,
282
               const google::protobuf::Message* request,
283
               google::protobuf::Message* response,
284
               google::protobuf::Closure* user_done)
285
    : _main_cntl(cntl)
286
    , _request(request)
287
    , _response(response)
288
    , _user_done(user_done)
289
    , _nfree(0)
290
    , _nalloc(0)
291
    , _finished(false)
292
0
    , _sub_done0(this) {
293
0
}
294
295
0
int Sender::IssueRPC(int64_t start_realtime_us) {
296
0
    _main_cntl->_current_call.need_feedback = false;
297
0
    LoadBalancer::SelectIn sel_in = { start_realtime_us,
298
0
                                      true,
299
0
                                      _main_cntl->has_request_code(),
300
0
                                      _main_cntl->_request_code,
301
0
                                      _main_cntl->_accessed };
302
0
    ChannelBalancer::SelectOut sel_out;
303
0
    const int rc = static_cast<ChannelBalancer*>(_main_cntl->_lb.get())
304
0
        ->SelectChannel(sel_in, &sel_out);
305
0
    if (rc != 0) {
306
0
        _main_cntl->SetFailed(rc, "Fail to select channel, %s", berror(rc));
307
0
        return -1;
308
0
    }
309
0
    DLOG(INFO) << "Selected channel=" << sel_out.channel() << ", size="
310
0
                << (_main_cntl->_accessed ? _main_cntl->_accessed->size() : 0);
311
0
    _main_cntl->_current_call.need_feedback = sel_out.need_feedback;
312
0
    _main_cntl->_current_call.peer_id = sel_out.fake_sock->id();
313
314
0
    Resource r = PopFree();
315
0
    if (r.sub_done == NULL) {
316
0
        CHECK(false) << "Impossible!";
317
0
        _main_cntl->SetFailed("Impossible happens");
318
0
        return -1;
319
0
    }
320
0
    r.sub_done->_cid = _main_cntl->current_id();
321
0
    r.sub_done->_peer_id = sel_out.fake_sock->id();
322
0
    Controller* sub_cntl = &r.sub_done->_cntl;
323
    // No need to count timeout. We already managed timeout in schan. If
324
    // timeout occurs, sub calls are canceled with ERPCTIMEDOUT.
325
0
    sub_cntl->_timeout_ms = -1;
326
0
    sub_cntl->_real_timeout_ms = _main_cntl->timeout_ms();
327
328
    // Inherit following fields of _main_cntl.
329
    // TODO(gejun): figure out a better way to maintain these fields.
330
0
    sub_cntl->set_connection_type(_main_cntl->connection_type());
331
0
    sub_cntl->set_type_of_service(_main_cntl->_tos);
332
0
    sub_cntl->set_request_compress_type(_main_cntl->request_compress_type());
333
0
    sub_cntl->set_log_id(_main_cntl->log_id());
334
0
    sub_cntl->set_request_code(_main_cntl->request_code());
335
    // Forward request attachment to the subcall
336
0
    sub_cntl->request_attachment().append(_main_cntl->request_attachment());
337
    
338
0
    sel_out.channel()->CallMethod(_main_cntl->_method,
339
0
                                  &r.sub_done->_cntl,
340
0
                                  _request,
341
0
                                  r.response,
342
0
                                  r.sub_done);
343
0
    return 0;
344
0
}
345
346
0
void SubDone::Run() {
347
0
    Controller* main_cntl = NULL;
348
0
    const int rc = bthread_id_lock(_cid, (void**)&main_cntl);
349
0
    if (rc != 0) {
350
        // _cid must be valid because schan does not dtor before cancelling
351
        // all sub calls.
352
0
        LOG(ERROR) << "Fail to lock correlation_id="
353
0
                   << _cid.value << ": " << berror(rc);
354
0
        return;
355
0
    }
356
    // NOTE: Copying gettable-but-settable fields which are generally set
357
    // during the RPC to reflect details.
358
0
    main_cntl->_remote_side = _cntl._remote_side;
359
    // connection_type may be changed during CallMethod. 
360
0
    main_cntl->set_connection_type(_cntl.connection_type());
361
0
    main_cntl->response_attachment().swap(_cntl.response_attachment());
362
0
    Resource r;
363
0
    r.response = _cntl._response;
364
0
    r.sub_done = this;
365
0
    if (!_owner->PushFree(r)) {
366
0
        return;
367
0
    }
368
0
    const int saved_error = main_cntl->ErrorCode();
369
    
370
0
    if (_cntl.Failed()) {
371
0
        if (_cntl.ErrorCode() == ENODATA || _cntl.ErrorCode() == EHOSTDOWN) {
372
            // LB could not find a server.
373
0
            Socket::SetFailed(_peer_id);  // trigger HC.
374
0
        }
375
0
        main_cntl->SetFailed(_cntl._error_text);
376
0
        main_cntl->_error_code = _cntl._error_code;
377
0
    } else {
378
0
        if (_cntl._response != main_cntl->_response) {
379
0
            main_cntl->_response->GetReflection()->Swap(
380
0
                main_cntl->_response, _cntl._response);
381
0
        }
382
0
    }
383
0
    const Controller::CompletionInfo info = { _cid, true };
384
0
    main_cntl->OnVersionedRPCReturned(info, false, saved_error);
385
0
}
386
387
0
void Sender::Run() {
388
0
    _finished = true;
389
0
    if (_nfree != _nalloc) {
390
0
        const int saved_nalloc = _nalloc;
391
0
        int error = (_main_cntl->ErrorCode() == ERPCTIMEDOUT ? ERPCTIMEDOUT : ECANCELED);
392
0
        CallId ids[_nalloc];
393
0
        for (int i = 0; i < _nalloc; ++i) {
394
0
            ids[i] = _alloc_resources[i].sub_done->_cntl.call_id();
395
0
        }
396
0
        CallId cid = _main_cntl->call_id();
397
0
        CHECK_EQ(0, bthread_id_unlock(cid));
398
0
        for (int i = 0; i < saved_nalloc; ++i) {
399
0
            bthread_id_error(ids[i], error);
400
0
        }
401
0
    } else {
402
0
        Clear();
403
0
    }
404
0
}
405
406
0
void Sender::Clear() {
407
0
    if (_main_cntl == NULL) {
408
0
        return;
409
0
    }
410
0
    delete _alloc_resources[1].response;
411
0
    delete _alloc_resources[1].sub_done;
412
0
    _alloc_resources[1] = Resource();
413
0
    const CallId cid = _main_cntl->call_id();
414
0
    _main_cntl = NULL;
415
0
    if (_user_done) {
416
0
        _user_done->Run();
417
0
    }
418
0
    bthread_id_unlock_and_destroy(cid);
419
0
}
420
421
0
inline Resource Sender::PopFree() {
422
0
    if (_nfree == 0) {
423
0
        if (_nalloc == 0) {
424
0
            Resource r;
425
0
            r.response = _response;
426
0
            r.sub_done = &_sub_done0;
427
0
            _alloc_resources[_nalloc++] = r;
428
0
            return r;
429
0
        } else if (_nalloc == 1) {
430
0
            Resource r;
431
0
            r.response = _response->New();
432
0
            r.sub_done = new SubDone(this);
433
0
            _alloc_resources[_nalloc++] = r;
434
0
            return r;
435
0
        } else {
436
0
            CHECK(false) << "nalloc=" << _nalloc;
437
0
            return Resource();
438
0
        }
439
0
    } else {
440
0
        Resource r = _free_resources[--_nfree];
441
0
        r.response->Clear();
442
0
        Controller& sub_cntl = r.sub_done->_cntl;
443
0
        ExcludedServers* saved_accessed = sub_cntl._accessed;
444
0
        sub_cntl._accessed = NULL;
445
0
        sub_cntl.Reset();
446
0
        sub_cntl._accessed = saved_accessed;
447
0
        return r;
448
0
    }
449
0
}
450
451
0
inline bool Sender::PushFree(const Resource& r) {
452
0
    if (_nfree < 2) {
453
0
        _free_resources[_nfree++] = r;
454
0
        if (_finished && _nfree == _nalloc) {
455
0
            Clear();
456
0
            return false;
457
0
        }
458
0
        return true;
459
0
    } else {
460
0
        CHECK(false) << "Impossible!";
461
0
        return false;
462
0
    }
463
0
}
464
465
0
inline const Controller* Sender::SubController(int index) const {
466
0
    if (index != 0) {
467
0
        return NULL;
468
0
    }
469
0
    for (int i = 0; i < _nfree; ++i) {
470
0
        if (!_free_resources[i].sub_done->_cntl.Failed()) {
471
0
            return &_free_resources[i].sub_done->_cntl;
472
0
        }
473
0
    }
474
0
    if (_nfree != 0) {
475
0
        return &_free_resources[_nfree - 1].sub_done->_cntl;
476
0
    }
477
0
    return NULL;
478
0
}
479
480
}  // namespace schan
481
482
const Controller* GetSubControllerOfSelectiveChannel(
483
0
    const RPCSender* sender, int index) {
484
0
    return static_cast<const schan::Sender*>(sender)->SubController(index);
485
0
}
486
487
static void PassSerializeRequest(butil::IOBuf*, Controller*,
488
0
                                 const google::protobuf::Message*) {
489
0
}
490
491
0
SelectiveChannel::SelectiveChannel() {}
492
493
0
SelectiveChannel::~SelectiveChannel() {}
494
495
0
int SelectiveChannel::Init(const char* lb_name, const ChannelOptions* options) {
496
    // Force naming services to register.
497
0
    GlobalInitializeOrDie();
498
0
    if (initialized()) {
499
0
        LOG(ERROR) << "Already initialized";
500
0
        return -1;
501
0
    }
502
0
    schan::ChannelBalancer* lb = new (std::nothrow) schan::ChannelBalancer;
503
0
    if (NULL == lb) {
504
0
        LOG(FATAL) << "Fail to new ChannelBalancer";
505
0
        return -1;
506
0
    }
507
0
    if (lb->Init(lb_name) != 0) {
508
0
        LOG(ERROR) << "Fail to init lb";
509
0
        delete lb;
510
0
        return -1;
511
0
    }
512
0
    _chan._lb.reset(lb);
513
0
    _chan._serialize_request = PassSerializeRequest;
514
0
    if (options) {
515
0
        _chan._options = *options;
516
        // Modify some fields to be consistent with behavior of schan.
517
0
        _chan._options.connection_type = CONNECTION_TYPE_UNKNOWN;
518
0
        _chan._options.succeed_without_server = true;
519
0
        _chan._options.auth = NULL;
520
0
    }
521
0
    _chan._options.protocol = PROTOCOL_UNKNOWN;
522
0
    return 0;
523
0
}
524
525
0
bool SelectiveChannel::initialized() const {
526
0
    return _chan._lb != NULL;
527
0
}
528
529
int SelectiveChannel::AddChannel(ChannelBase* sub_channel,
530
0
                                 ChannelHandle* handle) {
531
0
    schan::ChannelBalancer* lb =
532
0
        static_cast<schan::ChannelBalancer*>(_chan._lb.get());
533
0
    if (lb == NULL) {
534
0
        LOG(ERROR) << "You must call Init() to initialize a SelectiveChannel";
535
0
        return -1;
536
0
    }
537
0
    return lb->AddChannel(sub_channel, handle);
538
0
}
539
540
0
void SelectiveChannel::RemoveAndDestroyChannel(ChannelHandle handle) {
541
0
    schan::ChannelBalancer* lb =
542
0
        static_cast<schan::ChannelBalancer*>(_chan._lb.get());
543
0
    if (lb == NULL) {
544
0
        LOG(ERROR) << "You must call Init() to initialize a SelectiveChannel";
545
0
        return;
546
0
    }
547
0
    lb->RemoveAndDestroyChannel(handle);
548
0
}
549
550
void SelectiveChannel::CallMethod(
551
    const google::protobuf::MethodDescriptor* method,
552
    google::protobuf::RpcController* controller_base,
553
    const google::protobuf::Message* request,
554
    google::protobuf::Message* response,
555
0
    google::protobuf::Closure* user_done) {
556
0
    Controller* cntl = static_cast<Controller*>(controller_base);
557
0
    if (!initialized()) {
558
0
        cntl->SetFailed(EINVAL, "SelectiveChannel=%p is not initialized yet",
559
0
                        this);
560
0
    }
561
0
    schan::Sender* sndr = new schan::Sender(cntl, request, response, user_done);
562
0
    cntl->_sender = sndr;
563
0
    cntl->add_flag(Controller::FLAGS_DESTROY_CID_IN_DONE);
564
0
    const CallId cid = cntl->call_id();
565
0
    _chan.CallMethod(method, cntl, request, response, sndr);
566
0
    if (user_done == NULL) {
567
0
        Join(cid);
568
0
        cntl->OnRPCEnd(butil::gettimeofday_us());
569
0
    }
570
0
}
571
572
0
int SelectiveChannel::CheckHealth() {
573
0
    schan::ChannelBalancer* lb =
574
0
        static_cast<schan::ChannelBalancer*>(_chan._lb.get());
575
0
    if (lb) {
576
0
        return lb->CheckHealth();
577
0
    }
578
0
    return -1;
579
0
}
580
581
void SelectiveChannel::Describe(
582
0
    std::ostream& os, const DescribeOptions& options) const {
583
0
    os << "SelectiveChannel[";
584
0
    if (_chan._lb != NULL) {
585
0
        _chan._lb->Describe(os, options);
586
0
    } else {
587
0
        os << "uninitialized";
588
0
    }
589
0
    os << ']';
590
0
}
591
592
} // namespace brpc