1
#include "source/extensions/bootstrap/reverse_tunnel/upstream_socket_interface/upstream_socket_manager.h"
2

            
3
#include <algorithm>
4
#include <string>
5

            
6
#include "source/common/buffer/buffer_impl.h"
7
#include "source/common/common/logger.h"
8
#include "source/common/common/random_generator.h"
9
#include "source/extensions/bootstrap/reverse_tunnel/common/reverse_connection_utility.h"
10
#include "source/extensions/bootstrap/reverse_tunnel/upstream_socket_interface/reverse_tunnel_acceptor_extension.h"
11

            
12
#include "absl/strings/string_view.h"
13

            
14
namespace Envoy {
15
namespace Extensions {
16
namespace Bootstrap {
17
namespace ReverseConnection {
18

            
19
constexpr absl::string_view kMainThreadDispatcherName = "main_thread";
20

            
21
std::vector<UpstreamSocketManager*> UpstreamSocketManager::socket_managers_{};
22
absl::Mutex UpstreamSocketManager::socket_manager_lock{};
23

            
24
// UpstreamSocketManager implementation
25
UpstreamSocketManager::UpstreamSocketManager(Event::Dispatcher& dispatcher,
26
                                             ReverseTunnelAcceptorExtension* extension)
27
206
    : dispatcher_(dispatcher), random_generator_(std::make_unique<Random::RandomGeneratorImpl>()),
28
206
      extension_(extension) {
29
206
  ENVOY_LOG(debug, "reverse_tunnel: creating socket manager with stats integration.");
30

            
31
  // Only worker threads should handle data plane connections; skip the main thread.
32
206
  const std::string& dispatcher_name = dispatcher_.name();
33
206
  if (dispatcher_name != kMainThreadDispatcherName) {
34
173
    absl::WriterMutexLock lock(UpstreamSocketManager::socket_manager_lock);
35
173
    UpstreamSocketManager::socket_managers_.push_back(this);
36
173
    ENVOY_LOG(debug, "reverse_tunnel: registered socket manager for dispatcher: {}",
37
173
              dispatcher_name);
38
173
  } else {
39
33
    ENVOY_LOG(debug, "reverse_tunnel: skipping socket manager registration for main thread");
40
33
  }
41
206
}
42

            
43
UpstreamSocketManager&
44
UpstreamSocketManager::pickLeastLoadedSocketManager(const std::string& node_id,
45
44
                                                    const std::string& cluster_id) {
46
44
  absl::WriterMutexLock wlock(UpstreamSocketManager::socket_manager_lock);
47

            
48
  // Assume that this worker is the best candidate for sending the reverse.
49
  // connection socket.
50
44
  UpstreamSocketManager* target_socket_manager = this;
51
44
  const std::string source_worker = this->dispatcher_.name();
52

            
53
  // Contains the value that we assume to be the minimum value so far.
54
44
  int min_node_count = target_socket_manager->node_to_conn_count_map_[node_id];
55

            
56
  // Iterate over UpstreamSocketManager instances of all threads to check.
57
  // if any of them have a lower number of accepted reverse tunnels for.
58
  // the node 'node_id'.
59
64
  for (UpstreamSocketManager* socket_manager : socket_managers_) {
60
64
    int node_count = socket_manager->node_to_conn_count_map_[node_id];
61

            
62
64
    if (node_count < min_node_count) {
63
2
      target_socket_manager = socket_manager;
64
2
      min_node_count = node_count;
65
2
    }
66
64
  }
67

            
68
44
  const std::string dest_worker = target_socket_manager->dispatcher_.name();
69

            
70
  // Increment the reverse connection count of the chosen handler.
71
44
  if (source_worker != dest_worker) {
72
2
    ENVOY_LOG(info,
73
2
              "reverse_tunnel: Rebalancing socket from worker {} to worker {} with min "
74
2
              "count {} for node {} cluster {}",
75
2
              source_worker, dest_worker, target_socket_manager->node_to_conn_count_map_[node_id],
76
2
              node_id, cluster_id);
77
2
  }
78
44
  target_socket_manager->node_to_conn_count_map_[node_id]++;
79
44
  ENVOY_LOG(debug, "reverse_tunnel: Incremented count for node {}: {}", node_id,
80
44
            target_socket_manager->node_to_conn_count_map_[node_id]);
81
44
  return *target_socket_manager;
82
44
}
83

            
84
void UpstreamSocketManager::handoffSocketToWorker(const std::string& node_id,
85
                                                  const std::string& cluster_id,
86
                                                  Network::ConnectionSocketPtr socket,
87
2
                                                  const std::chrono::seconds& ping_interval) {
88
2
  dispatcher_.post(
89
2
      [this, node_id, cluster_id, ping_interval, socket = std::move(socket)]() mutable -> void {
90
2
        this->addConnectionSocket(node_id, cluster_id, std::move(socket), ping_interval,
91
2
                                  true /* rebalanced */);
92
2
      });
93
2
}
94

            
95
void UpstreamSocketManager::addConnectionSocket(const std::string& node_id,
96
                                                const std::string& cluster_id,
97
                                                Network::ConnectionSocketPtr socket,
98
                                                const std::chrono::seconds& ping_interval,
99
216
                                                bool rebalanced) {
100
  // If not already rebalanced, check if we should move this socket to a different worker thread.
101
216
  if (!rebalanced) {
102
44
    UpstreamSocketManager& target_manager = pickLeastLoadedSocketManager(node_id, cluster_id);
103
44
    if (&target_manager != this) {
104
2
      ENVOY_LOG(debug,
105
2
                "reverse_tunnel: Rebalancing socket to a different worker thread for node: "
106
2
                "{} cluster: {}",
107
2
                node_id, cluster_id);
108
2
      target_manager.handoffSocketToWorker(node_id, cluster_id, std::move(socket), ping_interval);
109
2
      return;
110
2
    }
111
44
  }
112

            
113
214
  ENVOY_LOG(debug, "reverse_tunnel: adding connection for node: {}, cluster: {}.", node_id,
114
214
            cluster_id);
115

            
116
  // Both node_id and cluster_id are mandatory for consistent state management and stats tracking.
117
214
  if (node_id.empty() || cluster_id.empty()) {
118
2
    ENVOY_LOG(error,
119
2
              "reverse_tunnel: node_id or cluster_id cannot be empty. node: '{}', cluster: '{}'.",
120
2
              node_id, cluster_id);
121
2
    return;
122
2
  }
123

            
124
212
  const int fd = socket->ioHandle().fdDoNotUse();
125
212
  const std::string& connectionKey = socket->connectionInfoProvider().localAddress()->asString();
126

            
127
212
  ENVOY_LOG(debug, "reverse_tunnel: adding socket with FD: {} for node: {}, cluster: {}.", fd,
128
212
            node_id, cluster_id);
129

            
130
  // Store node -> cluster mapping.
131
212
  ENVOY_LOG(trace, "reverse_tunnel: adding mapping node {} -> cluster {}.", node_id, cluster_id);
132
212
  if (node_to_cluster_map_.find(node_id) == node_to_cluster_map_.end()) {
133
194
    node_to_cluster_map_[node_id] = cluster_id;
134
194
    cluster_to_node_info_map_[cluster_id].nodes.push_back(node_id);
135
194
  }
136

            
137
212
  fd_to_node_map_[fd] = node_id;
138
212
  fd_to_cluster_map_[fd] = cluster_id;
139
212
  node_to_active_fd_count_[node_id]++;
140

            
141
  // Create per-connection timeout timer for ping responses.
142
212
  fd_to_timer_map_[fd] = dispatcher_.createTimer([this, fd]() { onPingTimeout(fd); });
143

            
144
212
  accepted_reverse_connections_[node_id].push_back(std::move(socket));
145
212
  fd_to_socket_it_map_[fd] = std::prev(accepted_reverse_connections_[node_id].end());
146
212
  Network::ConnectionSocketPtr& socket_ref = accepted_reverse_connections_[node_id].back();
147

            
148
  // Update stats registry.
149
212
  if (auto extension = getUpstreamExtension()) {
150
212
    extension->updateConnectionStats(node_id, cluster_id, true /* increment */,
151
212
                                     tenant_isolation_enabled_);
152
212
    ENVOY_LOG(debug, "reverse_tunnel: updated stats registry for node '{}' cluster '{}'.", node_id,
153
212
              cluster_id);
154
212
  }
155

            
156
  // onPingResponse() expects a ping reply on the socket.
157
212
  fd_to_event_map_[fd] = dispatcher_.createFileEvent(
158
212
      fd,
159
212
      [this, &socket_ref](uint32_t events) {
160
20
        ASSERT(events == Event::FileReadyType::Read);
161
20
        onPingResponse(socket_ref->ioHandle());
162
20
        return absl::OkStatus();
163
20
      },
164
212
      Event::FileTriggerType::Edge, Event::FileReadyType::Read);
165

            
166
  // Store ping_interval_ if not yet set.
167
212
  if (ping_interval_ == std::chrono::seconds::zero()) {
168
80
    ping_interval_ = ping_interval;
169
80
  }
170

            
171
  // Create per-connection send timer with jitter (matching HTTP/2 keepalive pattern).
172
212
  fd_to_ping_send_timer_map_[fd] =
173
212
      dispatcher_.createTimer([this, fd]() { sendPingForConnection(fd); });
174
212
  fd_to_ping_send_timer_map_[fd]->enableTimer(
175
212
      std::chrono::milliseconds(pingIntervalWithJitterMs()));
176

            
177
212
  ENVOY_LOG(debug, "reverse_tunnel: added socket to maps. node: {} connection key: {} fd: {}.",
178
212
            node_id, connectionKey, fd);
179
212
}
180

            
181
Network::ConnectionSocketPtr
182
35
UpstreamSocketManager::getConnectionSocket(const std::string& node_id) {
183

            
184
35
  ENVOY_LOG(debug, "reverse_tunnel: getConnectionSocket() called with node_id: {}.", node_id);
185

            
186
35
  if (node_to_cluster_map_.find(node_id) == node_to_cluster_map_.end()) {
187
6
    ENVOY_LOG(error, "reverse_tunnel: cluster to node mapping changed for node: {}.", node_id);
188
6
    return nullptr;
189
6
  }
190

            
191
29
  const std::string& cluster_id = node_to_cluster_map_[node_id];
192

            
193
29
  ENVOY_LOG(debug, "reverse_tunnel: looking for socket. node: {} cluster: {}.", node_id,
194
29
            cluster_id);
195

            
196
  // Find first available socket for the node.
197
29
  auto node_sockets_it = accepted_reverse_connections_.find(node_id);
198
29
  if (node_sockets_it == accepted_reverse_connections_.end() || node_sockets_it->second.empty()) {
199
3
    ENVOY_LOG(debug, "reverse_tunnel: no available sockets for node: {}.", node_id);
200
3
    return nullptr;
201
3
  }
202

            
203
  // Debugging: Print the number of free sockets on this worker thread.
204
26
  ENVOY_LOG(trace, "reverse_tunnel: found {} sockets for node: {}.", node_sockets_it->second.size(),
205
26
            node_id);
206

            
207
  // Fetch the socket from the accepted_reverse_connections_ and remove it from the list.
208
26
  Network::ConnectionSocketPtr socket(std::move(node_sockets_it->second.front()));
209
26
  node_sockets_it->second.pop_front();
210

            
211
26
  const int fd = socket->ioHandle().fdDoNotUse();
212
26
  const std::string& remoteConnectionKey =
213
26
      socket->connectionInfoProvider().remoteAddress()->asString();
214

            
215
26
  ENVOY_LOG(debug,
216
26
            "reverse_tunnel: reverse connection socket found. fd: {} connection key: {} "
217
26
            "node: {} cluster: {}.",
218
26
            fd, remoteConnectionKey, node_id, cluster_id);
219

            
220
26
  fd_to_event_map_.erase(fd);
221
26
  fd_to_timer_map_.erase(fd);
222
26
  fd_to_ping_send_timer_map_.erase(fd);
223
26
  fd_to_socket_it_map_.erase(fd);
224

            
225
26
  return socket;
226
29
}
227

            
228
187
std::string UpstreamSocketManager::getNodeWithSocket(const std::string& key) {
229
187
  ENVOY_LOG(trace, "reverse_tunnel: getNodeWithSocket() called with key: {}.", key);
230

            
231
  // Check if key exists as a cluster ID by looking at cluster_to_node_info_map_.
232
187
  auto cluster_it = cluster_to_node_info_map_.find(key);
233
187
  if (cluster_it != cluster_to_node_info_map_.end() && !cluster_it->second.nodes.empty()) {
234
    // Key is a cluster ID, use round-robin to select a node.
235
13
    auto& cluster_info = cluster_it->second;
236
13
    const auto& nodes = cluster_info.nodes;
237

            
238
    // Select node at current index and advance for next call.
239
13
    const std::string& selected_node = nodes[cluster_info.round_robin_index % nodes.size()];
240
13
    cluster_info.round_robin_index = (cluster_info.round_robin_index + 1) % nodes.size();
241

            
242
13
    ENVOY_LOG(debug, "reverse_tunnel: key '{}' is a cluster ID; returning node {} via round-robin.",
243
13
              key, selected_node);
244
13
    return selected_node;
245
13
  }
246

            
247
  // Key not found in cluster map, treat it as a node ID and return it directly.
248
174
  ENVOY_LOG(trace, "reverse_tunnel: key '{}' treated as node ID; returning as-is.", key);
249
174
  return key;
250
187
}
251

            
252
108
bool UpstreamSocketManager::hasAnySocketsForNode(const std::string& node_id) {
253
108
  auto it = node_to_active_fd_count_.find(node_id);
254
108
  return it != node_to_active_fd_count_.end() && it->second > 0;
255
108
}
256

            
257
112
void UpstreamSocketManager::markSocketDead(const int fd) {
258
112
  ENVOY_LOG(trace, "reverse_tunnel: markSocketDead called for fd {}.", fd);
259

            
260
112
  auto node_it = fd_to_node_map_.find(fd);
261
112
  if (node_it == fd_to_node_map_.end()) {
262
4
    ENVOY_LOG(warn, "reverse_tunnel: fd {} not found in fd_to_node_map_.", fd);
263
4
    return;
264
4
  }
265
108
  const std::string node_id = node_it->second;
266

            
267
  // Get cluster_id from fd_to_cluster_map_. We use the fd_to_cluster_map_ to get the cluster_id
268
  // and not the cluster_to_node_info_map_ because the node might have changed clusters before the
269
  // socket is marked dead, but the FD will always be tied to the same cluster in
270
  // fd_to_cluster_map_.
271
108
  std::string cluster_id;
272
108
  auto cluster_it = fd_to_cluster_map_.find(fd);
273
108
  if (cluster_it == fd_to_cluster_map_.end()) {
274
1
    ENVOY_LOG(warn, "reverse_tunnel: fd {} not found in fd_to_cluster_map_.", fd);
275
    // Try to get cluster_id from node_to_cluster_map_ as fallback.
276
1
    auto node_cluster_it = node_to_cluster_map_.find(node_id);
277
1
    if (node_cluster_it != node_to_cluster_map_.end()) {
278
1
      cluster_id = node_cluster_it->second;
279
1
    }
280
107
  } else {
281
107
    cluster_id = cluster_it->second;
282
107
  }
283
108
  ENVOY_LOG(debug, "reverse_tunnel: found node '{}' cluster '{}' for fd: {}", node_id, cluster_id,
284
108
            fd);
285

            
286
  // Remove FD from tracking maps before checking remaining sockets.
287
108
  fd_to_node_map_.erase(fd);
288
108
  fd_to_cluster_map_.erase(fd);
289

            
290
  // Decrement the active FD counter for the node.
291
108
  auto count_it = node_to_active_fd_count_.find(node_id);
292
108
  if (count_it != node_to_active_fd_count_.end()) {
293
107
    ASSERT(count_it->second > 0);
294
107
    if (--count_it->second == 0) {
295
89
      node_to_active_fd_count_.erase(count_it);
296
89
    }
297
107
  }
298

            
299
  // Determine if this is an idle or used socket via O(1) iterator lookup.
300
108
  auto socket_it = fd_to_socket_it_map_.find(fd);
301
108
  if (socket_it != fd_to_socket_it_map_.end()) {
302
    // Found in idle pool — erase from list and clean up timers/events.
303
81
    ENVOY_LOG(debug, "reverse_tunnel: marking idle socket dead. node: {} cluster: {} fd: {}.",
304
81
              node_id, cluster_id, fd);
305
81
    ::shutdown(fd, SHUT_RDWR);
306
81
    accepted_reverse_connections_[node_id].erase(socket_it->second);
307
81
    fd_to_socket_it_map_.erase(socket_it);
308

            
309
81
    fd_to_event_map_.erase(fd);
310
81
    fd_to_timer_map_.erase(fd);
311
81
    fd_to_ping_send_timer_map_.erase(fd);
312
82
  } else {
313
    // FD not found in idle pool, this is a used socket.
314
    // The socket will be closed by the owning UpstreamReverseConnectionIOHandle.
315
27
    ENVOY_LOG(debug, "reverse_tunnel: marking used socket dead. node: {} cluster: {} fd: {}.",
316
27
              node_id, cluster_id, fd);
317
27
  }
318

            
319
  // Update Envoy's stats system.
320
108
  if (auto extension = getUpstreamExtension()) {
321
108
    extension->updateConnectionStats(node_id, cluster_id, false /* decrement */,
322
108
                                     tenant_isolation_enabled_);
323
    // Report the disconnection to the extension for further action.
324
108
    extension->reportDisconnection(node_id, cluster_id);
325

            
326
108
    ENVOY_LOG(trace, "reverse_tunnel: decremented stats registry for node '{}' cluster '{}'.",
327
108
              node_id, cluster_id);
328
108
  }
329

            
330
  // Only clean up node-to-cluster mappings if this node has no remaining sockets (idle or used).
331
108
  if (!hasAnySocketsForNode(node_id)) {
332
90
    ENVOY_LOG(debug,
333
90
              "reverse_tunnel: node '{}' has no remaining sockets, cleaning up cluster mappings.",
334
90
              node_id);
335
90
    cleanStaleNodeEntry(node_id);
336
90
  } else {
337
18
    ENVOY_LOG(trace, "reverse_tunnel: node '{}' still has remaining sockets, keeping in maps.",
338
18
              node_id);
339
18
  }
340
108
}
341

            
342
91
void UpstreamSocketManager::cleanStaleNodeEntry(const std::string& node_id) {
343
  // Clean the given node ID if there are no active sockets.
344
91
  if (accepted_reverse_connections_.find(node_id) != accepted_reverse_connections_.end() &&
345
91
      accepted_reverse_connections_[node_id].size() > 0) {
346
1
    ENVOY_LOG(trace, "reverse_tunnel: found {} active sockets for node {}.",
347
1
              accepted_reverse_connections_[node_id].size(), node_id);
348
1
    return;
349
1
  }
350
90
  ENVOY_LOG(debug, "reverse_tunnel: cleaning stale node entry for node {}.", node_id);
351

            
352
  // Check if given node-id is present in node_to_cluster_map_. If present,
353
  // fetch the corresponding cluster-id and remove the node from the cluster's node list.
354
90
  const auto& node_itr = node_to_cluster_map_.find(node_id);
355
90
  if (node_itr != node_to_cluster_map_.end()) {
356
90
    const auto& cluster_itr = cluster_to_node_info_map_.find(node_itr->second);
357
90
    if (cluster_itr != cluster_to_node_info_map_.end()) {
358
90
      auto& nodes = cluster_itr->second.nodes;
359
90
      const auto& node_entry_itr = find(nodes.begin(), nodes.end(), node_id);
360

            
361
90
      if (node_entry_itr != nodes.end()) {
362
90
        ENVOY_LOG(trace, "reverse_tunnel: removing stale node {} from cluster {}.", node_id,
363
90
                  cluster_itr->first);
364
90
        nodes.erase(node_entry_itr);
365

            
366
        // If the cluster has no more nodes, remove the entire cluster entry.
367
90
        if (nodes.empty()) {
368
84
          ENVOY_LOG(trace, "reverse_tunnel: removing empty cluster {}.", cluster_itr->first);
369
84
          cluster_to_node_info_map_.erase(cluster_itr);
370
84
        }
371
90
      }
372
90
    }
373
90
    node_to_cluster_map_.erase(node_itr);
374
90
  }
375

            
376
  // Remove empty node entry from accepted_reverse_connections_.
377
90
  accepted_reverse_connections_.erase(node_id);
378
90
}
379

            
380
25
void UpstreamSocketManager::onPingResponse(Network::IoHandle& io_handle) {
381
25
  const int fd = io_handle.fdDoNotUse();
382

            
383
25
  Buffer::OwnedImpl buffer;
384
25
  const auto ping_size =
385
25
      ::Envoy::Extensions::Bootstrap::ReverseConnection::ReverseConnectionUtility::PING_MESSAGE
386
25
          .size();
387
25
  Api::IoCallUint64Result result = io_handle.read(buffer, absl::make_optional(ping_size));
388
25
  if (!result.ok()) {
389
1
    ENVOY_LOG(debug, "reverse_tunnel: Read error on FD: {}: error - {}", fd,
390
1
              result.err_->getErrorDetails());
391
1
    markSocketDead(fd);
392
1
    return;
393
1
  }
394

            
395
  // In this case, there is no read error, but the socket has been closed by the remote.
396
  // peer in a graceful manner, unlike a connection refused, or a reset.
397
24
  if (result.return_value_ == 0) {
398
21
    ENVOY_LOG(debug, "reverse_tunnel: FD: {}: reverse connection closed", fd);
399
21
    markSocketDead(fd);
400
21
    return;
401
21
  }
402

            
403
3
  if (result.return_value_ < ping_size) {
404
    ENVOY_LOG(debug, "reverse_tunnel: FD: {}: no complete ping data yet", fd);
405
    return;
406
  }
407

            
408
3
  const char* data = static_cast<const char*>(buffer.linearize(ping_size));
409
3
  absl::string_view view{data, static_cast<size_t>(ping_size)};
410
3
  if (!::Envoy::Extensions::Bootstrap::ReverseConnection::ReverseConnectionUtility::isPingMessage(
411
3
          view)) {
412
2
    ENVOY_LOG(debug, "reverse_tunnel: response is not RPING. fd: {}.", fd);
413
    // Treat as a miss; do not immediately kill unless threshold crossed.
414
2
    onPingTimeout(fd);
415
2
    return;
416
2
  }
417
1
  ENVOY_LOG(trace, "reverse_tunnel: received ping response. fd: {}.", fd);
418
1
  fd_to_timer_map_[fd]->disableTimer();
419
  // Reset miss counter on success.
420
1
  fd_to_miss_count_.erase(fd);
421

            
422
  // Re-arm the per-connection send timer with jitter.
423
1
  rearmPingSendTimer(fd);
424
1
}
425

            
426
6
void UpstreamSocketManager::sendPingForConnection(int fd) {
427
6
  auto node_it = fd_to_node_map_.find(fd);
428
6
  if (node_it == fd_to_node_map_.end()) {
429
1
    ENVOY_LOG(debug, "reverse_tunnel: sendPingForConnection: fd {} not found in fd_to_node_map_.",
430
1
              fd);
431
1
    return;
432
1
  }
433
5
  const std::string& node_id = node_it->second;
434

            
435
5
  auto socket_it = fd_to_socket_it_map_.find(fd);
436
5
  if (socket_it == fd_to_socket_it_map_.end()) {
437
    ENVOY_LOG(debug, "reverse_tunnel: sendPingForConnection: fd {} not found in idle pool.", fd);
438
    return;
439
  }
440
5
  Network::ConnectionSocket* socket_ptr = socket_it->second->get();
441

            
442
5
  auto buffer = ::Envoy::Extensions::Bootstrap::ReverseConnection::ReverseConnectionUtility::
443
5
      createPingResponse();
444

            
445
5
  auto ping_response_timeout = ping_interval_ / 2;
446
5
  fd_to_timer_map_[fd]->enableTimer(ping_response_timeout);
447

            
448
7
  while (buffer->length() > 0) {
449
5
    Api::IoCallUint64Result result = socket_ptr->ioHandle().write(*buffer);
450
5
    ENVOY_LOG(trace, "reverse_tunnel: node:{} FD:{}: sending ping request. return_value: {}",
451
5
              node_id, fd, result.return_value_);
452
5
    if (result.return_value_ == 0) {
453
5
      ENVOY_LOG(trace, "reverse_tunnel: node:{} FD:{}: sending ping rc {}, error - {}", node_id, fd,
454
5
                result.return_value_, result.err_->getErrorDetails());
455
5
      if (result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) {
456
3
        ENVOY_LOG(error, "reverse_tunnel: node:{} FD:{}: failed to send ping", node_id, fd);
457
3
        markSocketDead(fd);
458
3
        return;
459
3
      }
460
5
    }
461
5
  }
462
5
}
463

            
464
4
void UpstreamSocketManager::onPingTimeout(const int fd) {
465
4
  ENVOY_LOG(debug, "reverse_tunnel: ping timeout or invalid ping. fd: {}.", fd);
466
  // Increment miss count and evaluate threshold.
467
4
  const uint32_t misses = ++fd_to_miss_count_[fd];
468
4
  ENVOY_LOG(trace, "reverse_tunnel: miss count {}. fd: {}.", misses, fd);
469
4
  if (misses >= miss_threshold_) {
470
2
    ENVOY_LOG(debug, "reverse_tunnel: fd {} exceeded miss threshold {}; marking dead.", fd,
471
2
              miss_threshold_);
472
2
    fd_to_miss_count_.erase(fd);
473
2
    markSocketDead(fd);
474
3
  } else {
475
    // Below threshold: re-arm send timer for the next ping cycle.
476
2
    rearmPingSendTimer(fd);
477
2
  }
478
4
}
479

            
480
215
uint64_t UpstreamSocketManager::pingIntervalWithJitterMs() {
481
215
  uint64_t interval_ms = static_cast<uint64_t>(ping_interval_.count()) * 1000;
482
215
  constexpr uint64_t jitter_percent = 15;
483
215
  uint64_t jitter_mod = jitter_percent * interval_ms / 100;
484
215
  if (jitter_mod > 0) {
485
215
    interval_ms += random_generator_->random() % jitter_mod;
486
215
  }
487
215
  return interval_ms;
488
215
}
489

            
490
3
void UpstreamSocketManager::rearmPingSendTimer(int fd) {
491
3
  auto send_it = fd_to_ping_send_timer_map_.find(fd);
492
3
  if (send_it != fd_to_ping_send_timer_map_.end()) {
493
3
    send_it->second->enableTimer(std::chrono::milliseconds(pingIntervalWithJitterMs()));
494
3
  }
495
3
}
496

            
497
206
UpstreamSocketManager::~UpstreamSocketManager() {
498
206
  ENVOY_LOG(debug, "reverse_tunnel: destructor called.");
499

            
500
  // Clean up all active file events and timers first.
501
206
  for (auto& [fd, event] : fd_to_event_map_) {
502
42
    ENVOY_LOG(trace, "reverse_tunnel: cleaning up file event. fd: {}.", fd);
503
42
    event.reset(); // This will cancel the file event.
504
42
  }
505
206
  fd_to_event_map_.clear();
506

            
507
206
  for (auto& [fd, timer] : fd_to_timer_map_) {
508
42
    ENVOY_LOG(trace, "reverse_tunnel: cleaning up timeout timer. fd: {}.", fd);
509
42
    timer.reset();
510
42
  }
511
206
  fd_to_timer_map_.clear();
512

            
513
206
  for (auto& [fd, timer] : fd_to_ping_send_timer_map_) {
514
42
    ENVOY_LOG(trace, "reverse_tunnel: cleaning up send timer. fd: {}.", fd);
515
42
    timer.reset();
516
42
  }
517
206
  fd_to_ping_send_timer_map_.clear();
518

            
519
  // Now mark all sockets as dead.
520
206
  std::vector<int> fds_to_cleanup;
521
206
  for (const auto& [fd, node_id] : fd_to_node_map_) {
522
57
    fds_to_cleanup.push_back(fd);
523
57
  }
524

            
525
206
  for (int fd : fds_to_cleanup) {
526
57
    ENVOY_LOG(trace, "reverse_tunnel: marking socket dead in destructor. fd: {}.", fd);
527
57
    markSocketDead(fd);
528
57
  }
529

            
530
  // Clear any remaining fd mappings.
531
206
  fd_to_node_map_.clear();
532
206
  fd_to_cluster_map_.clear();
533
206
  fd_to_socket_it_map_.clear();
534

            
535
  // Remove this instance from the global socket managers list.
536
206
  absl::WriterMutexLock lock(UpstreamSocketManager::socket_manager_lock);
537
206
  auto it = std::find(socket_managers_.begin(), socket_managers_.end(), this);
538
206
  if (it != socket_managers_.end()) {
539
173
    socket_managers_.erase(it);
540
173
  }
541
206
}
542

            
543
} // namespace ReverseConnection.
544
} // namespace Bootstrap.
545
} // namespace Extensions.
546
} // namespace Envoy.