Skip to content

Commit

Permalink
Merge pull request wildfly#18506 from rhusar/WFLY-20076
Browse files Browse the repository at this point in the history
WFLY-20076 Intermittent failures in OpenTelemetry integration test cases
  • Loading branch information
bstansberry authored Dec 11, 2024
2 parents ea4909a + 0cc6c10 commit eb5fe83
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.wildfly.test.integration.microprofile.faulttolerance.opentelemetry;

import java.net.URL;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -76,34 +75,32 @@ public void test() throws Exception {
HttpRequest.get(url.toString() + "app/timeout", 10, TimeUnit.SECONDS);
}

// Pick a random metric to make sure that metrics are already collected and available for inspection
// Subsequently we will use concrete tags to lookup specific counters
List<PrometheusMetric> metrics = otelCollector.fetchMetrics("ft_invocations_total");

// Uncomment the following lines for debugging:
//System.out.println("Metrics collected:");
//metrics.forEach(metric -> System.out.println(metric.toString()));

// First verify total invocation count for the method + value returned + fallback applied
Optional<PrometheusMetric> prometheusMetric = metrics.stream().filter(metric -> metric.getKey().equals("ft_invocations_total")).findFirst();
Assert.assertTrue(prometheusMetric.isPresent());
Assert.assertEquals(INVOCATION_COUNT, Integer.parseInt(prometheusMetric.get().getValue()), 0);

// Verify the number of timeouts being equal to the number of invocations
prometheusMetric = metrics.stream()
.filter(metric -> metric.getKey().equals("ft_timeout_calls_total"))
.filter(metric -> Boolean.TRUE.toString().equalsIgnoreCase(metric.getTags().get("timedOut")))
.findFirst();
Assert.assertTrue(prometheusMetric.isPresent());
Assert.assertEquals(INVOCATION_COUNT, Integer.parseInt(prometheusMetric.get().getValue()), 0);

// Verify the number of successful invocations to be none, since it always fails
prometheusMetric = metrics.stream()
.filter(metric -> metric.getKey().equals("ft_timeout_calls_total"))
.filter(metric -> Boolean.FALSE.toString().equalsIgnoreCase(metric.getTags().get("timedOut")))
.findFirst();
Assert.assertTrue(prometheusMetric.isPresent());
Assert.assertEquals(0, Integer.parseInt(prometheusMetric.get().getValue()), 0);
otelCollector.assertMetrics(metrics -> {
Optional<PrometheusMetric> prometheusMetric;

// First verify total invocation count for the method + value returned + fallback applied
prometheusMetric = metrics.stream().filter(metric -> metric.getKey().equals("ft_invocations_total")).findFirst();
Assert.assertTrue(prometheusMetric.isPresent());
Assert.assertEquals(INVOCATION_COUNT, Integer.parseInt(prometheusMetric.get().getValue()), 0);


// Verify the number of timeouts being equal to the number of invocations
prometheusMetric = metrics.stream()
.filter(metric -> metric.getKey().equals("ft_timeout_calls_total"))
.filter(metric -> Boolean.TRUE.toString().equalsIgnoreCase(metric.getTags().get("timedOut")))
.findFirst();
Assert.assertTrue(prometheusMetric.isPresent());
Assert.assertEquals(INVOCATION_COUNT, Integer.parseInt(prometheusMetric.get().getValue()), 0);


// Verify the number of successful invocations to be none, since it always fails
prometheusMetric = metrics.stream()
.filter(metric -> metric.getKey().equals("ft_timeout_calls_total"))
.filter(metric -> Boolean.FALSE.toString().equalsIgnoreCase(metric.getTags().get("timedOut")))
.findFirst();
Assert.assertTrue(prometheusMetric.isPresent());
Assert.assertEquals(0, Integer.parseInt(prometheusMetric.get().getValue()), 0);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;

import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.arquillian.container.test.api.RunAsClient;
import org.jboss.arquillian.junit.Arquillian;
Expand Down Expand Up @@ -94,9 +95,8 @@ public void getMetrics() throws InterruptedException {
"undertow_bytes_received"
);

final List<PrometheusMetric> metrics = otelCollector.fetchMetrics(metricsToTest.get(0));
metricsToTest.forEach(n -> Assert.assertTrue("Missing metric: " + n,
metrics.stream().anyMatch(m -> m.getKey().startsWith(n))));
otelCollector.assertMetrics(prometheusMetrics -> metricsToTest.forEach(n -> Assert.assertTrue("Missing metric: " + n,
prometheusMetrics.stream().anyMatch(m -> m.getKey().startsWith(n)))));
}

@Test
Expand All @@ -116,19 +116,19 @@ public void testApplicationModelMetrics() throws InterruptedException {
"undertow_session_max_alive_time_seconds",
"undertow_sessions_created_total"
);
final List<PrometheusMetric> metrics = otelCollector.fetchMetrics(metricsToTest.get(0));

Map<String, PrometheusMetric> appMetrics =
metrics.stream().filter(m -> m.getTags().entrySet().stream()
.anyMatch(t -> "app".equals(t.getKey()) && DEPLOYMENT_NAME.equals(t.getValue()))
)
.collect(Collectors.toMap(PrometheusMetric::getKey, i -> i));
otelCollector.assertMetrics(prometheusMetrics -> {
Map<String, PrometheusMetric> appMetrics =
prometheusMetrics.stream().filter(m -> m.getTags().entrySet().stream()
.anyMatch(t -> "app".equals(t.getKey()) && DEPLOYMENT_NAME.equals(t.getValue()))
)
.collect(Collectors.toMap(PrometheusMetric::getKey, i -> i));

metricsToTest.forEach(m -> Assert.assertTrue("Missing app metric: " + m, appMetrics.containsKey(m)));
metricsToTest.forEach(m -> Assert.assertTrue("Missing app metric: " + m, appMetrics.containsKey(m)));
});
}



@Test
@InSequence(5)
public void testJmxMetrics() throws InterruptedException {
Expand All @@ -142,14 +142,15 @@ public void testJmxMetrics() throws InterruptedException {
"thread_daemon_count",
"cpu_available_processors"
);
final List<PrometheusMetric> metrics = otelCollector.fetchMetrics(metricsToTest.get(0));

metricsToTest.forEach(m -> {
Assert.assertNotEquals("Metric value should be non-zero: " + m,
"0", metrics.stream().filter(e -> e.getKey().startsWith(m))
.findFirst()
.orElseThrow()
.getValue()); // Add the metrics tags to complete the key

otelCollector.assertMetrics(prometheusMetrics -> {
metricsToTest.forEach(m -> {
Assert.assertNotEquals("Metric value should be non-zero: " + m,
"0", prometheusMetrics.stream().filter(e -> e.getKey().startsWith(m))
.findFirst()
.orElseThrow()
.getValue()); // Add the metrics tags to complete the key
});
});
}

Expand All @@ -162,7 +163,7 @@ private Map<String, String> getMetricsMap(String response) {

private String[] splitMetric(String entry) {
int index = entry.lastIndexOf(" ");
return new String[]{
return new String[] {
entry.substring(0, index),
entry.substring(index + 1)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collections;
import java.util.List;

import org.jboss.arquillian.container.test.api.Deployment;
Expand Down Expand Up @@ -55,26 +54,13 @@ public void dataTest(@ArquillianResource @OperateOnDeployment(ENTERPRISE_APP) UR
makeRequests(new URI(String.format("%s/%s/%s/%s", earUrl, ENTERPRISE_APP, SERVICE_ONE, DuplicateMetricResource1.TAG)));
makeRequests(new URI(String.format("%s/%s/%s/%s", earUrl, ENTERPRISE_APP, SERVICE_TWO, DuplicateMetricResource2.TAG)));

List<PrometheusMetric> results = Collections.emptyList();

int attemptCount = 0;

while (attemptCount < 10) {
attemptCount++;

results = getMetricsByName(
otelCollector.fetchMetrics(DuplicateMetricResource1.METER_NAME),
otelCollector.assertMetrics(prometheusMetrics -> {
List<PrometheusMetric> results = getMetricsByName(prometheusMetrics,
DuplicateMetricResource1.METER_NAME + "_total"); // Adjust for Prometheus naming conventions
// On occasion, it seems that the test will grab the metrics between updates, so we get the metric for app 1,
// but app 2 has not been published yet. We loop here, then, for a short while to give the server time to
// publish the metrics it has. After that short while, we break out and fail.
if (results.size() == 2) {
break;
} else {
Thread.sleep(500);
}
}
Assert.assertEquals(2, results.size());
results.forEach(r -> Assert.assertEquals("" + REQUEST_COUNT, r.getValue()));

Assert.assertEquals(2, results.size());
results.forEach(r -> Assert.assertEquals("" + REQUEST_COUNT, r.getValue()));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package org.wildfly.test.integration.observability.micrometer.multiple;

import static org.junit.Assert.*;

import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
Expand All @@ -17,7 +19,6 @@
import org.jboss.as.test.shared.observability.signals.PrometheusMetric;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.WebArchive;
import org.junit.Assert;
import org.junit.Test;
import org.wildfly.test.integration.observability.JaxRsActivator;
import org.wildfly.test.integration.observability.micrometer.multiple.application.DuplicateMetricResource1;
Expand Down Expand Up @@ -47,11 +48,12 @@ public void checkPingCount(@ArquillianResource @OperateOnDeployment(SERVICE_ONE)
makeRequests(new URI(String.format("%s/%s", serviceOne, DuplicateMetricResource1.TAG)));
makeRequests(new URI(String.format("%s/%s", serviceTwo, DuplicateMetricResource2.TAG)));

List<PrometheusMetric> results = getMetricsByName(
otelCollector.fetchMetrics(DuplicateMetricResource1.METER_NAME),
DuplicateMetricResource1.METER_NAME + "_total"); // Adjust for Prometheus naming conventions
otelCollector.assertMetrics(prometheusMetrics -> {
List<PrometheusMetric> results = getMetricsByName(prometheusMetrics,
DuplicateMetricResource1.METER_NAME + "_total"); // Adjust for Prometheus naming conventions

Assert.assertEquals(2, results.size());
results.forEach(r -> Assert.assertEquals("" + REQUEST_COUNT, r.getValue()));
assertEquals(2, results.size());
results.forEach(r -> assertEquals("" + REQUEST_COUNT, r.getValue()));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.jboss.arquillian.test.api.ArquillianResource;
import org.jboss.as.arquillian.api.ServerSetup;
import org.jboss.as.test.shared.observability.setuptasks.OpenTelemetryWithCollectorSetupTask;
import org.jboss.as.test.shared.observability.signals.PrometheusMetric;
import org.jboss.shrinkwrap.api.Archive;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -57,16 +56,9 @@ public void makeRequests() throws MalformedURLException {
@Test
@InSequence(2)
public void getMetrics() throws InterruptedException {
List<String> metricsToTest = List.of(
OtelMetricResource.COUNTER_NAME
);
List<String> metricsToTest = List.of(OtelMetricResource.COUNTER_NAME);

final List<PrometheusMetric> metrics = otelCollector.fetchMetrics(metricsToTest.get(0));
System.err.println("**************\nPrometheus metrics:");
System.err.println(metrics);
System.err.println("**************");

metricsToTest.forEach(n -> Assert.assertTrue("Missing metric: " + n,
metrics.stream().anyMatch(m -> m.getKey().startsWith(n))));
otelCollector.assertMetrics(prometheusMetrics -> metricsToTest.forEach(n -> Assert.assertTrue("Missing metric: " + n,
prometheusMetrics.stream().anyMatch(m -> m.getKey().startsWith(n)))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,34 @@
*/
package org.jboss.as.test.shared.observability.containers;

import static org.junit.Assert.assertTrue;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;

import org.jboss.as.test.shared.TimeoutUtil;
import org.jboss.as.test.shared.observability.signals.PrometheusMetric;
import org.jboss.as.test.shared.observability.signals.jaeger.JaegerTrace;
import org.junit.Assert;
import org.testcontainers.utility.MountableFile;

/**
* @author Jason Lee
* @author Radoslav Husar
*/
public class OpenTelemetryCollectorContainer extends BaseContainer<OpenTelemetryCollectorContainer> {
public static final String IMAGE_NAME = "otel/opentelemetry-collector";
public static final String IMAGE_VERSION = "0.103.1";
Expand All @@ -28,7 +41,7 @@ public class OpenTelemetryCollectorContainer extends BaseContainer<OpenTelemetry
public static final int HEALTH_CHECK_PORT = 13133;
public static final String OTEL_COLLECTOR_CONFIG_YAML = "/etc/otel-collector-config.yaml";

private JaegerContainer jaegerContainer;
private final JaegerContainer jaegerContainer;

public OpenTelemetryCollectorContainer() {
super("OpenTelemetryCollector", IMAGE_NAME, IMAGE_VERSION,
Expand Down Expand Up @@ -76,26 +89,76 @@ public List<JaegerTrace> getTraces(String serviceName) throws InterruptedExcepti
return (jaegerContainer != null ? jaegerContainer.getTraces(serviceName) : Collections.emptyList());
}

public List<PrometheusMetric> fetchMetrics(String nameToMonitor) throws InterruptedException {
String body = "";
try (Client client = ClientBuilder.newClient()) {
WebTarget target = client.target(getPrometheusUrl());
Duration DEFAULT_TIMEOUT = Duration.ofSeconds(TimeoutUtil.adjust(30));

/**
* Continually evaluates assertions provided in a consumer until the state obtained from the Prometheus endpoint
* matches the expected state or until a timeout elapses.
* By default, polls the collector every second for 30 seconds.
* Returns snapshot of the prometheus registry that passed the assertions; typically ignored.
*
* @param assertionConsumer consumer implementation that contains {@link Assert}ions throwing {@link AssertionError#AssertionError()}s
* if the state obtained from the Prometheus endpoint does not match the expected state
* @return list of prometheus metrics; typically ignored.
* @throws AssertionError if
* @throws InterruptedException if interrupted
*/
public List<PrometheusMetric> assertMetrics(Consumer<List<PrometheusMetric>> assertionConsumer) throws AssertionError, InterruptedException {
return this.assertMetrics(assertionConsumer, DEFAULT_TIMEOUT);
}

int attemptCount = 0;
boolean found = false;
/**
* Variant of {@link OpenTelemetryCollectorContainer#assertMetrics(Consumer)} that can be configured with a timeout duration.
*/
public List<PrometheusMetric> assertMetrics(Consumer<List<PrometheusMetric>> assertionConsumer, Duration timeout) throws AssertionError, InterruptedException {
debugLog("assertMetrics(..) validation starting.");

// Request counts can vary. Setting high to help ensure test stability
while (!found && attemptCount < 30) {
// Wait to give Micrometer time to export
Thread.sleep(1000);
Instant endTime = Instant.now().plus(timeout);

AssertionError lastAssertionError = null;

while (Instant.now().isBefore(endTime)) {
try {
List<PrometheusMetric> prometheusMetrics = fetchMetrics();

assertionConsumer.accept(prometheusMetrics);

body = target.request().get().readEntity(String.class);
found = body.contains(nameToMonitor);
attemptCount++;
debugLog("assertMetrics(..) validation passed.");

return prometheusMetrics;
} catch (AssertionError assertionError) {
debugLog("assertMetrics(..) validation failed - retrying.");
lastAssertionError = assertionError;
Thread.sleep(1000);
}
}

return buildPrometheusMetrics(body);
throw Objects.requireNonNullElseGet(lastAssertionError, AssertionError::new);
}

/**
* Fetches a current snapshot of the metrics from the Prometheus endpoint.
*
* @return list of prometheus metrics
*/
public List<PrometheusMetric> fetchMetrics() {
try (Client client = ClientBuilder.newClient()) {
WebTarget target = client.target(getPrometheusUrl());
return buildPrometheusMetrics(target.request().get().readEntity(String.class));
}
}

/**
* @deprecated Use {@link OpenTelemetryCollectorContainer#assertMetrics(Consumer)} instead.
*/
@Deprecated
public List<PrometheusMetric> fetchMetrics(String nameToMonitor) throws InterruptedException {
return assertMetrics(prometheusMetrics -> {
assertTrue(
String.format("Metric %s not seen in Prometheus within timeout.", nameToMonitor),
prometheusMetrics.stream().anyMatch(x -> x.getKey().contains(nameToMonitor))
);
});
}

private List<PrometheusMetric> buildPrometheusMetrics(String body) {
Expand Down

0 comments on commit eb5fe83

Please sign in to comment.