TraceRequestTracker.java
/*
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.instrument.cassandra;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.docs.AssertingSpan;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import static org.springframework.cloud.sleuth.instrument.cassandra.SleuthCassandraSpan.CASSANDRA_SPAN;
import static org.springframework.cloud.sleuth.instrument.cassandra.SleuthCassandraSpan.Events.NODE_ERROR;
import static org.springframework.cloud.sleuth.instrument.cassandra.SleuthCassandraSpan.Events.NODE_SUCCESS;
import static org.springframework.cloud.sleuth.instrument.cassandra.SleuthCassandraSpan.Tags.NODE_ERROR_TAG;
/**
* Trace implementation of the {@link RequestTracker}.
*
* @author Mark Paluch
* @author Marcin Grzejszczak
* @since 3.1.0
*/
enum TraceRequestTracker implements RequestTracker {
INSTANCE;
private static final Log log = LogFactory.getLog(TraceRequestTracker.class);
@Override
public void onSuccess(@NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile,
@NonNull Node node, @NonNull String requestLogPrefix) {
if (request instanceof CassandraSpanSupplier) {
Span span = ((CassandraSpanSupplier) request).getSpan();
if (log.isDebugEnabled()) {
log.debug("Closing span [" + span + "]");
}
span.end();
}
}
@Override
public void onError(@NonNull Request request, @NonNull Throwable error, long latencyNanos,
@NonNull DriverExecutionProfile executionProfile, @Nullable Node node, @NonNull String requestLogPrefix) {
if (request instanceof CassandraSpanSupplier) {
Span span = ((CassandraSpanSupplier) request).getSpan();
span.error(error);
if (log.isDebugEnabled()) {
log.debug("Closing span [" + span + "]");
}
span.end();
}
}
@Override
public void onNodeError(@NonNull Request request, @NonNull Throwable error, long latencyNanos,
@NonNull DriverExecutionProfile executionProfile, @NonNull Node node, @NonNull String requestLogPrefix) {
if (request instanceof CassandraSpanSupplier) {
AssertingSpan span = AssertingSpan.of(CASSANDRA_SPAN, ((CassandraSpanSupplier) request).getSpan());
span.event(NODE_ERROR);
span.tag(String.format(NODE_ERROR_TAG.getKey(), node.getEndPoint()), error.toString());
tryAddingRemoteIpAndPort(node, span);
if (log.isDebugEnabled()) {
log.debug("Marking node error for [" + span + "]");
}
}
}
@Override
public void onNodeSuccess(@NonNull Request request, long latencyNanos,
@NonNull DriverExecutionProfile executionProfile, @NonNull Node node, @NonNull String requestLogPrefix) {
if (request instanceof CassandraSpanSupplier) {
AssertingSpan span = AssertingSpan.of(CASSANDRA_SPAN, ((CassandraSpanSupplier) request).getSpan());
span.event(NODE_SUCCESS);
tryAddingRemoteIpAndPort(node, span);
if (log.isDebugEnabled()) {
log.debug("Marking node success for [" + span + "]");
}
}
}
@Override
public void onSessionReady(@NonNull Session session) {
}
@Override
public void close() throws Exception {
}
private void tryAddingRemoteIpAndPort(Node node, Span span) {
try {
SocketAddress socketAddress = node.getEndPoint().resolve();
String host;
int port;
if (socketAddress instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
host = inetSocketAddress.getHostString();
port = inetSocketAddress.getPort();
}
else {
host = socketAddress.toString();
port = 0;
}
span.remoteIpAndPort(host, port);
}
catch (Exception e) {
log.debug("Exception occurred while trying to set ip and port", e);
}
}
}