1
#include "cilium/l7policy.h"
2

            
3
#include <fmt/format.h>
4

            
5
#include <cstddef>
6
#include <cstdint>
7
#include <memory>
8
#include <string>
9
#include <utility>
10

            
11
#include "envoy/common/time.h"
12
#include "envoy/http/codes.h"
13
#include "envoy/http/filter.h"
14
#include "envoy/http/filter_factory.h"
15
#include "envoy/http/header_map.h"
16
#include "envoy/network/address.h"
17
#include "envoy/network/socket.h"
18
#include "envoy/registry/registry.h"
19
#include "envoy/server/factory_context.h"
20
#include "envoy/server/filter_config.h"
21
#include "envoy/stats/scope.h"
22
#include "envoy/stats/stats_macros.h"
23
#include "envoy/stream_info/filter_state.h"
24
#include "envoy/stream_info/stream_info.h"
25

            
26
#include "source/common/common/assert.h"
27
#include "source/common/common/logger.h"
28
#include "source/common/common/utility.h"
29
#include "source/common/http/header_utility.h"
30
#include "source/extensions/filters/http/common/factory_base.h"
31

            
32
#include "absl/status/statusor.h"
33
#include "absl/strings/string_view.h"
34
#include "absl/types/optional.h"
35
#include "cilium/accesslog.h"
36
#include "cilium/api/accesslog.pb.h"
37
#include "cilium/api/l7policy.pb.h"
38
#include "cilium/api/l7policy.pb.validate.h" // IWYU pragma: keep
39
#include "cilium/filter_state_cilium_policy.h"
40

            
41
namespace Envoy {
42
namespace Cilium {
43

            
44
class CiliumAccessFilterFactory
45
    : public Extensions::HttpFilters::Common::DualFactoryBase<::cilium::L7Policy> {
46
public:
47
10
  CiliumAccessFilterFactory() : DualFactoryBase("cilium.l7policy") {}
48

            
49
private:
50
  absl::StatusOr<Http::FilterFactoryCb>
51
  createFilterFactoryFromProtoTyped(const ::cilium::L7Policy& proto_config, const std::string&,
52
                                    DualInfo dual_info,
53
113
                                    Server::Configuration::ServerFactoryContext& context) override {
54
113
    auto config = std::make_shared<Cilium::Config>(proto_config, context.timeSource(),
55
113
                                                   dual_info.scope, dual_info.is_upstream);
56
117
    return [config](Http::FilterChainFactoryCallbacks& callbacks) mutable -> void {
57
109
      callbacks.addStreamFilter(std::make_shared<Cilium::AccessFilter>(config));
58
109
    };
59
113
  }
60
};
61

            
62
using UpstreamCiliumAccessFilterFactory = CiliumAccessFilterFactory;
63

            
64
/**
65
 * Static registration for this filter. @see RegisterFactory.
66
 */
67
REGISTER_FACTORY(CiliumAccessFilterFactory, Server::Configuration::NamedHttpFilterConfigFactory);
68
REGISTER_FACTORY(UpstreamCiliumAccessFilterFactory,
69
                 Server::Configuration::UpstreamHttpFilterConfigFactory);
70

            
71
Config::Config(const std::string& access_log_path, const std::string& denied_403_body,
72
               TimeSource& time_source, Stats::Scope& scope, bool is_upstream)
73
113
    : time_source_(time_source), stats_{ALL_CILIUM_STATS(POOL_COUNTER_PREFIX(scope, "cilium"))},
74
113
      denied_403_body_(denied_403_body), is_upstream_(is_upstream), access_log_(nullptr) {
75
113
  if (!access_log_path.empty()) {
76
113
    access_log_ = AccessLog::open(access_log_path, time_source);
77
113
  }
78
113
  if (denied_403_body_.empty()) {
79
113
    denied_403_body_ = "Access denied";
80
113
  }
81
113
  size_t len = denied_403_body_.length();
82
113
  if (len < 2 || denied_403_body_[len - 2] != '\r' || denied_403_body_[len - 1] != '\n') {
83
113
    denied_403_body_.append("\r\n");
84
113
  }
85
113
}
86

            
87
Config::Config(const ::cilium::L7Policy& config, TimeSource& time_source, Stats::Scope& scope,
88
               bool is_upstream)
89
113
    : Config(config.access_log_path(), config.denied_403_body(), time_source, scope, is_upstream) {}
90

            
91
156
void Config::log(AccessLog::Entry& entry, ::cilium::EntryType type) {
92
156
  if (access_log_) {
93
156
    access_log_->log(entry, type);
94
156
  }
95
156
}
96

            
97
109
void AccessFilter::onDestroy() {}
98

            
99
void AccessFilter::sendLocalError(absl::string_view details) {
100
  ENVOY_LOG(warn, details);
101
  callbacks_->sendLocalReply(Http::Code::InternalServerError, "", nullptr, absl::nullopt,
102
                             StringUtil::replaceAllEmptySpace(details));
103
}
104

            
105
109
void AccessFilter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
106
109
  callbacks_ = &callbacks;
107

            
108
  // Create log entry if not already in filter state
109
109
  log_entry_ =
110
109
      callbacks_->streamInfo().filterState()->getDataMutable<AccessLog::Entry>(AccessLogKey);
111
109
  if (log_entry_ == nullptr) {
112
78
    auto log_entry = std::make_unique<AccessLog::Entry>();
113
78
    log_entry_ = log_entry.get();
114
78
    callbacks_->streamInfo().filterState()->setData(AccessLogKey, std::move(log_entry),
115
78
                                                    StreamInfo::FilterState::StateType::Mutable,
116
78
                                                    StreamInfo::FilterState::LifeSpan::Request);
117
78
  }
118

            
119
109
  if (config_->is_upstream_) {
120
31
    callbacks_->upstreamCallbacks()->addUpstreamCallbacks(*this);
121
31
  }
122
109
}
123

            
124
31
void AccessFilter::onUpstreamConnectionEstablished() {
125
31
  if (latched_end_stream_.has_value()) {
126
30
    const bool end_stream = *latched_end_stream_;
127
30
    latched_end_stream_.reset();
128
30
    ENVOY_CONN_LOG(debug,
129
30
                   "cilium.l7policy: RESUMING after upstream connection has been established",
130
30
                   callbacks_->connection().ref());
131
30
    Http::FilterHeadersStatus status = decodeHeaders(*latched_headers_, end_stream);
132
30
    if (status == Http::FilterHeadersStatus::Continue) {
133
18
      callbacks_->continueDecoding();
134
18
    }
135
30
  }
136
31
}
137

            
138
Http::FilterHeadersStatus AccessFilter::decodeHeaders(Http::RequestHeaderMap& headers,
139
139
                                                      bool end_stream) {
140
139
  StreamInfo::StreamInfo& stream_info = callbacks_->streamInfo();
141

            
142
  // Skip enforcement or logging on shadows and ingress direction
143
139
  if (stream_info.isShadow()) {
144
    ENVOY_LOG(debug, "cilium.l7policy: upstream decodeHeaders() - skipping - shadow connection");
145
    return Http::FilterHeadersStatus::Continue;
146
  }
147

            
148
139
  const auto& conn = callbacks_->connection();
149

            
150
139
  if (!conn) {
151
    if (config_->is_upstream_) {
152
      // Skip enforcement on upstream connections originating from Envoy without a relation
153
      // to a downstream connection
154
      ENVOY_LOG(debug,
155
                "cilium.l7policy: upstream decodeHeaders() - skipping - no downstream connection");
156
      return Http::FilterHeadersStatus::Continue;
157
    }
158

            
159
    sendLocalError("cilium.l7policy: downstream decodeHeaders() - no connection");
160
    return Http::FilterHeadersStatus::StopIteration;
161
  }
162

            
163
139
  if (log_entry_ == nullptr) {
164
    sendLocalError("cilium.l7policy: No log entry");
165
    return Http::FilterHeadersStatus::StopIteration;
166
  }
167

            
168
139
  const auto policy_fs =
169
139
      conn->streamInfo().filterState().getDataReadOnly<Cilium::CiliumPolicyFilterState>(
170
139
          Cilium::CiliumPolicyFilterState::key());
171

            
172
139
  if (!policy_fs) {
173
    sendLocalError("cilium.l7policy: Cilium policy filter state not found");
174
    return Http::FilterHeadersStatus::StopIteration;
175
  }
176

            
177
139
  ENVOY_CONN_LOG(debug, "cilium.l7policy: {} decodeHeaders()", conn.ref(),
178
139
                 config_->is_upstream_ ? "upstream" : "downstream");
179

            
180
  // Handle downstream case first
181
139
  if (!config_->is_upstream_) {
182
78
    const auto& dst_address = stream_info.downstreamAddressProvider().localAddress();
183
78
    const auto dip = dst_address->ip();
184
    // destination identity should be reported as 0 for an ingress policy
185
78
    uint32_t destination_identity = policy_fs->ingress_ ? 0 : policy_fs->resolvePolicyId(dip);
186
78
    uint16_t destination_port = dip->port();
187

            
188
    // Initialize log entry in the beginning of downstream processing
189
78
    log_entry_->initFromRequest(
190
78
        policy_fs->pod_ip_, policy_fs->proxy_id_, policy_fs->ingress_, policy_fs->source_identity_,
191
78
        callbacks_->streamInfo().downstreamAddressProvider().remoteAddress(), destination_identity,
192
78
        dst_address, stream_info, headers);
193

            
194
    // Enforce pod policy only for local pods without L7 LB
195
78
    if (!policy_fs->policyUseUpstreamDestinationAddress() && !policy_fs->pod_ip_.empty()) {
196
45
      bool allowed = policy_fs->enforcePodHTTPPolicy(conn.ref(), destination_identity,
197
45
                                                     destination_port, headers, *log_entry_);
198

            
199
      // Update the log entry with current headers, as the policy may have altered them.
200
45
      log_entry_->updateFromRequest(destination_identity, dst_address, headers);
201

            
202
45
      if (!allowed) {
203
17
        config_->log(*log_entry_, ::cilium::EntryType::Denied);
204
17
        callbacks_->sendLocalReply(Http::Code::Forbidden, config_->denied_403_body_, nullptr,
205
17
                                   absl::nullopt, absl::string_view());
206
17
        return Http::FilterHeadersStatus::StopIteration;
207
17
      }
208

            
209
      // Log as a forwarded request
210
28
      config_->log(*log_entry_, ::cilium::EntryType::Request);
211
28
    }
212

            
213
106
  } else { // is_upstream_
214
    // Skip enforcement for non L7LB (which is always egress).
215
    // TODO: Drop and warn when Cilium Agent no longer mistakenly configures upstream enforcement or
216
    // non-L7LB
217
61
    if (!policy_fs->policyUseUpstreamDestinationAddress()) {
218
      ENVOY_CONN_LOG(debug, "cilium.l7policy: upstream enforcement configured for non L7 LB",
219
                     conn.ref());
220
      return Http::FilterHeadersStatus::Continue;
221
    }
222
61
    if (policy_fs->ingress_) {
223
      ENVOY_CONN_LOG(
224
          debug, "cilium.l7policy: upstream enforcement configured for ingress traffic direction",
225
          conn.ref());
226
      return Http::FilterHeadersStatus::Continue;
227
    }
228

            
229
    // must have a policy configured
230
    // This is safe as the upstream filter was introduced at Cilium 1.16 and
231
    // bpf_metadata config has had 'enforce_policy_on_l7lb' set since Cilium 1.15.
232
61
    if (policy_fs->pod_ip_.empty() && policy_fs->ingress_policy_name_.empty()) {
233
      ENVOY_CONN_LOG(warn, "cilium.network: no policy configured", conn.ref());
234
      return Http::FilterHeadersStatus::StopIteration;
235
    }
236

            
237
    // Pause upstream decoding until connection has been established
238
61
    ASSERT(callbacks_->upstreamCallbacks());
239
61
    if (!callbacks_->upstreamCallbacks()->upstream()) {
240
30
      latched_headers_ = headers;
241
30
      latched_end_stream_ = end_stream;
242
30
      ENVOY_CONN_LOG(debug,
243
30
                     "cilium.l7policy: PAUSING until upstream connection has been established",
244
30
                     conn.ref());
245
30
      return Http::FilterHeadersStatus::StopAllIterationAndWatermark;
246
30
    }
247

            
248
31
    const auto& dst_address = stream_info.upstreamInfo()->upstreamHost()->address();
249
31
    if (nullptr == dst_address) {
250
      sendLocalError("cilium.l7policy: No destination address");
251
      return Http::FilterHeadersStatus::StopIteration;
252
    }
253
31
    const auto dip = dst_address->ip();
254
31
    if (!dip) {
255
      sendLocalError(
256
          fmt::format("cilium.l7policy: Non-IP destination address: {}", dst_address->asString()));
257
      return Http::FilterHeadersStatus::StopIteration;
258
    }
259

            
260
31
    uint32_t destination_identity = policy_fs->resolvePolicyId(dip);
261
31
    uint16_t destination_port = dip->port();
262
31
    bool allowed;
263

            
264
    // Is there a pod egress policy?
265
31
    if (!policy_fs->pod_ip_.empty()) {
266
31
      allowed = policy_fs->enforcePodHTTPPolicy(conn.ref(), destination_identity, destination_port,
267
31
                                                headers, *log_entry_);
268

            
269
      // Update the log entry with current headers, as the policy may have altered them.
270
31
      log_entry_->updateFromRequest(destination_identity, dst_address, headers);
271

            
272
31
      if (!allowed) {
273
13
        config_->log(*log_entry_, ::cilium::EntryType::Denied);
274
13
        callbacks_->sendLocalReply(Http::Code::Forbidden, config_->denied_403_body_, nullptr,
275
13
                                   absl::nullopt, absl::string_view());
276
13
        return Http::FilterHeadersStatus::StopIteration;
277
13
      }
278
31
    }
279

            
280
    // Is there an Ingress policy?
281
18
    if (!policy_fs->ingress_policy_name_.empty()) {
282
      allowed = policy_fs->enforceIngressHTTPPolicy(conn.ref(), destination_identity,
283
                                                    destination_port, headers, *log_entry_);
284

            
285
      // Update the log entry with current headers, as the policy may have altered them.
286
      log_entry_->updateFromRequest(destination_identity, dst_address, headers);
287

            
288
      if (!allowed) {
289
        config_->log(*log_entry_, ::cilium::EntryType::Denied);
290
        callbacks_->sendLocalReply(Http::Code::Forbidden, config_->denied_403_body_, nullptr,
291
                                   absl::nullopt, absl::string_view());
292
        return Http::FilterHeadersStatus::StopIteration;
293
      }
294
    }
295
    // Log as a forwarded request
296
18
    config_->log(*log_entry_, ::cilium::EntryType::Request);
297
18
  }
298
79
  return Http::FilterHeadersStatus::Continue;
299
139
}
300

            
301
78
void AccessFilter::onStreamComplete() {
302
  // Request may have been left unlogged due to an error and/or missing local reply
303
78
  if (log_entry_ && !log_entry_->request_logged_) {
304
    config_->log(*log_entry_, ::cilium::EntryType::Request);
305
  }
306
78
}
307

            
308
96
Http::FilterHeadersStatus AccessFilter::encodeHeaders(Http::ResponseHeaderMap& headers, bool) {
309
96
  const auto& stream_info = callbacks_->streamInfo();
310

            
311
  // Skip enforcement or logging on shadows
312
96
  if (stream_info.isShadow()) {
313
    ENVOY_LOG(debug, "cilium.l7policy: upstream encodeHeaders() - skipping - shadow connection");
314
    return Http::FilterHeadersStatus::Continue;
315
  }
316

            
317
96
  const auto& conn = callbacks_->connection();
318

            
319
96
  if (!conn) {
320
    if (config_->is_upstream_) {
321
      // Skip enforcement on upstream connections originating from Envoy without a relation
322
      // to a downstream connection
323
      ENVOY_LOG(debug,
324
                "cilium.l7policy: upstream encodeHeaders() - skipping - no downstream connection");
325
      return Http::FilterHeadersStatus::Continue;
326
    }
327

            
328
    sendLocalError("cilium.l7policy: downstream encodeHeaders() - no connection");
329
    return Http::FilterHeadersStatus::StopIteration;
330
  }
331

            
332
96
  ENVOY_CONN_LOG(debug, "cilium.l7policy: {} encodeHeaders()", conn.ref(),
333
96
                 config_->is_upstream_ ? "upstream" : "downstream");
334

            
335
  // Nothing to do in the upstream filter
336
96
  if (config_->is_upstream_) {
337
18
    return Http::FilterHeadersStatus::Continue;
338
18
  }
339

            
340
  // check it "connection: close" header is present
341
78
  if (stream_info.upstreamInfo().has_value() && stream_info.protocol().has_value() &&
342
78
      Http::HeaderUtility::shouldCloseConnection(stream_info.protocol().value(), headers)) {
343
    const auto& upstream_info = stream_info.upstreamInfo().ref();
344

            
345
    // check if upstream and downstream connections have the same source and destination
346
    // addresses, respectively (note: do not compare pointers!).
347
    if (*upstream_info.upstreamRemoteAddress() ==
348
            *stream_info.downstreamAddressProvider().localAddress() &&
349
        *upstream_info.upstreamLocalAddress() ==
350
            *stream_info.downstreamAddressProvider().remoteAddress()) {
351
      ENVOY_CONN_LOG(debug,
352
                     "cilium.l7policy: Upstream connection with same 5-tuple closed, passing "
353
                     "connection close to downstream response",
354
                     callbacks_->connection().ref());
355
      callbacks_->streamInfo().setShouldDrainConnectionUponCompletion(true);
356
    }
357
  }
358

            
359
78
  if (log_entry_ == nullptr) {
360
    return Http::FilterHeadersStatus::Continue;
361
  }
362

            
363
  // Request may have been left unlogged due to an error or L3/4 deny
364
78
  if (!log_entry_->request_logged_) {
365
    // Default logging local errors as "forwarded".
366
    // The response log will contain the locally generated HTTP error code.
367
2
    auto log_type = ::cilium::EntryType::Request;
368

            
369
2
    if (headers.Status()->value() == "403") {
370
      // Log as a denied request.
371
2
      log_type = ::cilium::EntryType::Denied;
372
2
      config_->stats_.access_denied_.inc();
373
2
    }
374
2
    config_->log(*log_entry_, log_type);
375
2
  }
376

            
377
  // Log the response
378
78
  log_entry_->updateFromResponse(headers, config_->time_source_);
379
78
  config_->log(*log_entry_, ::cilium::EntryType::Response);
380
78
  return Http::FilterHeadersStatus::Continue;
381
78
}
382

            
383
} // namespace Cilium
384
} // namespace Envoy