1
#include "source/extensions/access_loggers/grpc/grpc_access_log_impl.h"
2

            
3
#include "envoy/data/accesslog/v3/accesslog.pb.h"
4
#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h"
5
#include "envoy/grpc/async_client_manager.h"
6
#include "envoy/local_info/local_info.h"
7

            
8
#include "source/common/config/utility.h"
9
#include "source/common/grpc/typed_async_client.h"
10
#include "source/extensions/access_loggers/common/grpc_access_logger_clients.h"
11

            
12
const char GRPC_LOG_STATS_PREFIX[] = "access_logs.grpc_access_log.";
13

            
14
namespace Envoy {
15
namespace Extensions {
16
namespace AccessLoggers {
17
namespace GrpcCommon {
18

            
19
GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
20
    const Grpc::RawAsyncClientSharedPtr& client,
21
    const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
22
    Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope)
23
70
    : GrpcAccessLogger(config, dispatcher, scope, GRPC_LOG_STATS_PREFIX,
24
70
                       std::make_unique<Common::StreamingGrpcAccessLogClient<
25
70
                           envoy::service::accesslog::v3::StreamAccessLogsMessage,
26
70
                           envoy::service::accesslog::v3::StreamAccessLogsResponse>>(
27
70
                           client,
28
70
                           *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
29
70
                               "envoy.service.accesslog.v3.AccessLogService.StreamAccessLogs"),
30
70
                           GrpcCommon::optionalRetryPolicy(config))),
31
70
      log_name_(config.log_name()), local_info_(local_info) {}
32

            
33
20
void GrpcAccessLoggerImpl::addEntry(envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry) {
34
20
  message_.mutable_http_logs()->mutable_log_entry()->Add(std::move(entry));
35
20
}
36

            
37
26
void GrpcAccessLoggerImpl::addEntry(envoy::data::accesslog::v3::TCPAccessLogEntry&& entry) {
38
26
  message_.mutable_tcp_logs()->mutable_log_entry()->Add(std::move(entry));
39
26
}
40

            
41
71
bool GrpcAccessLoggerImpl::isEmpty() {
42
71
  return !message_.has_http_logs() && !message_.has_tcp_logs();
43
71
}
44

            
45
33
void GrpcAccessLoggerImpl::initMessage() {
46
33
  auto* identifier = message_.mutable_identifier();
47
33
  *identifier->mutable_node() = local_info_.node();
48
33
  identifier->set_log_name(log_name_);
49
33
}
50

            
51
GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager,
52
                                                     Stats::Scope& scope,
53
                                                     ThreadLocal::SlotAllocator& tls,
54
                                                     const LocalInfo::LocalInfo& local_info)
55
32
    : GrpcAccessLoggerCache(async_client_manager, scope, tls), local_info_(local_info) {}
56

            
57
GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
58
    const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
59
68
    Event::Dispatcher& dispatcher) {
60
  // We pass skip_cluster_check=true to factoryForGrpcService in order to avoid throwing
61
  // exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster
62
  // availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in
63
  // the main thread if necessary.
64
68
  auto factory_or_error =
65
68
      async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, true);
66
68
  THROW_IF_NOT_OK_REF(factory_or_error.status());
67
68
  return std::make_shared<GrpcAccessLoggerImpl>(
68
68
      THROW_OR_RETURN_VALUE(factory_or_error.value()->createUncachedRawAsyncClient(),
69
68
                            Grpc::RawAsyncClientPtr),
70
68
      config, dispatcher, local_info_, scope_);
71
68
}
72

            
73
} // namespace GrpcCommon
74
} // namespace AccessLoggers
75
} // namespace Extensions
76
} // namespace Envoy