Skip to content

Commit

Permalink
Deprecates Sender for much simpler BytesMessageSender
Browse files Browse the repository at this point in the history
On yesterday's call with @shakuzen @anuraaga and @kojilin, we chatted
about how the async reporters never actually use the async code of the
senders. The async side was only used by zipkin-server, as the async
reporter design intentionally uses a blocking loop to flush the span
backlog. So, this makes things simpler by only requiring a blocking
implementation: `BytesMessageSender.send` (chosen to not create symbol
collisions and similar to `BytesMessageEncoder`.

The side effect is that new senders can use this interface and
completely avoid the complicated `Call` then `execute` chain, deferring
to whatever the library-specific blocking path is. I have implemented
this in armeria as a test, and it is absolutely easier.

As older spring libraries like sleuth may not upgrade, this maintains
the old types until reporter 4. However, new senders can simply
implement the new type and be done with it.

Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
Adrian Cole committed Jan 13, 2024
1 parent 2dbac71 commit 26e3eb1
Show file tree
Hide file tree
Showing 47 changed files with 617 additions and 483 deletions.
20 changes: 13 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Reporter.CONSOLE.report(span);

## AsyncReporter
AsyncReporter is how you actually get spans to zipkin. By default, it waits up to a second
before flushes any pending spans out of process via a Sender.
before flushes any pending spans out of process via a BytesMessageSender.

```java
reporter = AsyncReporter.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans"));
Expand Down Expand Up @@ -67,7 +67,7 @@ Here are the most important properties to understand when tuning.
Property | Description
--- | ---
`queuedMaxBytes` | Maximum backlog of span bytes reported vs sent. Corresponds to `ReporterMetrics.updateQueuedBytes`. Default 1% of heap
`messageMaxBytes` | Maximum bytes sendable per message including overhead. Default `500,000` bytes (`500KB`). Defined by `Sender.messageMaxBytes`
`messageMaxBytes` | Maximum bytes sendable per message including overhead. Default `500,000` bytes (`500KB`). Defined by `BytesMessageSender.messageMaxBytes`
`messageTimeout` | Maximum time to wait for messageMaxBytes to accumulate before sending. Default 1 second
`closeTimeout` | Maximum time to block for in-flight spans to send on close. Default 1 second

Expand All @@ -84,11 +84,12 @@ by a large `messageTimeout` or `messageMaxBytes`. Consider lowering the
`messageMaxBytes` if this occurs, as it will result in less work per
message.

## Sender
## BytesMessageSender
The sender component handles the last step of sending a list of encoded spans onto a transport.
This involves I/O, so you can call `Sender.check()` to check its health on a given frequency.
This involves I/O, so you can call `sender.send(Collections.emptyList())` to check it works before
using.

Sender is used by AsyncReporter, but you can also create your own if you need to.
BytesMessageSender is used by AsyncReporter, but you can also create your own if you need to.
```java
class CustomReporter implements Flushable {

Expand All @@ -99,7 +100,12 @@ class CustomReporter implements Flushable {

// Is the connection healthy?
public boolean ok() {
return sender.check().ok();
try {
sender.send(Collections.emptyList());
return true;
} catch (Exception e) {
return false;
}
}

public void report(Span span) {
Expand All @@ -113,7 +119,7 @@ class CustomReporter implements Flushable {
pending.drainTo(drained);
if (drained.isEmpty()) return;

sender.sendSpans(drained, callback).execute();
sender.send(drained);
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import javax.jms.JMSException;
import javax.jms.QueueSender;
import org.apache.activemq.ActiveMQConnectionFactory;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
import zipkin2.reporter.CheckResult;
Expand All @@ -32,7 +34,7 @@
*
* <h3>Usage</h3>
* <p>
* This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async
* This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async
* reporter}.
*
* <p>Here's a simple configuration, configured for json:
Expand Down Expand Up @@ -150,13 +152,33 @@ public final ActiveMQSender build() {
return encoding.listSizeInBytes(encodedSpans);
}

@Override public Call<Void> sendSpans(List<byte[]> encodedSpans) {
/** {@inheritDoc} */
@Override @Deprecated public Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new ClosedSenderException();
byte[] message = encoder.encode(encodedSpans);
return new ActiveMQCall(message);
}

@Override public CheckResult check() {
/** {@inheritDoc} */
@Override public void send(List<byte[]> encodedSpans) throws IOException {
if (closeCalled) throw new ClosedSenderException();
send(encoder.encode(encodedSpans));
}

void send(byte[] message) throws IOException {
try {
ActiveMQConn conn = lazyInit.get();
QueueSender sender = conn.sender;
BytesMessage bytesMessage = conn.session.createBytesMessage();
bytesMessage.writeBytes(message);
sender.send(bytesMessage);
} catch (JMSException e) {
throw ioException("Unable to send message: ", e);
}
}

/** {@inheritDoc} */
@Override @Deprecated public CheckResult check() {
try {
lazyInit.get();
} catch (Throwable t) {
Expand All @@ -171,7 +193,7 @@ public final ActiveMQSender build() {
lazyInit.close();
}

@Override public final String toString() {
@Override public String toString() {
return "ActiveMQSender{"
+ "brokerURL=" + lazyInit.connectionFactory.getBrokerURL()
+ ", queue=" + lazyInit.queue
Expand All @@ -186,29 +208,17 @@ final class ActiveMQCall extends Call.Base<Void> { // ActiveMQCall is not cancel
}

@Override protected Void doExecute() throws IOException {
send();
send(message);
return null;
}

void send() throws IOException {
try {
ActiveMQConn conn = lazyInit.get();
QueueSender sender = conn.sender;
BytesMessage bytesMessage = conn.session.createBytesMessage();
bytesMessage.writeBytes(message);
sender.send(bytesMessage);
} catch (JMSException e) {
throw ioException("Unable to send message: ", e);
}
}

@Override public Call<Void> clone() {
return new ActiveMQCall(message);
}

@Override protected void doEnqueue(Callback<Void> callback) {
try {
send();
send(message);
callback.onSuccess(null);
} catch (Throwable t) {
Call.propagateIfFatal(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package zipkin2.reporter.activemq;

import java.io.IOException;
import java.util.Collections;
import java.util.stream.Stream;
import javax.jms.BytesMessage;
import javax.jms.MessageConsumer;
Expand All @@ -25,12 +26,10 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.reporter.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Call;
import zipkin2.reporter.CheckResult;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.Sender;
import zipkin2.reporter.SpanBytesEncoder;

import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -43,55 +42,46 @@
class ITActiveMQSender {
@Container ActiveMQContainer activemq = new ActiveMQContainer();

@Test void checkPasses() {
@Test void emptyOk() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("checkPasses").build()) {
assertThat(sender.check().ok()).isTrue();
}
}

@Test void checkFalseWhenBrokerIsDown() {
// we can be pretty certain ActiveMQ isn't running on localhost port 80
try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) {
CheckResult check = sender.check();
assertThat(check.ok()).isFalse();
assertThat(check.error()).isInstanceOf(IOException.class);
sender.send(Collections.emptyList());
}
}

@Test void sendFailsWithInvalidActiveMqServer() {
// we can be pretty certain ActiveMQ isn't running on localhost port 80
try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) {
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf(
IOException.class)
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN))
.isInstanceOf(IOException.class)
.hasMessageContaining("Unable to establish connection to ActiveMQ broker");
}
}

@Test void sendsSpans() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans").build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN).execute();
@Test void send() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("send").build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN);

assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN, CLIENT_SPAN);
}
}

@Test void sendsSpans_PROTO3() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_PROTO3")
@Test void send_PROTO3() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("send_PROTO3")
.encoding(Encoding.PROTO3)
.build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN).execute();
send(sender, CLIENT_SPAN, CLIENT_SPAN);

assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN, CLIENT_SPAN);
}
}

@Test void sendsSpans_THRIFT() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_THRIFT")
@Test void send_THRIFT() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("send_THRIFT")
.encoding(Encoding.THRIFT)
.build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN).execute();
send(sender, CLIENT_SPAN, CLIENT_SPAN);

assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN, CLIENT_SPAN);
Expand All @@ -101,16 +91,16 @@ class ITActiveMQSender {
@Test void illegalToSendWhenClosed() {
try (ActiveMQSender sender = activemq.newSenderBuilder("illegalToSendWhenClosed").build()) {
sender.close();
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf(
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN)).isInstanceOf(
IllegalStateException.class);
}
}

/**
* The output of toString() on {@link Sender} implementations appears in thread names created by
* {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring
* tools, care should be taken to ensure the toString() output is a reasonable length and does not
* contain sensitive information.
* The output of toString() on {@link BytesMessageSender} implementations appears in thread names
* created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other
* monitoring tools, care should be taken to ensure the toString() output is a reasonable length
* and does not contain sensitive information.
*/
@Test void toStringContainsOnlySummaryInformation() {
try (ActiveMQSender sender = activemq.newSenderBuilder("toString").build()) {
Expand All @@ -119,7 +109,7 @@ class ITActiveMQSender {
}
}

Call<Void> send(ActiveMQSender sender, Span... spans) {
void send(ActiveMQSender sender, Span... spans) throws IOException {
SpanBytesEncoder bytesEncoder;
switch (sender.encoding()) {
case JSON:
Expand All @@ -134,7 +124,7 @@ Call<Void> send(ActiveMQSender sender, Span... spans) {
default:
throw new UnsupportedOperationException("encoding: " + sender.encoding());
}
return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList()));
sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList()));
}

byte[] readMessage(ActiveMQSender sender) throws Exception {
Expand Down
34 changes: 21 additions & 13 deletions amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
import zipkin2.reporter.CheckResult;
Expand All @@ -36,7 +38,7 @@
*
* <h3>Usage</h3>
* <p>
* This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async
* This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async
* reporter}.
*
* <p>Here's a simple configuration, configured for json:
Expand Down Expand Up @@ -69,7 +71,7 @@
* RabbitMQ failure.
*
* <p>This sender is thread-safe: a channel is created for each thread that calls
* {@link #sendSpans(List)}.
* {@link #send(List)}.
*/
public final class RabbitMQSender extends Sender {
/** Creates a sender that sends {@link Encoding#JSON} messages. */
Expand Down Expand Up @@ -213,15 +215,25 @@ public Builder toBuilder() {
return encoding.listSizeInBytes(encodedSizeInBytes);
}

/** This sends all of the spans as a single message. */
@Override public Call<Void> sendSpans(List<byte[]> encodedSpans) {
/** {@inheritDoc} */
@Override @Deprecated public Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new ClosedSenderException();
byte[] message = encoder.encode(encodedSpans);
return new RabbitMQCall(message);
}

/** Ensures there are no connection issues. */
@Override public CheckResult check() {
/** {@inheritDoc} */
@Override public void send(List<byte[]> encodedSpans) throws IOException {
if (closeCalled) throw new ClosedSenderException();
publish(encoder.encode(encodedSpans));
}

void publish(byte[] message) throws IOException {
localChannel().basicPublish("", queue, null, message);
}

/** {@inheritDoc} */
@Override @Deprecated public CheckResult check() {
try {
if (localChannel().isOpen()) return CheckResult.OK;
throw new IllegalStateException("Not Open");
Expand Down Expand Up @@ -266,7 +278,7 @@ Connection newConnection() {
final ThreadLocal<Channel> CHANNEL = new ThreadLocal<Channel>();

/**
* In most circumstances there will only be one thread calling {@link #sendSpans(List)}, the
* In most circumstances there will only be one thread calling {@link #send(List)}, the
* {@link AsyncReporter}. Just in case someone is flushing manually, we use a thread-local. All of
* this is to avoid recreating a channel for each publish, as that costs two additional network
* roundtrips.
Expand All @@ -288,17 +300,13 @@ class RabbitMQCall extends Call.Base<Void> { // RabbitMQFuture is not cancelable
}

@Override protected Void doExecute() throws IOException {
publish();
publish(message);
return null;
}

void publish() throws IOException {
localChannel().basicPublish("", queue, null, message);
}

@Override protected void doEnqueue(Callback<Void> callback) {
try {
publish();
publish(message);
callback.onSuccess(null);
} catch (Throwable t) {
Call.propagateIfFatal(t);
Expand Down
Loading

0 comments on commit 26e3eb1

Please sign in to comment.