1
#include "accesslog.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <map>
6
#include <memory>
7
#include <string>
8

            
9
#include "envoy/common/time.h"
10
#include "envoy/http/header_map.h"
11
#include "envoy/http/protocol.h"
12
#include "envoy/network/address.h"
13
#include "envoy/stream_info/stream_info.h"
14

            
15
#include "source/common/common/lock_guard.h"
16
#include "source/common/common/logger.h"
17
#include "source/common/common/thread.h"
18
#include "source/common/protobuf/utility.h"
19

            
20
#include "absl/strings/numbers.h"
21
#include "absl/strings/string_view.h"
22
#include "cilium/api/accesslog.pb.h"
23
#include "cilium/uds_client.h"
24

            
25
namespace Envoy {
26
namespace Cilium {
27

            
28
Thread::MutexBasicLockable AccessLog::logs_mutex;
29
std::map<std::string, std::weak_ptr<AccessLog>> AccessLog::logs;
30

            
31
170
AccessLogSharedPtr AccessLog::open(const std::string& path, TimeSource& time_source) {
32
170
  Thread::LockGuard guard(logs_mutex);
33
170
  auto it = logs.find(path);
34
170
  if (it != logs.end()) {
35
86
    auto log = it->second.lock();
36
86
    if (log) {
37
86
      return log;
38
86
    }
39
    // expired, remove
40
    logs.erase(path);
41
  }
42
  // Not found, open and store as a weak_ptr
43
84
  AccessLogSharedPtr log;
44
84
  log.reset(new AccessLog(path, time_source));
45
84
  logs.emplace(path, log);
46
84
  return log;
47
170
}
48

            
49
84
AccessLog::~AccessLog() {
50
  // last reference going out of scope
51
84
  Thread::LockGuard guard1(logs_mutex);
52
84
  logs.erase(path_);
53
84
}
54

            
55
207
void AccessLog::log(AccessLog::Entry& log_entry, ::cilium::EntryType entry_type) {
56
207
  ::cilium::LogEntry& entry = log_entry.entry_;
57
207
  entry.set_entry_type(entry_type);
58

            
59
207
  if (entry_type != ::cilium::EntryType::Response) {
60
105
    if (log_entry.request_logged_) {
61
      ENVOY_LOG_MISC(warn, "cilium.AccessLog: Request is logged twice");
62
    }
63
105
    log_entry.request_logged_ = true;
64
105
  }
65

            
66
  // encode protobuf
67
207
  std::string msg;
68
207
  entry.SerializeToString(&msg);
69

            
70
207
  UDSClient::log(msg);
71
207
}
72

            
73
#define CONST_STRING_VIEW(NAME, STR) const absl::string_view NAME = {STR, sizeof(STR) - 1}
74

            
75
CONST_STRING_VIEW(pathSV, ":path");
76
CONST_STRING_VIEW(methodSV, ":method");
77
CONST_STRING_VIEW(authoritySV, ":authority");
78
CONST_STRING_VIEW(xForwardedProtoSV, "x-forwarded-proto");
79
CONST_STRING_VIEW(xRequestIdSV, "x-request-id");
80
CONST_STRING_VIEW(statusSV, ":status");
81

            
82
void AccessLog::Entry::initFromConnection(
83
    const std::string& policy_name, uint32_t proxy_id, bool ingress, uint32_t source_identity,
84
    const Network::Address::InstanceConstSharedPtr& source_address, uint32_t destination_identity,
85
221
    const Network::Address::InstanceConstSharedPtr& destination_address, TimeSource* time_source) {
86
221
  request_logged_ = false;
87

            
88
221
  entry_.set_policy_name(policy_name);
89
221
  entry_.set_proxy_id(proxy_id);
90
221
  entry_.set_is_ingress(ingress);
91
221
  entry_.set_source_security_id(source_identity);
92
221
  entry_.set_destination_security_id(destination_identity);
93

            
94
221
  if (source_address != nullptr) {
95
221
    entry_.set_source_address(source_address->asString());
96
221
  }
97

            
98
221
  if (destination_address != nullptr) {
99
221
    entry_.set_destination_address(destination_address->asString());
100
221
  }
101

            
102
221
  if (time_source) {
103
142
    auto time = time_source->systemTime();
104
142
    entry_.set_timestamp(
105
142
        std::chrono::duration_cast<std::chrono::nanoseconds>(time.time_since_epoch()).count());
106
142
  }
107
221
}
108

            
109
bool AccessLog::Entry::updateFromMetadata(const std::string& l7proto,
110
                                          const ProtobufWkt::Struct& metadata) {
111
  bool changed = false;
112

            
113
  auto l7entry = entry_.mutable_generic_l7();
114
  if (l7entry->proto() != l7proto) {
115
    l7entry->set_proto(l7proto);
116
    changed = true;
117
  }
118
  // remove non-existing fields, update existing values
119
  auto* old_fields = l7entry->mutable_fields();
120
  const auto& new_fields = metadata.fields();
121
  for (const auto& pair : *old_fields) {
122
    const auto it = new_fields.find(pair.first);
123
    if (it == new_fields.cend()) {
124
      old_fields->erase(pair.first);
125
      changed = true;
126
    } else {
127
      auto new_value = MessageUtil::getJsonStringFromMessage(it->second, false, true);
128
      if (new_value.ok() && new_value.value() != pair.second) {
129
        (*old_fields)[pair.first] = new_value.value();
130
        changed = true;
131
      }
132
    }
133
  }
134
  // Insert new values
135
  for (const auto& pair : new_fields) {
136
    auto it = old_fields->find(pair.first);
137
    if (it == old_fields->cend()) {
138
      (*old_fields)[pair.first] =
139
          MessageUtil::getJsonStringFromMessageOrError(pair.second, false, true);
140
      changed = true;
141
    }
142
  }
143
  return changed;
144
}
145

            
146
void AccessLog::Entry::initFromRequest(const std::string& policy_name, uint32_t proxy_id,
147
                                       bool ingress, uint32_t source_identity,
148
                                       const Network::Address::InstanceConstSharedPtr& src_address,
149
                                       uint32_t destination_identity,
150
                                       const Network::Address::InstanceConstSharedPtr& dst_address,
151
                                       const StreamInfo::StreamInfo& info,
152
79
                                       const Http::RequestHeaderMap& headers) {
153
79
  initFromConnection(policy_name, proxy_id, ingress, source_identity, src_address,
154
79
                     destination_identity, dst_address, nullptr);
155

            
156
79
  auto time = info.startTime();
157
79
  entry_.set_timestamp(
158
79
      std::chrono::duration_cast<std::chrono::nanoseconds>(time.time_since_epoch()).count());
159

            
160
79
  ::cilium::HttpProtocol proto;
161
79
  switch (info.protocol() ? info.protocol().value() : Http::Protocol::Http11) {
162
  case Http::Protocol::Http10:
163
    proto = ::cilium::HttpProtocol::HTTP10;
164
    break;
165
79
  case Http::Protocol::Http11:
166
79
  default: // Just to make compiler happy
167
79
    proto = ::cilium::HttpProtocol::HTTP11;
168
79
    break;
169
  case Http::Protocol::Http2:
170
    proto = ::cilium::HttpProtocol::HTTP2;
171
    break;
172
79
  }
173
79
  ::cilium::HttpLogEntry* http_entry = entry_.mutable_http();
174
79
  http_entry->set_http_protocol(proto);
175

            
176
79
  updateFromRequest(destination_identity, dst_address, headers);
177
79
}
178

            
179
void AccessLog::Entry::updateFromRequest(
180
    uint32_t destination_identity, const Network::Address::InstanceConstSharedPtr& dst_address,
181
179
    const Http::RequestHeaderMap& headers) {
182
  // Destination may have changed
183
179
  if (destination_identity != 0) {
184
91
    entry_.set_destination_security_id(destination_identity);
185
91
  }
186
179
  if (dst_address != nullptr) {
187
163
    entry_.set_destination_address(dst_address->asString());
188
163
  }
189

            
190
179
  ::cilium::HttpLogEntry* http_entry = entry_.mutable_http();
191
  // Remove headers logged for the request, as they may have changed
192
179
  http_entry->clear_headers();
193

            
194
  // request headers
195
1471
  headers.iterate([http_entry](const Http::HeaderEntry& header) -> Http::HeaderMap::Iterate {
196
1471
    const absl::string_view key = header.key().getStringView();
197
1471
    const absl::string_view value = header.value().getStringView();
198

            
199
1471
    if (key == pathSV) {
200
166
      http_entry->set_path(value.data(), value.size());
201
1305
    } else if (key == methodSV) {
202
179
      http_entry->set_method(value.data(), value.size());
203
1126
    } else if (key == authoritySV) {
204
179
      http_entry->set_host(value.data(), value.size());
205
947
    } else if (key == xForwardedProtoSV) {
206
      // Envoy sets the ":scheme" header later in the router filter
207
      // according to the upstream protocol (TLS vs. clear), but we want to
208
      // get the downstream scheme, which is provided in
209
      // "x-forwarded-proto".
210
159
      http_entry->set_scheme(value.data(), value.size());
211
788
    } else {
212
788
      ::cilium::KeyValue* kv = http_entry->add_headers();
213
788
      kv->set_key(key.data(), key.size());
214
788
      kv->set_value(value.data(), value.size());
215
788
    }
216
1471
    return Http::HeaderMap::Iterate::Continue;
217
1471
  });
218
179
}
219

            
220
void AccessLog::Entry::updateFromResponse(const Http::ResponseHeaderMap& headers,
221
103
                                          TimeSource& time_source) {
222
103
  auto time = time_source.systemTime();
223
103
  entry_.set_timestamp(
224
103
      std::chrono::duration_cast<std::chrono::nanoseconds>(time.time_since_epoch()).count());
225

            
226
103
  ::cilium::HttpLogEntry* http_entry = entry_.mutable_http();
227

            
228
  // Find existing x-request-id before clearing headers
229
103
  std::string request_id;
230
431
  for (int i = 0; i < http_entry->headers_size(); i++) {
231
430
    if (http_entry->headers(i).key() == xRequestIdSV) {
232
102
      request_id = http_entry->headers(i).value();
233
102
      break;
234
102
    }
235
430
  }
236

            
237
  // Remove headers logged for the request
238
103
  http_entry->clear_headers();
239

            
240
  // Add back the x-request-id, if any
241
103
  if (request_id.length() > 0) {
242
102
    ::cilium::KeyValue* kv = http_entry->add_headers();
243
102
    kv->set_key(xRequestIdSV.data(), xRequestIdSV.size());
244
102
    kv->set_value(request_id);
245
102
  }
246

            
247
  // response headers
248
103
  headers.iterate(
249
374
      [http_entry, &request_id](const Http::HeaderEntry& header) -> Http::HeaderMap::Iterate {
250
374
        const absl::string_view key = header.key().getStringView();
251
374
        const absl::string_view value = header.value().getStringView();
252

            
253
374
        if (key == statusSV) {
254
102
          uint64_t status;
255
102
          if (absl::SimpleAtoi(value, &status)) {
256
102
            http_entry->set_status(status);
257
102
          }
258
272
        } else if (key == xRequestIdSV && value == request_id) {
259
          // We already have the request id, do not repeat it if the value is still the same
260
249
        } else {
261
249
          ::cilium::KeyValue* kv = http_entry->add_headers();
262
249
          kv->set_key(key.data(), key.size());
263
249
          kv->set_value(value.data(), value.size());
264
249
        }
265
374
        return Http::HeaderMap::Iterate::Continue;
266
374
      });
267
103
}
268

            
269
14
void AccessLog::Entry::addRejected(absl::string_view key, absl::string_view value) {
270
14
  for (const auto& entry : entry_.http().rejected_headers()) {
271
2
    if (entry.key() == key && entry.value() == value) {
272
2
      return;
273
2
    }
274
2
  }
275
12
  ::cilium::KeyValue* kv = entry_.mutable_http()->add_rejected_headers();
276
12
  kv->set_key(key.data(), key.size());
277
12
  kv->set_value(value.data(), value.size());
278
12
}
279

            
280
36
void AccessLog::Entry::addMissing(absl::string_view key, absl::string_view value) {
281
36
  for (const auto& entry : entry_.http().missing_headers()) {
282
18
    if (entry.key() == key && entry.value() == value) {
283
2
      return;
284
2
    }
285
18
  }
286
34
  ::cilium::KeyValue* kv = entry_.mutable_http()->add_missing_headers();
287
34
  kv->set_key(key.data(), key.size());
288
34
  kv->set_value(value.data(), value.size());
289
34
}
290

            
291
} // namespace Cilium
292
} // namespace Envoy