Skip to content

Commit

Permalink
HELLODATA-980 - request/reply connection check for NatsHealthIndicator (
Browse files Browse the repository at this point in the history
#9)

Co-authored-by: Slawomir Wieczorek <[email protected]>
  • Loading branch information
wieczorslawo and Slawomir Wieczorek authored Jan 19, 2024
1 parent ca0d943 commit 1a88bca
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import ch.bedag.dap.hellodata.commons.nats.actuator.NatsHealthIndicator;
import ch.bedag.dap.hellodata.commons.nats.bean.NatsConfigBeanPostProcessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.spring.boot.autoconfigure.NatsAutoConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,90 @@
*/
package ch.bedag.dap.hellodata.commons.nats.actuator;

import ch.bedag.dap.hellodata.commons.SlugifyUtil;
import ch.bedag.dap.hellodata.commons.sidecars.events.RequestReplySubject;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Message;
import jakarta.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.util.StringUtils;

@Log4j2
public class NatsHealthIndicator extends AbstractHealthIndicator {
private final Connection natsConnection;
@Value("${spring.application.name}")
private String appName;
@Value("${hello-data.instance.name:}")
private String instanceName;
private String subject;
private String subjectBase64;

/**
* Generate subject and listen for connection messages within the request/reply pattern
*/
@PostConstruct
public void listenForConnectionCheckRequests() {
String subjectBase = SlugifyUtil.slugify(RequestReplySubject.NATS_CONNECTION_HEALTH_CHECK.getSubject());
subject = this.appName + "-" + subjectBase + (StringUtils.hasText(instanceName) ? "-" + instanceName : "");
subjectBase64 = new String(Base64.getEncoder().encode(subject.getBytes(StandardCharsets.UTF_8)));
log.debug("[NATS connection check] Listening for messages on subject {}", subject);
Dispatcher dispatcher = natsConnection.createDispatcher((msg) -> {
String message = new String(msg.getData());
log.debug("[NATS connection check] Received request for NATS connection check", message);
natsConnection.publish(msg.getReplyTo(), "OK".getBytes(StandardCharsets.UTF_8));
msg.ack();
});
dispatcher.subscribe(subjectBase64);
}

public NatsHealthIndicator(Connection natsConnection) {
this.natsConnection = natsConnection;
}

@Override
protected void doHealthCheck(Health.Builder builder) {
//builder.withDetail("connection", natsConnection.getConnectedUrl()).withDetail("servers", natsConnection.getOptions().getServers());
String connectionName = natsConnection.getOptions().getConnectionName();
if (connectionName != null) {
builder.withDetail("name", connectionName);
}

builder.withDetail("status", natsConnection.getStatus());
switch (natsConnection.getStatus()) {
case CONNECTED -> builder.up();
case CONNECTED -> checkRequestReplyConnection(builder);
case CONNECTING, RECONNECTING -> builder.outOfService();
default -> // CLOSED, DISCONNECTED
builder.down();
}
}

/**
* Utilize the connection to be sure if request/reply pattern works (even tho the connection is up)
* @param builder
* @return builder status
*/
private Health.Builder checkRequestReplyConnection(Health.Builder builder) {
String subjectBase64 = new String(Base64.getEncoder().encode(subject.getBytes(StandardCharsets.UTF_8)));
log.debug("[NATS connection check] Sending request to subjectBase: {}", subject);
Message reply = null;
try {
reply = natsConnection.request(subjectBase64, subject.getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(10));
} catch (Exception exception) {
log.error("[NATS connection check] Could not connect to NATS", exception);
return builder.down();
}
if (reply == null) {
log.warn("[NATS connection check] Reply is null, please verify NATS connection");
return builder.down();
}
reply.ack();
log.debug("[NATS connection check] Reply received: " + new String(reply.getData()));
return builder.up();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
@Getter
public enum RequestReplySubject {
UPDATE_DASHBOARD_ROLES_FOR_USER("-update_dashboard_roles_for_user"),
NATS_CONNECTION_HEALTH_CHECK("nats_connection_health_check"),
;
private final String subject;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
package ch.bedag.dap.hellodata.portal.user;

import ch.bedag.dap.hellodata.commons.nats.actuator.NatsHealthIndicator;
import ch.bedag.dap.hellodata.commons.nats.service.NatsSenderService;
import ch.bedag.dap.hellodata.portal.initialize.service.RolesInitializer;
import ch.bedag.dap.hellodata.portal.monitoring.service.StorageSizeService;
Expand Down Expand Up @@ -75,6 +76,8 @@ public abstract class KeycloakTestContainerTest {
private RoleService roleService;
@MockBean
private StorageSizeService storageSizeService;
@MockBean
private NatsHealthIndicator natsHealthIndicator;

@DynamicPropertySource
static void registerResourceServerIssuerProperty(DynamicPropertyRegistry registry) {
Expand Down

0 comments on commit 1a88bca

Please sign in to comment.