Writer.java
package org.honton.chas.datadog.apm.sender;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.NotSupportedException;
import javax.ws.rs.client.WebTarget;
import lombok.extern.slf4j.Slf4j;
import org.honton.chas.datadog.apm.TraceConfiguration;
import org.honton.chas.datadog.apm.api.ApmApi;
import org.honton.chas.datadog.apm.api.ApmApi0_2;
import org.honton.chas.datadog.apm.api.ApmApi0_3;
import org.honton.chas.datadog.apm.api.Span;
import org.honton.chas.datadog.apm.api.Trace;
import org.honton.chas.datadog.apm.jackson.MsgPackProvider;
import org.jboss.resteasy.client.jaxrs.ResteasyClient;
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget;
@Slf4j
public class Writer {
private volatile TraceQueue queue;
private Thread worker;
private ApmApi apmApi;
long backoffExpiration;
private long backoffDuration;
private String apmUri;
@Inject
void setTraceConfiguration(TraceConfiguration configuration) {
backoffDuration = configuration.getBackoffDuration();
apmUri = configuration.getCollectorUrl();
}
@PostConstruct void initialize() {
if (apmUri == null || apmUri.isEmpty()) {
log.info("trace writer disabled");
} else {
initializeWith0_3();
queue = new TraceQueue();
startWorker();
}
}
/**
* Queue a span for sending
*
* @param span The span to send to the APM collector
*/
public void queue(Span span) {
// queue == null is signal that worker worker is no longer running (or was never started)
TraceQueue q = queue;
if (q != null) {
q.supply(span);
}
}
/**
* Stop the worker worker
*/
void stop() {
worker.interrupt();
}
/**
* Has the worker been stopped?
*/
boolean isStopped() {
return queue == null;
}
/**
* Single round of consuming traces from queue and sending to collector
*
* @return false, if worker worker has been shutdown
*/
List<Span> deQueue() {
try {
return queue.consume();
} catch (InterruptedException ie) {
queue = null;
return null;
}
}
private void trySend(List<Span> spans) {
if (System.currentTimeMillis() > backoffExpiration) {
try {
Collection<Trace> traces = toTraces(spans);
log.debug("traces: {}", traces);
send(traces);
} catch (RuntimeException re) {
log.info("writer worker problem {} sending to {} ", re.getMessage(), apmUri);
backoffExpiration = System.currentTimeMillis() + backoffDuration;
}
}
}
private static Collection<Trace> toTraces(List<Span> spans) {
List<Trace> traces = new ArrayList<>(spans.size());
for(Span span : spans) {
traces.add(new Trace(span));
}
return traces;
}
private void send(Collection<Trace> traces) {
try {
apmApi.reportTraces(traces);
} catch (NotFoundException | NotSupportedException cee) {
// 404, 415
if (apmApi instanceof ApmApi0_3) {
log.info("falling back to json");
fallbackTo0_2();
send(traces);
}
} catch (BadRequestException bre) {
log.error("{}: {}", bre.getMessage(), traces);
}
}
private void fallbackTo0_2() {
apmApi = getProxy(ApmApi0_2.class, new JacksonShim());
}
private void initializeWith0_3() {
apmApi = getProxy(ApmApi0_3.class, new MsgPackProvider());
}
private void startWorker() {
worker = new Thread("APM writer") {
{
setDaemon(true);
}
@Override
public void run() {
for(;;) {
List<Span> spans = deQueue();
if(spans == null) {
log.error("writer worker shutdown");
break;
}
trySend(spans);
}
}
};
worker.start();
}
private <T> T getProxy(Class<T> proxyType, Object provider) {
ResteasyClientBuilder clientBuilder = new ResteasyClientBuilder();
clientBuilder.register(provider);
ResteasyClient client = clientBuilder.build();
WebTarget target = client.target(apmUri);
ResteasyWebTarget rtarget = (ResteasyWebTarget) target;
return rtarget.proxy(proxyType);
}
}