Skip to content

Commit

Permalink
Deprecates Sender for much simpler BytesMessageSender (#244)
Browse files Browse the repository at this point in the history
async reporters never actually use the async code of the senders. The async side was only used by zipkin-server, as the async reporter design intentionally uses a blocking loop to flush the span backlog. So, this makes things simpler by only requiring a blocking implementation: `BytesMessageSender.send` (chosen to not create symbol collisions and similar to `BytesMessageEncoder`.

The side effect is that new senders can use this interface and completely avoid the complicated `Call` then `execute` chain, deferring to whatever the library-specific blocking path is.

As older spring libraries like sleuth may not upgrade, this maintains the old types until reporter 4. However, new senders can simply implement the new type and be done with it.

Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
codefromthecrypt authored Jan 14, 2024
1 parent 2dbac71 commit 22b0e37
Show file tree
Hide file tree
Showing 63 changed files with 931 additions and 503 deletions.
55 changes: 55 additions & 0 deletions RATIONALE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# zipkin-reporter rationale

## Sending an empty list is permitted

Historically, we had a `Sender.check()` function for fail fast reasons, but it was rarely used and
rarely implemented correctly. In some cases, people returned `OK` having no knowledge of if the
health was good or not. In one case, Stackdriver, a seemingly good implementation was avoided for
directly sending an empty list of spans, until `check()` was changed to do the same. Rather than
define a poorly implementable `Sender.check()` which would likely still require sending an empty
list, we decided to document a call to send no spans should pass through.

Two known examples of using `check()` were in server modules that forward spans with zipkin reporter
and finagle. `zipkin-finagle` is no longer maintained, so we'll focus on the server modules.

zipkin-stackdriver (now zipkin-gcp) was both important to verify and difficult to implement a
meaningful `check()`. First attempts looked good, but would pass even when users had no permission
to write spans. For this reason, people ignored the check and did out-of-band sending zero spans to
the POST endpoint. Later, this logic was made the primary impl of `check()`.

In HTTP senders a `check()` would be invalid for non-intuitive reasons unless you also just posted
no spans. For example, while zipkin has a `/health` endpoint, most clones do not implement that or
put it at a different path. So, you can't check with `/health` and are left with either falsely
returning `OK` or sending an empty list of spans.

Note that zipkin server does obviate calls to storage when incoming lists are empty. This is not
just for things like this, but 3rd party instrumentation which bugged out and sent no spans.

Messaging senders came close to implementing health except would suffer similar problems as
Stackdriver did. For example, verifying broker connectivity doesn't mean the queue or topic works.
While you can dig around and solve this for some brokers, it ends up the same situation.

Another way could be to catch an exception from a prior "POST", and if that failed, return a
corresponding status. This could not be for fail-fast because the caller wouldn't have any spans to
send, yet. It is complicated code for a function uncommon in instrumentation and the impl would be
hard to reason with concurrently.

The main problem is that we used the same `Component` type in reporter as we did for zipkin server,
which defined `check()` in a hardly used and hardly implementable way except sending no spans.

We had the following choices:

* force implementation of `check()` knowing its problems and that it is usual in instrumentation
* document that implementors can skip `send(empty)` even though call sites use this today
* document that you should not skip `send(empty)`, so that the few callers can use it for fail-fast

The main driving points were how niche this function is (not called by many, or on interval),
and how much work it is to implement a `check()` vs allowing an empty send to proceed. In the
current code base, the only work required for the latter was documentation, as all senders would
pass an empty list. Secondary driving force was that the `BytesMessageSender` main goal is easier
implementation and re-introducing a bad `check()` api gets in the way of this.

Due to the complexity of this problem, we decided that rather to leave empty undefined, document
sending empty is ok. This allows a couple users to implement a fail-fast in a portable way, without
burdening implementers of `BytesMessageSender` with an unimplementable or wrong `check()` function
for most platforms.
20 changes: 13 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Reporter.CONSOLE.report(span);

## AsyncReporter
AsyncReporter is how you actually get spans to zipkin. By default, it waits up to a second
before flushes any pending spans out of process via a Sender.
before flushes any pending spans out of process via a BytesMessageSender.

```java
reporter = AsyncReporter.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans"));
Expand Down Expand Up @@ -67,7 +67,7 @@ Here are the most important properties to understand when tuning.
Property | Description
--- | ---
`queuedMaxBytes` | Maximum backlog of span bytes reported vs sent. Corresponds to `ReporterMetrics.updateQueuedBytes`. Default 1% of heap
`messageMaxBytes` | Maximum bytes sendable per message including overhead. Default `500,000` bytes (`500KB`). Defined by `Sender.messageMaxBytes`
`messageMaxBytes` | Maximum bytes sendable per message including overhead. Default `500,000` bytes (`500KB`). Defined by `BytesMessageSender.messageMaxBytes`
`messageTimeout` | Maximum time to wait for messageMaxBytes to accumulate before sending. Default 1 second
`closeTimeout` | Maximum time to block for in-flight spans to send on close. Default 1 second

Expand All @@ -84,11 +84,12 @@ by a large `messageTimeout` or `messageMaxBytes`. Consider lowering the
`messageMaxBytes` if this occurs, as it will result in less work per
message.

## Sender
## BytesMessageSender
The sender component handles the last step of sending a list of encoded spans onto a transport.
This involves I/O, so you can call `Sender.check()` to check its health on a given frequency.
This involves I/O, so you can call `sender.send(Collections.emptyList())` to check it works before
using.

Sender is used by AsyncReporter, but you can also create your own if you need to.
BytesMessageSender is used by AsyncReporter, but you can also create your own if you need to.
```java
class CustomReporter implements Flushable {

Expand All @@ -99,7 +100,12 @@ class CustomReporter implements Flushable {

// Is the connection healthy?
public boolean ok() {
return sender.check().ok();
try {
sender.send(Collections.emptyList());
return true;
} catch (Exception e) {
return false;
}
}

public void report(Span span) {
Expand All @@ -113,7 +119,7 @@ class CustomReporter implements Flushable {
pending.drainTo(drained);
if (drained.isEmpty()) return;

sender.sendSpans(drained, callback).execute();
sender.send(drained);
}
```

Expand Down
2 changes: 1 addition & 1 deletion activemq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-reporter-parent</artifactId>
<groupId>io.zipkin.reporter2</groupId>
<version>3.1.2-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import javax.jms.JMSException;
import javax.jms.QueueSender;
import org.apache.activemq.ActiveMQConnectionFactory;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
import zipkin2.reporter.CheckResult;
Expand All @@ -32,7 +34,7 @@
*
* <h3>Usage</h3>
* <p>
* This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async
* This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async
* reporter}.
*
* <p>Here's a simple configuration, configured for json:
Expand Down Expand Up @@ -150,13 +152,33 @@ public final ActiveMQSender build() {
return encoding.listSizeInBytes(encodedSpans);
}

@Override public Call<Void> sendSpans(List<byte[]> encodedSpans) {
/** {@inheritDoc} */
@Override @Deprecated public Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (closeCalled) throw new ClosedSenderException();
byte[] message = encoder.encode(encodedSpans);
return new ActiveMQCall(message);
}

@Override public CheckResult check() {
/** {@inheritDoc} */
@Override public void send(List<byte[]> encodedSpans) throws IOException {
if (closeCalled) throw new ClosedSenderException();
send(encoder.encode(encodedSpans));
}

void send(byte[] message) throws IOException {
try {
ActiveMQConn conn = lazyInit.get();
QueueSender sender = conn.sender;
BytesMessage bytesMessage = conn.session.createBytesMessage();
bytesMessage.writeBytes(message);
sender.send(bytesMessage);
} catch (JMSException e) {
throw ioException("Unable to send message: ", e);
}
}

/** {@inheritDoc} */
@Override @Deprecated public CheckResult check() {
try {
lazyInit.get();
} catch (Throwable t) {
Expand All @@ -171,7 +193,7 @@ public final ActiveMQSender build() {
lazyInit.close();
}

@Override public final String toString() {
@Override public String toString() {
return "ActiveMQSender{"
+ "brokerURL=" + lazyInit.connectionFactory.getBrokerURL()
+ ", queue=" + lazyInit.queue
Expand All @@ -186,29 +208,17 @@ final class ActiveMQCall extends Call.Base<Void> { // ActiveMQCall is not cancel
}

@Override protected Void doExecute() throws IOException {
send();
send(message);
return null;
}

void send() throws IOException {
try {
ActiveMQConn conn = lazyInit.get();
QueueSender sender = conn.sender;
BytesMessage bytesMessage = conn.session.createBytesMessage();
bytesMessage.writeBytes(message);
sender.send(bytesMessage);
} catch (JMSException e) {
throw ioException("Unable to send message: ", e);
}
}

@Override public Call<Void> clone() {
return new ActiveMQCall(message);
}

@Override protected void doEnqueue(Callback<Void> callback) {
try {
send();
send(message);
callback.onSuccess(null);
} catch (Throwable t) {
Call.propagateIfFatal(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package zipkin2.reporter.activemq;

import java.io.IOException;
import java.util.Collections;
import java.util.stream.Stream;
import javax.jms.BytesMessage;
import javax.jms.MessageConsumer;
Expand All @@ -25,12 +26,10 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.reporter.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Call;
import zipkin2.reporter.CheckResult;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.Sender;
import zipkin2.reporter.SpanBytesEncoder;

import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -43,55 +42,46 @@
class ITActiveMQSender {
@Container ActiveMQContainer activemq = new ActiveMQContainer();

@Test void checkPasses() {
@Test void emptyOk() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("checkPasses").build()) {
assertThat(sender.check().ok()).isTrue();
}
}

@Test void checkFalseWhenBrokerIsDown() {
// we can be pretty certain ActiveMQ isn't running on localhost port 80
try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) {
CheckResult check = sender.check();
assertThat(check.ok()).isFalse();
assertThat(check.error()).isInstanceOf(IOException.class);
sender.send(Collections.emptyList());
}
}

@Test void sendFailsWithInvalidActiveMqServer() {
// we can be pretty certain ActiveMQ isn't running on localhost port 80
try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) {
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf(
IOException.class)
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN))
.isInstanceOf(IOException.class)
.hasMessageContaining("Unable to establish connection to ActiveMQ broker");
}
}

@Test void sendsSpans() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans").build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN).execute();
@Test void send() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("send").build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN);

assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN, CLIENT_SPAN);
}
}

@Test void sendsSpans_PROTO3() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_PROTO3")
@Test void send_PROTO3() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("send_PROTO3")
.encoding(Encoding.PROTO3)
.build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN).execute();
send(sender, CLIENT_SPAN, CLIENT_SPAN);

assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN, CLIENT_SPAN);
}
}

@Test void sendsSpans_THRIFT() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_THRIFT")
@Test void send_THRIFT() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("send_THRIFT")
.encoding(Encoding.THRIFT)
.build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN).execute();
send(sender, CLIENT_SPAN, CLIENT_SPAN);

assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN, CLIENT_SPAN);
Expand All @@ -101,16 +91,16 @@ class ITActiveMQSender {
@Test void illegalToSendWhenClosed() {
try (ActiveMQSender sender = activemq.newSenderBuilder("illegalToSendWhenClosed").build()) {
sender.close();
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf(
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN)).isInstanceOf(
IllegalStateException.class);
}
}

/**
* The output of toString() on {@link Sender} implementations appears in thread names created by
* {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring
* tools, care should be taken to ensure the toString() output is a reasonable length and does not
* contain sensitive information.
* The output of toString() on {@link BytesMessageSender} implementations appears in thread names
* created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other
* monitoring tools, care should be taken to ensure the toString() output is a reasonable length
* and does not contain sensitive information.
*/
@Test void toStringContainsOnlySummaryInformation() {
try (ActiveMQSender sender = activemq.newSenderBuilder("toString").build()) {
Expand All @@ -119,7 +109,7 @@ class ITActiveMQSender {
}
}

Call<Void> send(ActiveMQSender sender, Span... spans) {
void send(ActiveMQSender sender, Span... spans) throws IOException {
SpanBytesEncoder bytesEncoder;
switch (sender.encoding()) {
case JSON:
Expand All @@ -134,7 +124,7 @@ Call<Void> send(ActiveMQSender sender, Span... spans) {
default:
throw new UnsupportedOperationException("encoding: " + sender.encoding());
}
return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList()));
sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList()));
}

byte[] readMessage(ActiveMQSender sender) throws Exception {
Expand Down
2 changes: 1 addition & 1 deletion amqp-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-parent</artifactId>
<version>3.1.2-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-sender-amqp-client</artifactId>
Expand Down
Loading

0 comments on commit 22b0e37

Please sign in to comment.