1
#pragma once
2

            
3
#if !defined(__linux__)
4
#error "Linux platform file is part of non-Linux build."
5
#endif
6

            
7
#include <sys/socket.h>
8
#include <sys/types.h>
9

            
10
#include <atomic>
11
#include <cstddef>
12
#include <cstdint>
13
#include <cstring>
14

            
15
#include "envoy/api/os_sys_calls_common.h"
16

            
17
#include "source/common/common/assert.h"
18
#include "source/common/common/lock_guard.h"
19
#include "source/common/common/logger.h"
20
#include "source/common/common/thread.h"
21
#include "source/common/singleton/threadsafe_singleton.h"
22

            
23
#include "absl/base/thread_annotations.h"
24
#include "absl/container/flat_hash_map.h"
25
#include "starter/privileged_service_protocol.h"
26

            
27
namespace Envoy {
28
namespace Cilium {
29

            
30
class Bpf;
31
class SocketMarkOption;
32

            
33
namespace PrivilegedService {
34

            
35
#define RESPONSE_BUF_SIZE 1024
36

            
37
// ProtocolClient implements the client logic for communicating with the privileged service.
38
class ProtocolClient : public Protocol, Logger::Loggable<Logger::Id::filter> {
39
public:
40
  ProtocolClient();
41

            
42
  // allow access to the classes that need it
43
  friend class Envoy::Cilium::Bpf;
44
  friend class Envoy::Cilium::SocketMarkOption;
45

            
46
  // Set a socket option
47
  Envoy::Api::SysCallIntResult setsockopt(int sockfd, int level, int optname, const void* optval,
48
                                          socklen_t optlen);
49

            
50
protected:
51
  // Read-only bpf syscalls
52
  Envoy::Api::SysCallIntResult bpfOpen(const char* path);
53
  Envoy::Api::SysCallIntResult bpfLookup(int fd, const void* key, uint32_t key_size, void* value,
54
                                         uint32_t value_size);
55

            
56
private:
57
  bool checkPrivilegedService();
58
  bool haveCiliumPrivilegedService() const { return isOpen(); }
59

            
60
  ssize_t transact(MessageHeader& req, size_t req_len, const void* data, size_t datalen, int* fd,
61
                   Response& resp, void* buf = nullptr, size_t buf_size = 0);
62

            
63
  std::atomic<uint32_t> seq_;
64

            
65
  // Waiter has space for a response. While placed in the 'waiters_' map, all access to the
66
  // waiter must happen while holding 'mutex_', except for the designated receiver may
67
  // access it's own waiter without the mutex.
68
  class Waiter {
69
  public:
70
    Waiter() = default;
71

            
72
    // Returns non-zero sequence number after a response has been received.
73
    uint32_t seq() const { return resp_.hdr_.msg_seq_; }
74

            
75
    // Returns received message type
76
    MessageType msgType() const { return static_cast<MessageType>(resp_.hdr_.msg_type_); }
77

            
78
    ssize_t recvFdMsg(ProtocolClient& client) {
79
      size_ = client.recvFdMsg(&resp_, sizeof(resp_), buf_, sizeof(buf_), &fd_);
80
      if (size_ >= 0) {
81
        // Failing release asserts cause an exit and an automated restart. This is the only way
82
        // to recover from privilaged service failures.
83
        RELEASE_ASSERT(size_ != 0, "Cilium privileged service closed pipe");
84
        // Must have enough data to decode the response header
85
        RELEASE_ASSERT(size_t(size_) >= sizeof(Response),
86
                       "Cilium privileged service truncated response");
87
        RELEASE_ASSERT(msgType() == MessageType::TypeResponse,
88
                       "Cilium privileged service unexpected response type");
89
      }
90
      return size_;
91
    }
92

            
93
    ssize_t getResponse(uint32_t expected_seq_n, Response& resp, void* buf, size_t buf_size,
94
                        int* fd) const {
95
      auto received_seq = seq();
96
      RELEASE_ASSERT(
97
          received_seq == 0 && size_ <= 0 || received_seq == expected_seq_n,
98
          fmt::format("waiter: invalid response sequence: {} != {}", received_seq, expected_seq_n));
99

            
100
      ssize_t size = size_;
101
      if (size_t(size) > sizeof(resp)) {
102
        auto copy_size = size_t(size) - sizeof(resp);
103
        if (copy_size > buf_size) {
104
          // truncate response
105
          size -= copy_size - buf_size;
106
          copy_size = buf_size;
107
        }
108
        memcpy(buf, buf_, copy_size); // NOLINT(safe-memcpy)
109
      }
110
      resp = resp_;
111
      if (fd) {
112
        *fd = fd_;
113
      }
114

            
115
      return size;
116
    }
117

            
118
    Waiter& operator=(Waiter& other) {
119
      size_ = other.size_;
120
      fd_ = other.fd_;
121
      resp_ = other.resp_;
122
      if (size_ > ssize_t(sizeof(resp_))) {
123
        size_t copy_size = size_t(size_) - sizeof(resp_);
124
        if (copy_size <= sizeof(buf_)) {
125
          memcpy(buf_, other.buf_, copy_size); // NOLINT(safe-memcpy)
126
        }
127
      }
128
      return *this;
129
    }
130

            
131
    void clear() {
132
      size_ = 0;
133
      fd_ = -1;
134
      resp_ = {};
135
    }
136

            
137
  private:
138
    // 'size_' non-zero after a the response has been received
139
    ssize_t size_{};
140
    int fd_;
141
    Response resp_;
142
    char buf_[RESPONSE_BUF_SIZE];
143
  };
144

            
145
  void insert(uint32_t seq, Waiter* waiter) {
146
    Thread::LockGuard guard(mutex_);
147
    auto ret = waiters_.emplace(seq, waiter);
148
    RELEASE_ASSERT(ret.second, "waiter emplace failed");
149
  }
150

            
151
  void remove(uint32_t seq) {
152
    Thread::LockGuard guard(mutex_);
153
    waiters_.erase(seq);
154
  }
155

            
156
  // receive is declared as noexcept to guarantee it will return normally, rather than via
157
  // an exception, if the program continues running. This allows for safe removal of the Waiter
158
  // from Waiters before the Waiter is destructed.
159
  void receive(Waiter&, uint32_t seq) noexcept;
160

            
161
private:
162
  using WaitersMap = absl::flat_hash_map<uint32_t, Waiter*>;
163

            
164
  Thread::MutexBasicLockable mutex_;
165
  WaitersMap waiters_ ABSL_GUARDED_BY(mutex_);
166
  bool is_receiver_active_ ABSL_GUARDED_BY(mutex_) = false;
167

            
168
  void wait() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { cond_.wait(mutex_); }
169

            
170
  void notifyOne() ABSL_LOCKS_EXCLUDED(mutex_) { cond_.notifyOne(); }
171

            
172
  void notifyAll() ABSL_LOCKS_EXCLUDED(mutex_) { cond_.notifyAll(); }
173

            
174
  Thread::CondVar cond_;
175
};
176

            
177
using Singleton = Envoy::ThreadSafeSingleton<ProtocolClient>;
178

            
179
} // namespace PrivilegedService
180
} // namespace Cilium
181
} // namespace Envoy