1
#include "source/extensions/config_subscription/filesystem/filesystem_subscription_impl.h"
2

            
3
#include "envoy/service/discovery/v3/discovery.pb.h"
4

            
5
#include "source/common/common/macros.h"
6
#include "source/common/common/utility.h"
7
#include "source/common/config/decoded_resource_impl.h"
8
#include "source/common/config/utility.h"
9
#include "source/common/protobuf/protobuf.h"
10
#include "source/common/protobuf/utility.h"
11

            
12
namespace Envoy {
13
namespace Config {
14

            
15
22
envoy::config::core::v3::PathConfigSource makePathConfigSource(const std::string& path) {
16
22
  envoy::config::core::v3::PathConfigSource path_config_source;
17
22
  path_config_source.set_path(path);
18
22
  return path_config_source;
19
22
}
20

            
21
FilesystemSubscriptionImpl::FilesystemSubscriptionImpl(
22
    Event::Dispatcher& dispatcher,
23
    const envoy::config::core::v3::PathConfigSource& path_config_source,
24
    SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
25
    SubscriptionStats stats, ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api)
26
10212
    : path_(path_config_source.path()), callbacks_(callbacks), resource_decoder_(resource_decoder),
27
10212
      stats_(stats), api_(api), validation_visitor_(validation_visitor) {
28
10212
  if (!path_config_source.has_watched_directory()) {
29
10208
    file_watcher_ = dispatcher.createFilesystemWatcher();
30
10208
    THROW_IF_NOT_OK(
31
10208
        file_watcher_->addWatch(path_, Filesystem::Watcher::Events::MovedTo, [this](uint32_t) {
32
10208
          if (started_) {
33
10208
            refresh();
34
10208
          }
35
10208
          return absl::OkStatus();
36
10208
        }));
37
10212
  } else {
38
4
    directory_watcher_ = THROW_OR_RETURN_VALUE(
39
4
        WatchedDirectory::create(path_config_source.watched_directory(), dispatcher),
40
4
        std::unique_ptr<WatchedDirectory>);
41
4
    directory_watcher_->setCallback([this]() {
42
4
      if (started_) {
43
4
        refresh();
44
4
      }
45
4
      return absl::OkStatus();
46
4
    });
47
4
  }
48
10212
}
49

            
50
// Config::Subscription
51
10210
void FilesystemSubscriptionImpl::start(const absl::flat_hash_set<std::string>&) {
52
10210
  started_ = true;
53
  // Attempt to read in case there is a file there already.
54
10210
  refresh();
55
10210
}
56

            
57
1
void FilesystemSubscriptionImpl::updateResourceInterest(const absl::flat_hash_set<std::string>&) {
58
  // Bump stats for consistent behavior with other xDS.
59
1
  stats_.update_attempt_.inc();
60
1
}
61

            
62
void FilesystemSubscriptionImpl::configRejected(const EnvoyException& e,
63
24
                                                const std::string& message) {
64
24
  ENVOY_LOG(warn, "Filesystem config update rejected: {}", e.what());
65
24
  ENVOY_LOG(debug, "Failed configuration:\n{}", message);
66
24
  stats_.update_rejected_.inc();
67
24
  callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
68
24
}
69

            
70
10463
std::string FilesystemSubscriptionImpl::refreshInternal(ProtobufTypes::MessagePtr* config_update) {
71
10463
  auto owned_message = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
72
10463
  auto& message = *owned_message;
73
10463
  THROW_IF_NOT_OK(MessageUtil::loadFromFile(path_, message, validation_visitor_, api_));
74
10376
  *config_update = std::move(owned_message);
75
10376
  const auto decoded_resources =
76
10376
      THROW_OR_RETURN_VALUE(DecodedResourcesWrapper::create(*resource_decoder_, message.resources(),
77
10376
                                                            message.version_info()),
78
10376
                            std::unique_ptr<DecodedResourcesWrapper>);
79
10376
  THROW_IF_NOT_OK(callbacks_.onConfigUpdate(decoded_resources->refvec_, message.version_info()));
80
10369
  return message.version_info();
81
10376
}
82

            
83
10475
void FilesystemSubscriptionImpl::refresh() {
84
10475
  ENVOY_LOG(debug, "Filesystem config refresh for {}", path_);
85
10475
  stats_.update_attempt_.inc();
86
10475
  ProtobufTypes::MessagePtr config_update;
87
10475
  TRY_ASSERT_MAIN_THREAD {
88
10475
    const std::string version = refreshInternal(&config_update);
89
10475
    stats_.update_time_.set(DateUtil::nowToMilliseconds(api_.timeSource()));
90
10475
    stats_.version_.set(HashUtil::xxHash64(version));
91
10475
    stats_.version_text_.set(version);
92
10475
    stats_.update_success_.inc();
93
10475
    ENVOY_LOG(debug, "Filesystem config update accepted for {}: {}", path_,
94
10475
              config_update->DebugString());
95
10475
  }
96
10475
  END_TRY
97
10475
  catch (const EnvoyException& e) {
98
121
    if (config_update != nullptr) {
99
17
      configRejected(e, config_update->DebugString());
100
112
    } else if (absl::EndsWith(e.what(), "has unknown fields")) {
101
7
      configRejected(e, "");
102
103
    } else {
103
97
      ENVOY_LOG(warn, "Filesystem config update failure: in {}, {}", path_, e.what());
104
97
      stats_.update_failure_.inc();
105
      // This could happen due to filesystem issues or a bad configuration (e.g. proto validation).
106
      // Since the latter is more likely, for now we will treat it as rejection.
107
97
      callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
108
97
    }
109
121
  }
110
10475
}
111

            
112
FilesystemCollectionSubscriptionImpl::FilesystemCollectionSubscriptionImpl(
113
    Event::Dispatcher& dispatcher,
114
    const envoy::config::core::v3::PathConfigSource& path_config_source,
115
    SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
116
    SubscriptionStats stats, ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api)
117
6
    : FilesystemSubscriptionImpl(dispatcher, path_config_source, callbacks, resource_decoder, stats,
118
6
                                 validation_visitor, api) {}
119

            
120
std::string
121
12
FilesystemCollectionSubscriptionImpl::refreshInternal(ProtobufTypes::MessagePtr* config_update) {
122
12
  auto owned_resource_message = std::make_unique<envoy::service::discovery::v3::Resource>();
123
12
  auto& resource_message = *owned_resource_message;
124
12
  THROW_IF_NOT_OK(MessageUtil::loadFromFile(path_, resource_message, validation_visitor_, api_));
125
  // Dynamically load the collection message.
126
8
  const std::string collection_type =
127
8
      std::string(TypeUtil::typeUrlToDescriptorFullName(resource_message.resource().type_url()));
128
8
  const Protobuf::Descriptor* collection_descriptor =
129
8
      Protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(collection_type);
130
8
  if (collection_descriptor == nullptr) {
131
1
    throw EnvoyException(fmt::format("Unknown collection type {}", collection_type));
132
1
  }
133
7
  Protobuf::DynamicMessageFactory dmf;
134
7
  ProtobufTypes::MessagePtr collection_message;
135
7
  collection_message.reset(dmf.GetPrototype(collection_descriptor)->New());
136
7
  THROW_IF_NOT_OK(MessageUtil::unpackTo(resource_message.resource(), *collection_message));
137
7
  const auto* collection_entries_field_descriptor = collection_descriptor->field(0);
138
  // Verify collection message type structure.
139
7
  if (collection_entries_field_descriptor == nullptr ||
140
7
      collection_entries_field_descriptor->type() != Protobuf::FieldDescriptor::TYPE_MESSAGE ||
141
7
      collection_entries_field_descriptor->message_type()->full_name() !=
142
4
          "xds.core.v3.CollectionEntry" ||
143
7
      !collection_entries_field_descriptor->is_repeated()) {
144
1
    throw EnvoyException(fmt::format("Invalid structure for collection type {} in {}",
145
1
                                     collection_type, resource_message.DebugString()));
146
1
  }
147
6
  const auto* reflection = collection_message->GetReflection();
148
6
  const uint32_t num_entries =
149
6
      reflection->FieldSize(*collection_message, collection_entries_field_descriptor);
150
6
  DecodedResourcesWrapper decoded_resources;
151
10
  for (uint32_t i = 0; i < num_entries; ++i) {
152
4
    xds::core::v3::CollectionEntry collection_entry;
153
4
    collection_entry.MergeFrom(reflection->GetRepeatedMessage(
154
4
        *collection_message, collection_entries_field_descriptor, i));
155
    // TODO(htuch): implement indirect collection entries.
156
4
    if (collection_entry.has_inline_entry()) {
157
4
      decoded_resources.pushBack(std::make_unique<DecodedResourceImpl>(
158
4
          *resource_decoder_, collection_entry.inline_entry()));
159
4
    }
160
4
  }
161
6
  *config_update = std::move(owned_resource_message);
162
6
  THROW_IF_NOT_OK(callbacks_.onConfigUpdate(decoded_resources.refvec_, resource_message.version()));
163
6
  return resource_message.version();
164
6
}
165

            
166
REGISTER_FACTORY(FilesystemSubscriptionFactory, ConfigSubscriptionFactory);
167
REGISTER_FACTORY(FilesystemCollectionSubscriptionFactory, ConfigSubscriptionFactory);
168

            
169
} // namespace Config
170
} // namespace Envoy