Skip to content

Commit

Permalink
beacon checker
Browse files Browse the repository at this point in the history
  • Loading branch information
qifanwang committed Nov 25, 2024
1 parent 97e9741 commit b455d96
Show file tree
Hide file tree
Showing 26 changed files with 738 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;

import java.util.Map;
import java.util.Set;

/**
Expand All @@ -27,6 +28,12 @@ public interface MonitorService {

void registerCluster(String system, String clusterName, Set<MonitorGroupMeta> groups);

void updateCluster(String system, String clusterName, Set<MonitorGroupMeta> groups);

void unregisterCluster(String system, String clusterName);

int getBeaconClusterHash(String system, String clusterName);

Map<String,Set<String>> getAllClusterWithDc(String system);

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,13 @@ public void setExtra(Map<String, String> extra) {
this.extra = extra;
}

@Override
public int hashCode() {
int hash = 0;
for (MonitorGroupMeta meta : nodeGroups) {
hash ^= meta.hashCode();
}
return hash;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ public boolean equals(Object o) {
MonitorGroupMeta that = (MonitorGroupMeta) o;
return Objects.equals(name, that.name) &&
Objects.equals(idc, that.idc) &&
Objects.equals(nodes, that.nodes);
Objects.equals(nodes, that.nodes) &&
Objects.equals(masterGroup, that.masterGroup);
}

@Override
public int hashCode() {
return Objects.hash(name, idc, nodes);
return Objects.hash(name, idc, nodes, masterGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -57,9 +58,24 @@ public void registerCluster(String system, String clusterName, Set<MonitorGroupM
// do nothing
}

@Override
public void updateCluster(String system, String clusterName, Set<MonitorGroupMeta> groups) {

}

@Override
public void unregisterCluster(String system, String clusterName) {
// do nothing
}

@Override
public int getBeaconClusterHash(String system, String clusterName) {
return 0;
}

@Override
public Map<String, Set<String>> getAllClusterWithDc(String system) {
return Collections.emptyMap();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.ctrip.xpipe.api.migration;

import com.ctrip.xpipe.api.migration.auto.data.MonitorClusterMeta;
import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;
import com.ctrip.xpipe.endpoint.HostPort;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;

import java.util.Set;

public class MonitorClusterMetaTest {

@Test
public void testMonitorClusterMeta(){
Set<MonitorGroupMeta> group1 = getMonitorGroupMeta1();
MonitorClusterMeta monitorClusterMeta1 = new MonitorClusterMeta(group1);

Set<MonitorGroupMeta> group2 = getMonitorGroupMeta2();
MonitorClusterMeta monitorClusterMeta2 = new MonitorClusterMeta(group2);

Set<MonitorGroupMeta> group3 = getMonitorGroupMeta3();
MonitorClusterMeta monitorClusterMeta3 = new MonitorClusterMeta(group3);

Set<MonitorGroupMeta> group4 = getMonitorGroupMeta4();
MonitorClusterMeta monitorClusterMeta4 = new MonitorClusterMeta(group4);

Set<MonitorGroupMeta> group5 = getMonitorGroupMeta5();
MonitorClusterMeta monitorClusterMeta5 = new MonitorClusterMeta(group5);

Assert.assertEquals(monitorClusterMeta1.hashCode(), monitorClusterMeta2.hashCode());
Assert.assertNotEquals(monitorClusterMeta1.hashCode(), monitorClusterMeta3.hashCode());
Assert.assertNotEquals(monitorClusterMeta1.hashCode(), monitorClusterMeta4.hashCode());
Assert.assertNotEquals(monitorClusterMeta4.hashCode(), monitorClusterMeta5.hashCode());
System.out.println(monitorClusterMeta1.hashCode());
System.out.println(monitorClusterMeta2.hashCode());
System.out.println(monitorClusterMeta3.hashCode());
System.out.println(monitorClusterMeta4.hashCode());
}


private Set<MonitorGroupMeta> getMonitorGroupMeta1() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta2() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta3() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), false),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), false),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), true),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), true)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta4() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta5() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.checker;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon.BeaconCheckStatus;

/**
* @author lishanglin
Expand All @@ -10,4 +11,8 @@ public interface BeaconManager {

void registerCluster(String clusterId, ClusterType clusterType, int orgId);

void updateCluster(String clusterId, ClusterType clusterType, int orgId);

BeaconCheckStatus checkClusterHash(String clusterId, ClusterType clusterType, int orgId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon;

public enum BeaconCheckStatus {
SERVICE_NOT_FOUND("Service not found"),
SYSTEM_NOT_FOUND("System not found"),
CLUSTER_NOT_FOUND("Cluster not found"),
INCONSISTENCY("Inconsistency"),
CONSISTENCY("Consistency"),
ERROR("Error"),
UNKNOWN("Unknown");

private final String description;

BeaconCheckStatus(String description) {
this.description = description;
}

public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.metric.MetricData;
import com.ctrip.xpipe.metric.MetricProxy;
import com.ctrip.xpipe.metric.MetricProxyException;
import com.ctrip.xpipe.redis.checker.BeaconManager;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterInstanceInfo;
import com.ctrip.xpipe.redis.checker.healthcheck.leader.AbstractLeaderAwareHealthCheckAction;
import com.ctrip.xpipe.utils.ServicesUtil;
import org.slf4j.Logger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

public class BeaconConsistencyCheckAction extends AbstractLeaderAwareHealthCheckAction<ClusterHealthCheckInstance> {

private BeaconManager beaconManager;

private MetricProxy metricProxy = ServicesUtil.getMetricProxy();

public BeaconConsistencyCheckAction(ScheduledExecutorService scheduled, ClusterHealthCheckInstance instance, ExecutorService executors, BeaconManager beaconManager) {
super(scheduled, instance, executors);
this.beaconManager = beaconManager;
}

@Override
protected void doTask() {
ClusterInstanceInfo info = instance.getCheckInfo();
String clusterId = info.getClusterId();
int orgId = info.getOrgId();
checkConsistency(clusterId, info.getClusterType(), orgId);
}

@Override
protected Logger getHealthCheckLogger() {
return logger;
}

@Override
protected int getBaseCheckInterval() {
return getActionInstance().getHealthCheckConfig().getRedisConfCheckIntervalMilli();
}

private void checkConsistency(String clusterId, ClusterType clusterType, int orgId) {
BeaconCheckStatus status;
try {
status = beaconManager.checkClusterHash(clusterId, clusterType, orgId);
} catch (Throwable t) {
// cluster not found in beacon
status = BeaconCheckStatus.UNKNOWN;
logger.error("[checkConsistency]{}:{}:{}", clusterType, orgId, t.getMessage());
}
handleCheckResult(status, clusterId, clusterType, orgId);
}

private void handleCheckResult(BeaconCheckStatus status, String clusterId, ClusterType clusterType, int orgId) {
try {
if(status == BeaconCheckStatus.INCONSISTENCY || status == BeaconCheckStatus.CLUSTER_NOT_FOUND) {
beaconManager.registerCluster(clusterId, clusterType, orgId);

} else if(status == BeaconCheckStatus.CONSISTENCY) {
return;
}
sendMetricData(clusterId, status);
} catch (Throwable t) {
logger.error("[checkPost]{}:{}:{}", clusterType, orgId, t.getMessage());
}
}

private MetricData getMetricData(String clusterId, BeaconCheckStatus status) {
MetricData metricData = new MetricData("beacon.checker", null, clusterId, null);
metricData.setTimestampMilli(System.currentTimeMillis());
metricData.setValue(1);
metricData.addTag("consistency", status.toString());
return metricData;
}

private void sendMetricData(String trackCluster, BeaconCheckStatus status) {
try {
MetricData metricData = getMetricData(trackCluster, status);
metricProxy.writeBinMultiDataPoint(metricData);
} catch (MetricProxyException e) {
logger.error("[sendMetricData]", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@
import java.util.Collections;
import java.util.List;

/**
* @author lishanglin
* date 2021/1/15
*/
@Component
public class BeaconMetaCheckActionFactory extends AbstractClusterLeaderAwareHealthCheckActionFactory implements OneWaySupport, BiDirectionSupport {
public class BeaconConsistencyCheckActionFactory extends AbstractClusterLeaderAwareHealthCheckActionFactory implements OneWaySupport, BiDirectionSupport {

@Autowired
private BeaconManager beaconManager;
Expand All @@ -28,14 +24,14 @@ public class BeaconMetaCheckActionFactory extends AbstractClusterLeaderAwareHeal

@Override
public SiteLeaderAwareHealthCheckAction create(ClusterHealthCheckInstance instance) {
BeaconMetaCheckAction action = new BeaconMetaCheckAction(scheduled, instance, executors, beaconManager);
BeaconConsistencyCheckAction action = new BeaconConsistencyCheckAction(scheduled, instance, executors, beaconManager);
action.addControllers(controllers);
return action;
}

@Override
public Class<? extends SiteLeaderAwareHealthCheckAction> support() {
return BeaconMetaCheckAction.class;
return BeaconConsistencyCheckAction.class;
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
import com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.controller.CurrentDcSentinelHelloCheckControllerTest;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.controller.OneWaySentinelHelloCheckControllerTest;
import com.ctrip.xpipe.redis.checker.healthcheck.allleader.DefaultSentinelMonitorsCheckTest;
import com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon.BeaconMetaCheckActionTest;
import com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon.BeaconConsistencyCheckActionTest;
import com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon.DefaultBeaconMetaControllerTest;
import com.ctrip.xpipe.redis.checker.healthcheck.config.DefaultHealthCheckConfigTest;
import com.ctrip.xpipe.redis.checker.healthcheck.factory.DefaultHealthCheckEndpointFactoryTest;
Expand Down Expand Up @@ -154,7 +154,7 @@
CrossDcRedisMasterActionListenerTest.class,
OutClientRedisMasterActionListenerTest.class,

BeaconMetaCheckActionTest.class,
BeaconConsistencyCheckActionTest.class,
DefaultBeaconMetaControllerTest.class,

SentinelHelloTest.class,
Expand Down
Loading

0 comments on commit b455d96

Please sign in to comment.