1
#pragma once
2

            
3
#include <atomic>
4
#include <memory>
5
#include <string>
6
#include <vector>
7

            
8
#include "envoy/common/callback.h"
9
#include "envoy/common/optref.h"
10
#include "envoy/config/cluster/v3/cluster.pb.h"
11
#include "envoy/extensions/clusters/dynamic_modules/v3/cluster.pb.h"
12
#include "envoy/extensions/clusters/dynamic_modules/v3/cluster.pb.validate.h"
13
#include "envoy/http/async_client.h"
14
#include "envoy/server/lifecycle_notifier.h"
15
#include "envoy/stats/scope.h"
16
#include "envoy/stats/stats.h"
17
#include "envoy/upstream/upstream.h"
18

            
19
#include "source/common/common/logger.h"
20
#include "source/common/http/message_impl.h"
21
#include "source/common/stats/utility.h"
22
#include "source/common/upstream/cluster_factory_impl.h"
23
#include "source/common/upstream/upstream_impl.h"
24
#include "source/extensions/dynamic_modules/abi/abi.h"
25
#include "source/extensions/dynamic_modules/dynamic_modules.h"
26

            
27
#include "absl/container/flat_hash_map.h"
28
#include "absl/container/flat_hash_set.h"
29

            
30
namespace Envoy {
31
namespace Extensions {
32
namespace Clusters {
33
namespace DynamicModules {
34

            
35
class DynamicModuleCluster;
36
class DynamicModuleClusterScheduler;
37
class DynamicModuleClusterTestPeer;
38

            
39
// Function pointer types for the cluster ABI event hooks.
40
using OnClusterConfigNewType = decltype(&envoy_dynamic_module_on_cluster_config_new);
41
using OnClusterConfigDestroyType = decltype(&envoy_dynamic_module_on_cluster_config_destroy);
42
using OnClusterNewType = decltype(&envoy_dynamic_module_on_cluster_new);
43
using OnClusterInitType = decltype(&envoy_dynamic_module_on_cluster_init);
44
using OnClusterDestroyType = decltype(&envoy_dynamic_module_on_cluster_destroy);
45
using OnClusterLbNewType = decltype(&envoy_dynamic_module_on_cluster_lb_new);
46
using OnClusterLbDestroyType = decltype(&envoy_dynamic_module_on_cluster_lb_destroy);
47
using OnClusterLbChooseHostType = decltype(&envoy_dynamic_module_on_cluster_lb_choose_host);
48
using OnClusterLbCancelHostSelectionType =
49
    decltype(&envoy_dynamic_module_on_cluster_lb_cancel_host_selection);
50
using OnClusterScheduledType = decltype(&envoy_dynamic_module_on_cluster_scheduled);
51
using OnClusterServerInitializedType =
52
    decltype(&envoy_dynamic_module_on_cluster_server_initialized);
53
using OnClusterDrainStartedType = decltype(&envoy_dynamic_module_on_cluster_drain_started);
54
using OnClusterShutdownType = decltype(&envoy_dynamic_module_on_cluster_shutdown);
55
using OnClusterHttpCalloutDoneType = decltype(&envoy_dynamic_module_on_cluster_http_callout_done);
56
using OnClusterLbOnHostMembershipUpdateType =
57
    decltype(&envoy_dynamic_module_on_cluster_lb_on_host_membership_update);
58

            
59
/**
60
 * Configuration for a dynamic module cluster. This holds the loaded dynamic module, resolved
61
 * function pointers, the in-module configuration, and metrics storage.
62
 */
63
class DynamicModuleClusterConfig {
64
public:
65
  /**
66
   * Creates a new DynamicModuleClusterConfig.
67
   *
68
   * @param cluster_name the name identifying the cluster implementation in the module.
69
   * @param cluster_config the configuration bytes to pass to the module.
70
   * @param module the loaded dynamic module.
71
   * @param stats_scope the stats scope for creating custom metrics.
72
   * @return a shared pointer to the config, or an error status.
73
   */
74
  static absl::StatusOr<std::shared_ptr<DynamicModuleClusterConfig>>
75
  create(const std::string& cluster_name, const std::string& cluster_config,
76
         Envoy::Extensions::DynamicModules::DynamicModulePtr module, Stats::Scope& stats_scope);
77

            
78
  ~DynamicModuleClusterConfig();
79

            
80
  // Function pointers resolved from the dynamic module.
81
  OnClusterConfigNewType on_cluster_config_new_ = nullptr;
82
  OnClusterConfigDestroyType on_cluster_config_destroy_ = nullptr;
83
  OnClusterNewType on_cluster_new_ = nullptr;
84
  OnClusterInitType on_cluster_init_ = nullptr;
85
  OnClusterDestroyType on_cluster_destroy_ = nullptr;
86
  OnClusterLbNewType on_cluster_lb_new_ = nullptr;
87
  OnClusterLbDestroyType on_cluster_lb_destroy_ = nullptr;
88
  OnClusterLbChooseHostType on_cluster_lb_choose_host_ = nullptr;
89
  OnClusterLbCancelHostSelectionType on_cluster_lb_cancel_host_selection_ = nullptr;
90
  OnClusterScheduledType on_cluster_scheduled_ = nullptr;
91
  OnClusterServerInitializedType on_cluster_server_initialized_ = nullptr;
92
  OnClusterDrainStartedType on_cluster_drain_started_ = nullptr;
93
  OnClusterShutdownType on_cluster_shutdown_ = nullptr;
94
  OnClusterHttpCalloutDoneType on_cluster_http_callout_done_ = nullptr;
95
  OnClusterLbOnHostMembershipUpdateType on_cluster_lb_on_host_membership_update_ = nullptr;
96

            
97
  // The in-module configuration pointer.
98
  envoy_dynamic_module_type_cluster_config_module_ptr in_module_config_ = nullptr;
99

            
100
  // ----------------------------- Metrics Support -----------------------------
101

            
102
  class ModuleCounterHandle {
103
  public:
104
3
    ModuleCounterHandle(Stats::Counter& counter) : counter_(counter) {}
105
5
    void add(uint64_t value) const { counter_.add(value); }
106

            
107
  private:
108
    Stats::Counter& counter_;
109
  };
110

            
111
  class ModuleCounterVecHandle {
112
  public:
113
    ModuleCounterVecHandle(Stats::StatName name, Stats::StatNameVec label_names)
114
2
        : name_(name), label_names_(label_names) {}
115
3
    const Stats::StatNameVec& getLabelNames() const { return label_names_; }
116
1
    void add(Stats::Scope& scope, Stats::StatNameTagVectorOptConstRef tags, uint64_t amount) const {
117
1
      ASSERT(tags.has_value());
118
1
      Stats::Utility::counterFromElements(scope, {name_}, tags).add(amount);
119
1
    }
120

            
121
  private:
122
    Stats::StatName name_;
123
    Stats::StatNameVec label_names_;
124
  };
125

            
126
  class ModuleGaugeHandle {
127
  public:
128
1
    ModuleGaugeHandle(Stats::Gauge& gauge) : gauge_(gauge) {}
129
1
    void add(uint64_t value) const { gauge_.add(value); }
130
1
    void sub(uint64_t value) const { gauge_.sub(value); }
131
1
    void set(uint64_t value) const { gauge_.set(value); }
132

            
133
  private:
134
    Stats::Gauge& gauge_;
135
  };
136

            
137
  class ModuleGaugeVecHandle {
138
  public:
139
    ModuleGaugeVecHandle(Stats::StatName name, Stats::StatNameVec label_names,
140
                         Stats::Gauge::ImportMode import_mode)
141
3
        : name_(name), label_names_(label_names), import_mode_(import_mode) {}
142
9
    const Stats::StatNameVec& getLabelNames() const { return label_names_; }
143
1
    void add(Stats::Scope& scope, Stats::StatNameTagVectorOptConstRef tags, uint64_t amount) const {
144
1
      ASSERT(tags.has_value());
145
1
      Stats::Utility::gaugeFromElements(scope, {name_}, import_mode_, tags).add(amount);
146
1
    }
147
1
    void sub(Stats::Scope& scope, Stats::StatNameTagVectorOptConstRef tags, uint64_t amount) const {
148
1
      ASSERT(tags.has_value());
149
1
      Stats::Utility::gaugeFromElements(scope, {name_}, import_mode_, tags).sub(amount);
150
1
    }
151
1
    void set(Stats::Scope& scope, Stats::StatNameTagVectorOptConstRef tags, uint64_t amount) const {
152
1
      ASSERT(tags.has_value());
153
1
      Stats::Utility::gaugeFromElements(scope, {name_}, import_mode_, tags).set(amount);
154
1
    }
155

            
156
  private:
157
    Stats::StatName name_;
158
    Stats::StatNameVec label_names_;
159
    Stats::Gauge::ImportMode import_mode_;
160
  };
161

            
162
  class ModuleHistogramHandle {
163
  public:
164
1
    ModuleHistogramHandle(Stats::Histogram& histogram) : histogram_(histogram) {}
165
1
    void recordValue(uint64_t value) const { histogram_.recordValue(value); }
166

            
167
  private:
168
    Stats::Histogram& histogram_;
169
  };
170

            
171
  class ModuleHistogramVecHandle {
172
  public:
173
    ModuleHistogramVecHandle(Stats::StatName name, Stats::StatNameVec label_names,
174
                             Stats::Histogram::Unit unit)
175
3
        : name_(name), label_names_(label_names), unit_(unit) {}
176
3
    const Stats::StatNameVec& getLabelNames() const { return label_names_; }
177
    void recordValue(Stats::Scope& scope, Stats::StatNameTagVectorOptConstRef tags,
178
1
                     uint64_t value) const {
179
1
      ASSERT(tags.has_value());
180
1
      Stats::Utility::histogramFromElements(scope, {name_}, unit_, tags).recordValue(value);
181
1
    }
182

            
183
  private:
184
    Stats::StatName name_;
185
    Stats::StatNameVec label_names_;
186
    Stats::Histogram::Unit unit_;
187
  };
188

            
189
// We use 1-based IDs for the metrics in the ABI, so we need to convert them to 0-based indices
190
// for our internal storage. These helper functions do that conversion.
191
26
#define ID_TO_INDEX(id) ((id) - 1)
192

            
193
3
  size_t addCounter(ModuleCounterHandle&& counter) {
194
3
    counters_.push_back(std::move(counter));
195
3
    return counters_.size();
196
3
  }
197
2
  size_t addCounterVec(ModuleCounterVecHandle&& counter) {
198
2
    counter_vecs_.push_back(std::move(counter));
199
2
    return counter_vecs_.size();
200
2
  }
201

            
202
1
  size_t addGauge(ModuleGaugeHandle&& gauge) {
203
1
    gauges_.push_back(std::move(gauge));
204
1
    return gauges_.size();
205
1
  }
206
3
  size_t addGaugeVec(ModuleGaugeVecHandle&& gauge) {
207
3
    gauge_vecs_.push_back(std::move(gauge));
208
3
    return gauge_vecs_.size();
209
3
  }
210

            
211
1
  size_t addHistogram(ModuleHistogramHandle&& histogram) {
212
1
    histograms_.push_back(std::move(histogram));
213
1
    return histograms_.size();
214
1
  }
215
3
  size_t addHistogramVec(ModuleHistogramVecHandle&& histogram) {
216
3
    histogram_vecs_.push_back(std::move(histogram));
217
3
    return histogram_vecs_.size();
218
3
  }
219

            
220
8
  OptRef<const ModuleCounterHandle> getCounterById(size_t id) const {
221
8
    if (id == 0 || id > counters_.size()) {
222
3
      return {};
223
3
    }
224
5
    return counters_[ID_TO_INDEX(id)];
225
8
  }
226
6
  OptRef<const ModuleCounterVecHandle> getCounterVecById(size_t id) const {
227
6
    if (id == 0 || id > counter_vecs_.size()) {
228
2
      return {};
229
2
    }
230
4
    return counter_vecs_[ID_TO_INDEX(id)];
231
6
  }
232

            
233
9
  OptRef<const ModuleGaugeHandle> getGaugeById(size_t id) const {
234
9
    if (id == 0 || id > gauges_.size()) {
235
6
      return {};
236
6
    }
237
3
    return gauges_[ID_TO_INDEX(id)];
238
9
  }
239
15
  OptRef<const ModuleGaugeVecHandle> getGaugeVecById(size_t id) const {
240
15
    if (id == 0 || id > gauge_vecs_.size()) {
241
6
      return {};
242
6
    }
243
9
    return gauge_vecs_[ID_TO_INDEX(id)];
244
15
  }
245

            
246
4
  OptRef<const ModuleHistogramHandle> getHistogramById(size_t id) const {
247
4
    if (id == 0 || id > histograms_.size()) {
248
3
      return {};
249
3
    }
250
1
    return histograms_[ID_TO_INDEX(id)];
251
4
  }
252
6
  OptRef<const ModuleHistogramVecHandle> getHistogramVecById(size_t id) const {
253
6
    if (id == 0 || id > histogram_vecs_.size()) {
254
2
      return {};
255
2
    }
256
4
    return histogram_vecs_[ID_TO_INDEX(id)];
257
6
  }
258

            
259
#undef ID_TO_INDEX
260

            
261
  const Stats::ScopeSharedPtr stats_scope_;
262
  Stats::StatNamePool stat_name_pool_;
263

            
264
private:
265
  DynamicModuleClusterConfig(const std::string& cluster_name, const std::string& cluster_config,
266
                             Envoy::Extensions::DynamicModules::DynamicModulePtr module,
267
                             Stats::Scope& stats_scope);
268

            
269
  const std::string cluster_name_;
270
  const std::string cluster_config_;
271
  Envoy::Extensions::DynamicModules::DynamicModulePtr dynamic_module_;
272

            
273
  std::vector<ModuleCounterHandle> counters_;
274
  std::vector<ModuleCounterVecHandle> counter_vecs_;
275
  std::vector<ModuleGaugeHandle> gauges_;
276
  std::vector<ModuleGaugeVecHandle> gauge_vecs_;
277
  std::vector<ModuleHistogramHandle> histograms_;
278
  std::vector<ModuleHistogramVecHandle> histogram_vecs_;
279
};
280

            
281
using DynamicModuleClusterConfigSharedPtr = std::shared_ptr<DynamicModuleClusterConfig>;
282

            
283
/**
284
 * Handle object to ensure that the destructor of DynamicModuleCluster is called on the main
285
 * thread.
286
 */
287
class DynamicModuleClusterHandle {
288
public:
289
  DynamicModuleClusterHandle(std::shared_ptr<DynamicModuleCluster> cluster)
290
97
      : cluster_(std::move(cluster)) {}
291
  ~DynamicModuleClusterHandle();
292

            
293
  // Access the cluster for host lookup during async completion.
294
3
  DynamicModuleCluster* cluster() const { return cluster_.get(); }
295

            
296
private:
297
  std::shared_ptr<DynamicModuleCluster> cluster_;
298
  friend class DynamicModuleCluster;
299
  friend class DynamicModuleLoadBalancer;
300
};
301

            
302
using DynamicModuleClusterHandleSharedPtr = std::shared_ptr<DynamicModuleClusterHandle>;
303

            
304
/**
305
 * The DynamicModuleCluster delegates host discovery and load balancing to a dynamic module.
306
 * The module manages hosts via add/remove callbacks and provides its own load balancer.
307
 */
308
class DynamicModuleCluster : public Upstream::ClusterImplBase,
309
                             public std::enable_shared_from_this<DynamicModuleCluster> {
310
public:
311
  ~DynamicModuleCluster() override;
312

            
313
  // Upstream::Cluster
314
12
  Upstream::Cluster::InitializePhase initializePhase() const override {
315
12
    return Upstream::Cluster::InitializePhase::Primary;
316
12
  }
317

            
318
  // Methods called by the dynamic module via ABI callbacks.
319
  bool addHosts(
320
      const std::vector<std::string>& addresses, const std::vector<uint32_t>& weights,
321
      const std::vector<std::string>& regions, const std::vector<std::string>& zones,
322
      const std::vector<std::string>& sub_zones,
323
      const std::vector<std::vector<std::tuple<std::string, std::string, std::string>>>& metadata,
324
      std::vector<Upstream::HostSharedPtr>& result_hosts, uint32_t priority = 0);
325
  size_t removeHosts(const std::vector<Upstream::HostSharedPtr>& hosts);
326
  bool updateHostHealth(Upstream::HostSharedPtr host,
327
                        envoy_dynamic_module_type_host_health health_status);
328
  Upstream::HostSharedPtr findHost(void* raw_host_ptr);
329
  Upstream::HostSharedPtr findHostByAddress(const std::string& address);
330
  void preInitComplete();
331

            
332
  /**
333
   * Called when an event is scheduled via DynamicModuleClusterScheduler::commit.
334
   */
335
  void onScheduled(uint64_t event_id);
336

            
337
  /**
338
   * Sends an HTTP callout to the specified cluster with the given message.
339
   * This must be called on the main thread.
340
   *
341
   * @param callout_id_out is a pointer to a variable where the callout ID will be stored.
342
   * @param cluster_name is the name of the cluster to which the callout is sent.
343
   * @param message is the HTTP request message to send.
344
   * @param timeout_milliseconds is the timeout for the callout in milliseconds.
345
   * @return the result of the callout initialization.
346
   */
347
  envoy_dynamic_module_type_http_callout_init_result
348
  sendHttpCallout(uint64_t* callout_id_out, absl::string_view cluster_name,
349
                  Http::RequestMessagePtr&& message, uint64_t timeout_milliseconds);
350

            
351
  // Accessors.
352
210
  const DynamicModuleClusterConfigSharedPtr& config() const { return config_; }
353
45
  envoy_dynamic_module_type_cluster_module_ptr inModuleCluster() const {
354
45
    return in_module_cluster_;
355
45
  }
356

            
357
protected:
358
  DynamicModuleCluster(const envoy::config::cluster::v3::Cluster& cluster,
359
                       DynamicModuleClusterConfigSharedPtr config,
360
                       Upstream::ClusterFactoryContext& context, absl::Status& creation_status);
361

            
362
  // Upstream::ClusterImplBase.
363
  void startPreInit() override;
364

            
365
private:
366
  /**
367
   * Registers server lifecycle callbacks (server_initialized, drain, shutdown).
368
   */
369
  void registerLifecycleCallbacks();
370

            
371
  friend class DynamicModuleClusterFactory;
372
  friend class DynamicModuleClusterScheduler;
373
  friend class DynamicModuleClusterTestPeer;
374
  friend class DynamicModuleClusterHandle;
375
  friend class DynamicModuleLoadBalancer;
376

            
377
  /**
378
   * This implementation of the AsyncClient::Callbacks handles the response from the HTTP callout.
379
   */
380
  class HttpCalloutCallback : public Http::AsyncClient::Callbacks {
381
  public:
382
    HttpCalloutCallback(std::shared_ptr<DynamicModuleCluster> cluster, uint64_t id)
383
7
        : cluster_(std::move(cluster)), callout_id_(id) {}
384
7
    ~HttpCalloutCallback() override = default;
385

            
386
    void onSuccess(const Http::AsyncClient::Request& request,
387
                   Http::ResponseMessagePtr&& response) override;
388
    void onFailure(const Http::AsyncClient::Request& request,
389
                   Http::AsyncClient::FailureReason reason) override;
390
    void onBeforeFinalizeUpstreamSpan(Envoy::Tracing::Span&,
391
                                      const Http::ResponseHeaderMap*) override {};
392

            
393
    Http::AsyncClient::Request* request_ = nullptr;
394

            
395
  private:
396
    const std::shared_ptr<DynamicModuleCluster> cluster_;
397
    const uint64_t callout_id_{};
398
  };
399

            
400
7
  uint64_t getNextCalloutId() { return next_callout_id_++; }
401

            
402
  DynamicModuleClusterConfigSharedPtr config_;
403
  envoy_dynamic_module_type_cluster_module_ptr in_module_cluster_;
404
  Event::Dispatcher& dispatcher_;
405
  Server::Configuration::ServerFactoryContext& server_context_;
406

            
407
  // Map from raw host pointer to shared pointer for lookup in chooseHost.
408
  absl::Mutex host_map_lock_;
409
  absl::flat_hash_map<void*, Upstream::HostSharedPtr> host_map_ ABSL_GUARDED_BY(host_map_lock_);
410

            
411
  // Handle for the drain close callback registration. Dropped on destruction to unregister.
412
  Envoy::Common::CallbackHandlePtr drain_handle_;
413

            
414
  // Handle for the shutdown lifecycle callback registration.
415
  Server::ServerLifecycleNotifier::HandlePtr shutdown_handle_;
416

            
417
  // Handle for the server initialized lifecycle callback registration.
418
  Server::ServerLifecycleNotifier::HandlePtr server_initialized_handle_;
419

            
420
  // HTTP callout tracking.
421
  uint64_t next_callout_id_ = 1; // 0 is reserved as an invalid id.
422
  absl::flat_hash_map<uint64_t, std::unique_ptr<HttpCalloutCallback>> http_callouts_;
423
};
424

            
425
/**
426
 * This class is used to schedule a cluster event hook from a different thread than the main thread.
427
 * This is created via envoy_dynamic_module_callback_cluster_scheduler_new and deleted via
428
 * envoy_dynamic_module_callback_cluster_scheduler_delete.
429
 */
430
class DynamicModuleClusterScheduler {
431
public:
432
  /**
433
   * Creates a new scheduler for the given cluster.
434
   */
435
5
  static DynamicModuleClusterScheduler* create(DynamicModuleCluster* cluster) {
436
5
    return new DynamicModuleClusterScheduler(cluster->weak_from_this(), cluster->dispatcher_);
437
5
  }
438

            
439
4
  void commit(uint64_t event_id) {
440
4
    dispatcher_.post([cluster = cluster_, event_id]() {
441
4
      if (std::shared_ptr<DynamicModuleCluster> cluster_shared = cluster.lock()) {
442
3
        cluster_shared->onScheduled(event_id);
443
3
      }
444
4
    });
445
4
  }
446

            
447
private:
448
  DynamicModuleClusterScheduler(std::weak_ptr<DynamicModuleCluster> cluster,
449
                                Event::Dispatcher& dispatcher)
450
5
      : cluster_(std::move(cluster)), dispatcher_(dispatcher) {}
451

            
452
  // Using a weak pointer to avoid unnecessarily extending the lifetime of the cluster.
453
  std::weak_ptr<DynamicModuleCluster> cluster_;
454
  Event::Dispatcher& dispatcher_;
455
};
456

            
457
/**
458
 * Async host selection handle that bridges the dynamic module's async host selection to Envoy's
459
 * LoadBalancerContext::onAsyncHostSelection. This is created when the module returns an async
460
 * pending result from choose_host, and destroyed after the module delivers the result or the
461
 * selection is canceled.
462
 */
463
class DynamicModuleAsyncHostSelectionHandle : public Upstream::AsyncHostSelectionHandle {
464
public:
465
  DynamicModuleAsyncHostSelectionHandle(
466
      envoy_dynamic_module_type_cluster_lb_async_handle_module_ptr async_handle,
467
      envoy_dynamic_module_type_cluster_lb_module_ptr in_module_lb,
468
      OnClusterLbCancelHostSelectionType cancel_fn, std::shared_ptr<std::atomic<bool>> cancelled)
469
3
      : async_handle_(async_handle), in_module_lb_(in_module_lb), cancel_fn_(cancel_fn),
470
3
        cancelled_(std::move(cancelled)) {}
471

            
472
  ~DynamicModuleAsyncHostSelectionHandle() override;
473

            
474
  void cancel() override;
475

            
476
private:
477
  envoy_dynamic_module_type_cluster_lb_async_handle_module_ptr async_handle_;
478
  envoy_dynamic_module_type_cluster_lb_module_ptr in_module_lb_;
479
  OnClusterLbCancelHostSelectionType cancel_fn_;
480
  std::shared_ptr<std::atomic<bool>> cancelled_;
481
};
482

            
483
/**
484
 * Load balancer that delegates to the dynamic module.
485
 */
486
class DynamicModuleLoadBalancer : public Upstream::LoadBalancer {
487
public:
488
  DynamicModuleLoadBalancer(const DynamicModuleClusterHandleSharedPtr& handle);
489
  ~DynamicModuleLoadBalancer() override;
490

            
491
  // Upstream::LoadBalancer.
492
  Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
493
1
  Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
494
1
    return nullptr;
495
1
  }
496
  absl::optional<Upstream::SelectedPoolAndConnection>
497
  selectExistingConnection(Upstream::LoadBalancerContext*, const Upstream::Host&,
498
1
                           std::vector<uint8_t>&) override {
499
1
    return absl::nullopt;
500
1
  }
501
1
  OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
502
1
    return {};
503
1
  }
504

            
505
  // Access the priority set for lb callbacks.
506
  const Upstream::PrioritySet& prioritySet() const;
507

            
508
  // Access the handle for async host selection completion.
509
3
  const DynamicModuleClusterHandleSharedPtr& handle() const { return handle_; }
510

            
511
  /**
512
   * Returns the shared cancellation flag for the current async host selection. When the router
513
   * cancels the selection (e.g., stream timeout), the flag is set so the posted completion
514
   * callback becomes a no-op. Returns nullptr when there is no active async selection.
515
   */
516
4
  std::shared_ptr<std::atomic<bool>> activeAsyncCancelled() const {
517
4
    return active_async_cancelled_;
518
4
  }
519

            
520
  /**
521
   * Returns the worker thread's dispatcher captured during chooseHost. Used by the async
522
   * completion callback in abi_impl.cc to post to the correct worker thread without accessing
523
   * the LoadBalancerContext from a background thread.
524
   */
525
4
  Event::Dispatcher* activeAsyncDispatcher() const { return active_async_dispatcher_; }
526

            
527
  // Per-host custom data storage.
528
  bool setHostData(uint32_t priority, size_t index, uintptr_t data);
529
  bool getHostData(uint32_t priority, size_t index, uintptr_t* data) const;
530

            
531
  // Accessors for hosts added/removed during the on_host_membership_update callback.
532
15
  const Upstream::HostVector* hostsAdded() const { return hosts_added_; }
533
3
  const Upstream::HostVector* hostsRemoved() const { return hosts_removed_; }
534

            
535
private:
536
  const DynamicModuleClusterHandleSharedPtr handle_;
537
  envoy_dynamic_module_type_cluster_lb_module_ptr in_module_lb_;
538

            
539
  // Shared cancellation flag for the active async host selection. Set in chooseHost when the
540
  // module returns AsyncPending, and read by the posted completion callback in abi_impl.cc.
541
  std::shared_ptr<std::atomic<bool>> active_async_cancelled_;
542

            
543
  // Worker thread dispatcher captured during chooseHost for async completion posting.
544
  Event::Dispatcher* active_async_dispatcher_{nullptr};
545

            
546
  // Per-host data storage keyed by (priority, index). This is per-LB-instance (per-worker).
547
  absl::flat_hash_map<std::pair<uint32_t, size_t>, uintptr_t> per_host_data_;
548

            
549
  // Temporary pointers to host vectors, valid only during on_host_membership_update callback.
550
  const Upstream::HostVector* hosts_added_{};
551
  const Upstream::HostVector* hosts_removed_{};
552

            
553
  // Membership update callback handle.
554
  Envoy::Common::CallbackHandlePtr member_update_cb_;
555
};
556

            
557
/**
558
 * Factory for creating DynamicModuleCluster instances.
559
 */
560
class DynamicModuleClusterFactory
561
    : public Upstream::ConfigurableClusterFactoryBase<
562
          envoy::extensions::clusters::dynamic_modules::v3::ClusterConfig> {
563
public:
564
  DynamicModuleClusterFactory()
565
85
      : ConfigurableClusterFactoryBase("envoy.clusters.dynamic_modules") {}
566

            
567
private:
568
  friend class DynamicModuleClusterFactoryTestPeer;
569
  absl::StatusOr<
570
      std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
571
  createClusterWithConfig(
572
      const envoy::config::cluster::v3::Cluster& cluster,
573
      const envoy::extensions::clusters::dynamic_modules::v3::ClusterConfig& proto_config,
574
      Upstream::ClusterFactoryContext& context) override;
575
};
576

            
577
DECLARE_FACTORY(DynamicModuleClusterFactory);
578

            
579
} // namespace DynamicModules
580
} // namespace Clusters
581
} // namespace Extensions
582
} // namespace Envoy