1
#include "source/extensions/tracers/datadog/agent_http_client.h"
2

            
3
#include <cstddef>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7

            
8
#include "source/common/common/assert.h"
9
#include "source/common/http/message_impl.h"
10
#include "source/common/http/utility.h"
11
#include "source/extensions/tracers/datadog/dict_util.h"
12
#include "source/extensions/tracers/datadog/tracer_stats.h"
13

            
14
#include "absl/strings/str_format.h"
15
#include "datadog/dict_reader.h"
16
#include "datadog/dict_writer.h"
17
#include "datadog/error.h"
18
#include "nlohmann/json.hpp"
19

            
20
namespace Envoy {
21
namespace Extensions {
22
namespace Tracers {
23
namespace Datadog {
24

            
25
AgentHTTPClient::AgentHTTPClient(Upstream::ClusterManager& cluster_manager,
26
                                 const std::string& cluster, const std::string& reference_host,
27
                                 TracerStats& stats, TimeSource& time_source)
28
56
    : collector_cluster_(cluster_manager, cluster), cluster_(cluster),
29
56
      reference_host_(reference_host), stats_(stats), time_source_(time_source) {}
30

            
31
56
AgentHTTPClient::~AgentHTTPClient() {
32
56
  for (const auto& [request_ptr, _] : handlers_) {
33
5
    RELEASE_ASSERT(request_ptr,
34
5
                   "null Http::AsyncClient::Request* in handler map of Datadog::AgentHTTPClient");
35
5
    request_ptr->cancel();
36
5
  }
37
56
}
38

            
39
// datadog::tracing::HTTPClient
40

            
41
datadog::tracing::Expected<void>
42
AgentHTTPClient::post(const URL& url, HeadersSetter set_headers, std::string body,
43
                      ResponseHandler on_response, ErrorHandler on_error,
44
23
                      std::chrono::steady_clock::time_point deadline) {
45
23
  auto message = std::make_unique<Http::RequestMessageImpl>();
46
23
  Http::RequestHeaderMap& headers = message->headers();
47
23
  headers.setReferenceMethod(Http::Headers::get().MethodValues.Post);
48
23
  headers.setReferencePath(url.path);
49
23
  headers.setReferenceHost(reference_host_);
50

            
51
23
  RequestHeaderWriter writer{message->headers()};
52
23
  set_headers(writer);
53

            
54
23
  message->body().add(body);
55

            
56
23
  const auto timeout = std::chrono::duration_cast<std::chrono::milliseconds>(
57
23
      deadline - time_source_.monotonicTime());
58
23
  if (timeout.count() <= 0) {
59
2
    std::string message = "request deadline expired already";
60
2
    if (timeout.count() < 0) {
61
1
      message += ' ';
62
1
      message += std::to_string(-timeout.count());
63
1
      message += " milliseconds ago";
64
1
    }
65
2
    stats_.reports_dropped_.inc();
66
2
    return datadog::tracing::Error{datadog::tracing::Error::ENVOY_HTTP_CLIENT_FAILURE,
67
2
                                   std::move(message)};
68
2
  }
69

            
70
21
  ENVOY_LOG(debug, "submitting data to {} with payload size {} and {} ms timeout", url.path,
71
21
            body.size(), timeout.count());
72

            
73
21
  if (!collector_cluster_.threadLocalCluster().has_value()) {
74
3
    ENVOY_LOG(debug, "collector cluster '{}' does not exist", cluster_);
75
3
    stats_.reports_skipped_no_cluster_.inc();
76
3
    return datadog::tracing::nullopt;
77
3
  }
78

            
79
18
  Http::AsyncClient::Request* request =
80
18
      collector_cluster_.threadLocalCluster()->get().httpAsyncClient().send(
81
18
          std::move(message), *this, Http::AsyncClient::RequestOptions().setTimeout(timeout));
82
18
  if (!request) {
83
6
    stats_.reports_failed_.inc();
84
6
    return datadog::tracing::Error{datadog::tracing::Error::ENVOY_HTTP_CLIENT_FAILURE,
85
6
                                   "Failed to create request."};
86
6
  }
87

            
88
12
  handlers_.emplace(request, Handlers{std::move(on_response), std::move(on_error)});
89
12
  return datadog::tracing::nullopt;
90
18
}
91

            
92
34
void AgentHTTPClient::drain(std::chrono::steady_clock::time_point) {}
93

            
94
34
std::string AgentHTTPClient::config() const { return config_json().dump(); }
95

            
96
35
const nlohmann::json& AgentHTTPClient::config_json() const {
97
35
  static const nlohmann::json config = nlohmann::json::object({
98
35
      {"type", "Envoy::Extensions::Tracers::Datadog::AgentHTTPClient"},
99
35
  });
100
35
  return config;
101
35
}
102

            
103
// Http::AsyncClient::Callbacks
104

            
105
void AgentHTTPClient::onSuccess(const Http::AsyncClient::Request& request,
106
5
                                Http::ResponseMessagePtr&& response) {
107
5
  const std::uint64_t status = Http::Utility::getResponseStatus(response->headers());
108
5
  if (status != std::uint64_t(Http::Code::OK)) {
109
3
    stats_.reports_dropped_.inc();
110
3
  } else {
111
2
    ENVOY_LOG(debug, "traces successfully submitted to datadog agent");
112
2
    stats_.reports_sent_.inc();
113
2
  }
114

            
115
5
  auto found = handlers_.find(const_cast<Http::AsyncClient::Request*>(&request));
116
5
  if (found == handlers_.end()) {
117
2
    ENVOY_LOG(debug, "request at address 0x{} is not in the map of {} Handlers objects",
118
2
              absl::StrFormat("%p", &request), handlers_.size());
119
2
    return;
120
2
  }
121

            
122
3
  Handlers& handlers = found->second;
123
3
  ResponseHeaderReader reader{response->headers()};
124
3
  handlers.on_response(status, reader, response->bodyAsString());
125

            
126
3
  handlers_.erase(found);
127
3
}
128

            
129
void AgentHTTPClient::onFailure(const Http::AsyncClient::Request& request,
130
5
                                Http::AsyncClient::FailureReason reason) {
131
5
  auto found = handlers_.find(const_cast<Http::AsyncClient::Request*>(&request));
132
  // If the request failed to start, then `post` never even registered the request.
133
5
  if (found == handlers_.end()) {
134
1
    return;
135
1
  }
136

            
137
4
  stats_.reports_failed_.inc();
138

            
139
4
  Handlers& handlers = found->second;
140
4
  std::string message = "Failed to send request to Datadog Agent: ";
141
4
  switch (reason) {
142
2
  case Http::AsyncClient::FailureReason::Reset:
143
2
    message += "The stream has been reset.";
144
2
    break;
145
1
  case Http::AsyncClient::FailureReason::ExceedResponseBufferLimit:
146
1
    message += "The stream exceeds the response buffer limit.";
147
1
    break;
148
1
  default:
149
1
    message += "Unknown error.";
150
4
  }
151
4
  handlers.on_error(datadog::tracing::Error{datadog::tracing::Error::ENVOY_HTTP_CLIENT_FAILURE,
152
4
                                            std::move(message)});
153

            
154
4
  handlers_.erase(found);
155
4
}
156

            
157
1
void AgentHTTPClient::onBeforeFinalizeUpstreamSpan(Tracing::Span&, const Http::ResponseHeaderMap*) {
158
1
}
159

            
160
} // namespace Datadog
161
} // namespace Tracers
162
} // namespace Extensions
163
} // namespace Envoy