Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pipeline to statsd client #49

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: java
jdk:
- oraclejdk7
- openjdk7
- openjdk6
- oraclejdk8
- oraclejdk9
- openjdk8
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,5 @@ public final void recordExecutionTime(String aspect, long timeInMs) {
public void recordExecutionTimeToNow(String aspect, long systemTimeMillisAtStart) {
time(aspect, Math.max(0, System.currentTimeMillis() - systemTimeMillisAtStart));
}

}
48 changes: 48 additions & 0 deletions src/main/java/com/timgroup/statsd/MessageUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.timgroup.statsd;

import java.text.NumberFormat;
import java.util.Locale;

/**
* Created by c6s on 18-9-1
*/
public class MessageUtil {
private static String messageFor(String prefix, String aspect, String value, String type) {
return messageFor(prefix, aspect, value, type, 1.0);
}

private static String messageFor(String prefix, String aspect, String value, String type, double sampleRate) {
final String message = prefix + aspect + ':' + value + '|' + type;
return (sampleRate == 1.0)
? message
: (message + "|@" + stringValueOf(sampleRate));
}

static String stringValueOf(double value) {
NumberFormat formatter = NumberFormat.getInstance(Locale.US);
formatter.setGroupingUsed(false);
formatter.setMaximumFractionDigits(19);
return formatter.format(value);
}

static String makeCountMessage(String prefix, String aspect, long delta, double sampleRate) {
return messageFor(prefix, aspect, Long.toString(delta), "c", sampleRate);
}

static String makeGaugeMessage(String prefix, String aspect, String value, boolean negative, boolean delta) {
StringBuilder message = new StringBuilder();
if (!delta && negative) {
message.append(messageFor(prefix, aspect, "0", "g")).append('\n');
}
message.append(messageFor(prefix, aspect, (delta && !negative) ? ("+" + value) : value, "g"));
return message.toString();
}

static String makeRecordSetEventMessage(String prefix, String aspect, String eventName) {
return messageFor(prefix, aspect, eventName, "s");
}

static String makeRecordExecutionTimeMessage(String prefix, String aspect, long timeInMs, double sampleRate) {
return messageFor(prefix, aspect, Long.toString(timeInMs), "ms", sampleRate);
}
}
26 changes: 26 additions & 0 deletions src/main/java/com/timgroup/statsd/NoOpPipeline.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.timgroup.statsd;

/**
* Created by c6s on 18-9-1
*/
public class NoOpPipeline extends PipelineAdapter {
Pipeline recordGaugeCommon(String aspect, String value, boolean negative, boolean delta) {
return this;
}

public Pipeline count(String aspect, long delta, double sampleRate) {
return this;
}

public Pipeline recordSetEvent(String aspect, String eventName) {
return this;
}

public Pipeline recordExecutionTime(String aspect, long timeInMs, double sampleRate) {
return this;
}

public void flush() {

}
}
1 change: 1 addition & 0 deletions src/main/java/com/timgroup/statsd/NoOpStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ public final class NoOpStatsDClient extends ConvenienceMethodProvidingStatsDClie
@Override public void recordGaugeDelta(String aspect, double delta) { }
@Override public void recordSetEvent(String aspect, String value) { }
@Override public void recordExecutionTime(String aspect, long timeInMs, double sampleRate) { }
@Override public Pipeline pipeline() {return new NoOpPipeline();}
}
91 changes: 91 additions & 0 deletions src/main/java/com/timgroup/statsd/NonBlockingPipeline.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.timgroup.statsd;

import java.util.ArrayList;
import java.util.List;

/**
* Created by c6s on 18-9-1
*/
public class NonBlockingPipeline extends PipelineAdapter {

private NonBlockingStatsDClient nonBlockingStatsDClient;
private String prefix;
private List<String> messageList = new ArrayList<String>();
private int MTU = 512;
private boolean closed = false;

public NonBlockingPipeline(NonBlockingStatsDClient nonBlockingStatsDClient) {
this.nonBlockingStatsDClient = nonBlockingStatsDClient;
prefix = nonBlockingStatsDClient.getPrefix();
}

public NonBlockingPipeline(NonBlockingStatsDClient nonBlockingStatsDClient, int MTU) {
this(nonBlockingStatsDClient);
this.MTU = MTU;
}

public Pipeline count(String aspect, long delta, double sampleRate) {
if (!closed) {
messageList.add(MessageUtil.makeCountMessage(prefix, aspect, delta, sampleRate));
}
return this;
}

Pipeline recordGaugeCommon(String aspect, String value, boolean negative, boolean delta) {
if (!closed) {
messageList.add(MessageUtil.makeGaugeMessage(prefix, aspect, value, negative, delta));
}
return this;
}

public Pipeline recordSetEvent(String aspect, String eventName) {
if (!closed) {
messageList.add(MessageUtil.makeRecordSetEventMessage(prefix, aspect, eventName));
}
return this;
}

public Pipeline recordExecutionTime(String aspect, long timeInMs, double sampleRate) {
if (!closed) {
messageList.add(MessageUtil.makeRecordExecutionTimeMessage(prefix, aspect, timeInMs, sampleRate));
}
return this;
}

public void flush() {
if (closed || messageList.isEmpty()) {
closed = true;
return;
}
List<String> packets = concatenate();
for (String p : packets) {
nonBlockingStatsDClient.send(p);
}
closed = true;
}

private List<String> concatenate() {
List<String> result = new ArrayList<String>();
StringBuilder builder = new StringBuilder(messageList.get(0));
int sz = builder.length();

for (int i = 1; i < messageList.size(); i++) {
String s = messageList.get(i);
if (sz + 1 + s.length() <= MTU) {
sz += 1 + s.length();
builder.append('\n').append(s);
} else {
result.add(builder.toString());
builder = new StringBuilder(s);
sz = builder.length();
}
}


if (builder.length() != 0) {
result.add(builder.toString());
}

return result;
}
}
68 changes: 27 additions & 41 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.timgroup.statsd;

import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.Locale;

import static com.timgroup.statsd.MessageUtil.*;

/**
* A simple StatsD client implementation facilitating metrics recording.
*
*
* <p>Upon instantiation, this client will establish a socket connection to a StatsD instance
* running on the specified host and port. Metrics are then sent over this connection as they are
* received by the client.
* </p>
*
*
* <p>Three key methods are provided for the submission of data-points for the application under
* scrutiny:
* <ul>
Expand All @@ -23,10 +23,10 @@
* IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed
* not to throw an exception which may disrupt application execution.
* </p>
*
*
* <p>As part of a clean system shutdown, the {@link #stop()} method should be invoked
* on any StatsD clients.</p>
*
*
* @author Tom Denley
*
*/
Expand All @@ -50,7 +50,7 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta
* be established. Once a client has been instantiated in this way, all
* exceptions thrown during subsequent usage are consumed, guaranteeing
* that failures in metrics will not affect normal code execution.
*
*
* @param prefix
* the prefix to apply to keys sent via this client (can be null or empty for no prefix)
* @param hostname
Expand All @@ -74,7 +74,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws
* exceptions thrown during subsequent usage are passed to the specified
* handler and then consumed, guaranteeing that failures in metrics will
* not affect normal code execution.
*
*
* @param prefix
* the prefix to apply to keys sent via this client (can be null or empty for no prefix)
* @param hostname
Expand All @@ -96,6 +96,10 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC
}
}

String getPrefix() {
return prefix;
}

/**
* Cleanly shut down this StatsD client. This method may throw an exception if
* the socket cannot be closed.
Expand All @@ -107,9 +111,9 @@ public void stop() {

/**
* Adjusts the specified counter by a given delta.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the counter to adjust
* @param delta
Expand All @@ -120,14 +124,14 @@ public void stop() {
*/
@Override
public void count(String aspect, long delta, double sampleRate) {
send(messageFor(aspect, Long.toString(delta), "c", sampleRate));
send(makeCountMessage(prefix, aspect, delta, sampleRate));
}

/**
* Records the latest fixed value for the specified named gauge.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the gauge
* @param value
Expand All @@ -154,64 +158,46 @@ public void recordGaugeDelta(String aspect, double value) {
}

private void recordGaugeCommon(String aspect, String value, boolean negative, boolean delta) {
final StringBuilder message = new StringBuilder();
if (!delta && negative) {
message.append(messageFor(aspect, "0", "g")).append('\n');
}
message.append(messageFor(aspect, (delta && !negative) ? ("+" + value) : value, "g"));
send(message.toString());
send(makeGaugeMessage(prefix, aspect, value, negative, delta));
}

/**
* StatsD supports counting unique occurrences of events between flushes, Call this method to records an occurrence
* of the specified named event.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the set
* @param eventName
* the value to be added to the set
*/
@Override
public void recordSetEvent(String aspect, String eventName) {
send(messageFor(aspect, eventName, "s"));
send(makeRecordSetEventMessage(prefix, aspect, eventName));
}

/**
* Records an execution time in milliseconds for the specified named operation.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the timed operation
* @param timeInMs
* the time in milliseconds
*/
@Override
public void recordExecutionTime(String aspect, long timeInMs, double sampleRate) {
send(messageFor(aspect, Long.toString(timeInMs), "ms", sampleRate));
send(makeRecordExecutionTimeMessage(prefix, aspect, timeInMs, sampleRate));
}

private String messageFor(String aspect, String value, String type) {
return messageFor(aspect, value, type, 1.0);
public Pipeline pipeline() {
return new NonBlockingPipeline(this);
}

private String messageFor(String aspect, String value, String type, double sampleRate) {
final String message = prefix + aspect + ':' + value + '|' + type;
return (sampleRate == 1.0)
? message
: (message + "|@" + stringValueOf(sampleRate));
}

private void send(final String message) {
void send(final String message) {
sender.send(message);
}

private String stringValueOf(double value) {
NumberFormat formatter = NumberFormat.getInstance(Locale.US);
formatter.setGroupingUsed(false);
formatter.setMaximumFractionDigits(19);
return formatter.format(value);
}
}
Loading