/proc/self/cwd/source/extensions/filters/network/dubbo_proxy/active_message.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/filters/network/dubbo_proxy/active_message.h" |
2 | | |
3 | | #include "source/common/stats/timespan_impl.h" |
4 | | #include "source/extensions/filters/network/dubbo_proxy/app_exception.h" |
5 | | #include "source/extensions/filters/network/dubbo_proxy/conn_manager.h" |
6 | | |
7 | | namespace Envoy { |
8 | | namespace Extensions { |
9 | | namespace NetworkFilters { |
10 | | namespace DubboProxy { |
11 | | |
12 | | // class ActiveResponseDecoder |
13 | | ActiveResponseDecoder::ActiveResponseDecoder(ActiveMessage& parent, DubboFilterStats& stats, |
14 | | Network::Connection& connection, |
15 | | ProtocolPtr&& protocol) |
16 | | : parent_(parent), stats_(stats), response_connection_(connection), |
17 | | protocol_(std::move(protocol)), |
18 | 0 | decoder_(std::make_unique<ResponseDecoder>(*protocol_, *this)), complete_(false) {} |
19 | | |
20 | 0 | DubboFilters::UpstreamResponseStatus ActiveResponseDecoder::onData(Buffer::Instance& data) { |
21 | 0 | ENVOY_LOG(debug, "dubbo response: the received reply data length is {}", data.length()); |
22 | |
|
23 | 0 | bool underflow = false; |
24 | 0 | decoder_->onData(data, underflow); |
25 | 0 | ASSERT(complete_ || underflow); |
26 | | |
27 | 0 | return response_status_; |
28 | 0 | } |
29 | | |
30 | | void ActiveResponseDecoder::onStreamDecoded(MessageMetadataSharedPtr metadata, |
31 | 0 | ContextSharedPtr ctx) { |
32 | 0 | ASSERT(metadata->messageType() == MessageType::Response || |
33 | 0 | metadata->messageType() == MessageType::Exception); |
34 | 0 | ASSERT(metadata->hasResponseStatus()); |
35 | | |
36 | 0 | metadata_ = metadata; |
37 | 0 | if (applyMessageEncodedFilters(metadata, ctx) != FilterStatus::Continue) { |
38 | 0 | response_status_ = DubboFilters::UpstreamResponseStatus::Complete; |
39 | 0 | return; |
40 | 0 | } |
41 | | |
42 | 0 | if (response_connection_.state() != Network::Connection::State::Open) { |
43 | 0 | throw DownstreamConnectionCloseException("Downstream has closed or closing"); |
44 | 0 | } |
45 | | |
46 | 0 | response_connection_.write(ctx->originMessage(), false); |
47 | 0 | ENVOY_LOG(debug, |
48 | 0 | "dubbo response: the upstream response message has been forwarded to the downstream"); |
49 | |
|
50 | 0 | stats_.response_.inc(); |
51 | 0 | stats_.response_decoding_success_.inc(); |
52 | 0 | if (metadata->messageType() == MessageType::Exception) { |
53 | 0 | stats_.response_business_exception_.inc(); |
54 | 0 | } |
55 | |
|
56 | 0 | switch (metadata->responseStatus()) { |
57 | 0 | case ResponseStatus::Ok: |
58 | 0 | stats_.response_success_.inc(); |
59 | 0 | break; |
60 | 0 | default: |
61 | 0 | stats_.response_error_.inc(); |
62 | 0 | ENVOY_LOG(error, "dubbo response status: {}", static_cast<uint8_t>(metadata->responseStatus())); |
63 | 0 | break; |
64 | 0 | } |
65 | | |
66 | 0 | complete_ = true; |
67 | 0 | response_status_ = DubboFilters::UpstreamResponseStatus::Complete; |
68 | |
|
69 | 0 | ENVOY_LOG(debug, "dubbo response: complete processing of upstream response messages, id is {}", |
70 | 0 | metadata->requestId()); |
71 | 0 | } |
72 | | |
73 | | FilterStatus ActiveResponseDecoder::applyMessageEncodedFilters(MessageMetadataSharedPtr metadata, |
74 | 0 | ContextSharedPtr ctx) { |
75 | 0 | parent_.encoder_filter_action_ = [metadata, |
76 | 0 | ctx](DubboFilters::EncoderFilter* filter) -> FilterStatus { |
77 | 0 | return filter->onMessageEncoded(metadata, ctx); |
78 | 0 | }; |
79 | |
|
80 | 0 | auto status = parent_.applyEncoderFilters( |
81 | 0 | nullptr, ActiveMessage::FilterIterationStartState::CanStartFromCurrent); |
82 | 0 | switch (status) { |
83 | 0 | case FilterStatus::StopIteration: |
84 | 0 | break; |
85 | 0 | default: |
86 | 0 | ASSERT(FilterStatus::Continue == status); |
87 | 0 | break; |
88 | 0 | } |
89 | | |
90 | 0 | return status; |
91 | 0 | } |
92 | | |
93 | | // class ActiveMessageFilterBase |
94 | 0 | uint64_t ActiveMessageFilterBase::requestId() const { return parent_.requestId(); } |
95 | | |
96 | 0 | uint64_t ActiveMessageFilterBase::streamId() const { return parent_.streamId(); } |
97 | | |
98 | 0 | const Network::Connection* ActiveMessageFilterBase::connection() const { |
99 | 0 | return parent_.connection(); |
100 | 0 | } |
101 | | |
102 | 0 | Router::RouteConstSharedPtr ActiveMessageFilterBase::route() { return parent_.route(); } |
103 | | |
104 | 0 | SerializationType ActiveMessageFilterBase::serializationType() const { |
105 | 0 | return parent_.serializationType(); |
106 | 0 | } |
107 | | |
108 | 0 | ProtocolType ActiveMessageFilterBase::protocolType() const { return parent_.protocolType(); } |
109 | | |
110 | 0 | Event::Dispatcher& ActiveMessageFilterBase::dispatcher() { return parent_.dispatcher(); } |
111 | | |
112 | 0 | void ActiveMessageFilterBase::resetStream() { parent_.resetStream(); } |
113 | | |
114 | 0 | StreamInfo::StreamInfo& ActiveMessageFilterBase::streamInfo() { return parent_.streamInfo(); } |
115 | | |
116 | | // class ActiveMessageDecoderFilter |
117 | | ActiveMessageDecoderFilter::ActiveMessageDecoderFilter(ActiveMessage& parent, |
118 | | DubboFilters::DecoderFilterSharedPtr filter, |
119 | | bool dual_filter) |
120 | 0 | : ActiveMessageFilterBase(parent, dual_filter), handle_(filter) {} Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessageDecoderFilter::ActiveMessageDecoderFilter(Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessage&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::DubboFilters::DecoderFilter>, bool) Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessageDecoderFilter::ActiveMessageDecoderFilter(Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessage&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::DubboFilters::DecoderFilter>, bool) |
121 | | |
122 | 0 | void ActiveMessageDecoderFilter::continueDecoding() { |
123 | 0 | ASSERT(parent_.context()); |
124 | 0 | auto state = ActiveMessage::FilterIterationStartState::AlwaysStartFromNext; |
125 | 0 | if (0 != parent_.context()->originMessage().length()) { |
126 | 0 | state = ActiveMessage::FilterIterationStartState::CanStartFromCurrent; |
127 | 0 | ENVOY_LOG(warn, "The original message data is not consumed, triggering the decoder filter from " |
128 | 0 | "the current location"); |
129 | 0 | } |
130 | 0 | const FilterStatus status = parent_.applyDecoderFilters(this, state); |
131 | 0 | if (status == FilterStatus::Continue) { |
132 | 0 | ENVOY_LOG(debug, "dubbo response: start upstream"); |
133 | | // All filters have been executed for the current decoder state. |
134 | 0 | if (parent_.pendingStreamDecoded()) { |
135 | | // If the filter stack was paused during messageEnd, handle end-of-request details. |
136 | 0 | parent_.finalizeRequest(); |
137 | 0 | } |
138 | 0 | } |
139 | 0 | } |
140 | | |
141 | | void ActiveMessageDecoderFilter::sendLocalReply(const DubboFilters::DirectResponse& response, |
142 | 0 | bool end_stream) { |
143 | 0 | parent_.sendLocalReply(response, end_stream); |
144 | 0 | } |
145 | | |
146 | 0 | void ActiveMessageDecoderFilter::startUpstreamResponse() { parent_.startUpstreamResponse(); } |
147 | | |
148 | | DubboFilters::UpstreamResponseStatus |
149 | 0 | ActiveMessageDecoderFilter::upstreamData(Buffer::Instance& buffer) { |
150 | 0 | return parent_.upstreamData(buffer); |
151 | 0 | } |
152 | | |
153 | 0 | void ActiveMessageDecoderFilter::resetDownstreamConnection() { |
154 | 0 | parent_.resetDownstreamConnection(); |
155 | 0 | } |
156 | | |
157 | | // class ActiveMessageEncoderFilter |
158 | | ActiveMessageEncoderFilter::ActiveMessageEncoderFilter(ActiveMessage& parent, |
159 | | DubboFilters::EncoderFilterSharedPtr filter, |
160 | | bool dual_filter) |
161 | 0 | : ActiveMessageFilterBase(parent, dual_filter), handle_(filter) {} Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessageEncoderFilter::ActiveMessageEncoderFilter(Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessage&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::DubboFilters::EncoderFilter>, bool) Unexecuted instantiation: Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessageEncoderFilter::ActiveMessageEncoderFilter(Envoy::Extensions::NetworkFilters::DubboProxy::ActiveMessage&, std::__1::shared_ptr<Envoy::Extensions::NetworkFilters::DubboProxy::DubboFilters::EncoderFilter>, bool) |
162 | | |
163 | 0 | void ActiveMessageEncoderFilter::continueEncoding() { |
164 | 0 | ASSERT(parent_.context()); |
165 | 0 | auto state = ActiveMessage::FilterIterationStartState::AlwaysStartFromNext; |
166 | 0 | if (0 != parent_.context()->originMessage().length()) { |
167 | 0 | state = ActiveMessage::FilterIterationStartState::CanStartFromCurrent; |
168 | 0 | ENVOY_LOG(warn, "The original message data is not consumed, triggering the encoder filter from " |
169 | 0 | "the current location"); |
170 | 0 | } |
171 | 0 | const FilterStatus status = parent_.applyEncoderFilters(this, state); |
172 | 0 | if (FilterStatus::Continue == status) { |
173 | 0 | ENVOY_LOG(debug, "All encoding filters have been executed"); |
174 | 0 | } |
175 | 0 | } |
176 | | |
177 | | // class ActiveMessage |
178 | | ActiveMessage::ActiveMessage(ConnectionManager& parent) |
179 | | : parent_(parent), request_timer_(std::make_unique<Stats::HistogramCompletableTimespanImpl>( |
180 | | parent_.stats().request_time_ms_, parent.timeSystem())), |
181 | | stream_id_(parent.randomGenerator().random()), |
182 | | stream_info_(parent.timeSystem(), parent_.connection().connectionInfoProviderSharedPtr()), |
183 | 0 | pending_stream_decoded_(false), local_response_sent_(false) { |
184 | 0 | parent_.stats().request_active_.inc(); |
185 | 0 | } |
186 | | |
187 | 0 | ActiveMessage::~ActiveMessage() { |
188 | 0 | parent_.stats().request_active_.dec(); |
189 | 0 | request_timer_->complete(); |
190 | 0 | for (auto& filter : decoder_filters_) { |
191 | 0 | ENVOY_LOG(debug, "destroy decoder filter"); |
192 | 0 | filter->handler()->onDestroy(); |
193 | 0 | } |
194 | |
|
195 | 0 | for (auto& filter : encoder_filters_) { |
196 | | // Do not call on destroy twice for dual registered filters. |
197 | 0 | if (!filter->dual_filter_) { |
198 | 0 | ENVOY_LOG(debug, "destroy encoder filter"); |
199 | 0 | filter->handler()->onDestroy(); |
200 | 0 | } |
201 | 0 | } |
202 | 0 | } |
203 | | |
204 | | std::list<ActiveMessageEncoderFilterPtr>::iterator |
205 | | ActiveMessage::commonEncodePrefix(ActiveMessageEncoderFilter* filter, |
206 | 0 | FilterIterationStartState state) { |
207 | | // Only do base state setting on the initial call. Subsequent calls for filtering do not touch |
208 | | // the base state. |
209 | 0 | if (filter == nullptr) { |
210 | | // ASSERT(!state_.local_complete_); |
211 | | // state_.local_complete_ = end_stream; |
212 | 0 | return encoder_filters_.begin(); |
213 | 0 | } |
214 | | |
215 | 0 | if (state == FilterIterationStartState::CanStartFromCurrent) { |
216 | | // The filter iteration has been stopped for all frame types, and now the iteration continues. |
217 | | // The current filter's encoding callback has not be called. Call it now. |
218 | 0 | return filter->entry(); |
219 | 0 | } |
220 | 0 | return std::next(filter->entry()); |
221 | 0 | } |
222 | | |
223 | | std::list<ActiveMessageDecoderFilterPtr>::iterator |
224 | | ActiveMessage::commonDecodePrefix(ActiveMessageDecoderFilter* filter, |
225 | 0 | FilterIterationStartState state) { |
226 | 0 | if (!filter) { |
227 | 0 | return decoder_filters_.begin(); |
228 | 0 | } |
229 | 0 | if (state == FilterIterationStartState::CanStartFromCurrent) { |
230 | | // The filter iteration has been stopped for all frame types, and now the iteration continues. |
231 | | // The current filter's callback function has not been called. Call it now. |
232 | 0 | return filter->entry(); |
233 | 0 | } |
234 | 0 | return std::next(filter->entry()); |
235 | 0 | } |
236 | | |
237 | 0 | void ActiveMessage::onStreamDecoded(MessageMetadataSharedPtr metadata, ContextSharedPtr ctx) { |
238 | 0 | parent_.stats().request_decoding_success_.inc(); |
239 | |
|
240 | 0 | metadata_ = metadata; |
241 | 0 | context_ = ctx; |
242 | 0 | filter_action_ = [metadata, ctx](DubboFilters::DecoderFilter* filter) -> FilterStatus { |
243 | 0 | return filter->onMessageDecoded(metadata, ctx); |
244 | 0 | }; |
245 | |
|
246 | 0 | auto status = applyDecoderFilters(nullptr, FilterIterationStartState::CanStartFromCurrent); |
247 | 0 | switch (status) { |
248 | 0 | case FilterStatus::StopIteration: |
249 | 0 | ENVOY_LOG(debug, "dubbo request: pause calling decoder filter, id is {}", |
250 | 0 | metadata->requestId()); |
251 | 0 | pending_stream_decoded_ = true; |
252 | 0 | return; |
253 | 0 | case FilterStatus::AbortIteration: |
254 | 0 | ENVOY_LOG(debug, "dubbo request: abort calling decoder filter, id is {}", |
255 | 0 | metadata->requestId()); |
256 | 0 | parent_.deferredMessage(*this); |
257 | 0 | return; |
258 | 0 | case FilterStatus::Continue: |
259 | 0 | ENVOY_LOG(debug, "dubbo request: complete processing of downstream request messages, id is {}", |
260 | 0 | metadata->requestId()); |
261 | 0 | finalizeRequest(); |
262 | 0 | return; |
263 | 0 | } |
264 | 0 | PANIC_DUE_TO_CORRUPT_ENUM |
265 | 0 | } |
266 | | |
267 | 0 | void ActiveMessage::finalizeRequest() { |
268 | 0 | pending_stream_decoded_ = false; |
269 | 0 | parent_.stats().request_.inc(); |
270 | 0 | bool is_one_way = false; |
271 | 0 | switch (metadata_->messageType()) { |
272 | 0 | case MessageType::Request: |
273 | 0 | parent_.stats().request_twoway_.inc(); |
274 | 0 | break; |
275 | 0 | case MessageType::Oneway: |
276 | 0 | parent_.stats().request_oneway_.inc(); |
277 | 0 | is_one_way = true; |
278 | 0 | break; |
279 | 0 | default: |
280 | 0 | break; |
281 | 0 | } |
282 | | |
283 | 0 | if (local_response_sent_ || is_one_way) { |
284 | 0 | parent_.deferredMessage(*this); |
285 | 0 | } |
286 | 0 | } |
287 | | |
288 | 0 | void ActiveMessage::createFilterChain() { |
289 | 0 | parent_.config().filterFactory().createFilterChain(*this); |
290 | 0 | } |
291 | | |
292 | 0 | DubboProxy::Router::RouteConstSharedPtr ActiveMessage::route() { |
293 | 0 | if (cached_route_) { |
294 | 0 | return cached_route_.value(); |
295 | 0 | } |
296 | | |
297 | 0 | if (metadata_ != nullptr) { |
298 | 0 | DubboProxy::Router::RouteConstSharedPtr route = |
299 | 0 | parent_.config().routerConfig().route(*metadata_, stream_id_); |
300 | 0 | cached_route_ = route; |
301 | 0 | return cached_route_.value(); |
302 | 0 | } |
303 | | |
304 | 0 | return nullptr; |
305 | 0 | } |
306 | | |
307 | | FilterStatus ActiveMessage::applyDecoderFilters(ActiveMessageDecoderFilter* filter, |
308 | 0 | FilterIterationStartState state) { |
309 | 0 | ASSERT(filter_action_ != nullptr); |
310 | 0 | if (!local_response_sent_) { |
311 | 0 | for (auto entry = commonDecodePrefix(filter, state); entry != decoder_filters_.end(); entry++) { |
312 | 0 | const FilterStatus status = filter_action_((*entry)->handler().get()); |
313 | 0 | if (local_response_sent_) { |
314 | 0 | break; |
315 | 0 | } |
316 | | |
317 | 0 | if (status != FilterStatus::Continue) { |
318 | 0 | return status; |
319 | 0 | } |
320 | 0 | } |
321 | 0 | } |
322 | | |
323 | 0 | filter_action_ = nullptr; |
324 | |
|
325 | 0 | return FilterStatus::Continue; |
326 | 0 | } |
327 | | |
328 | | FilterStatus ActiveMessage::applyEncoderFilters(ActiveMessageEncoderFilter* filter, |
329 | 0 | FilterIterationStartState state) { |
330 | 0 | ASSERT(encoder_filter_action_ != nullptr); |
331 | | |
332 | 0 | if (!local_response_sent_) { |
333 | 0 | for (auto entry = commonEncodePrefix(filter, state); entry != encoder_filters_.end(); entry++) { |
334 | 0 | const FilterStatus status = encoder_filter_action_((*entry)->handler().get()); |
335 | 0 | if (local_response_sent_) { |
336 | 0 | break; |
337 | 0 | } |
338 | | |
339 | 0 | if (status != FilterStatus::Continue) { |
340 | 0 | return status; |
341 | 0 | } |
342 | 0 | } |
343 | 0 | } |
344 | | |
345 | 0 | encoder_filter_action_ = nullptr; |
346 | |
|
347 | 0 | return FilterStatus::Continue; |
348 | 0 | } |
349 | | |
350 | 0 | void ActiveMessage::sendLocalReply(const DubboFilters::DirectResponse& response, bool end_stream) { |
351 | 0 | ASSERT(metadata_); |
352 | 0 | parent_.sendLocalReply(*metadata_, response, end_stream); |
353 | |
|
354 | 0 | if (end_stream) { |
355 | 0 | return; |
356 | 0 | } |
357 | | |
358 | 0 | local_response_sent_ = true; |
359 | 0 | } |
360 | | |
361 | 0 | void ActiveMessage::startUpstreamResponse() { |
362 | 0 | ENVOY_LOG(debug, "dubbo response: start upstream"); |
363 | |
|
364 | 0 | ASSERT(response_decoder_ == nullptr); |
365 | | |
366 | 0 | auto protocol = |
367 | 0 | NamedProtocolConfigFactory::getFactory(protocolType()).createProtocol(serializationType()); |
368 | | |
369 | | // Create a response message decoder. |
370 | 0 | response_decoder_ = std::make_unique<ActiveResponseDecoder>( |
371 | 0 | *this, parent_.stats(), parent_.connection(), std::move(protocol)); |
372 | 0 | } |
373 | | |
374 | 0 | DubboFilters::UpstreamResponseStatus ActiveMessage::upstreamData(Buffer::Instance& buffer) { |
375 | 0 | ASSERT(response_decoder_ != nullptr); |
376 | | |
377 | 0 | TRY_NEEDS_AUDIT { |
378 | 0 | auto status = response_decoder_->onData(buffer); |
379 | 0 | if (status == DubboFilters::UpstreamResponseStatus::Complete) { |
380 | 0 | if (requestId() != response_decoder_->requestId()) { |
381 | 0 | throw EnvoyException(fmt::format("dubbo response: request ID is not equal, {}:{}", |
382 | 0 | requestId(), response_decoder_->requestId())); |
383 | 0 | } |
384 | | |
385 | | // Completed upstream response. |
386 | 0 | parent_.deferredMessage(*this); |
387 | 0 | } else if (status == DubboFilters::UpstreamResponseStatus::Retry) { |
388 | 0 | response_decoder_.reset(); |
389 | 0 | } |
390 | | |
391 | 0 | return status; |
392 | 0 | } |
393 | 0 | END_TRY catch (const DownstreamConnectionCloseException& ex) { |
394 | 0 | ENVOY_CONN_LOG(error, "dubbo response: exception ({})", parent_.connection(), ex.what()); |
395 | 0 | onReset(); |
396 | 0 | parent_.stats().response_error_caused_connection_close_.inc(); |
397 | 0 | return DubboFilters::UpstreamResponseStatus::Reset; |
398 | 0 | } |
399 | 0 | catch (const EnvoyException& ex) { |
400 | 0 | ENVOY_CONN_LOG(error, "dubbo response: exception ({})", parent_.connection(), ex.what()); |
401 | 0 | parent_.stats().response_decoding_error_.inc(); |
402 | |
|
403 | 0 | onError(ex.what()); |
404 | 0 | return DubboFilters::UpstreamResponseStatus::Reset; |
405 | 0 | } |
406 | 0 | } |
407 | | |
408 | 0 | void ActiveMessage::resetDownstreamConnection() { |
409 | 0 | parent_.connection().close(Network::ConnectionCloseType::NoFlush); |
410 | 0 | } |
411 | | |
412 | 0 | void ActiveMessage::resetStream() { parent_.deferredMessage(*this); } |
413 | | |
414 | 0 | uint64_t ActiveMessage::requestId() const { |
415 | 0 | return metadata_ != nullptr ? metadata_->requestId() : 0; |
416 | 0 | } |
417 | | |
418 | 0 | uint64_t ActiveMessage::streamId() const { return stream_id_; } |
419 | | |
420 | 0 | SerializationType ActiveMessage::serializationType() const { |
421 | 0 | return parent_.downstreamSerializationType(); |
422 | 0 | } |
423 | | |
424 | 0 | ProtocolType ActiveMessage::protocolType() const { return parent_.downstreamProtocolType(); } |
425 | | |
426 | 0 | StreamInfo::StreamInfo& ActiveMessage::streamInfo() { return stream_info_; } |
427 | | |
428 | 0 | Event::Dispatcher& ActiveMessage::dispatcher() { return parent_.connection().dispatcher(); } |
429 | | |
430 | 0 | const Network::Connection* ActiveMessage::connection() const { return &parent_.connection(); } |
431 | | |
432 | 0 | void ActiveMessage::addDecoderFilter(DubboFilters::DecoderFilterSharedPtr filter) { |
433 | 0 | addDecoderFilterWorker(filter, false); |
434 | 0 | } |
435 | | |
436 | 0 | void ActiveMessage::addEncoderFilter(DubboFilters::EncoderFilterSharedPtr filter) { |
437 | 0 | addEncoderFilterWorker(filter, false); |
438 | 0 | } |
439 | | |
440 | 0 | void ActiveMessage::addFilter(DubboFilters::CodecFilterSharedPtr filter) { |
441 | 0 | addDecoderFilterWorker(filter, true); |
442 | 0 | addEncoderFilterWorker(filter, true); |
443 | 0 | } |
444 | | |
445 | | void ActiveMessage::addDecoderFilterWorker(DubboFilters::DecoderFilterSharedPtr filter, |
446 | 0 | bool dual_filter) { |
447 | 0 | ActiveMessageDecoderFilterPtr wrapper = |
448 | 0 | std::make_unique<ActiveMessageDecoderFilter>(*this, filter, dual_filter); |
449 | 0 | filter->setDecoderFilterCallbacks(*wrapper); |
450 | 0 | LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_); |
451 | 0 | } |
452 | | void ActiveMessage::addEncoderFilterWorker(DubboFilters::EncoderFilterSharedPtr filter, |
453 | 0 | bool dual_filter) { |
454 | 0 | ActiveMessageEncoderFilterPtr wrapper = |
455 | 0 | std::make_unique<ActiveMessageEncoderFilter>(*this, filter, dual_filter); |
456 | 0 | filter->setEncoderFilterCallbacks(*wrapper); |
457 | 0 | LinkedList::moveIntoListBack(std::move(wrapper), encoder_filters_); |
458 | 0 | } |
459 | | |
460 | 0 | void ActiveMessage::onReset() { parent_.deferredMessage(*this); } |
461 | | |
462 | 0 | void ActiveMessage::onError(const std::string& what) { |
463 | 0 | if (!metadata_) { |
464 | | // It's possible that an error occurred before the decoder generated metadata, |
465 | | // and a metadata object needs to be created in order to generate a local reply. |
466 | 0 | metadata_ = std::make_shared<MessageMetadata>(); |
467 | 0 | } |
468 | |
|
469 | 0 | ASSERT(metadata_); |
470 | 0 | ENVOY_LOG(error, "Bad response: {}", what); |
471 | 0 | sendLocalReply(AppException(ResponseStatus::BadResponse, what), false); |
472 | 0 | parent_.deferredMessage(*this); |
473 | 0 | } |
474 | | |
475 | | } // namespace DubboProxy |
476 | | } // namespace NetworkFilters |
477 | | } // namespace Extensions |
478 | | } // namespace Envoy |