1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/api/api.h"
6
#include "envoy/common/random_generator.h"
7
#include "envoy/config/core/v3/base.pb.h"
8
#include "envoy/event/deferred_deletable.h"
9
#include "envoy/init/manager.h"
10
#include "envoy/singleton/instance.h"
11
#include "envoy/thread_local/thread_local.h"
12
#include "envoy/upstream/cluster_manager.h"
13

            
14
#include "source/common/common/backoff_strategy.h"
15
#include "source/common/common/empty_string.h"
16
#include "source/common/common/enum_to_int.h"
17
#include "source/common/config/datasource.h"
18
#include "source/common/config/remote_data_fetcher.h"
19
#include "source/common/config/watched_directory.h"
20
#include "source/common/init/target_impl.h"
21

            
22
#include "absl/types/optional.h"
23

            
24
namespace Envoy {
25
namespace Config {
26
namespace DataSource {
27

            
28
template <class DataType> class DataSourceProvider;
29

            
30
using ProtoDataSource = envoy::config::core::v3::DataSource;
31
using ProtoWatchedDirectory = envoy::config::core::v3::WatchedDirectory;
32
template <class DataType>
33
using DataSourceProviderPtr = std::unique_ptr<DataSourceProvider<DataType>>;
34
template <class DataType>
35
using DataSourceProviderSharedPtr = std::shared_ptr<DataSourceProvider<DataType>>;
36

            
37
/**
38
 * Read contents of the file.
39
 * @param path file path.
40
 * @param api reference to the Api.
41
 * @param allow_empty return an empty string if the file is empty.
42
 * @param max_size max size limit of file to read, default 0 means no limit, and if the file data
43
 * would exceed the limit, it will return an error status.
44
 * @return std::string with file contents. or an error status if the file does not exist or
45
 * cannot be read.
46
 */
47
absl::StatusOr<std::string> readFile(const std::string& path, Api::Api& api, bool allow_empty,
48
                                     uint64_t max_size = 0);
49

            
50
/**
51
 * Read contents of the DataSource.
52
 * @param source data source.
53
 * @param allow_empty return an empty string if no DataSource case is specified.
54
 * @param api reference to the Api.
55
 * @param max_size max size limit of file to read, default 0 means no limit, and if the file data
56
 * would exceed the limit, it will return an error status.
57
 * @return std::string with DataSource contents. or an error status if no DataSource case is
58
 * specified and !allow_empty.
59
 */
60
absl::StatusOr<std::string> read(const envoy::config::core::v3::DataSource& source,
61
                                 bool allow_empty, Api::Api& api, uint64_t max_size = 0);
62

            
63
/**
64
 * @param source data source.
65
 * @return absl::optional<std::string> path to DataSource if a filename, otherwise absl::nullopt.
66
 */
67
absl::optional<std::string> getPath(const envoy::config::core::v3::DataSource& source);
68

            
69
template <class DataType>
70
using DataTransform = std::function<absl::StatusOr<std::shared_ptr<DataType>>(absl::string_view)>;
71

            
72
struct ProviderOptions {
73
  // Use an empty string if no DataSource case is specified.
74
  bool allow_empty{false};
75
  // Limit of file to read, default 0 means no limit.
76
  uint64_t max_size{0};
77
  // Watch for file modifications.
78
  bool modify_watch{false};
79
  // Hash content before transforming, and skip transforming if hash is the same.
80
  bool hash_content{false};
81
};
82

            
83
// DynamicData registers the file watches and a thread local slot. This class
84
// must be created and deleted on the dispatcher thread.
85
template <class DataType> class DynamicData {
86
public:
87
  struct ThreadLocalData : public ThreadLocal::ThreadLocalObject {
88
19
    ThreadLocalData(std::shared_ptr<DataType> data) : data_(std::move(data)) {}
89
    std::shared_ptr<DataType> data_;
90
  };
91

            
92
  DynamicData(const ProtoDataSource& source, Event::Dispatcher& main_dispatcher,
93
              ThreadLocal::SlotAllocator& tls, Api::Api& api,
94
              DataTransform<DataType> data_transform_cb, const ProviderOptions& options,
95
              std::shared_ptr<DataType> initial_data, uint64_t initial_hash,
96
              absl::AnyInvocable<void()> cleanup, absl::Status& creation_status)
97
17
      : dispatcher_(main_dispatcher), api_(api), options_(options), filename_(source.filename()),
98
17
        data_transform_(data_transform_cb), hash_(initial_hash), cleanup_(std::move(cleanup)) {
99
17
    slot_ =
100
17
        ThreadLocal::TypedSlot<typename DynamicData<DataType>::ThreadLocalData>::makeUnique(tls);
101
19
    slot_->set([initial_data = std::move(initial_data)](Event::Dispatcher&) {
102
19
      return std::make_shared<typename DynamicData<DataType>::ThreadLocalData>(initial_data);
103
19
    });
104

            
105
17
    if (source.has_watched_directory()) {
106
11
      auto directory_watcher_or_error =
107
11
          WatchedDirectory::create(source.watched_directory(), main_dispatcher);
108
11
      SET_AND_RETURN_IF_NOT_OK(directory_watcher_or_error.status(), creation_status);
109
10
      watcher_ = *std::move(directory_watcher_or_error);
110
10
      watcher_->setCallback([this]() { return onWatchUpdate(); });
111
10
    }
112

            
113
16
    if (options.modify_watch) {
114
6
      modify_watcher_ = main_dispatcher.createFilesystemWatcher();
115
6
      SET_AND_RETURN_IF_NOT_OK(
116
6
          modify_watcher_->addWatch(filename_, Filesystem::Watcher::Events::Modified,
117
6
                                    [this](uint32_t) { return onWatchUpdate(); }),
118
6
          creation_status);
119
6
    }
120
16
  }
121

            
122
17
  ~DynamicData() {
123
17
    if (cleanup_) {
124
5
      cleanup_();
125
5
    }
126
17
  }
127

            
128
62
  std::shared_ptr<DataType> data() const {
129
62
    const auto thread_local_data = slot_->get();
130
62
    return thread_local_data.has_value() ? thread_local_data->data_ : nullptr;
131
62
  }
132

            
133
17
  Event::Dispatcher& dispatcher() { return dispatcher_; }
134

            
135
private:
136
10
  absl::Status onWatchUpdate() {
137
10
    auto new_data_or_error = readFile(filename_, api_, options_.allow_empty, options_.max_size);
138
10
    if (!new_data_or_error.ok()) {
139
      // Log an error but don't fail the watch to avoid throwing EnvoyException at runtime.
140
2
      ENVOY_LOG_TO_LOGGER(Logger::Registry::getLog(Logger::Id::config), error,
141
2
                          "Failed to read file: {}", new_data_or_error.status().message());
142
2
      return absl::OkStatus();
143
2
    }
144
8
    uint64_t new_hash;
145
8
    if (options_.hash_content) {
146
2
      new_hash = HashUtil::xxHash64(*new_data_or_error);
147
2
      if (new_hash == hash_) {
148
1
        return absl::OkStatus();
149
1
      }
150
2
    }
151
7
    auto transformed_new_data_or_error = data_transform_(new_data_or_error.value());
152
7
    if (!transformed_new_data_or_error.ok()) {
153
      // Log an error but don't fail the watch to avoid throwing EnvoyException at runtime.
154
1
      ENVOY_LOG_TO_LOGGER(Logger::Registry::getLog(Logger::Id::config), error,
155
1
                          "Failed to transform data from file '{}': {}", filename_,
156
1
                          transformed_new_data_or_error.status().message());
157
1
      return absl::OkStatus();
158
1
    }
159
6
    if (options_.hash_content) {
160
1
      hash_ = new_hash;
161
1
    }
162

            
163
6
    slot_->runOnAllThreads([new_data = std::move(transformed_new_data_or_error.value())](
164
7
                               OptRef<typename DynamicData<DataType>::ThreadLocalData> obj) {
165
7
      if (obj.has_value()) {
166
7
        obj->data_ = new_data;
167
7
      }
168
7
    });
169
6
    return absl::OkStatus();
170
7
  }
171

            
172
  Event::Dispatcher& dispatcher_;
173
  Api::Api& api_;
174
  const ProviderOptions options_;
175
  const std::string filename_;
176
  DataTransform<DataType> data_transform_;
177
  uint64_t hash_;
178
  absl::AnyInvocable<void()> cleanup_;
179
  ThreadLocal::TypedSlotPtr<ThreadLocalData> slot_;
180
  WatchedDirectoryPtr watcher_;
181
  Filesystem::WatcherPtr modify_watcher_;
182
};
183

            
184
/** Checks whether data source uses file watching. */
185
bool usesFileWatching(const ProtoDataSource& source, const ProviderOptions& options);
186

            
187
/**
188
 * DataSourceProvider provides a way to get the DataSource contents and watch the possible
189
 * content changes. The watch only works for filename-based DataSource and watched directory
190
 * is provided explicitly or with modify-watch enabled.
191
 *
192
 * NOTE: This should only be used when the envoy.config.core.v3.DataSource is necessary and
193
 * file watch is required.
194
 */
195
template <class DataType> class DataSourceProvider {
196
public:
197
  /**
198
   * Create a DataSourceProvider from a DataSource.
199
   * @param source data source.
200
   * @param main_dispatcher reference to the main dispatcher.
201
   * @param tls reference to the thread local slot allocator.
202
   * @param api reference to the Api.
203
   * @param allow_empty return an empty string if no DataSource case is specified.
204
   * @param data_transform_cb transforms content of the DataSource (type std::string)
205
   *        to the desired `DataType` type.
206
   * @param max_size max size limit of file to read, default 0 means no limit.
207
   * @return absl::StatusOr<DataSourceProvider> with DataSource contents. or an error
208
   * status if any error occurs.
209
   * NOTE: If file watch is enabled and the new file content does not meet the
210
   * requirements (allow_empty, max_size), the provider will keep the old content.
211
   */
212
  static absl::StatusOr<DataSourceProviderPtr<DataType>>
213
  create(const ProtoDataSource& source, Event::Dispatcher& main_dispatcher,
214
         ThreadLocal::SlotAllocator& tls, Api::Api& api, bool allow_empty,
215
615
         DataTransform<DataType> data_transform_cb, uint64_t max_size) {
216
615
    return create(source, main_dispatcher, tls, api, data_transform_cb,
217
615
                  {.allow_empty = allow_empty, .max_size = max_size});
218
615
  }
219

            
220
  static absl::StatusOr<DataSourceProviderPtr<DataType>>
221
  create(const ProtoDataSource& source, Event::Dispatcher& main_dispatcher,
222
         ThreadLocal::SlotAllocator& tls, Api::Api& api, DataTransform<DataType> data_transform_cb,
223
633
         const ProviderOptions& options, absl::AnyInvocable<void()> cleanup = {}) {
224
633
    uint64_t max_size = options.max_size;
225
633
    auto initial_data_or_error = read(source, options.allow_empty, api, max_size);
226
633
    RETURN_IF_NOT_OK_REF(initial_data_or_error.status());
227

            
228
    // read() only validates the size of the file and does not check the size of inline data.
229
    // We check the size of inline data here.
230
    // TODO(wbpcode): consider moving this check to read() to avoid duplicate checks.
231
599
    if (max_size > 0 && initial_data_or_error.value().length() > max_size) {
232
2
      return absl::InvalidArgumentError(fmt::format("response body size is {} bytes; maximum is {}",
233
2
                                                    initial_data_or_error.value().length(),
234
2
                                                    max_size));
235
2
    }
236
597
    auto transformed_data_or_error = data_transform_cb(initial_data_or_error.value());
237
597
    RETURN_IF_NOT_OK_REF(transformed_data_or_error.status());
238

            
239
589
    if (!usesFileWatching(source, options)) {
240
572
      return std::unique_ptr<DataSourceProvider<DataType>>(
241
572
          new DataSourceProvider<DataType>(std::move(*transformed_data_or_error.value())));
242
572
    }
243

            
244
17
    absl::Status creation_status = absl::OkStatus();
245
17
    const uint64_t hash = options.hash_content ? HashUtil::xxHash64(*initial_data_or_error) : 0;
246
17
    auto ret = std::unique_ptr<DataSourceProvider>(new DataSourceProvider<DataType>(
247
17
        std::make_unique<DynamicData<DataType>>(source, main_dispatcher, tls, api,
248
17
                                                data_transform_cb, options,
249
17
                                                std::move(transformed_data_or_error).value(), hash,
250
17
                                                std::move(cleanup), creation_status)));
251
17
    RETURN_IF_NOT_OK(creation_status);
252
16
    return std::move(ret);
253
17
  }
254

            
255
633
  std::shared_ptr<DataType> data() const {
256
633
    if (absl::holds_alternative<std::shared_ptr<DataType>>(data_)) {
257
571
      return absl::get<std::shared_ptr<DataType>>(data_);
258
571
    }
259
62
    return absl::get<std::unique_ptr<DynamicData<DataType>>>(data_)->data();
260
633
  }
261

            
262
589
  ~DataSourceProvider() {
263
589
    if (absl::holds_alternative<std::unique_ptr<DynamicData<DataType>>>(data_)) {
264
      // Schedule destruction on the dispatcher thread. This ensures that close()
265
      // stops any inotify events on the same thread.
266
17
      std::unique_ptr<DynamicData<DataType>> data =
267
17
          std::move(absl::get<std::unique_ptr<DynamicData<DataType>>>(data_));
268
17
      Event::Dispatcher& dispatcher = data->dispatcher();
269
17
      if (!dispatcher.isThreadSafe()) {
270
        dispatcher.post([to_delete = std::move(data)] {});
271
      }
272
17
    }
273
589
  }
274

            
275
private:
276
572
  DataSourceProvider(DataType&& data) : data_(std::make_shared<DataType>(std::move(data))) {}
277
17
  DataSourceProvider(std::unique_ptr<DynamicData<DataType>> data) : data_(std::move(data)) {}
278

            
279
  absl::variant<std::shared_ptr<DataType>, std::unique_ptr<DynamicData<DataType>>> data_;
280
};
281

            
282
/**
283
 * ProviderSingleton allows sharing of dynamic DataSourceProviders using a process-wide
284
 * singleton instance. Only providers that rely on the file watching are shared, the rest
285
 * are created on-demand. This singleton reduces the resource pressure on the file watchers
286
 * and storage needed by the data type.
287
 */
288
template <class DataType>
289
class ProviderSingleton : public Singleton::Instance,
290
                          public std::enable_shared_from_this<ProviderSingleton<DataType>> {
291
public:
292
  ProviderSingleton(Event::Dispatcher& main_dispatcher, ThreadLocal::SlotAllocator& tls,
293
                    Api::Api& api, DataTransform<DataType> data_transform_cb,
294
                    const ProviderOptions& options)
295
6
      : dispatcher_(main_dispatcher), tls_(tls), api_(api), data_transform_(data_transform_cb),
296
6
        options_(options) {}
297

            
298
17
  absl::StatusOr<DataSourceProviderSharedPtr<DataType>> getOrCreate(const ProtoDataSource& source) {
299
17
    ASSERT_IS_MAIN_OR_TEST_THREAD();
300
17
    if (!usesFileWatching(source, options_)) {
301
3
      return DataSourceProvider<DataType>::create(source, dispatcher_, tls_, api_, data_transform_,
302
3
                                                  options_);
303
3
    }
304
14
    const size_t config_hash = MessageUtil::hash(source);
305
14
    auto it = dynamic_providers_.find(config_hash);
306
14
    if (it != dynamic_providers_.end()) {
307
2
      auto locked_provider = it->second.lock();
308
2
      if (locked_provider) {
309
2
        return locked_provider;
310
2
      }
311
2
    }
312

            
313
    // Cleanup is guaranteed to execute on the main dispatcher during destruction but may happen
314
    // after the singleton is released.
315
12
    auto provider_or_error = DataSourceProvider<DataType>::create(
316
12
        source, dispatcher_, tls_, api_, data_transform_, options_,
317
12
        [weak_this = this->weak_from_this(), config_hash] {
318
5
          if (auto locked_this = weak_this.lock(); locked_this) {
319
4
            locked_this->cleanup(config_hash);
320
4
          }
321
5
        });
322
12
    RETURN_IF_NOT_OK(provider_or_error.status());
323
5
    DataSourceProviderSharedPtr<DataType> new_provider = *std::move(provider_or_error);
324
5
    dynamic_providers_[config_hash] = new_provider;
325
5
    return new_provider;
326
12
  }
327

            
328
private:
329
4
  void cleanup(size_t key) {
330
4
    auto it = dynamic_providers_.find(key);
331
4
    if (it != dynamic_providers_.end() && it->second.expired()) {
332
4
      dynamic_providers_.erase(it);
333
4
    }
334
4
  }
335
  Event::Dispatcher& dispatcher_;
336
  ThreadLocal::SlotAllocator& tls_;
337
  Api::Api& api_;
338
  DataTransform<DataType> data_transform_;
339
  const ProviderOptions options_;
340
  absl::flat_hash_map<size_t, std::weak_ptr<DataSourceProvider<DataType>>> dynamic_providers_;
341
};
342

            
343
} // namespace DataSource
344
} // namespace Config
345
} // namespace Envoy