From f7987b5bd6bbb47aeb25aa42faef6ccbb951dd4e Mon Sep 17 00:00:00 2001
From: Nov11 <529079634@qq.com>
Date: Sat, 1 Sep 2018 20:13:45 +0800
Subject: [PATCH 1/4] add pipeline to statsd client
---
...onvenienceMethodProvidingStatsDClient.java | 6 +
.../java/com/timgroup/statsd/MessageUtil.java | 48 +++++
.../timgroup/statsd/NonBlockingPipeline.java | 91 +++++++++
.../statsd/NonBlockingStatsDClient.java | 68 +++----
.../java/com/timgroup/statsd/Pipeline.java | 179 ++++++++++++++++++
.../com/timgroup/statsd/PipelineAdapter.java | 99 ++++++++++
.../com/timgroup/statsd/StatsDClient.java | 5 +
7 files changed, 453 insertions(+), 43 deletions(-)
create mode 100644 src/main/java/com/timgroup/statsd/MessageUtil.java
create mode 100644 src/main/java/com/timgroup/statsd/NonBlockingPipeline.java
create mode 100644 src/main/java/com/timgroup/statsd/Pipeline.java
create mode 100644 src/main/java/com/timgroup/statsd/PipelineAdapter.java
diff --git a/src/main/java/com/timgroup/statsd/ConvenienceMethodProvidingStatsDClient.java b/src/main/java/com/timgroup/statsd/ConvenienceMethodProvidingStatsDClient.java
index 60f6aa6..895baed 100644
--- a/src/main/java/com/timgroup/statsd/ConvenienceMethodProvidingStatsDClient.java
+++ b/src/main/java/com/timgroup/statsd/ConvenienceMethodProvidingStatsDClient.java
@@ -84,4 +84,10 @@ public final void recordExecutionTime(String aspect, long timeInMs) {
public void recordExecutionTimeToNow(String aspect, long systemTimeMillisAtStart) {
time(aspect, Math.max(0, System.currentTimeMillis() - systemTimeMillisAtStart));
}
+
+ @Override
+ public Pipeline pipeline() {
+ return new Pipeline(this);
+ }
+
}
\ No newline at end of file
diff --git a/src/main/java/com/timgroup/statsd/MessageUtil.java b/src/main/java/com/timgroup/statsd/MessageUtil.java
new file mode 100644
index 0000000..58f51cb
--- /dev/null
+++ b/src/main/java/com/timgroup/statsd/MessageUtil.java
@@ -0,0 +1,48 @@
+package com.timgroup.statsd;
+
+import java.text.NumberFormat;
+import java.util.Locale;
+
+/**
+ * Created by c6s on 18-9-1
+ */
+public class MessageUtil {
+ private static String messageFor(String prefix, String aspect, String value, String type) {
+ return messageFor(prefix, aspect, value, type, 1.0);
+ }
+
+ private static String messageFor(String prefix, String aspect, String value, String type, double sampleRate) {
+ final String message = prefix + aspect + ':' + value + '|' + type;
+ return (sampleRate == 1.0)
+ ? message
+ : (message + "|@" + stringValueOf(sampleRate));
+ }
+
+ static String stringValueOf(double value) {
+ NumberFormat formatter = NumberFormat.getInstance(Locale.US);
+ formatter.setGroupingUsed(false);
+ formatter.setMaximumFractionDigits(19);
+ return formatter.format(value);
+ }
+
+ static String makeCountMessage(String prefix, String aspect, long delta, double sampleRate) {
+ return messageFor(prefix, aspect, Long.toString(delta), "c", sampleRate);
+ }
+
+ static String makeGaugeMessage(String prefix, String aspect, String value, boolean negative, boolean delta) {
+ StringBuilder message = new StringBuilder();
+ if (!delta && negative) {
+ message.append(messageFor(prefix, aspect, "0", "g")).append('\n');
+ }
+ message.append(messageFor(prefix, aspect, (delta && !negative) ? ("+" + value) : value, "g"));
+ return message.toString();
+ }
+
+ static String makeRecordSetEventMessage(String prefix, String aspect, String eventName) {
+ return messageFor(prefix, aspect, eventName, "s");
+ }
+
+ static String makeRecordExecutionTimeMessage(String prefix, String aspect, long timeInMs, double sampleRate) {
+ return messageFor(prefix, aspect, Long.toString(timeInMs), "ms", sampleRate)
+ }
+}
diff --git a/src/main/java/com/timgroup/statsd/NonBlockingPipeline.java b/src/main/java/com/timgroup/statsd/NonBlockingPipeline.java
new file mode 100644
index 0000000..bc36bef
--- /dev/null
+++ b/src/main/java/com/timgroup/statsd/NonBlockingPipeline.java
@@ -0,0 +1,91 @@
+package com.timgroup.statsd;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by c6s on 18-9-1
+ */
+public class NonBlockingPipeline extends PipelineAdapter {
+
+ private NonBlockingStatsDClient nonBlockingStatsDClient;
+ private String prefix;
+ private List Upon instantiation, this client will establish a socket connection to a StatsD instance
* running on the specified host and port. Metrics are then sent over this connection as they are
* received by the client.
* Three key methods are provided for the submission of data-points for the application under
* scrutiny:
*
@@ -23,10 +23,10 @@
* IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed
* not to throw an exception which may disrupt application execution.
*
As part of a clean system shutdown, the {@link #stop()} method should be invoked * on any StatsD clients.
- * + * * @author Tom Denley * */ @@ -50,7 +50,7 @@ public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingSta * be established. Once a client has been instantiated in this way, all * exceptions thrown during subsequent usage are consumed, guaranteeing * that failures in metrics will not affect normal code execution. - * + * * @param prefix * the prefix to apply to keys sent via this client (can be null or empty for no prefix) * @param hostname @@ -74,7 +74,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws * exceptions thrown during subsequent usage are passed to the specified * handler and then consumed, guaranteeing that failures in metrics will * not affect normal code execution. - * + * * @param prefix * the prefix to apply to keys sent via this client (can be null or empty for no prefix) * @param hostname @@ -96,6 +96,10 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDC } } + String getPrefix() { + return prefix; + } + /** * Cleanly shut down this StatsD client. This method may throw an exception if * the socket cannot be closed. @@ -107,9 +111,9 @@ public void stop() { /** * Adjusts the specified counter by a given delta. - * + * *This method is non-blocking and is guaranteed not to throw an exception.
- * + * * @param aspect * the name of the counter to adjust * @param delta @@ -120,14 +124,14 @@ public void stop() { */ @Override public void count(String aspect, long delta, double sampleRate) { - send(messageFor(aspect, Long.toString(delta), "c", sampleRate)); + send(makeCountMessage(prefix, aspect, delta, sampleRate)); } /** * Records the latest fixed value for the specified named gauge. - * + * *This method is non-blocking and is guaranteed not to throw an exception.
- * + * * @param aspect * the name of the gauge * @param value @@ -154,20 +158,15 @@ public void recordGaugeDelta(String aspect, double value) { } private void recordGaugeCommon(String aspect, String value, boolean negative, boolean delta) { - final StringBuilder message = new StringBuilder(); - if (!delta && negative) { - message.append(messageFor(aspect, "0", "g")).append('\n'); - } - message.append(messageFor(aspect, (delta && !negative) ? ("+" + value) : value, "g")); - send(message.toString()); + send(makeGaugeMessage(prefix, aspect, value, negative, delta)); } /** * StatsD supports counting unique occurrences of events between flushes, Call this method to records an occurrence * of the specified named event. - * + * *This method is non-blocking and is guaranteed not to throw an exception.
- * + * * @param aspect * the name of the set * @param eventName @@ -175,14 +174,14 @@ private void recordGaugeCommon(String aspect, String value, boolean negative, bo */ @Override public void recordSetEvent(String aspect, String eventName) { - send(messageFor(aspect, eventName, "s")); + send(makeRecordSetEventMessage(prefix, aspect, eventName)); } /** * Records an execution time in milliseconds for the specified named operation. - * + * *This method is non-blocking and is guaranteed not to throw an exception.
- * + * * @param aspect * the name of the timed operation * @param timeInMs @@ -190,28 +189,11 @@ public void recordSetEvent(String aspect, String eventName) { */ @Override public void recordExecutionTime(String aspect, long timeInMs, double sampleRate) { - send(messageFor(aspect, Long.toString(timeInMs), "ms", sampleRate)); - } - - private String messageFor(String aspect, String value, String type) { - return messageFor(aspect, value, type, 1.0); + send(makeRecordExecutionTimeMessage(prefix, aspect, timeInMs, sampleRate)); } - private String messageFor(String aspect, String value, String type, double sampleRate) { - final String message = prefix + aspect + ':' + value + '|' + type; - return (sampleRate == 1.0) - ? message - : (message + "|@" + stringValueOf(sampleRate)); - } - - private void send(final String message) { + void send(final String message) { sender.send(message); } - private String stringValueOf(double value) { - NumberFormat formatter = NumberFormat.getInstance(Locale.US); - formatter.setGroupingUsed(false); - formatter.setMaximumFractionDigits(19); - return formatter.format(value); - } } diff --git a/src/main/java/com/timgroup/statsd/Pipeline.java b/src/main/java/com/timgroup/statsd/Pipeline.java new file mode 100644 index 0000000..9ece4ae --- /dev/null +++ b/src/main/java/com/timgroup/statsd/Pipeline.java @@ -0,0 +1,179 @@ +package com.timgroup.statsd; + +/** + * Pipeline supports same functionality with StatsDClient but with different return type of Pipeline but void. + * + * Created by c6s on 18-9-1 + */ +public interface Pipeline { + /** + * Adjusts the specified counter by a given delta. + * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect + * the name of the counter to adjust + * @param delta + * the amount to adjust the counter by + */ + Pipeline count(String aspect, long delta); + + /** + * Adjusts the specified counter by a given delta. + * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect + * the name of the counter to adjust + * @param delta + * the amount to adjust the counter by + * @param sampleRate + * the sampling rate being employed. For example, a rate of 0.1 would tell StatsD that this counter is being sent + * sampled every 1/10th of the time. + */ + Pipeline count(String aspect, long delta, double sampleRate); + + /** + * Increments the specified counter by one. + * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect + * the name of the counter to increment + */ + Pipeline incrementCounter(String aspect); + + /** + * Convenience method equivalent to {@link #incrementCounter(String)}. + */ + Pipeline increment(String aspect); + + /** + * Decrements the specified counter by one. + * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect + * the name of the counter to decrement + */ + Pipeline decrementCounter(String aspect); + + /** + * Convenience method equivalent to {@link #decrementCounter(String)}. + */ + Pipeline decrement(String aspect); + + /** + * Records the latest fixed value for the specified named gauge. + * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect + * the name of the gauge + * @param value + * the new reading of the gauge + */ + Pipeline recordGaugeValue(String aspect, long value); + + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, long)} but for double values. + */ + Pipeline recordGaugeValue(String aspect, double value); + + /** + * Records a change in the value of the specified named gauge. + * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect + * the name of the gauge + * @param delta + * the +/- delta to apply to the gauge + */ + Pipeline recordGaugeDelta(String aspect, long delta); + + /** + * Convenience method equivalent to {@link #recordGaugeDelta(String, long)} but for double deltas. + */ + Pipeline recordGaugeDelta(String aspect, double delta); + + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, long)}. + */ + Pipeline gauge(String aspect, long value); + + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, double)}. + */ + Pipeline gauge(String aspect, double value); + + /** + * StatsD supports counting unique occurrences of events between flushes, Call this method to records an occurrence + * of the specified named event. + * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect + * the name of the set + * @param eventName + * the value to be added to the set + */ + Pipeline recordSetEvent(String aspect, String eventName); + + /** + * Convenience method equivalent to {@link #recordSetEvent(String, String)}. + */ + Pipeline set(String aspect, String eventName); + + /** + * Records an execution time in milliseconds for the specified named operation. + * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect + * the name of the timed operation + * @param timeInMs + * the time in milliseconds + */ + Pipeline recordExecutionTime(String aspect, long timeInMs); + + /** + * Adjusts the specified counter by a given delta. + * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect + * the name of the counter to adjust + * @param delta + * the amount to adjust the counter by + * @param sampleRate + * the sampling rate being employed. For example, a rate of 0.1 would tell StatsD that this timer is being sent + * sampled every 1/10th of the time, so that it updates its timer_counters appropriately. + */ + Pipeline recordExecutionTime(String aspect, long timeInMs, double sampleRate); + + /** + * Records an execution time in milliseconds for the specified named operation. The execution + * time is calculated as the delta between the specified start time and the current system + * time (using {@link System#currentTimeMillis()}) + * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect + * the name of the timed operation + * @param timeInMs + * the system time, in millis, at the start of the operation that has just completed + */ + Pipeline recordExecutionTimeToNow(String aspect, long systemTimeMillisAtStart); + + /** + * Convenience method equivalent to {@link #recordExecutionTime(String, long)}. + */ + Pipeline time(String aspect, long value); + + /** + * Flush buffered metrics. + * A pipeline will not preform any more actions if flushed. + */ + void flush(); +} diff --git a/src/main/java/com/timgroup/statsd/PipelineAdapter.java b/src/main/java/com/timgroup/statsd/PipelineAdapter.java new file mode 100644 index 0000000..cd7e28c --- /dev/null +++ b/src/main/java/com/timgroup/statsd/PipelineAdapter.java @@ -0,0 +1,99 @@ +package com.timgroup.statsd; + +import static com.timgroup.statsd.MessageUtil.stringValueOf; + +/** + * Same as ConvenienceMethodProvidingStatsDClient + * Created by c6s on 18-9-1 + */ +public abstract class PipelineAdapter implements Pipeline { + + public final Pipeline count(String aspect, long delta) { + return count(aspect, delta, 1.0); + } + + /** + * Convenience method equivalent to {@link #count(String, long)} with a value of 1. + */ + public final Pipeline incrementCounter(String aspect) { + return count(aspect, 1); + } + + /** + * Convenience method equivalent to {@link #incrementCounter(String)}. + */ + public final Pipeline increment(String aspect) { + return incrementCounter(aspect); + } + + /** + * Convenience method equivalent to {@link #count(String, long)} with a value of -1. + */ + public final Pipeline decrementCounter(String aspect) { + return count(aspect, -1); + } + + /** + * Convenience method equivalent to {@link #decrementCounter(String)}. + */ + public final Pipeline decrement(String aspect) { + return decrementCounter(aspect); + } + + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, long)}. + */ + public final Pipeline gauge(String aspect, long value) { + return recordGaugeValue(aspect, value); + } + + /** + * Convenience method equivalent to {@link #recordGaugeValue(String, double)}. + */ + + public final Pipeline gauge(String aspect, double value) { + return recordGaugeValue(aspect, value); + } + + /** + * Convenience method equivalent to {@link #recordSetEvent(String, String)}. + */ + public final Pipeline set(String aspect, String eventName) { + return recordSetEvent(aspect, eventName); + } + + /** + * Convenience method equivalent to {@link #recordExecutionTime(String, long)}. + */ + public final Pipeline time(String aspect, long timeInMs) { + return recordExecutionTime(aspect, timeInMs); + } + + + public final Pipeline recordExecutionTime(String aspect, long timeInMs) { + return recordExecutionTime(aspect, timeInMs, 1.0); + } + + + public final Pipeline recordExecutionTimeToNow(String aspect, long systemTimeMillisAtStart) { + return time(aspect, Math.max(0, System.currentTimeMillis() - systemTimeMillisAtStart)); + } + + public final Pipeline recordGaugeValue(String aspect, long value) { + return recordGaugeCommon(aspect, Long.toString(value), value < 0, false); + } + + public final Pipeline recordGaugeValue(String aspect, double value) { + return recordGaugeCommon(aspect, stringValueOf(value), value < 0, false); + } + + public final Pipeline recordGaugeDelta(String aspect, long value) { + return recordGaugeCommon(aspect, Long.toString(value), value < 0, true); + } + + public final Pipeline recordGaugeDelta(String aspect, double value) { + return recordGaugeCommon(aspect, stringValueOf(value), value < 0, true); + } + + abstract Pipeline recordGaugeCommon(String aspect, String value, boolean negative, boolean delta); +} diff --git a/src/main/java/com/timgroup/statsd/StatsDClient.java b/src/main/java/com/timgroup/statsd/StatsDClient.java index c8d4949..72f6efc 100644 --- a/src/main/java/com/timgroup/statsd/StatsDClient.java +++ b/src/main/java/com/timgroup/statsd/StatsDClient.java @@ -189,4 +189,9 @@ public interface StatsDClient { */ void time(String aspect, long value); + /** + * open a pipeline which is capable of sending several metrics in single packet + */ + Pipeline pipeline(); + } \ No newline at end of file From 16fbef4e6ec75ffc0e8b5f4fd17e7ded80e4529c Mon Sep 17 00:00:00 2001 From: Nov11 <529079634@qq.com> Date: Sat, 1 Sep 2018 20:22:40 +0800 Subject: [PATCH 2/4] add NoOpPipeline --- .../com/timgroup/statsd/NoOpPipeline.java | 26 +++++++++++++++++++ .../com/timgroup/statsd/NoOpStatsDClient.java | 1 + 2 files changed, 27 insertions(+) create mode 100644 src/main/java/com/timgroup/statsd/NoOpPipeline.java diff --git a/src/main/java/com/timgroup/statsd/NoOpPipeline.java b/src/main/java/com/timgroup/statsd/NoOpPipeline.java new file mode 100644 index 0000000..063342f --- /dev/null +++ b/src/main/java/com/timgroup/statsd/NoOpPipeline.java @@ -0,0 +1,26 @@ +package com.timgroup.statsd; + +/** + * Created by c6s on 18-9-1 + */ +public class NoOpPipeline extends PipelineAdapter { + Pipeline recordGaugeCommon(String aspect, String value, boolean negative, boolean delta) { + return this; + } + + public Pipeline count(String aspect, long delta, double sampleRate) { + return this; + } + + public Pipeline recordSetEvent(String aspect, String eventName) { + return this; + } + + public Pipeline recordExecutionTime(String aspect, long timeInMs, double sampleRate) { + return this; + } + + public void flush() { + + } +} diff --git a/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java b/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java index cc5f3ea..1443f88 100644 --- a/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NoOpStatsDClient.java @@ -16,4 +16,5 @@ public final class NoOpStatsDClient extends ConvenienceMethodProvidingStatsDClie @Override public void recordGaugeDelta(String aspect, double delta) { } @Override public void recordSetEvent(String aspect, String value) { } @Override public void recordExecutionTime(String aspect, long timeInMs, double sampleRate) { } + @Override public Pipeline pipeline() {return new NoOpPipeline();} } From f403274c1260b7fdab5ea30e7d1234e864a09ba0 Mon Sep 17 00:00:00 2001 From: Nov11 <529079634@qq.com> Date: Sat, 1 Sep 2018 20:22:55 +0800 Subject: [PATCH 3/4] fix compile error --- .../statsd/ConvenienceMethodProvidingStatsDClient.java | 5 ----- src/main/java/com/timgroup/statsd/MessageUtil.java | 2 +- .../java/com/timgroup/statsd/NonBlockingStatsDClient.java | 4 ++++ 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/ConvenienceMethodProvidingStatsDClient.java b/src/main/java/com/timgroup/statsd/ConvenienceMethodProvidingStatsDClient.java index 895baed..e514917 100644 --- a/src/main/java/com/timgroup/statsd/ConvenienceMethodProvidingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/ConvenienceMethodProvidingStatsDClient.java @@ -85,9 +85,4 @@ public void recordExecutionTimeToNow(String aspect, long systemTimeMillisAtStart time(aspect, Math.max(0, System.currentTimeMillis() - systemTimeMillisAtStart)); } - @Override - public Pipeline pipeline() { - return new Pipeline(this); - } - } \ No newline at end of file diff --git a/src/main/java/com/timgroup/statsd/MessageUtil.java b/src/main/java/com/timgroup/statsd/MessageUtil.java index 58f51cb..6e2bdb9 100644 --- a/src/main/java/com/timgroup/statsd/MessageUtil.java +++ b/src/main/java/com/timgroup/statsd/MessageUtil.java @@ -43,6 +43,6 @@ static String makeRecordSetEventMessage(String prefix, String aspect, String eve } static String makeRecordExecutionTimeMessage(String prefix, String aspect, long timeInMs, double sampleRate) { - return messageFor(prefix, aspect, Long.toString(timeInMs), "ms", sampleRate) + return messageFor(prefix, aspect, Long.toString(timeInMs), "ms", sampleRate); } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index c4f69bf..54348b5 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -192,6 +192,10 @@ public void recordExecutionTime(String aspect, long timeInMs, double sampleRate) send(makeRecordExecutionTimeMessage(prefix, aspect, timeInMs, sampleRate)); } + public Pipeline pipeline() { + return new NonBlockingPipeline(this); + } + void send(final String message) { sender.send(message); } From 86ae4110519107d982b0cb3ad536606c0bfae21a Mon Sep 17 00:00:00 2001 From: Nov11 <529079634@qq.com> Date: Sat, 1 Sep 2018 20:33:55 +0800 Subject: [PATCH 4/4] since this is 2018, why some one still use jdk 6 or jdk 7? --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index d99f43b..d1a5335 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ language: java jdk: - - oraclejdk7 - - openjdk7 - - openjdk6 + - oraclejdk8 + - oraclejdk9 + - openjdk8 \ No newline at end of file