LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/filesystem - filesystem_subscription_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 37 115 32.2 %
Date: 2024-01-05 06:35:25 Functions: 4 11 36.4 %

          Line data    Source code
       1             : #include "source/extensions/config_subscription/filesystem/filesystem_subscription_impl.h"
       2             : 
       3             : #include "envoy/service/discovery/v3/discovery.pb.h"
       4             : 
       5             : #include "source/common/common/macros.h"
       6             : #include "source/common/common/utility.h"
       7             : #include "source/common/config/decoded_resource_impl.h"
       8             : #include "source/common/config/utility.h"
       9             : #include "source/common/protobuf/protobuf.h"
      10             : #include "source/common/protobuf/utility.h"
      11             : 
      12             : namespace Envoy {
      13             : namespace Config {
      14             : 
      15           0 : envoy::config::core::v3::PathConfigSource makePathConfigSource(const std::string& path) {
      16           0 :   envoy::config::core::v3::PathConfigSource path_config_source;
      17           0 :   path_config_source.set_path(path);
      18           0 :   return path_config_source;
      19           0 : }
      20             : 
      21             : FilesystemSubscriptionImpl::FilesystemSubscriptionImpl(
      22             :     Event::Dispatcher& dispatcher,
      23             :     const envoy::config::core::v3::PathConfigSource& path_config_source,
      24             :     SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
      25             :     SubscriptionStats stats, ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api)
      26             :     : path_(path_config_source.path()), callbacks_(callbacks), resource_decoder_(resource_decoder),
      27          70 :       stats_(stats), api_(api), validation_visitor_(validation_visitor) {
      28          70 :   if (!path_config_source.has_watched_directory()) {
      29          70 :     file_watcher_ = dispatcher.createFilesystemWatcher();
      30          70 :     file_watcher_->addWatch(path_, Filesystem::Watcher::Events::MovedTo, [this](uint32_t) {
      31           0 :       if (started_) {
      32           0 :         refresh();
      33           0 :       }
      34           0 :     });
      35          70 :   } else {
      36           0 :     directory_watcher_ =
      37           0 :         std::make_unique<WatchedDirectory>(path_config_source.watched_directory(), dispatcher);
      38           0 :     directory_watcher_->setCallback([this]() {
      39           0 :       if (started_) {
      40           0 :         refresh();
      41           0 :       }
      42           0 :     });
      43           0 :   }
      44          70 : }
      45             : 
      46             : // Config::Subscription
      47          70 : void FilesystemSubscriptionImpl::start(const absl::flat_hash_set<std::string>&) {
      48          70 :   started_ = true;
      49             :   // Attempt to read in case there is a file there already.
      50          70 :   refresh();
      51          70 : }
      52             : 
      53           0 : void FilesystemSubscriptionImpl::updateResourceInterest(const absl::flat_hash_set<std::string>&) {
      54             :   // Bump stats for consistent behavior with other xDS.
      55           0 :   stats_.update_attempt_.inc();
      56           0 : }
      57             : 
      58             : void FilesystemSubscriptionImpl::configRejected(const EnvoyException& e,
      59           0 :                                                 const std::string& message) {
      60           0 :   ENVOY_LOG(warn, "Filesystem config update rejected: {}", e.what());
      61           0 :   ENVOY_LOG(debug, "Failed configuration:\n{}", message);
      62           0 :   stats_.update_rejected_.inc();
      63           0 :   callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
      64           0 : }
      65             : 
      66          70 : std::string FilesystemSubscriptionImpl::refreshInternal(ProtobufTypes::MessagePtr* config_update) {
      67          70 :   auto owned_message = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
      68          70 :   auto& message = *owned_message;
      69          70 :   MessageUtil::loadFromFile(path_, message, validation_visitor_, api_);
      70          70 :   *config_update = std::move(owned_message);
      71          70 :   const auto decoded_resources =
      72          70 :       DecodedResourcesWrapper(*resource_decoder_, message.resources(), message.version_info());
      73          70 :   THROW_IF_NOT_OK(callbacks_.onConfigUpdate(decoded_resources.refvec_, message.version_info()));
      74          70 :   return message.version_info();
      75          70 : }
      76             : 
      77          70 : void FilesystemSubscriptionImpl::refresh() {
      78          70 :   ENVOY_LOG(debug, "Filesystem config refresh for {}", path_);
      79          70 :   stats_.update_attempt_.inc();
      80          70 :   ProtobufTypes::MessagePtr config_update;
      81          70 :   TRY_ASSERT_MAIN_THREAD {
      82          70 :     const std::string version = refreshInternal(&config_update);
      83          70 :     stats_.update_time_.set(DateUtil::nowToMilliseconds(api_.timeSource()));
      84          70 :     stats_.version_.set(HashUtil::xxHash64(version));
      85          70 :     stats_.version_text_.set(version);
      86          70 :     stats_.update_success_.inc();
      87          70 :     ENVOY_LOG(debug, "Filesystem config update accepted for {}: {}", path_,
      88          70 :               config_update->DebugString());
      89          70 :   }
      90          70 :   END_TRY
      91          70 :   catch (const ProtobufMessage::UnknownProtoFieldException& e) {
      92           0 :     configRejected(e, config_update == nullptr ? "" : config_update->DebugString());
      93           0 :   }
      94          70 :   catch (const EnvoyException& e) {
      95           0 :     if (config_update != nullptr) {
      96           0 :       configRejected(e, config_update->DebugString());
      97           0 :     } else {
      98           0 :       ENVOY_LOG(warn, "Filesystem config update failure: in {}, {}", path_, e.what());
      99           0 :       stats_.update_failure_.inc();
     100             :       // This could happen due to filesystem issues or a bad configuration (e.g. proto validation).
     101             :       // Since the latter is more likely, for now we will treat it as rejection.
     102           0 :       callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
     103           0 :     }
     104           0 :   }
     105          70 : }
     106             : 
     107             : FilesystemCollectionSubscriptionImpl::FilesystemCollectionSubscriptionImpl(
     108             :     Event::Dispatcher& dispatcher,
     109             :     const envoy::config::core::v3::PathConfigSource& path_config_source,
     110             :     SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
     111             :     SubscriptionStats stats, ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api)
     112             :     : FilesystemSubscriptionImpl(dispatcher, path_config_source, callbacks, resource_decoder, stats,
     113           0 :                                  validation_visitor, api) {}
     114             : 
     115             : std::string
     116           0 : FilesystemCollectionSubscriptionImpl::refreshInternal(ProtobufTypes::MessagePtr* config_update) {
     117           0 :   auto owned_resource_message = std::make_unique<envoy::service::discovery::v3::Resource>();
     118           0 :   auto& resource_message = *owned_resource_message;
     119           0 :   MessageUtil::loadFromFile(path_, resource_message, validation_visitor_, api_);
     120             :   // Dynamically load the collection message.
     121           0 :   const std::string collection_type =
     122           0 :       std::string(TypeUtil::typeUrlToDescriptorFullName(resource_message.resource().type_url()));
     123           0 :   const Protobuf::Descriptor* collection_descriptor =
     124           0 :       Protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(collection_type);
     125           0 :   if (collection_descriptor == nullptr) {
     126           0 :     throw EnvoyException(fmt::format("Unknown collection type {}", collection_type));
     127           0 :   }
     128           0 :   Protobuf::DynamicMessageFactory dmf;
     129           0 :   ProtobufTypes::MessagePtr collection_message;
     130           0 :   collection_message.reset(dmf.GetPrototype(collection_descriptor)->New());
     131           0 :   MessageUtil::unpackTo(resource_message.resource(), *collection_message);
     132           0 :   const auto* collection_entries_field_descriptor = collection_descriptor->field(0);
     133             :   // Verify collection message type structure.
     134           0 :   if (collection_entries_field_descriptor == nullptr ||
     135           0 :       collection_entries_field_descriptor->type() != Protobuf::FieldDescriptor::TYPE_MESSAGE ||
     136           0 :       collection_entries_field_descriptor->message_type()->full_name() !=
     137           0 :           "xds.core.v3.CollectionEntry" ||
     138           0 :       !collection_entries_field_descriptor->is_repeated()) {
     139           0 :     throw EnvoyException(fmt::format("Invalid structure for collection type {} in {}",
     140           0 :                                      collection_type, resource_message.DebugString()));
     141           0 :   }
     142           0 :   const auto* reflection = collection_message->GetReflection();
     143           0 :   const uint32_t num_entries =
     144           0 :       reflection->FieldSize(*collection_message, collection_entries_field_descriptor);
     145           0 :   DecodedResourcesWrapper decoded_resources;
     146           0 :   for (uint32_t i = 0; i < num_entries; ++i) {
     147           0 :     xds::core::v3::CollectionEntry collection_entry;
     148           0 :     collection_entry.MergeFrom(reflection->GetRepeatedMessage(
     149           0 :         *collection_message, collection_entries_field_descriptor, i));
     150             :     // TODO(htuch): implement indirect collection entries.
     151           0 :     if (collection_entry.has_inline_entry()) {
     152           0 :       decoded_resources.pushBack(std::make_unique<DecodedResourceImpl>(
     153           0 :           *resource_decoder_, collection_entry.inline_entry()));
     154           0 :     }
     155           0 :   }
     156           0 :   *config_update = std::move(owned_resource_message);
     157           0 :   THROW_IF_NOT_OK(callbacks_.onConfigUpdate(decoded_resources.refvec_, resource_message.version()));
     158           0 :   return resource_message.version();
     159           0 : }
     160             : 
     161             : REGISTER_FACTORY(FilesystemSubscriptionFactory, ConfigSubscriptionFactory);
     162             : REGISTER_FACTORY(FilesystemCollectionSubscriptionFactory, ConfigSubscriptionFactory);
     163             : 
     164             : } // namespace Config
     165             : } // namespace Envoy

Generated by: LCOV version 1.15