1
#include "tests/accesslog_server.h"
2

            
3
#include <chrono>
4
#include <functional>
5
#include <string>
6

            
7
#include "source/common/common/logger.h"
8

            
9
#include "absl/base/thread_annotations.h"
10
#include "absl/synchronization/mutex.h"
11
#include "absl/time/time.h"
12
#include "absl/types/optional.h"
13
#include "cilium/api/accesslog.pb.h"
14
#include "tests/uds_server.h"
15

            
16
namespace Envoy {
17

            
18
AccessLogServer::AccessLogServer(const std::string path)
19
123
    : UDSServer(path, std::bind(&AccessLogServer::msgCallback, this, std::placeholders::_1)) {}
20

            
21
123
AccessLogServer::~AccessLogServer() = default;
22

            
23
79
void AccessLogServer::clear() {
24
79
  absl::MutexLock lock(&mutex_);
25
79
  messages_.clear();
26
79
}
27

            
28
absl::optional<::cilium::LogEntry>
29
178
AccessLogServer::waitForMessage(::cilium::EntryType entry_type, std::chrono::milliseconds timeout) {
30
178
  absl::MutexLock lock(&mutex_);
31
178
  absl::optional<::cilium::LogEntry> entry;
32
202
  auto predicate = [this, &entry, entry_type]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) {
33
202
    mutex_.AssertHeld();
34
276
    for (auto& msg : messages_) {
35
276
      if (msg.entry_type() == entry_type) {
36
190
        entry = msg;
37
190
        return true;
38
190
      }
39
276
    }
40
12
    return false;
41
202
  };
42
178
  mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::Milliseconds(timeout.count()));
43
178
  return entry;
44
178
}
45

            
46
224
void AccessLogServer::msgCallback(const std::string& data) {
47
224
  ::cilium::LogEntry entry;
48
224
  if (!entry.ParseFromString(data)) {
49
    ENVOY_LOG(warn, "Access log parse failed!");
50
224
  } else {
51
224
    ENVOY_LOG(info, "Access log entry: {}", entry.DebugString());
52
224
    absl::MutexLock lock(&mutex_);
53
224
    messages_.emplace_back(entry);
54
224
  }
55
224
}
56

            
57
} // namespace Envoy