From 1a88bca865a6f0935e1a840c1c638a527f2a1c77 Mon Sep 17 00:00:00 2001 From: Slawomir Wieczorek Date: Fri, 19 Jan 2024 15:47:22 +0100 Subject: [PATCH] HELLODATA-980 - request/reply connection check for NatsHealthIndicator (#9) Co-authored-by: Slawomir Wieczorek --- .../commons/nats/NatsConfiguration.java | 1 + .../nats/actuator/NatsHealthIndicator.java | 63 ++++++++++++++++++- .../sidecars/events/RequestReplySubject.java | 1 + .../user/KeycloakTestContainerTest.java | 3 + 4 files changed, 66 insertions(+), 2 deletions(-) diff --git a/hello-data-commons/hello-data-nats-spring/src/main/java/ch/bedag/dap/hellodata/commons/nats/NatsConfiguration.java b/hello-data-commons/hello-data-nats-spring/src/main/java/ch/bedag/dap/hellodata/commons/nats/NatsConfiguration.java index bbf7a0fc..567fd501 100644 --- a/hello-data-commons/hello-data-nats-spring/src/main/java/ch/bedag/dap/hellodata/commons/nats/NatsConfiguration.java +++ b/hello-data-commons/hello-data-nats-spring/src/main/java/ch/bedag/dap/hellodata/commons/nats/NatsConfiguration.java @@ -28,6 +28,7 @@ import ch.bedag.dap.hellodata.commons.nats.actuator.NatsHealthIndicator; import ch.bedag.dap.hellodata.commons.nats.bean.NatsConfigBeanPostProcessor; +import com.fasterxml.jackson.databind.ObjectMapper; import io.nats.client.Connection; import io.nats.client.ConnectionListener; import io.nats.spring.boot.autoconfigure.NatsAutoConfiguration; diff --git a/hello-data-commons/hello-data-nats-spring/src/main/java/ch/bedag/dap/hellodata/commons/nats/actuator/NatsHealthIndicator.java b/hello-data-commons/hello-data-nats-spring/src/main/java/ch/bedag/dap/hellodata/commons/nats/actuator/NatsHealthIndicator.java index bc553895..47ae97cd 100644 --- a/hello-data-commons/hello-data-nats-spring/src/main/java/ch/bedag/dap/hellodata/commons/nats/actuator/NatsHealthIndicator.java +++ b/hello-data-commons/hello-data-nats-spring/src/main/java/ch/bedag/dap/hellodata/commons/nats/actuator/NatsHealthIndicator.java @@ -26,12 +26,48 @@ */ package ch.bedag.dap.hellodata.commons.nats.actuator; +import ch.bedag.dap.hellodata.commons.SlugifyUtil; +import ch.bedag.dap.hellodata.commons.sidecars.events.RequestReplySubject; import io.nats.client.Connection; +import io.nats.client.Dispatcher; +import io.nats.client.Message; +import jakarta.annotation.PostConstruct; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Base64; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; +import org.springframework.util.StringUtils; +@Log4j2 public class NatsHealthIndicator extends AbstractHealthIndicator { private final Connection natsConnection; + @Value("${spring.application.name}") + private String appName; + @Value("${hello-data.instance.name:}") + private String instanceName; + private String subject; + private String subjectBase64; + + /** + * Generate subject and listen for connection messages within the request/reply pattern + */ + @PostConstruct + public void listenForConnectionCheckRequests() { + String subjectBase = SlugifyUtil.slugify(RequestReplySubject.NATS_CONNECTION_HEALTH_CHECK.getSubject()); + subject = this.appName + "-" + subjectBase + (StringUtils.hasText(instanceName) ? "-" + instanceName : ""); + subjectBase64 = new String(Base64.getEncoder().encode(subject.getBytes(StandardCharsets.UTF_8))); + log.debug("[NATS connection check] Listening for messages on subject {}", subject); + Dispatcher dispatcher = natsConnection.createDispatcher((msg) -> { + String message = new String(msg.getData()); + log.debug("[NATS connection check] Received request for NATS connection check", message); + natsConnection.publish(msg.getReplyTo(), "OK".getBytes(StandardCharsets.UTF_8)); + msg.ack(); + }); + dispatcher.subscribe(subjectBase64); + } public NatsHealthIndicator(Connection natsConnection) { this.natsConnection = natsConnection; @@ -39,7 +75,6 @@ public NatsHealthIndicator(Connection natsConnection) { @Override protected void doHealthCheck(Health.Builder builder) { - //builder.withDetail("connection", natsConnection.getConnectedUrl()).withDetail("servers", natsConnection.getOptions().getServers()); String connectionName = natsConnection.getOptions().getConnectionName(); if (connectionName != null) { builder.withDetail("name", connectionName); @@ -47,10 +82,34 @@ protected void doHealthCheck(Health.Builder builder) { builder.withDetail("status", natsConnection.getStatus()); switch (natsConnection.getStatus()) { - case CONNECTED -> builder.up(); + case CONNECTED -> checkRequestReplyConnection(builder); case CONNECTING, RECONNECTING -> builder.outOfService(); default -> // CLOSED, DISCONNECTED builder.down(); } } + + /** + * Utilize the connection to be sure if request/reply pattern works (even tho the connection is up) + * @param builder + * @return builder status + */ + private Health.Builder checkRequestReplyConnection(Health.Builder builder) { + String subjectBase64 = new String(Base64.getEncoder().encode(subject.getBytes(StandardCharsets.UTF_8))); + log.debug("[NATS connection check] Sending request to subjectBase: {}", subject); + Message reply = null; + try { + reply = natsConnection.request(subjectBase64, subject.getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(10)); + } catch (Exception exception) { + log.error("[NATS connection check] Could not connect to NATS", exception); + return builder.down(); + } + if (reply == null) { + log.warn("[NATS connection check] Reply is null, please verify NATS connection"); + return builder.down(); + } + reply.ack(); + log.debug("[NATS connection check] Reply received: " + new String(reply.getData())); + return builder.up(); + } } diff --git a/hello-data-commons/hello-data-sidecar-common/src/main/java/ch/bedag/dap/hellodata/commons/sidecars/events/RequestReplySubject.java b/hello-data-commons/hello-data-sidecar-common/src/main/java/ch/bedag/dap/hellodata/commons/sidecars/events/RequestReplySubject.java index ef1f182a..a402ce46 100644 --- a/hello-data-commons/hello-data-sidecar-common/src/main/java/ch/bedag/dap/hellodata/commons/sidecars/events/RequestReplySubject.java +++ b/hello-data-commons/hello-data-sidecar-common/src/main/java/ch/bedag/dap/hellodata/commons/sidecars/events/RequestReplySubject.java @@ -34,6 +34,7 @@ @Getter public enum RequestReplySubject { UPDATE_DASHBOARD_ROLES_FOR_USER("-update_dashboard_roles_for_user"), + NATS_CONNECTION_HEALTH_CHECK("nats_connection_health_check"), ; private final String subject; diff --git a/hello-data-portal/hello-data-portal-api/src/test/java/ch/bedag/dap/hellodata/portal/user/KeycloakTestContainerTest.java b/hello-data-portal/hello-data-portal-api/src/test/java/ch/bedag/dap/hellodata/portal/user/KeycloakTestContainerTest.java index a3884c18..cc6a52ff 100644 --- a/hello-data-portal/hello-data-portal-api/src/test/java/ch/bedag/dap/hellodata/portal/user/KeycloakTestContainerTest.java +++ b/hello-data-portal/hello-data-portal-api/src/test/java/ch/bedag/dap/hellodata/portal/user/KeycloakTestContainerTest.java @@ -26,6 +26,7 @@ */ package ch.bedag.dap.hellodata.portal.user; +import ch.bedag.dap.hellodata.commons.nats.actuator.NatsHealthIndicator; import ch.bedag.dap.hellodata.commons.nats.service.NatsSenderService; import ch.bedag.dap.hellodata.portal.initialize.service.RolesInitializer; import ch.bedag.dap.hellodata.portal.monitoring.service.StorageSizeService; @@ -75,6 +76,8 @@ public abstract class KeycloakTestContainerTest { private RoleService roleService; @MockBean private StorageSizeService storageSizeService; + @MockBean + private NatsHealthIndicator natsHealthIndicator; @DynamicPropertySource static void registerResourceServerIssuerProperty(DynamicPropertyRegistry registry) {