1
#include "cilium/health_check_sink.h"
2

            
3
#include <map>
4
#include <memory>
5
#include <string>
6

            
7
#include "envoy/common/time.h"
8
#include "envoy/data/core/v3/health_check_event.pb.h"
9
#include "envoy/registry/registry.h"
10
#include "envoy/server/health_checker_config.h"
11
#include "envoy/upstream/health_check_event_sink.h"
12

            
13
#include "source/common/common/lock_guard.h"
14
#include "source/common/common/logger.h"
15
#include "source/common/common/thread.h"
16
#include "source/common/protobuf/protobuf.h" // IWYU pragma: keep
17
#include "source/common/protobuf/utility.h"
18

            
19
#include "cilium/api/health_check_sink.pb.h"
20
#include "cilium/uds_client.h"
21

            
22
namespace Envoy {
23
namespace Cilium {
24

            
25
Thread::MutexBasicLockable HealthCheckEventPipeSink::udss_mutex;
26
std::map<std::string, std::weak_ptr<UDSClient>> HealthCheckEventPipeSink::udss;
27

            
28
HealthCheckEventPipeSink::HealthCheckEventPipeSink(const cilium::HealthCheckEventPipeSink& config,
29
                                                   TimeSource& time_source)
30
4
    : uds_client_(nullptr) {
31
4
  auto path = config.path();
32
4
  Thread::LockGuard guard(udss_mutex);
33
4
  auto it = udss.find(path);
34
4
  if (it != udss.end()) {
35
2
    uds_client_ = it->second.lock();
36
2
    if (!uds_client_) {
37
      // expired, remove
38
1
      udss.erase(path);
39
1
    }
40
2
  }
41
4
  if (!uds_client_) {
42
    // Not found, allocate and store as a weak_ptr
43
3
    uds_client_ = std::make_shared<UDSClient>(path, time_source);
44
3
    udss.emplace(path, uds_client_);
45
3
  }
46
4
}
47

            
48
3
void HealthCheckEventPipeSink::log(envoy::data::core::v3::HealthCheckEvent event) {
49
3
  if (!uds_client_) {
50
    ENVOY_LOG_MISC(warn, "HealthCheckEventPipeSink: no connection, skipping event: {}",
51
                   event.DebugString());
52
    return;
53
  }
54
3
  std::string msg;
55
3
  event.SerializeToString(&msg);
56
3
  uds_client_->log(msg);
57
3
};
58

            
59
Upstream::HealthCheckEventSinkPtr HealthCheckEventPipeSinkFactory::createHealthCheckEventSink(
60
4
    const ProtobufWkt::Any& config, Server::Configuration::HealthCheckerFactoryContext& context) {
61
4
  const auto& validator_config =
62
4
      Envoy::MessageUtil::anyConvertAndValidate<cilium::HealthCheckEventPipeSink>(
63
4
          config, context.messageValidationVisitor());
64
4
  Upstream::HealthCheckEventSinkPtr uds;
65
4
  uds.reset(
66
4
      new HealthCheckEventPipeSink(validator_config, context.serverFactoryContext().timeSource()));
67
4
  return uds;
68
4
}
69

            
70
REGISTER_FACTORY(HealthCheckEventPipeSinkFactory, Upstream::HealthCheckEventSinkFactory);
71

            
72
} // namespace Cilium
73
} // namespace Envoy