extract a class for the UDP sending, making it easier to play with batching implementations for issue #15

This commit is contained in:
scarytom 2014-08-26 18:02:42 +01:00
parent 26765c9c13
commit be0b4fd1f0
2 changed files with 82 additions and 59 deletions

View File

@ -1,15 +1,8 @@
package com.timgroup.statsd;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* A simple StatsD client implementation facilitating metrics recording.
@ -46,18 +39,7 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta
};
private final String prefix;
private final DatagramChannel clientSocket;
private final StatsDClientErrorHandler handler;
private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
result.setName("StatsD-" + result.getName());
result.setDaemon(true);
return result;
}
});
private final NonBlockingUdpSender sender;
/**
* Create a new StatsD client communicating with a StatsD instance on the
@ -106,11 +88,9 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta
*/
public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDClientErrorHandler errorHandler) throws StatsDClientException {
this.prefix = (prefix == null || prefix.trim().isEmpty()) ? "" : (prefix.trim() + ".");
this.handler = errorHandler;
try {
this.clientSocket = DatagramChannel.open();
this.clientSocket.connect(new InetSocketAddress(hostname, port));
this.sender = new NonBlockingUdpSender(hostname, port, STATS_D_ENCODING, errorHandler);
} catch (Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
@ -122,23 +102,7 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta
*/
@Override
public void stop() {
try {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
catch (Exception e) {
handler.handle(e);
}
finally {
if (clientSocket != null) {
try {
clientSocket.close();
}
catch (Exception e) {
handler.handle(e);
}
}
}
sender.stop();
}
/**
@ -239,25 +203,7 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta
}
private void send(final String message) {
try {
executor.execute(new Runnable() {
@Override public void run() {
blockingSend(message);
}
});
}
catch (Exception e) {
handler.handle(e);
}
}
private void blockingSend(String message) {
try {
final byte[] sendData = message.getBytes(STATS_D_ENCODING);
clientSocket.write(ByteBuffer.wrap(sendData));
} catch (Exception e) {
handler.handle(e);
}
sender.send(message);
}
private String stringValueOf(double value) {

View File

@ -0,0 +1,77 @@
package com.timgroup.statsd;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public final class NonBlockingUdpSender {
private final Charset encoding;
private final DatagramChannel clientSocket;
private final ExecutorService executor;
private StatsDClientErrorHandler handler;
public NonBlockingUdpSender(String hostname, int port, Charset encoding, StatsDClientErrorHandler handler) throws IOException {
this.encoding = encoding;
this.handler = handler;
this.clientSocket = DatagramChannel.open();
this.clientSocket.connect(new InetSocketAddress(hostname, port));
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
result.setName("StatsD-" + result.getName());
result.setDaemon(true);
return result;
}
});
}
public void stop() {
try {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
catch (Exception e) {
handler.handle(e);
}
finally {
if (clientSocket != null) {
try {
clientSocket.close();
}
catch (Exception e) {
handler.handle(e);
}
}
}
}
public void send(final String message) {
try {
executor.execute(new Runnable() {
@Override public void run() {
blockingSend(message);
}
});
}
catch (Exception e) {
handler.handle(e);
}
}
private void blockingSend(String message) {
try {
final byte[] sendData = message.getBytes(encoding);
clientSocket.write(ByteBuffer.wrap(sendData));
} catch (Exception e) {
handler.handle(e);
}
}
}