1
#include "source/extensions/clusters/dynamic_modules/cluster.h"
2

            
3
#include "envoy/config/core/v3/base.pb.h"
4
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
5
#include "envoy/network/connection.h"
6
#include "envoy/network/drain_decision.h"
7
#include "envoy/upstream/locality.h"
8

            
9
#include "source/common/buffer/buffer_impl.h"
10
#include "source/common/network/address_impl.h"
11
#include "source/common/network/utility.h"
12
#include "source/common/protobuf/protobuf.h"
13
#include "source/common/protobuf/utility.h"
14
#include "source/common/upstream/upstream_impl.h"
15
#include "source/extensions/dynamic_modules/dynamic_modules.h"
16

            
17
namespace Envoy {
18
namespace Extensions {
19
namespace Clusters {
20
namespace DynamicModules {
21

            
22
namespace {
23

            
24
/**
25
 * Thread-aware load balancer that creates DynamicModuleLoadBalancer instances per worker.
26
 */
27
struct DynamicModuleThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
28
  DynamicModuleThreadAwareLoadBalancer(DynamicModuleClusterHandleSharedPtr handle)
29
80
      : handle_(std::move(handle)) {}
30

            
31
  struct LoadBalancerFactory : public Upstream::LoadBalancerFactory {
32
14
    LoadBalancerFactory(DynamicModuleClusterHandleSharedPtr handle) : handle_(std::move(handle)) {}
33

            
34
29
    Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override {
35
29
      return std::make_unique<DynamicModuleLoadBalancer>(handle_);
36
29
    }
37

            
38
    DynamicModuleClusterHandleSharedPtr handle_;
39
  };
40

            
41
14
  Upstream::LoadBalancerFactorySharedPtr factory() override {
42
14
    return std::make_shared<LoadBalancerFactory>(handle_);
43
14
  }
44

            
45
14
  absl::Status initialize() override { return absl::OkStatus(); }
46

            
47
  DynamicModuleClusterHandleSharedPtr handle_;
48
};
49

            
50
} // namespace
51

            
52
// =================================================================================================
53
// DynamicModuleClusterConfig
54
// =================================================================================================
55

            
56
absl::StatusOr<std::shared_ptr<DynamicModuleClusterConfig>> DynamicModuleClusterConfig::create(
57
    const std::string& cluster_name, const std::string& cluster_config,
58
83
    Envoy::Extensions::DynamicModules::DynamicModulePtr module, Stats::Scope& stats_scope) {
59
83
  auto config = std::shared_ptr<DynamicModuleClusterConfig>(
60
83
      new DynamicModuleClusterConfig(cluster_name, cluster_config, std::move(module), stats_scope));
61

            
62
  // Resolve all required function pointers from the dynamic module.
63
83
#define RESOLVE_SYMBOL(name, type, member)                                                         \
64
657
  {                                                                                                \
65
657
    auto symbol_or_error = config->dynamic_module_->getFunctionPointer<type>(name);                \
66
657
    if (!symbol_or_error.ok()) {                                                                   \
67
1
      return symbol_or_error.status();                                                             \
68
1
    }                                                                                              \
69
657
    config->member = symbol_or_error.value();                                                      \
70
656
  }
71

            
72
83
  RESOLVE_SYMBOL("envoy_dynamic_module_on_cluster_config_new", OnClusterConfigNewType,
73
82
                 on_cluster_config_new_);
74
82
  RESOLVE_SYMBOL("envoy_dynamic_module_on_cluster_config_destroy", OnClusterConfigDestroyType,
75
82
                 on_cluster_config_destroy_);
76
82
  RESOLVE_SYMBOL("envoy_dynamic_module_on_cluster_new", OnClusterNewType, on_cluster_new_);
77
82
  RESOLVE_SYMBOL("envoy_dynamic_module_on_cluster_init", OnClusterInitType, on_cluster_init_);
78
82
  RESOLVE_SYMBOL("envoy_dynamic_module_on_cluster_destroy", OnClusterDestroyType,
79
82
                 on_cluster_destroy_);
80
82
  RESOLVE_SYMBOL("envoy_dynamic_module_on_cluster_lb_new", OnClusterLbNewType, on_cluster_lb_new_);
81
82
  RESOLVE_SYMBOL("envoy_dynamic_module_on_cluster_lb_destroy", OnClusterLbDestroyType,
82
82
                 on_cluster_lb_destroy_);
83
82
  RESOLVE_SYMBOL("envoy_dynamic_module_on_cluster_lb_choose_host", OnClusterLbChooseHostType,
84
82
                 on_cluster_lb_choose_host_);
85

            
86
82
#undef RESOLVE_SYMBOL
87

            
88
  // Optional hooks. Modules that don't need async host selection or scheduling don't need to
89
  // implement these.
90
82
  auto on_cancel = config->dynamic_module_->getFunctionPointer<OnClusterLbCancelHostSelectionType>(
91
82
      "envoy_dynamic_module_on_cluster_lb_cancel_host_selection");
92
82
  config->on_cluster_lb_cancel_host_selection_ = on_cancel.ok() ? on_cancel.value() : nullptr;
93

            
94
82
  auto on_scheduled = config->dynamic_module_->getFunctionPointer<OnClusterScheduledType>(
95
82
      "envoy_dynamic_module_on_cluster_scheduled");
96
82
  config->on_cluster_scheduled_ = on_scheduled.ok() ? on_scheduled.value() : nullptr;
97

            
98
  // Lifecycle hooks are optional. Modules that don't need them don't need to implement them.
99
82
  auto on_server_initialized =
100
82
      config->dynamic_module_->getFunctionPointer<OnClusterServerInitializedType>(
101
82
          "envoy_dynamic_module_on_cluster_server_initialized");
102
82
  config->on_cluster_server_initialized_ =
103
82
      on_server_initialized.ok() ? on_server_initialized.value() : nullptr;
104

            
105
82
  auto on_drain_started = config->dynamic_module_->getFunctionPointer<OnClusterDrainStartedType>(
106
82
      "envoy_dynamic_module_on_cluster_drain_started");
107
82
  config->on_cluster_drain_started_ = on_drain_started.ok() ? on_drain_started.value() : nullptr;
108

            
109
82
  auto on_shutdown = config->dynamic_module_->getFunctionPointer<OnClusterShutdownType>(
110
82
      "envoy_dynamic_module_on_cluster_shutdown");
111
82
  config->on_cluster_shutdown_ = on_shutdown.ok() ? on_shutdown.value() : nullptr;
112

            
113
82
  auto on_http_callout_done =
114
82
      config->dynamic_module_->getFunctionPointer<OnClusterHttpCalloutDoneType>(
115
82
          "envoy_dynamic_module_on_cluster_http_callout_done");
116
82
  config->on_cluster_http_callout_done_ =
117
82
      on_http_callout_done.ok() ? on_http_callout_done.value() : nullptr;
118

            
119
82
  auto on_lb_membership_update =
120
82
      config->dynamic_module_->getFunctionPointer<OnClusterLbOnHostMembershipUpdateType>(
121
82
          "envoy_dynamic_module_on_cluster_lb_on_host_membership_update");
122
82
  config->on_cluster_lb_on_host_membership_update_ =
123
82
      on_lb_membership_update.ok() ? on_lb_membership_update.value() : nullptr;
124

            
125
  // Call on_cluster_config_new to get the in-module configuration.
126
82
  envoy_dynamic_module_type_envoy_buffer name_buffer = {config->cluster_name_.data(),
127
82
                                                        config->cluster_name_.size()};
128
82
  envoy_dynamic_module_type_envoy_buffer config_buffer = {config->cluster_config_.data(),
129
82
                                                          config->cluster_config_.size()};
130

            
131
82
  config->in_module_config_ =
132
82
      config->on_cluster_config_new_(config.get(), name_buffer, config_buffer);
133
82
  if (config->in_module_config_ == nullptr) {
134
1
    return absl::InvalidArgumentError("Failed to create in-module cluster configuration");
135
1
  }
136

            
137
81
  return config;
138
82
}
139

            
140
DynamicModuleClusterConfig::DynamicModuleClusterConfig(
141
    const std::string& cluster_name, const std::string& cluster_config,
142
    Envoy::Extensions::DynamicModules::DynamicModulePtr module, Stats::Scope& stats_scope)
143
83
    : stats_scope_(stats_scope.createScope("dynamicmodulescustom.")),
144
83
      stat_name_pool_(stats_scope_->symbolTable()), cluster_name_(cluster_name),
145
83
      cluster_config_(cluster_config), dynamic_module_(std::move(module)) {}
146

            
147
83
DynamicModuleClusterConfig::~DynamicModuleClusterConfig() {
148
83
  if (in_module_config_ != nullptr && on_cluster_config_destroy_ != nullptr) {
149
81
    on_cluster_config_destroy_(in_module_config_);
150
81
  }
151
83
}
152

            
153
// =================================================================================================
154
// DynamicModuleClusterHandle
155
// =================================================================================================
156

            
157
97
DynamicModuleClusterHandle::~DynamicModuleClusterHandle() {
158
97
  std::shared_ptr<DynamicModuleCluster> cluster = std::move(cluster_);
159
97
  cluster_.reset();
160
  // Release lifecycle handles eagerly while the lifecycle notifier is still valid. When the
161
  // dispatcher destructor clears pending callbacks, the cluster destructor would otherwise try to
162
  // unregister from already-destroyed lifecycle notifier lists.
163
97
  cluster->server_initialized_handle_.reset();
164
97
  cluster->shutdown_handle_.reset();
165
97
  cluster->drain_handle_.reset();
166
97
  Event::Dispatcher& dispatcher = cluster->dispatcher_;
167
97
  dispatcher.post([cluster = std::move(cluster)]() mutable { cluster.reset(); });
168
97
}
169

            
170
// =================================================================================================
171
// DynamicModuleCluster
172
// =================================================================================================
173

            
174
DynamicModuleCluster::DynamicModuleCluster(const envoy::config::cluster::v3::Cluster& cluster,
175
                                           DynamicModuleClusterConfigSharedPtr config,
176
                                           Upstream::ClusterFactoryContext& context,
177
                                           absl::Status& creation_status)
178
81
    : ClusterImplBase(cluster, context, creation_status), config_(std::move(config)),
179
81
      in_module_cluster_(nullptr),
180
81
      dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
181
81
      server_context_(context.serverFactoryContext()) {
182

            
183
  // Create the in-module cluster instance.
184
81
  in_module_cluster_ = config_->on_cluster_new_(config_->in_module_config_, this);
185
81
  if (in_module_cluster_ == nullptr) {
186
1
    creation_status = absl::InvalidArgumentError("Failed to create in-module cluster instance");
187
1
    return;
188
1
  }
189

            
190
  // Initialize the priority set with an empty host set at priority 0.
191
80
  priority_set_.getOrCreateHostSet(0);
192

            
193
80
  registerLifecycleCallbacks();
194
80
}
195

            
196
81
DynamicModuleCluster::~DynamicModuleCluster() {
197
81
  ASSERT_IS_MAIN_OR_TEST_THREAD();
198
  // Cancel any pending HTTP callouts before destroying the cluster.
199
81
  for (auto& callout : http_callouts_) {
200
    if (callout.second->request_ != nullptr) {
201
      callout.second->request_->cancel();
202
    }
203
  }
204
81
  http_callouts_.clear();
205

            
206
81
  if (in_module_cluster_ != nullptr && config_->on_cluster_destroy_ != nullptr) {
207
78
    config_->on_cluster_destroy_(in_module_cluster_);
208
78
  }
209
81
}
210

            
211
80
void DynamicModuleCluster::registerLifecycleCallbacks() {
212
80
  if (config_->on_cluster_server_initialized_ != nullptr) {
213
80
    server_initialized_handle_ = server_context_.lifecycleNotifier().registerCallback(
214
80
        Server::ServerLifecycleNotifier::Stage::PostInit, [this]() {
215
7
          if (in_module_cluster_ != nullptr) {
216
7
            ENVOY_LOG(debug, "dynamic module cluster server initialized");
217
7
            config_->on_cluster_server_initialized_(this, in_module_cluster_);
218
7
          }
219
7
        });
220
80
  }
221

            
222
80
  if (config_->on_cluster_drain_started_ != nullptr) {
223
80
    drain_handle_ = server_context_.drainManager().addOnDrainCloseCb(
224
80
        Network::DrainDirection::All, [this](std::chrono::milliseconds) -> absl::Status {
225
2
          if (in_module_cluster_ != nullptr) {
226
2
            ENVOY_LOG(debug, "dynamic module cluster drain started");
227
2
            config_->on_cluster_drain_started_(this, in_module_cluster_);
228
2
          }
229
2
          return absl::OkStatus();
230
2
        });
231
80
  }
232

            
233
80
  if (config_->on_cluster_shutdown_ != nullptr) {
234
80
    shutdown_handle_ = server_context_.lifecycleNotifier().registerCallback(
235
80
        Server::ServerLifecycleNotifier::Stage::ShutdownExit, [this](Event::PostCb completion_cb) {
236
8
          if (in_module_cluster_ != nullptr) {
237
8
            ENVOY_LOG(debug, "dynamic module cluster shutdown started");
238
8
            auto* completion = new Event::PostCb(std::move(completion_cb));
239
8
            config_->on_cluster_shutdown_(
240
8
                this, in_module_cluster_,
241
8
                [](void* context) {
242
8
                  auto* cb = static_cast<Event::PostCb*>(context);
243
8
                  (*cb)();
244
8
                  delete cb;
245
8
                },
246
8
                static_cast<void*>(completion));
247
8
          } else {
248
            completion_cb();
249
          }
250
8
        });
251
80
  }
252
80
}
253

            
254
15
void DynamicModuleCluster::startPreInit() {
255
  // Call the module's init function. The module is expected to call
256
  // envoy_dynamic_module_callback_cluster_pre_init_complete when ready.
257
15
  config_->on_cluster_init_(this, in_module_cluster_);
258
15
}
259

            
260
15
void DynamicModuleCluster::preInitComplete() { onPreInitComplete(); }
261

            
262
4
void DynamicModuleCluster::onScheduled(uint64_t event_id) {
263
4
  if (in_module_cluster_ != nullptr && config_->on_cluster_scheduled_ != nullptr) {
264
1
    config_->on_cluster_scheduled_(this, in_module_cluster_, event_id);
265
1
  }
266
4
}
267

            
268
namespace {
269
// Builds hosts-per-locality from a host vector using value-based locality comparison.
270
55
Upstream::HostsPerLocalityConstSharedPtr buildHostsPerLocality(const Upstream::HostVector& hosts) {
271
55
  absl::node_hash_map<envoy::config::core::v3::Locality, Upstream::HostVector,
272
55
                      Upstream::LocalityHash, Upstream::LocalityEqualTo>
273
55
      per_locality_hosts;
274
76
  for (const auto& host : hosts) {
275
76
    per_locality_hosts[host->locality()].push_back(host);
276
76
  }
277
55
  std::vector<Upstream::HostVector> locality_hosts;
278
55
  for (auto& [_, h] : per_locality_hosts) {
279
53
    locality_hosts.push_back(std::move(h));
280
53
  }
281
55
  return std::make_shared<Upstream::HostsPerLocalityImpl>(std::move(locality_hosts), false);
282
55
}
283
} // namespace
284

            
285
bool DynamicModuleCluster::addHosts(
286
    const std::vector<std::string>& addresses, const std::vector<uint32_t>& weights,
287
    const std::vector<std::string>& regions, const std::vector<std::string>& zones,
288
    const std::vector<std::string>& sub_zones,
289
    const std::vector<std::vector<std::tuple<std::string, std::string, std::string>>>& metadata,
290
47
    std::vector<Upstream::HostSharedPtr>& result_hosts, uint32_t priority) {
291
47
  ASSERT(addresses.size() == weights.size());
292
47
  ASSERT(addresses.size() == regions.size());
293
47
  ASSERT(addresses.size() == zones.size());
294
47
  ASSERT(addresses.size() == sub_zones.size());
295
47
  ASSERT(metadata.empty() || metadata.size() == addresses.size());
296
47
  result_hosts.clear();
297
47
  result_hosts.reserve(addresses.size());
298

            
299
47
  auto cluster_info = info();
300

            
301
109
  for (size_t i = 0; i < addresses.size(); ++i) {
302
70
    if (weights[i] == 0 || weights[i] > 128) {
303
4
      ENVOY_LOG(error, "Invalid weight {} for host {}.", weights[i], addresses[i]);
304
4
      return false;
305
4
    }
306

            
307
66
    Network::Address::InstanceConstSharedPtr resolved_address =
308
66
        Network::Utility::parseInternetAddressAndPortNoThrow(addresses[i], false);
309
66
    if (resolved_address == nullptr) {
310
4
      ENVOY_LOG(error, "Invalid address: {}.", addresses[i]);
311
4
      return false;
312
4
    }
313

            
314
62
    auto locality = std::make_shared<envoy::config::core::v3::Locality>();
315
62
    if (!regions[i].empty()) {
316
10
      locality->set_region(regions[i]);
317
10
    }
318
62
    if (!zones[i].empty()) {
319
10
      locality->set_zone(zones[i]);
320
10
    }
321
62
    if (!sub_zones[i].empty()) {
322
2
      locality->set_sub_zone(sub_zones[i]);
323
2
    }
324

            
325
    // Build endpoint metadata if provided.
326
62
    Upstream::MetadataConstSharedPtr endpoint_metadata = nullptr;
327
62
    if (!metadata.empty() && !metadata[i].empty()) {
328
2
      auto md = std::make_shared<envoy::config::core::v3::Metadata>();
329
3
      for (const auto& [filter_name, key, value] : metadata[i]) {
330
3
        auto& fields = (*md->mutable_filter_metadata())[filter_name];
331
3
        (*fields.mutable_fields())[key].set_string_value(value);
332
3
      }
333
2
      endpoint_metadata = std::move(md);
334
2
    }
335

            
336
62
    auto host_result = Upstream::HostImpl::create(
337
62
        cluster_info, cluster_info->name() + addresses[i], std::move(resolved_address),
338
62
        std::move(endpoint_metadata), nullptr, weights[i], std::move(locality),
339
62
        envoy::config::endpoint::v3::Endpoint::HealthCheckConfig().default_instance(), 0,
340
62
        envoy::config::core::v3::UNKNOWN);
341
62
    if (!host_result.ok()) {
342
      ENVOY_LOG(error, "Failed to create host for address: {}.", addresses[i]); // LCOV_EXCL_LINE
343
      return false;                                                             // LCOV_EXCL_LINE
344
    }
345
62
    result_hosts.emplace_back(std::move(host_result.value()));
346
62
  }
347

            
348
39
  {
349
39
    absl::WriterMutexLock lock(&host_map_lock_);
350
60
    for (const auto& host : result_hosts) {
351
60
      host_map_[host.get()] = host;
352
60
    }
353
39
  }
354

            
355
39
  const auto& host_set = priority_set_.getOrCreateHostSet(priority);
356
39
  Upstream::HostVectorSharedPtr all_hosts(new Upstream::HostVector(host_set.hosts()));
357
39
  Upstream::HostVector added_hosts;
358
60
  for (const auto& host : result_hosts) {
359
60
    all_hosts->emplace_back(host);
360
60
    added_hosts.emplace_back(host);
361
60
  }
362

            
363
39
  auto hosts_per_locality = buildHostsPerLocality(*all_hosts);
364

            
365
39
  priority_set_.updateHosts(
366
39
      priority, Upstream::HostSetImpl::partitionHosts(all_hosts, std::move(hosts_per_locality)), {},
367
39
      added_hosts, {}, absl::nullopt, absl::nullopt);
368

            
369
39
  ENVOY_LOG(debug, "Added {} hosts to dynamic module cluster at priority {}.", result_hosts.size(),
370
39
            priority);
371
39
  return true;
372
47
}
373

            
374
bool DynamicModuleCluster::updateHostHealth(Upstream::HostSharedPtr host,
375
12
                                            envoy_dynamic_module_type_host_health health_status) {
376
12
  if (host == nullptr) {
377
3
    return false;
378
3
  }
379

            
380
  // Clear existing EDS health flags and set the new status.
381
9
  host->healthFlagClear(Upstream::Host::HealthFlag::FAILED_EDS_HEALTH);
382
9
  host->healthFlagClear(Upstream::Host::HealthFlag::DEGRADED_EDS_HEALTH);
383

            
384
9
  switch (health_status) {
385
5
  case envoy_dynamic_module_type_host_health_Unhealthy:
386
5
    host->healthFlagSet(Upstream::Host::HealthFlag::FAILED_EDS_HEALTH);
387
5
    break;
388
1
  case envoy_dynamic_module_type_host_health_Degraded:
389
1
    host->healthFlagSet(Upstream::Host::HealthFlag::DEGRADED_EDS_HEALTH);
390
1
    break;
391
3
  case envoy_dynamic_module_type_host_health_Healthy:
392
3
    break;
393
9
  }
394

            
395
  // Find the priority level that contains this host and trigger a priority set update to
396
  // propagate the health change to load balancers.
397
9
  const auto& host_sets = priority_set_.hostSetsPerPriority();
398
11
  for (uint32_t p = 0; p < host_sets.size(); ++p) {
399
11
    const auto& hosts = host_sets[p]->hosts();
400
11
    for (const auto& h : hosts) {
401
11
      if (h.get() == host.get()) {
402
9
        auto all_hosts = std::make_shared<Upstream::HostVector>(hosts);
403
9
        auto hosts_per_locality = buildHostsPerLocality(*all_hosts);
404
9
        priority_set_.updateHosts(
405
9
            p, Upstream::HostSetImpl::partitionHosts(all_hosts, std::move(hosts_per_locality)), {},
406
9
            {}, {}, absl::nullopt, absl::nullopt);
407
9
        ENVOY_LOG(debug, "Updated health status for host to {} at priority {}.",
408
9
                  static_cast<int>(health_status), p);
409
9
        return true;
410
9
      }
411
11
    }
412
11
  }
413

            
414
  ENVOY_LOG(error, "Host not found in any priority level during health update.");
415
  return false;
416
9
}
417

            
418
6
Upstream::HostSharedPtr DynamicModuleCluster::findHostByAddress(const std::string& address) {
419
6
  const auto host_map = prioritySet().crossPriorityHostMap();
420
6
  if (host_map == nullptr) {
421
    return nullptr;
422
  }
423
6
  const auto it = host_map->find(address);
424
6
  if (it == host_map->end()) {
425
3
    return nullptr;
426
3
  }
427
3
  return it->second;
428
6
}
429

            
430
18
Upstream::HostSharedPtr DynamicModuleCluster::findHost(void* raw_host_ptr) {
431
18
  absl::ReaderMutexLock lock(&host_map_lock_);
432
18
  auto it = host_map_.find(raw_host_ptr);
433
18
  if (it == host_map_.end()) {
434
5
    return nullptr;
435
5
  }
436
13
  return it->second;
437
18
}
438

            
439
10
size_t DynamicModuleCluster::removeHosts(const std::vector<Upstream::HostSharedPtr>& hosts) {
440
10
  Upstream::HostVector removed_hosts;
441
10
  removed_hosts.reserve(hosts.size());
442

            
443
  // Remove all valid hosts from the map.
444
10
  {
445
10
    absl::WriterMutexLock lock(&host_map_lock_);
446
14
    for (const auto& host : hosts) {
447
14
      if (host == nullptr) {
448
3
        continue;
449
3
      }
450
11
      auto it = host_map_.find(host.get());
451
11
      if (it != host_map_.end()) {
452
10
        removed_hosts.emplace_back(host);
453
10
        host_map_.erase(it);
454
10
      }
455
11
    }
456
10
  }
457

            
458
10
  if (removed_hosts.empty()) {
459
3
    return 0;
460
3
  }
461

            
462
  // Build the remaining host list and update the priority set once.
463
7
  ASSERT(priority_set_.hostSetsPerPriority().size() >= 1);
464
7
  const auto& first_host_set = priority_set_.getOrCreateHostSet(0);
465

            
466
  // Build a set of removed host pointers for O(1) lookup.
467
7
  absl::flat_hash_set<Upstream::Host*> removed_set;
468
7
  removed_set.reserve(removed_hosts.size());
469
10
  for (const auto& h : removed_hosts) {
470
10
    removed_set.insert(h.get());
471
10
  }
472

            
473
7
  Upstream::HostVectorSharedPtr remaining_hosts(new Upstream::HostVector());
474
12
  for (const auto& h : first_host_set.hosts()) {
475
12
    if (removed_set.find(h.get()) == removed_set.end()) {
476
2
      remaining_hosts->emplace_back(h);
477
2
    }
478
12
  }
479

            
480
7
  auto hosts_per_locality = buildHostsPerLocality(*remaining_hosts);
481

            
482
7
  priority_set_.updateHosts(
483
7
      0, Upstream::HostSetImpl::partitionHosts(remaining_hosts, std::move(hosts_per_locality)), {},
484
7
      {}, removed_hosts, absl::nullopt, absl::nullopt);
485

            
486
7
  ENVOY_LOG(debug, "Removed {} hosts from dynamic module cluster.", removed_hosts.size());
487
7
  return removed_hosts.size();
488
10
}
489

            
490
envoy_dynamic_module_type_http_callout_init_result
491
DynamicModuleCluster::sendHttpCallout(uint64_t* callout_id_out, absl::string_view cluster_name,
492
                                      Http::RequestMessagePtr&& message,
493
8
                                      uint64_t timeout_milliseconds) {
494
8
  if (config_->on_cluster_http_callout_done_ == nullptr) {
495
    ENVOY_LOG(debug, "dynamic module cluster: HTTP callout requested but "
496
                     "on_cluster_http_callout_done is not implemented.");
497
    return envoy_dynamic_module_type_http_callout_init_result_CannotCreateRequest;
498
  }
499

            
500
8
  Upstream::ThreadLocalCluster* cluster =
501
8
      server_context_.clusterManager().getThreadLocalCluster(cluster_name);
502
8
  if (!cluster) {
503
1
    return envoy_dynamic_module_type_http_callout_init_result_ClusterNotFound;
504
1
  }
505
7
  Http::AsyncClient::RequestOptions options;
506
7
  options.setTimeout(std::chrono::milliseconds(timeout_milliseconds));
507

            
508
7
  const uint64_t callout_id = getNextCalloutId();
509
7
  auto http_callout_callback =
510
7
      std::make_unique<DynamicModuleCluster::HttpCalloutCallback>(shared_from_this(), callout_id);
511
7
  DynamicModuleCluster::HttpCalloutCallback& callback = *http_callout_callback;
512

            
513
7
  auto request = cluster->httpAsyncClient().send(std::move(message), callback, options);
514
7
  if (!request) {
515
1
    return envoy_dynamic_module_type_http_callout_init_result_CannotCreateRequest;
516
1
  }
517

            
518
6
  callback.request_ = request;
519
6
  http_callouts_.emplace(callout_id, std::move(http_callout_callback));
520
6
  *callout_id_out = callout_id;
521

            
522
6
  return envoy_dynamic_module_type_http_callout_init_result_Success;
523
7
}
524

            
525
void DynamicModuleCluster::HttpCalloutCallback::onSuccess(const Http::AsyncClient::Request&,
526
3
                                                          Http::ResponseMessagePtr&& response) {
527
  // Move the cluster and callout id to local scope since on_cluster_http_callout_done_ might
528
  // result in operations that affect this callback's lifetime.
529
3
  std::shared_ptr<DynamicModuleCluster> cluster = std::move(cluster_);
530
3
  uint64_t callout_id = callout_id_;
531

            
532
3
  if (!cluster->in_module_cluster_) {
533
1
    cluster->http_callouts_.erase(callout_id);
534
1
    return;
535
1
  }
536

            
537
2
  absl::InlinedVector<envoy_dynamic_module_type_envoy_http_header, 16> headers_vector;
538
2
  headers_vector.reserve(response->headers().size());
539
2
  response->headers().iterate([&headers_vector](
540
2
                                  const Http::HeaderEntry& header) -> Http::HeaderMap::Iterate {
541
2
    headers_vector.emplace_back(envoy_dynamic_module_type_envoy_http_header{
542
2
        const_cast<char*>(header.key().getStringView().data()), header.key().getStringView().size(),
543
2
        const_cast<char*>(header.value().getStringView().data()),
544
2
        header.value().getStringView().size()});
545
2
    return Http::HeaderMap::Iterate::Continue;
546
2
  });
547

            
548
2
  Envoy::Buffer::RawSliceVector body = response->body().getRawSlices(std::nullopt);
549
2
  cluster->config_->on_cluster_http_callout_done_(
550
2
      cluster.get(), cluster->in_module_cluster_, callout_id,
551
2
      envoy_dynamic_module_type_http_callout_result_Success, headers_vector.data(),
552
2
      headers_vector.size(), reinterpret_cast<envoy_dynamic_module_type_envoy_buffer*>(body.data()),
553
2
      body.size());
554
2
  cluster->http_callouts_.erase(callout_id);
555
2
}
556

            
557
void DynamicModuleCluster::HttpCalloutCallback::onFailure(const Http::AsyncClient::Request&,
558
3
                                                          Http::AsyncClient::FailureReason reason) {
559
  // Move the cluster and callout id to local scope since on_cluster_http_callout_done_ might
560
  // result in operations that affect this callback's lifetime.
561
3
  std::shared_ptr<DynamicModuleCluster> cluster = std::move(cluster_);
562
3
  const uint64_t callout_id = callout_id_;
563

            
564
3
  if (!cluster->in_module_cluster_) {
565
1
    cluster->http_callouts_.erase(callout_id);
566
1
    return;
567
1
  }
568

            
569
  // request_ is not null if the callout is actually sent to the upstream cluster.
570
  // This allows us to avoid inlined calls to onFailure() method (which results in a reentrant to
571
  // the modules) when the async client immediately fails the callout.
572
2
  if (request_) {
573
2
    envoy_dynamic_module_type_http_callout_result result;
574
2
    switch (reason) {
575
1
    case Http::AsyncClient::FailureReason::Reset:
576
1
      result = envoy_dynamic_module_type_http_callout_result_Reset;
577
1
      break;
578
1
    case Http::AsyncClient::FailureReason::ExceedResponseBufferLimit:
579
1
      result = envoy_dynamic_module_type_http_callout_result_ExceedResponseBufferLimit;
580
1
      break;
581
2
    }
582
2
    cluster->config_->on_cluster_http_callout_done_(cluster.get(), cluster->in_module_cluster_,
583
2
                                                    callout_id, result, nullptr, 0, nullptr, 0);
584
2
  }
585

            
586
2
  cluster->http_callouts_.erase(callout_id);
587
2
}
588

            
589
// =================================================================================================
590
// DynamicModuleLoadBalancer
591
// =================================================================================================
592

            
593
DynamicModuleLoadBalancer::DynamicModuleLoadBalancer(
594
    const DynamicModuleClusterHandleSharedPtr& handle)
595
45
    : handle_(handle), in_module_lb_(nullptr) {
596
45
  in_module_lb_ =
597
45
      handle_->cluster_->config()->on_cluster_lb_new_(handle_->cluster_->inModuleCluster(), this);
598

            
599
  // Register for host membership updates if the module implements the hook.
600
45
  if (handle_->cluster_->config()->on_cluster_lb_on_host_membership_update_ != nullptr) {
601
45
    member_update_cb_ = handle_->cluster_->prioritySet().addMemberUpdateCb(
602
45
        [this](const Upstream::HostVector& hosts_added, const Upstream::HostVector& hosts_removed) {
603
11
          hosts_added_ = &hosts_added;
604
11
          hosts_removed_ = &hosts_removed;
605
11
          handle_->cluster_->config()->on_cluster_lb_on_host_membership_update_(
606
11
              this, in_module_lb_, hosts_added.size(), hosts_removed.size());
607
11
          hosts_added_ = nullptr;
608
11
          hosts_removed_ = nullptr;
609
11
        });
610
45
  }
611
45
}
612

            
613
45
DynamicModuleLoadBalancer::~DynamicModuleLoadBalancer() {
614
45
  if (in_module_lb_ != nullptr && handle_->cluster_->config()->on_cluster_lb_destroy_ != nullptr) {
615
45
    handle_->cluster_->config()->on_cluster_lb_destroy_(in_module_lb_);
616
45
  }
617
45
}
618

            
619
Upstream::HostSelectionResponse
620
8
DynamicModuleLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
621
8
  if (in_module_lb_ == nullptr) {
622
    return {nullptr};
623
  }
624

            
625
  // Pre-capture the worker dispatcher and prepare the cancellation flag before calling into the
626
  // module. The module's choose_host may spawn a background thread that calls
627
  // async_host_selection_complete, which reads these fields. Setting them beforehand establishes
628
  // a happens-before relationship via the thread::spawn synchronization in the module.
629
8
  const auto* connection = context != nullptr ? context->downstreamConnection() : nullptr;
630
8
  active_async_dispatcher_ = connection != nullptr ? &connection->dispatcher() : nullptr;
631
8
  active_async_cancelled_ = std::make_shared<std::atomic<bool>>(false);
632

            
633
8
  envoy_dynamic_module_type_cluster_host_envoy_ptr host_ptr = nullptr;
634
8
  envoy_dynamic_module_type_cluster_lb_async_handle_module_ptr async_handle = nullptr;
635
8
  handle_->cluster_->config()->on_cluster_lb_choose_host_(in_module_lb_, context, &host_ptr,
636
8
                                                          &async_handle);
637

            
638
8
  if (async_handle != nullptr) {
639
    // Async pending: the module will call the completion callback later.
640
1
    auto cancelable = std::make_unique<DynamicModuleAsyncHostSelectionHandle>(
641
1
        async_handle, in_module_lb_,
642
1
        handle_->cluster_->config()->on_cluster_lb_cancel_host_selection_, active_async_cancelled_);
643
1
    return Upstream::HostSelectionResponse{nullptr, std::move(cancelable)};
644
1
  }
645

            
646
  // Synchronous result or no host. Clear the async state.
647
7
  active_async_dispatcher_ = nullptr;
648
7
  active_async_cancelled_ = nullptr;
649

            
650
7
  if (host_ptr == nullptr) {
651
1
    return {nullptr};
652
1
  }
653

            
654
  // Look up the host shared pointer from the raw pointer.
655
6
  auto host = handle_->cluster_->findHost(host_ptr);
656
6
  return {host};
657
7
}
658

            
659
3
DynamicModuleAsyncHostSelectionHandle::~DynamicModuleAsyncHostSelectionHandle() {
660
  // Free the module-side async handle. The cancel function takes ownership of the handle and
661
  // drops it, so this works for both cancellation and normal completion paths.
662
3
  if (async_handle_ != nullptr && cancel_fn_ != nullptr) {
663
1
    cancel_fn_(in_module_lb_, async_handle_);
664
1
    async_handle_ = nullptr;
665
1
  }
666
3
}
667

            
668
2
void DynamicModuleAsyncHostSelectionHandle::cancel() {
669
2
  cancelled_->store(true, std::memory_order_release);
670
2
}
671

            
672
132
const Upstream::PrioritySet& DynamicModuleLoadBalancer::prioritySet() const {
673
132
  return handle_->cluster_->prioritySet();
674
132
}
675

            
676
3
bool DynamicModuleLoadBalancer::setHostData(uint32_t priority, size_t index, uintptr_t data) {
677
3
  const auto& host_sets = prioritySet().hostSetsPerPriority();
678
3
  if (priority >= host_sets.size()) {
679
    return false;
680
  }
681
3
  const auto& hosts = host_sets[priority]->hosts();
682
3
  if (index >= hosts.size()) {
683
1
    return false;
684
1
  }
685
2
  if (data == 0) {
686
1
    per_host_data_.erase({priority, index});
687
1
  } else {
688
1
    per_host_data_[{priority, index}] = data;
689
1
  }
690
2
  return true;
691
3
}
692

            
693
bool DynamicModuleLoadBalancer::getHostData(uint32_t priority, size_t index,
694
4
                                            uintptr_t* data) const {
695
4
  const auto& host_sets = prioritySet().hostSetsPerPriority();
696
4
  if (priority >= host_sets.size()) {
697
    return false;
698
  }
699
4
  const auto& hosts = host_sets[priority]->hosts();
700
4
  if (index >= hosts.size()) {
701
1
    return false;
702
1
  }
703
3
  auto it = per_host_data_.find({priority, index});
704
3
  if (it != per_host_data_.end()) {
705
1
    *data = it->second;
706
2
  } else {
707
2
    *data = 0;
708
2
  }
709
3
  return true;
710
4
}
711

            
712
// =================================================================================================
713
// DynamicModuleClusterFactory
714
// =================================================================================================
715

            
716
absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
717
DynamicModuleClusterFactory::createClusterWithConfig(
718
    const envoy::config::cluster::v3::Cluster& cluster,
719
    const envoy::extensions::clusters::dynamic_modules::v3::ClusterConfig& proto_config,
720
85
    Upstream::ClusterFactoryContext& context) {
721

            
722
  // Validate that CLUSTER_PROVIDED LB policy is used.
723
85
  if (cluster.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
724
1
    return absl::InvalidArgumentError(
725
1
        fmt::format("cluster: LB policy {} is not valid for cluster type "
726
1
                    "'envoy.clusters.dynamic_modules'. Only 'CLUSTER_PROVIDED' is allowed.",
727
1
                    envoy::config::cluster::v3::Cluster::LbPolicy_Name(cluster.lb_policy())));
728
1
  }
729

            
730
  // Extract cluster_config from the Any field.
731
84
  std::string cluster_config_bytes;
732
84
  if (proto_config.has_cluster_config()) {
733
9
    auto config_or_error = MessageUtil::knownAnyToBytes(proto_config.cluster_config());
734
9
    RETURN_IF_NOT_OK_REF(config_or_error.status());
735
9
    cluster_config_bytes = std::move(config_or_error.value());
736
9
  }
737

            
738
  // Load the dynamic module.
739
84
  const auto& module_config = proto_config.dynamic_module_config();
740
84
  auto module_or_error = Envoy::Extensions::DynamicModules::newDynamicModuleByName(
741
84
      module_config.name(), module_config.do_not_close(), module_config.load_globally());
742
84
  if (!module_or_error.ok()) {
743
1
    return absl::InvalidArgumentError(fmt::format("Failed to load dynamic module '{}': {}",
744
1
                                                  module_config.name(),
745
1
                                                  module_or_error.status().message()));
746
1
  }
747

            
748
  // Create the cluster configuration.
749
83
  auto config_or_error = DynamicModuleClusterConfig::create(
750
83
      proto_config.cluster_name(), cluster_config_bytes, std::move(module_or_error.value()),
751
83
      context.serverFactoryContext().serverScope());
752
83
  if (!config_or_error.ok()) {
753
2
    return config_or_error.status();
754
2
  }
755

            
756
  // Create the cluster.
757
81
  absl::Status creation_status = absl::OkStatus();
758
81
  auto new_cluster = std::shared_ptr<DynamicModuleCluster>(new DynamicModuleCluster(
759
81
      cluster, std::move(config_or_error.value()), context, creation_status));
760
81
  RETURN_IF_NOT_OK(creation_status);
761

            
762
  // Create the thread-aware load balancer.
763
80
  auto handle = std::make_shared<DynamicModuleClusterHandle>(new_cluster);
764
80
  auto lb = std::make_unique<DynamicModuleThreadAwareLoadBalancer>(handle);
765

            
766
80
  return std::make_pair(std::move(new_cluster), std::move(lb));
767
81
}
768

            
769
REGISTER_FACTORY(DynamicModuleClusterFactory, Upstream::ClusterFactory);
770

            
771
} // namespace DynamicModules
772
} // namespace Clusters
773
} // namespace Extensions
774
} // namespace Envoy