/src/brpc/src/brpc/selective_channel.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | |
19 | | #include <map> |
20 | | #include <gflags/gflags.h> |
21 | | #include "bthread/bthread.h" // bthread_id_xx |
22 | | #include "brpc/socket.h" // SocketUser |
23 | | #include "brpc/load_balancer.h" // LoadBalancer |
24 | | #include "brpc/details/controller_private_accessor.h" // RPCSender |
25 | | #include "brpc/selective_channel.h" |
26 | | #include "brpc/global.h" |
27 | | |
28 | | |
29 | | namespace brpc { |
30 | | |
31 | | DEFINE_int32(channel_check_interval, 1, |
32 | | "seconds between consecutive health-checking of unaccessible" |
33 | | " sub channels inside SelectiveChannel"); |
34 | | |
35 | | namespace schan { |
36 | | |
37 | | // This map is generally very small, std::map may be good enough. |
38 | | typedef std::map<ChannelBase*, Socket*> ChannelToIdMap; |
39 | | |
40 | | // Representing a sub channel. |
41 | | class SubChannel : public SocketUser { |
42 | | public: |
43 | | ChannelBase* chan; |
44 | | |
45 | | // internal channel is deleted after the fake Socket is SetFailed |
46 | 0 | void BeforeRecycle(Socket*) { |
47 | 0 | delete chan; |
48 | 0 | delete this; |
49 | 0 | } |
50 | | |
51 | 0 | int CheckHealth(Socket* ptr) { |
52 | 0 | if (ptr->health_check_count() == 0) { |
53 | 0 | LOG(INFO) << "Checking " << *chan << " chan=0x" << (void*)chan |
54 | 0 | << " Fake" << *ptr; |
55 | 0 | } |
56 | 0 | return chan->CheckHealth(); |
57 | 0 | } |
58 | | |
59 | 0 | void AfterRevived(Socket* ptr) { |
60 | 0 | LOG(INFO) << "Revived " << *chan << " chan=0x" << (void*)chan |
61 | 0 | << " Fake" << *ptr << " (Connectable)"; |
62 | 0 | } |
63 | | }; |
64 | | |
65 | 0 | int GetSubChannelWeight(SocketUser* u) { |
66 | 0 | return static_cast<SubChannel*>(u)->chan->Weight(); |
67 | 0 | } |
68 | | |
69 | | // Load balance between fake sockets whose SocketUsers are sub channels. |
70 | | class ChannelBalancer : public SharedLoadBalancer { |
71 | | public: |
72 | | struct SelectOut { |
73 | 0 | SelectOut() : need_feedback(false) {} |
74 | | |
75 | 0 | ChannelBase* channel() { |
76 | 0 | return static_cast<SubChannel*>(fake_sock->user())->chan; |
77 | 0 | } |
78 | | |
79 | | SocketUniquePtr fake_sock; |
80 | | bool need_feedback; |
81 | | }; |
82 | | |
83 | 0 | ChannelBalancer() {} |
84 | | ~ChannelBalancer(); |
85 | | int Init(const char* lb_name); |
86 | | int AddChannel(ChannelBase* sub_channel, |
87 | | SelectiveChannel::ChannelHandle* handle); |
88 | | void RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle handle); |
89 | | int SelectChannel(const LoadBalancer::SelectIn& in, SelectOut* out); |
90 | | int CheckHealth(); |
91 | | void Describe(std::ostream& os, const DescribeOptions&); |
92 | | |
93 | | private: |
94 | | butil::Mutex _mutex; |
95 | | // Find out duplicated sub channels. |
96 | | ChannelToIdMap _chan_map; |
97 | | }; |
98 | | |
99 | | class SubDone; |
100 | | class Sender; |
101 | | |
102 | | struct Resource { |
103 | 0 | Resource() : response(NULL), sub_done(NULL) {} |
104 | | |
105 | | google::protobuf::Message* response; |
106 | | SubDone* sub_done; |
107 | | }; |
108 | | |
109 | | // The done to sub channels. |
110 | | class SubDone : public google::protobuf::Closure { |
111 | | public: |
112 | | explicit SubDone(Sender* owner) |
113 | | : _owner(owner) |
114 | | , _cid(INVALID_BTHREAD_ID) |
115 | 0 | , _peer_id(INVALID_SOCKET_ID) { |
116 | 0 | } |
117 | 0 | ~SubDone() {} |
118 | | void Run(); |
119 | | |
120 | | Sender* _owner; |
121 | | CallId _cid; |
122 | | SocketId _peer_id; |
123 | | Controller _cntl; |
124 | | }; |
125 | | |
126 | | // The sender to intercept Controller::IssueRPC |
127 | | class Sender : public RPCSender, |
128 | | public google::protobuf::Closure { |
129 | | friend class SubDone; |
130 | | public: |
131 | | Sender(Controller* cntl, |
132 | | const google::protobuf::Message* request, |
133 | | google::protobuf::Message* response, |
134 | | google::protobuf::Closure* user_done); |
135 | 0 | ~Sender() { Clear(); } |
136 | | int IssueRPC(int64_t start_realtime_us); |
137 | | Resource PopFree(); |
138 | | bool PushFree(const Resource& r); |
139 | | const Controller* SubController(int index) const; |
140 | | void Run(); |
141 | | void Clear(); |
142 | | |
143 | | private: |
144 | | Controller* _main_cntl; |
145 | | const google::protobuf::Message* _request; |
146 | | google::protobuf::Message* _response; |
147 | | google::protobuf::Closure* _user_done; |
148 | | short _nfree; |
149 | | short _nalloc; |
150 | | bool _finished; |
151 | | Resource _free_resources[2]; |
152 | | Resource _alloc_resources[2]; |
153 | | SubDone _sub_done0; |
154 | | }; |
155 | | |
156 | | // =============================================== |
157 | | |
158 | 0 | ChannelBalancer::~ChannelBalancer() { |
159 | 0 | for (ChannelToIdMap::iterator |
160 | 0 | it = _chan_map.begin(); it != _chan_map.end(); ++it) { |
161 | 0 | it->second->ReleaseAdditionalReference(); |
162 | 0 | it->second->ReleaseHCRelatedReference(); |
163 | 0 | } |
164 | 0 | _chan_map.clear(); |
165 | 0 | } |
166 | | |
167 | 0 | int ChannelBalancer::Init(const char* lb_name) { |
168 | 0 | return SharedLoadBalancer::Init(lb_name); |
169 | 0 | } |
170 | | |
171 | | int ChannelBalancer::AddChannel(ChannelBase* sub_channel, |
172 | 0 | SelectiveChannel::ChannelHandle* handle) { |
173 | 0 | if (NULL == sub_channel) { |
174 | 0 | LOG(ERROR) << "Parameter[sub_channel] is NULL"; |
175 | 0 | return -1; |
176 | 0 | } |
177 | 0 | BAIDU_SCOPED_LOCK(_mutex); |
178 | 0 | if (_chan_map.find(sub_channel) != _chan_map.end()) { |
179 | 0 | LOG(ERROR) << "Duplicated sub_channel=" << sub_channel; |
180 | 0 | return -1; |
181 | 0 | } |
182 | 0 | SubChannel* sub_chan = new (std::nothrow) SubChannel; |
183 | 0 | if (sub_chan == NULL) { |
184 | 0 | LOG(FATAL) << "Fail to to new SubChannel"; |
185 | 0 | return -1; |
186 | 0 | } |
187 | 0 | sub_chan->chan = sub_channel; |
188 | 0 | SocketId sock_id; |
189 | 0 | SocketOptions options; |
190 | 0 | options.user = sub_chan; |
191 | 0 | options.health_check_interval_s = FLAGS_channel_check_interval; |
192 | |
|
193 | 0 | if (Socket::Create(options, &sock_id) != 0) { |
194 | 0 | delete sub_chan; |
195 | 0 | LOG(ERROR) << "Fail to create fake socket for sub channel"; |
196 | 0 | return -1; |
197 | 0 | } |
198 | 0 | SocketUniquePtr ptr; |
199 | 0 | int rc = Socket::AddressFailedAsWell(sock_id, &ptr); |
200 | 0 | if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) { |
201 | 0 | LOG(FATAL) << "Fail to address SocketId=" << sock_id; |
202 | 0 | return -1; |
203 | 0 | } |
204 | 0 | if (!AddServer(ServerId(sock_id))) { |
205 | 0 | LOG(ERROR) << "Duplicated sub_channel=" << sub_channel; |
206 | | // sub_chan will be deleted when the socket is recycled. |
207 | 0 | ptr->SetFailed(); |
208 | | // Cancel health checking. |
209 | 0 | ptr->ReleaseHCRelatedReference(); |
210 | 0 | return -1; |
211 | 0 | } |
212 | | // The health-check-related reference has been held on created. |
213 | 0 | _chan_map[sub_channel]= ptr.get(); |
214 | 0 | if (handle) { |
215 | 0 | *handle = sock_id; |
216 | 0 | } |
217 | 0 | return 0; |
218 | 0 | } |
219 | | |
220 | 0 | void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle handle) { |
221 | 0 | if (!RemoveServer(ServerId(handle))) { |
222 | 0 | return; |
223 | 0 | } |
224 | 0 | SocketUniquePtr ptr; |
225 | 0 | const int rc = Socket::AddressFailedAsWell(handle, &ptr); |
226 | 0 | if (rc >= 0) { |
227 | 0 | SubChannel* sub = static_cast<SubChannel*>(ptr->user()); |
228 | 0 | { |
229 | 0 | BAIDU_SCOPED_LOCK(_mutex); |
230 | 0 | CHECK_EQ(1UL, _chan_map.erase(sub->chan)); |
231 | 0 | } |
232 | 0 | if (rc == 0) { |
233 | 0 | ptr->ReleaseAdditionalReference(); |
234 | 0 | } |
235 | | // Cancel health checking. |
236 | 0 | ptr->ReleaseHCRelatedReference(); |
237 | 0 | } |
238 | 0 | } |
239 | | |
240 | | inline int ChannelBalancer::SelectChannel(const LoadBalancer::SelectIn& in, |
241 | 0 | SelectOut* out) { |
242 | 0 | LoadBalancer::SelectOut sel_out(&out->fake_sock); |
243 | 0 | const int rc = SelectServer(in, &sel_out); |
244 | 0 | if (rc != 0) { |
245 | 0 | return rc; |
246 | 0 | } |
247 | 0 | out->need_feedback = sel_out.need_feedback; |
248 | 0 | return 0; |
249 | 0 | } |
250 | | |
251 | 0 | int ChannelBalancer::CheckHealth() { |
252 | 0 | BAIDU_SCOPED_LOCK(_mutex); |
253 | 0 | for (ChannelToIdMap::const_iterator it = _chan_map.begin(); |
254 | 0 | it != _chan_map.end(); ++it) { |
255 | 0 | if (!it->second->Failed() && |
256 | 0 | it->first->CheckHealth() == 0) { |
257 | 0 | return 0; |
258 | 0 | } |
259 | 0 | } |
260 | 0 | return -1; |
261 | 0 | } |
262 | | |
263 | | void ChannelBalancer::Describe(std::ostream& os, |
264 | 0 | const DescribeOptions& options) { |
265 | 0 | BAIDU_SCOPED_LOCK(_mutex); |
266 | 0 | if (!options.verbose) { |
267 | 0 | os << _chan_map.size(); |
268 | 0 | return; |
269 | 0 | } |
270 | 0 | for (ChannelToIdMap::const_iterator it = _chan_map.begin(); |
271 | 0 | it != _chan_map.end(); ++it) { |
272 | 0 | if (it != _chan_map.begin()) { |
273 | 0 | os << ' '; |
274 | 0 | } |
275 | 0 | it->first->Describe(os, options); |
276 | 0 | } |
277 | 0 | } |
278 | | |
279 | | // =================================== |
280 | | |
281 | | Sender::Sender(Controller* cntl, |
282 | | const google::protobuf::Message* request, |
283 | | google::protobuf::Message* response, |
284 | | google::protobuf::Closure* user_done) |
285 | | : _main_cntl(cntl) |
286 | | , _request(request) |
287 | | , _response(response) |
288 | | , _user_done(user_done) |
289 | | , _nfree(0) |
290 | | , _nalloc(0) |
291 | | , _finished(false) |
292 | 0 | , _sub_done0(this) { |
293 | 0 | } |
294 | | |
295 | 0 | int Sender::IssueRPC(int64_t start_realtime_us) { |
296 | 0 | _main_cntl->_current_call.need_feedback = false; |
297 | 0 | LoadBalancer::SelectIn sel_in = { start_realtime_us, |
298 | 0 | true, |
299 | 0 | _main_cntl->has_request_code(), |
300 | 0 | _main_cntl->_request_code, |
301 | 0 | _main_cntl->_accessed }; |
302 | 0 | ChannelBalancer::SelectOut sel_out; |
303 | 0 | const int rc = static_cast<ChannelBalancer*>(_main_cntl->_lb.get()) |
304 | 0 | ->SelectChannel(sel_in, &sel_out); |
305 | 0 | if (rc != 0) { |
306 | 0 | _main_cntl->SetFailed(rc, "Fail to select channel, %s", berror(rc)); |
307 | 0 | return -1; |
308 | 0 | } |
309 | 0 | DLOG(INFO) << "Selected channel=" << sel_out.channel() << ", size=" |
310 | 0 | << (_main_cntl->_accessed ? _main_cntl->_accessed->size() : 0); |
311 | 0 | _main_cntl->_current_call.need_feedback = sel_out.need_feedback; |
312 | 0 | _main_cntl->_current_call.peer_id = sel_out.fake_sock->id(); |
313 | |
|
314 | 0 | Resource r = PopFree(); |
315 | 0 | if (r.sub_done == NULL) { |
316 | 0 | CHECK(false) << "Impossible!"; |
317 | 0 | _main_cntl->SetFailed("Impossible happens"); |
318 | 0 | return -1; |
319 | 0 | } |
320 | 0 | r.sub_done->_cid = _main_cntl->current_id(); |
321 | 0 | r.sub_done->_peer_id = sel_out.fake_sock->id(); |
322 | 0 | Controller* sub_cntl = &r.sub_done->_cntl; |
323 | | // No need to count timeout. We already managed timeout in schan. If |
324 | | // timeout occurs, sub calls are canceled with ERPCTIMEDOUT. |
325 | 0 | sub_cntl->_timeout_ms = -1; |
326 | 0 | sub_cntl->_real_timeout_ms = _main_cntl->timeout_ms(); |
327 | | |
328 | | // Inherit following fields of _main_cntl. |
329 | | // TODO(gejun): figure out a better way to maintain these fields. |
330 | 0 | sub_cntl->set_connection_type(_main_cntl->connection_type()); |
331 | 0 | sub_cntl->set_type_of_service(_main_cntl->_tos); |
332 | 0 | sub_cntl->set_request_compress_type(_main_cntl->request_compress_type()); |
333 | 0 | sub_cntl->set_log_id(_main_cntl->log_id()); |
334 | 0 | sub_cntl->set_request_code(_main_cntl->request_code()); |
335 | | // Forward request attachment to the subcall |
336 | 0 | sub_cntl->request_attachment().append(_main_cntl->request_attachment()); |
337 | | |
338 | 0 | sel_out.channel()->CallMethod(_main_cntl->_method, |
339 | 0 | &r.sub_done->_cntl, |
340 | 0 | _request, |
341 | 0 | r.response, |
342 | 0 | r.sub_done); |
343 | 0 | return 0; |
344 | 0 | } |
345 | | |
346 | 0 | void SubDone::Run() { |
347 | 0 | Controller* main_cntl = NULL; |
348 | 0 | const int rc = bthread_id_lock(_cid, (void**)&main_cntl); |
349 | 0 | if (rc != 0) { |
350 | | // _cid must be valid because schan does not dtor before cancelling |
351 | | // all sub calls. |
352 | 0 | LOG(ERROR) << "Fail to lock correlation_id=" |
353 | 0 | << _cid.value << ": " << berror(rc); |
354 | 0 | return; |
355 | 0 | } |
356 | | // NOTE: Copying gettable-but-settable fields which are generally set |
357 | | // during the RPC to reflect details. |
358 | 0 | main_cntl->_remote_side = _cntl._remote_side; |
359 | | // connection_type may be changed during CallMethod. |
360 | 0 | main_cntl->set_connection_type(_cntl.connection_type()); |
361 | 0 | main_cntl->response_attachment().swap(_cntl.response_attachment()); |
362 | 0 | Resource r; |
363 | 0 | r.response = _cntl._response; |
364 | 0 | r.sub_done = this; |
365 | 0 | if (!_owner->PushFree(r)) { |
366 | 0 | return; |
367 | 0 | } |
368 | 0 | const int saved_error = main_cntl->ErrorCode(); |
369 | | |
370 | 0 | if (_cntl.Failed()) { |
371 | 0 | if (_cntl.ErrorCode() == ENODATA || _cntl.ErrorCode() == EHOSTDOWN) { |
372 | | // LB could not find a server. |
373 | 0 | Socket::SetFailed(_peer_id); // trigger HC. |
374 | 0 | } |
375 | 0 | main_cntl->SetFailed(_cntl._error_text); |
376 | 0 | main_cntl->_error_code = _cntl._error_code; |
377 | 0 | } else { |
378 | 0 | if (_cntl._response != main_cntl->_response) { |
379 | 0 | main_cntl->_response->GetReflection()->Swap( |
380 | 0 | main_cntl->_response, _cntl._response); |
381 | 0 | } |
382 | 0 | } |
383 | 0 | const Controller::CompletionInfo info = { _cid, true }; |
384 | 0 | main_cntl->OnVersionedRPCReturned(info, false, saved_error); |
385 | 0 | } |
386 | | |
387 | 0 | void Sender::Run() { |
388 | 0 | _finished = true; |
389 | 0 | if (_nfree != _nalloc) { |
390 | 0 | const int saved_nalloc = _nalloc; |
391 | 0 | int error = (_main_cntl->ErrorCode() == ERPCTIMEDOUT ? ERPCTIMEDOUT : ECANCELED); |
392 | 0 | CallId ids[_nalloc]; |
393 | 0 | for (int i = 0; i < _nalloc; ++i) { |
394 | 0 | ids[i] = _alloc_resources[i].sub_done->_cntl.call_id(); |
395 | 0 | } |
396 | 0 | CallId cid = _main_cntl->call_id(); |
397 | 0 | CHECK_EQ(0, bthread_id_unlock(cid)); |
398 | 0 | for (int i = 0; i < saved_nalloc; ++i) { |
399 | 0 | bthread_id_error(ids[i], error); |
400 | 0 | } |
401 | 0 | } else { |
402 | 0 | Clear(); |
403 | 0 | } |
404 | 0 | } |
405 | | |
406 | 0 | void Sender::Clear() { |
407 | 0 | if (_main_cntl == NULL) { |
408 | 0 | return; |
409 | 0 | } |
410 | 0 | delete _alloc_resources[1].response; |
411 | 0 | delete _alloc_resources[1].sub_done; |
412 | 0 | _alloc_resources[1] = Resource(); |
413 | 0 | const CallId cid = _main_cntl->call_id(); |
414 | 0 | _main_cntl = NULL; |
415 | 0 | if (_user_done) { |
416 | 0 | _user_done->Run(); |
417 | 0 | } |
418 | 0 | bthread_id_unlock_and_destroy(cid); |
419 | 0 | } |
420 | | |
421 | 0 | inline Resource Sender::PopFree() { |
422 | 0 | if (_nfree == 0) { |
423 | 0 | if (_nalloc == 0) { |
424 | 0 | Resource r; |
425 | 0 | r.response = _response; |
426 | 0 | r.sub_done = &_sub_done0; |
427 | 0 | _alloc_resources[_nalloc++] = r; |
428 | 0 | return r; |
429 | 0 | } else if (_nalloc == 1) { |
430 | 0 | Resource r; |
431 | 0 | r.response = _response->New(); |
432 | 0 | r.sub_done = new SubDone(this); |
433 | 0 | _alloc_resources[_nalloc++] = r; |
434 | 0 | return r; |
435 | 0 | } else { |
436 | 0 | CHECK(false) << "nalloc=" << _nalloc; |
437 | 0 | return Resource(); |
438 | 0 | } |
439 | 0 | } else { |
440 | 0 | Resource r = _free_resources[--_nfree]; |
441 | 0 | r.response->Clear(); |
442 | 0 | Controller& sub_cntl = r.sub_done->_cntl; |
443 | 0 | ExcludedServers* saved_accessed = sub_cntl._accessed; |
444 | 0 | sub_cntl._accessed = NULL; |
445 | 0 | sub_cntl.Reset(); |
446 | 0 | sub_cntl._accessed = saved_accessed; |
447 | 0 | return r; |
448 | 0 | } |
449 | 0 | } |
450 | | |
451 | 0 | inline bool Sender::PushFree(const Resource& r) { |
452 | 0 | if (_nfree < 2) { |
453 | 0 | _free_resources[_nfree++] = r; |
454 | 0 | if (_finished && _nfree == _nalloc) { |
455 | 0 | Clear(); |
456 | 0 | return false; |
457 | 0 | } |
458 | 0 | return true; |
459 | 0 | } else { |
460 | 0 | CHECK(false) << "Impossible!"; |
461 | 0 | return false; |
462 | 0 | } |
463 | 0 | } |
464 | | |
465 | 0 | inline const Controller* Sender::SubController(int index) const { |
466 | 0 | if (index != 0) { |
467 | 0 | return NULL; |
468 | 0 | } |
469 | 0 | for (int i = 0; i < _nfree; ++i) { |
470 | 0 | if (!_free_resources[i].sub_done->_cntl.Failed()) { |
471 | 0 | return &_free_resources[i].sub_done->_cntl; |
472 | 0 | } |
473 | 0 | } |
474 | 0 | if (_nfree != 0) { |
475 | 0 | return &_free_resources[_nfree - 1].sub_done->_cntl; |
476 | 0 | } |
477 | 0 | return NULL; |
478 | 0 | } |
479 | | |
480 | | } // namespace schan |
481 | | |
482 | | const Controller* GetSubControllerOfSelectiveChannel( |
483 | 0 | const RPCSender* sender, int index) { |
484 | 0 | return static_cast<const schan::Sender*>(sender)->SubController(index); |
485 | 0 | } |
486 | | |
487 | | static void PassSerializeRequest(butil::IOBuf*, Controller*, |
488 | 0 | const google::protobuf::Message*) { |
489 | 0 | } |
490 | | |
491 | 0 | SelectiveChannel::SelectiveChannel() {} |
492 | | |
493 | 0 | SelectiveChannel::~SelectiveChannel() {} |
494 | | |
495 | 0 | int SelectiveChannel::Init(const char* lb_name, const ChannelOptions* options) { |
496 | | // Force naming services to register. |
497 | 0 | GlobalInitializeOrDie(); |
498 | 0 | if (initialized()) { |
499 | 0 | LOG(ERROR) << "Already initialized"; |
500 | 0 | return -1; |
501 | 0 | } |
502 | 0 | schan::ChannelBalancer* lb = new (std::nothrow) schan::ChannelBalancer; |
503 | 0 | if (NULL == lb) { |
504 | 0 | LOG(FATAL) << "Fail to new ChannelBalancer"; |
505 | 0 | return -1; |
506 | 0 | } |
507 | 0 | if (lb->Init(lb_name) != 0) { |
508 | 0 | LOG(ERROR) << "Fail to init lb"; |
509 | 0 | delete lb; |
510 | 0 | return -1; |
511 | 0 | } |
512 | 0 | _chan._lb.reset(lb); |
513 | 0 | _chan._serialize_request = PassSerializeRequest; |
514 | 0 | if (options) { |
515 | 0 | _chan._options = *options; |
516 | | // Modify some fields to be consistent with behavior of schan. |
517 | 0 | _chan._options.connection_type = CONNECTION_TYPE_UNKNOWN; |
518 | 0 | _chan._options.succeed_without_server = true; |
519 | 0 | _chan._options.auth = NULL; |
520 | 0 | } |
521 | 0 | _chan._options.protocol = PROTOCOL_UNKNOWN; |
522 | 0 | return 0; |
523 | 0 | } |
524 | | |
525 | 0 | bool SelectiveChannel::initialized() const { |
526 | 0 | return _chan._lb != NULL; |
527 | 0 | } |
528 | | |
529 | | int SelectiveChannel::AddChannel(ChannelBase* sub_channel, |
530 | 0 | ChannelHandle* handle) { |
531 | 0 | schan::ChannelBalancer* lb = |
532 | 0 | static_cast<schan::ChannelBalancer*>(_chan._lb.get()); |
533 | 0 | if (lb == NULL) { |
534 | 0 | LOG(ERROR) << "You must call Init() to initialize a SelectiveChannel"; |
535 | 0 | return -1; |
536 | 0 | } |
537 | 0 | return lb->AddChannel(sub_channel, handle); |
538 | 0 | } |
539 | | |
540 | 0 | void SelectiveChannel::RemoveAndDestroyChannel(ChannelHandle handle) { |
541 | 0 | schan::ChannelBalancer* lb = |
542 | 0 | static_cast<schan::ChannelBalancer*>(_chan._lb.get()); |
543 | 0 | if (lb == NULL) { |
544 | 0 | LOG(ERROR) << "You must call Init() to initialize a SelectiveChannel"; |
545 | 0 | return; |
546 | 0 | } |
547 | 0 | lb->RemoveAndDestroyChannel(handle); |
548 | 0 | } |
549 | | |
550 | | void SelectiveChannel::CallMethod( |
551 | | const google::protobuf::MethodDescriptor* method, |
552 | | google::protobuf::RpcController* controller_base, |
553 | | const google::protobuf::Message* request, |
554 | | google::protobuf::Message* response, |
555 | 0 | google::protobuf::Closure* user_done) { |
556 | 0 | Controller* cntl = static_cast<Controller*>(controller_base); |
557 | 0 | if (!initialized()) { |
558 | 0 | cntl->SetFailed(EINVAL, "SelectiveChannel=%p is not initialized yet", |
559 | 0 | this); |
560 | 0 | } |
561 | 0 | schan::Sender* sndr = new schan::Sender(cntl, request, response, user_done); |
562 | 0 | cntl->_sender = sndr; |
563 | 0 | cntl->add_flag(Controller::FLAGS_DESTROY_CID_IN_DONE); |
564 | 0 | const CallId cid = cntl->call_id(); |
565 | 0 | _chan.CallMethod(method, cntl, request, response, sndr); |
566 | 0 | if (user_done == NULL) { |
567 | 0 | Join(cid); |
568 | 0 | cntl->OnRPCEnd(butil::gettimeofday_us()); |
569 | 0 | } |
570 | 0 | } |
571 | | |
572 | 0 | int SelectiveChannel::CheckHealth() { |
573 | 0 | schan::ChannelBalancer* lb = |
574 | 0 | static_cast<schan::ChannelBalancer*>(_chan._lb.get()); |
575 | 0 | if (lb) { |
576 | 0 | return lb->CheckHealth(); |
577 | 0 | } |
578 | 0 | return -1; |
579 | 0 | } |
580 | | |
581 | | void SelectiveChannel::Describe( |
582 | 0 | std::ostream& os, const DescribeOptions& options) const { |
583 | 0 | os << "SelectiveChannel["; |
584 | 0 | if (_chan._lb != NULL) { |
585 | 0 | _chan._lb->Describe(os, options); |
586 | 0 | } else { |
587 | 0 | os << "uninitialized"; |
588 | 0 | } |
589 | 0 | os << ']'; |
590 | 0 | } |
591 | | |
592 | | } // namespace brpc |