Line data Source code
1 : #include "source/extensions/config_subscription/filesystem/filesystem_subscription_impl.h"
2 :
3 : #include "envoy/service/discovery/v3/discovery.pb.h"
4 :
5 : #include "source/common/common/macros.h"
6 : #include "source/common/common/utility.h"
7 : #include "source/common/config/decoded_resource_impl.h"
8 : #include "source/common/config/utility.h"
9 : #include "source/common/protobuf/protobuf.h"
10 : #include "source/common/protobuf/utility.h"
11 :
12 : namespace Envoy {
13 : namespace Config {
14 :
15 0 : envoy::config::core::v3::PathConfigSource makePathConfigSource(const std::string& path) {
16 0 : envoy::config::core::v3::PathConfigSource path_config_source;
17 0 : path_config_source.set_path(path);
18 0 : return path_config_source;
19 0 : }
20 :
21 : FilesystemSubscriptionImpl::FilesystemSubscriptionImpl(
22 : Event::Dispatcher& dispatcher,
23 : const envoy::config::core::v3::PathConfigSource& path_config_source,
24 : SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
25 : SubscriptionStats stats, ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api)
26 : : path_(path_config_source.path()), callbacks_(callbacks), resource_decoder_(resource_decoder),
27 70 : stats_(stats), api_(api), validation_visitor_(validation_visitor) {
28 70 : if (!path_config_source.has_watched_directory()) {
29 70 : file_watcher_ = dispatcher.createFilesystemWatcher();
30 70 : file_watcher_->addWatch(path_, Filesystem::Watcher::Events::MovedTo, [this](uint32_t) {
31 0 : if (started_) {
32 0 : refresh();
33 0 : }
34 0 : });
35 70 : } else {
36 0 : directory_watcher_ =
37 0 : std::make_unique<WatchedDirectory>(path_config_source.watched_directory(), dispatcher);
38 0 : directory_watcher_->setCallback([this]() {
39 0 : if (started_) {
40 0 : refresh();
41 0 : }
42 0 : });
43 0 : }
44 70 : }
45 :
46 : // Config::Subscription
47 70 : void FilesystemSubscriptionImpl::start(const absl::flat_hash_set<std::string>&) {
48 70 : started_ = true;
49 : // Attempt to read in case there is a file there already.
50 70 : refresh();
51 70 : }
52 :
53 0 : void FilesystemSubscriptionImpl::updateResourceInterest(const absl::flat_hash_set<std::string>&) {
54 : // Bump stats for consistent behavior with other xDS.
55 0 : stats_.update_attempt_.inc();
56 0 : }
57 :
58 : void FilesystemSubscriptionImpl::configRejected(const EnvoyException& e,
59 0 : const std::string& message) {
60 0 : ENVOY_LOG(warn, "Filesystem config update rejected: {}", e.what());
61 0 : ENVOY_LOG(debug, "Failed configuration:\n{}", message);
62 0 : stats_.update_rejected_.inc();
63 0 : callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
64 0 : }
65 :
66 70 : std::string FilesystemSubscriptionImpl::refreshInternal(ProtobufTypes::MessagePtr* config_update) {
67 70 : auto owned_message = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
68 70 : auto& message = *owned_message;
69 70 : MessageUtil::loadFromFile(path_, message, validation_visitor_, api_);
70 70 : *config_update = std::move(owned_message);
71 70 : const auto decoded_resources =
72 70 : DecodedResourcesWrapper(*resource_decoder_, message.resources(), message.version_info());
73 70 : THROW_IF_NOT_OK(callbacks_.onConfigUpdate(decoded_resources.refvec_, message.version_info()));
74 70 : return message.version_info();
75 70 : }
76 :
77 70 : void FilesystemSubscriptionImpl::refresh() {
78 70 : ENVOY_LOG(debug, "Filesystem config refresh for {}", path_);
79 70 : stats_.update_attempt_.inc();
80 70 : ProtobufTypes::MessagePtr config_update;
81 70 : TRY_ASSERT_MAIN_THREAD {
82 70 : const std::string version = refreshInternal(&config_update);
83 70 : stats_.update_time_.set(DateUtil::nowToMilliseconds(api_.timeSource()));
84 70 : stats_.version_.set(HashUtil::xxHash64(version));
85 70 : stats_.version_text_.set(version);
86 70 : stats_.update_success_.inc();
87 70 : ENVOY_LOG(debug, "Filesystem config update accepted for {}: {}", path_,
88 70 : config_update->DebugString());
89 70 : }
90 70 : END_TRY
91 70 : catch (const ProtobufMessage::UnknownProtoFieldException& e) {
92 0 : configRejected(e, config_update == nullptr ? "" : config_update->DebugString());
93 0 : }
94 70 : catch (const EnvoyException& e) {
95 0 : if (config_update != nullptr) {
96 0 : configRejected(e, config_update->DebugString());
97 0 : } else {
98 0 : ENVOY_LOG(warn, "Filesystem config update failure: in {}, {}", path_, e.what());
99 0 : stats_.update_failure_.inc();
100 : // This could happen due to filesystem issues or a bad configuration (e.g. proto validation).
101 : // Since the latter is more likely, for now we will treat it as rejection.
102 0 : callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
103 0 : }
104 0 : }
105 70 : }
106 :
107 : FilesystemCollectionSubscriptionImpl::FilesystemCollectionSubscriptionImpl(
108 : Event::Dispatcher& dispatcher,
109 : const envoy::config::core::v3::PathConfigSource& path_config_source,
110 : SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
111 : SubscriptionStats stats, ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api)
112 : : FilesystemSubscriptionImpl(dispatcher, path_config_source, callbacks, resource_decoder, stats,
113 0 : validation_visitor, api) {}
114 :
115 : std::string
116 0 : FilesystemCollectionSubscriptionImpl::refreshInternal(ProtobufTypes::MessagePtr* config_update) {
117 0 : auto owned_resource_message = std::make_unique<envoy::service::discovery::v3::Resource>();
118 0 : auto& resource_message = *owned_resource_message;
119 0 : MessageUtil::loadFromFile(path_, resource_message, validation_visitor_, api_);
120 : // Dynamically load the collection message.
121 0 : const std::string collection_type =
122 0 : std::string(TypeUtil::typeUrlToDescriptorFullName(resource_message.resource().type_url()));
123 0 : const Protobuf::Descriptor* collection_descriptor =
124 0 : Protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(collection_type);
125 0 : if (collection_descriptor == nullptr) {
126 0 : throw EnvoyException(fmt::format("Unknown collection type {}", collection_type));
127 0 : }
128 0 : Protobuf::DynamicMessageFactory dmf;
129 0 : ProtobufTypes::MessagePtr collection_message;
130 0 : collection_message.reset(dmf.GetPrototype(collection_descriptor)->New());
131 0 : MessageUtil::unpackTo(resource_message.resource(), *collection_message);
132 0 : const auto* collection_entries_field_descriptor = collection_descriptor->field(0);
133 : // Verify collection message type structure.
134 0 : if (collection_entries_field_descriptor == nullptr ||
135 0 : collection_entries_field_descriptor->type() != Protobuf::FieldDescriptor::TYPE_MESSAGE ||
136 0 : collection_entries_field_descriptor->message_type()->full_name() !=
137 0 : "xds.core.v3.CollectionEntry" ||
138 0 : !collection_entries_field_descriptor->is_repeated()) {
139 0 : throw EnvoyException(fmt::format("Invalid structure for collection type {} in {}",
140 0 : collection_type, resource_message.DebugString()));
141 0 : }
142 0 : const auto* reflection = collection_message->GetReflection();
143 0 : const uint32_t num_entries =
144 0 : reflection->FieldSize(*collection_message, collection_entries_field_descriptor);
145 0 : DecodedResourcesWrapper decoded_resources;
146 0 : for (uint32_t i = 0; i < num_entries; ++i) {
147 0 : xds::core::v3::CollectionEntry collection_entry;
148 0 : collection_entry.MergeFrom(reflection->GetRepeatedMessage(
149 0 : *collection_message, collection_entries_field_descriptor, i));
150 : // TODO(htuch): implement indirect collection entries.
151 0 : if (collection_entry.has_inline_entry()) {
152 0 : decoded_resources.pushBack(std::make_unique<DecodedResourceImpl>(
153 0 : *resource_decoder_, collection_entry.inline_entry()));
154 0 : }
155 0 : }
156 0 : *config_update = std::move(owned_resource_message);
157 0 : THROW_IF_NOT_OK(callbacks_.onConfigUpdate(decoded_resources.refvec_, resource_message.version()));
158 0 : return resource_message.version();
159 0 : }
160 :
161 : REGISTER_FACTORY(FilesystemSubscriptionFactory, ConfigSubscriptionFactory);
162 : REGISTER_FACTORY(FilesystemCollectionSubscriptionFactory, ConfigSubscriptionFactory);
163 :
164 : } // namespace Config
165 : } // namespace Envoy
|