Skip to content

Commit

Permalink
beacon checker
Browse files Browse the repository at this point in the history
  • Loading branch information
qifanwang committed Nov 28, 2024
1 parent 97e9741 commit 3bff963
Show file tree
Hide file tree
Showing 25 changed files with 755 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;

import java.util.Map;
import java.util.Set;

/**
Expand All @@ -27,6 +28,12 @@ public interface MonitorService {

void registerCluster(String system, String clusterName, Set<MonitorGroupMeta> groups);

void updateCluster(String system, String clusterName, Set<MonitorGroupMeta> groups);

void unregisterCluster(String system, String clusterName);

int getBeaconClusterHash(String system, String clusterName);

Map<String,Set<String>> getAllClusterWithDc(String system);

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.ctrip.xpipe.api.migration.auto.data;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.builder.HashCodeBuilder;

import java.util.*;

/**
* @author lishanglin
Expand Down Expand Up @@ -39,4 +39,20 @@ public void setExtra(Map<String, String> extra) {
this.extra = extra;
}

public int generateHashCodeForBeaconCheck () {
List<MonitorGroupMeta> nodeList = new ArrayList<>(nodeGroups);
nodeList.sort(Comparator.comparing(MonitorGroupMeta::getName, Comparator.nullsFirst(Comparator.naturalOrder()))
.thenComparing(MonitorGroupMeta::getIdc, Comparator.nullsFirst(Comparator.naturalOrder()))
.thenComparing(MonitorGroupMeta::isMasterGroup)
.thenComparing(monitorGroupMeta -> monitorGroupMeta.getNodes().toString()));
HashCodeBuilder builder = new HashCodeBuilder();
for(MonitorGroupMeta group : nodeList) {
builder.append(group.getName())
.append(group.getIdc())
.append(group.isMasterGroup())
.append(group.getNodes());
}
return builder.toHashCode();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -57,9 +58,24 @@ public void registerCluster(String system, String clusterName, Set<MonitorGroupM
// do nothing
}

@Override
public void updateCluster(String system, String clusterName, Set<MonitorGroupMeta> groups) {

}

@Override
public void unregisterCluster(String system, String clusterName) {
// do nothing
}

@Override
public int getBeaconClusterHash(String system, String clusterName) {
return 0;
}

@Override
public Map<String, Set<String>> getAllClusterWithDc(String system) {
return Collections.emptyMap();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.ctrip.xpipe.api.migration;

import com.ctrip.xpipe.api.migration.auto.data.MonitorClusterMeta;
import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;
import com.ctrip.xpipe.endpoint.HostPort;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;

import java.util.Set;

public class MonitorClusterMetaTest {

@Test
public void testMonitorClusterMeta(){
Set<MonitorGroupMeta> group1 = getMonitorGroupMeta1();
MonitorClusterMeta monitorClusterMeta1 = new MonitorClusterMeta(group1);

Set<MonitorGroupMeta> group2 = getMonitorGroupMeta2();
MonitorClusterMeta monitorClusterMeta2 = new MonitorClusterMeta(group2);

Set<MonitorGroupMeta> group3 = getMonitorGroupMeta3();
MonitorClusterMeta monitorClusterMeta3 = new MonitorClusterMeta(group3);

Set<MonitorGroupMeta> group4 = getMonitorGroupMeta4();
MonitorClusterMeta monitorClusterMeta4 = new MonitorClusterMeta(group4);

Set<MonitorGroupMeta> group5 = getMonitorGroupMeta5();
MonitorClusterMeta monitorClusterMeta5 = new MonitorClusterMeta(group5);

Set<MonitorGroupMeta> group6 = getMonitorGroupMeta6();
MonitorClusterMeta monitorClusterMeta6 = new MonitorClusterMeta(group6);

Assert.assertEquals(monitorClusterMeta1.generateHashCodeForBeaconCheck(), monitorClusterMeta2.generateHashCodeForBeaconCheck());
Assert.assertNotEquals(monitorClusterMeta1.generateHashCodeForBeaconCheck(), monitorClusterMeta3.generateHashCodeForBeaconCheck());
Assert.assertNotEquals(monitorClusterMeta1.generateHashCodeForBeaconCheck(), monitorClusterMeta4.generateHashCodeForBeaconCheck());
Assert.assertNotEquals(monitorClusterMeta4.generateHashCodeForBeaconCheck(), monitorClusterMeta5.generateHashCodeForBeaconCheck());
Assert.assertNotEquals(monitorClusterMeta5.generateHashCodeForBeaconCheck(), monitorClusterMeta6.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta1.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta2.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta3.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta4.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta5.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta6.generateHashCodeForBeaconCheck());
}


private Set<MonitorGroupMeta> getMonitorGroupMeta1() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta2() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta3() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), false),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), false),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), true),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), true)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta4() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta5() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta6() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6382")), false),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6380")), false),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.checker;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon.BeaconCheckStatus;

/**
* @author lishanglin
Expand All @@ -10,4 +11,8 @@ public interface BeaconManager {

void registerCluster(String clusterId, ClusterType clusterType, int orgId);

void updateCluster(String clusterId, ClusterType clusterType, int orgId);

BeaconCheckStatus checkClusterHash(String clusterId, ClusterType clusterType, int orgId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon;

public enum BeaconCheckStatus {
SERVICE_NOT_FOUND("Service not found"),
SYSTEM_NOT_FOUND("System not found"),
CLUSTER_NOT_FOUND("Cluster not found"),
INCONSISTENCY("Inconsistency"),
CONSISTENCY("Consistency"),
UNKNOWN("Unknown"),
ERROR("Error");

private final String description;

BeaconCheckStatus(String description) {
this.description = description;
}

public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.metric.MetricData;
import com.ctrip.xpipe.metric.MetricProxy;
import com.ctrip.xpipe.metric.MetricProxyException;
import com.ctrip.xpipe.redis.checker.BeaconManager;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterInstanceInfo;
import com.ctrip.xpipe.redis.checker.healthcheck.leader.AbstractLeaderAwareHealthCheckAction;
import com.ctrip.xpipe.utils.ServicesUtil;
import org.slf4j.Logger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

public class BeaconConsistencyCheckAction extends AbstractLeaderAwareHealthCheckAction<ClusterHealthCheckInstance> {

private BeaconManager beaconManager;

private MetricProxy metricProxy = ServicesUtil.getMetricProxy();

private static long lastSendTime = System.currentTimeMillis();

public BeaconConsistencyCheckAction(ScheduledExecutorService scheduled, ClusterHealthCheckInstance instance, ExecutorService executors, BeaconManager beaconManager) {
super(scheduled, instance, executors);
this.beaconManager = beaconManager;
}

@Override
protected void doTask() {
try {
tryDoTask();
} catch (Exception e) {
logger.error("[CheckBeaconConsistency]", e);
}
}

private void tryDoTask() {
ClusterInstanceInfo info = instance.getCheckInfo();
String clusterId = info.getClusterId();
int orgId = info.getOrgId();
checkConsistency(clusterId, info.getClusterType(), orgId);
}

@Override
protected Logger getHealthCheckLogger() {
return logger;
}

@Override
protected int getBaseCheckInterval() {
return getActionInstance().getHealthCheckConfig().clusterCheckIntervalMilli();
}

private void checkConsistency(String clusterId, ClusterType clusterType, int orgId) {
BeaconCheckStatus status;
try {
status = beaconManager.checkClusterHash(clusterId, clusterType, orgId);
} catch (Throwable t) {
// cluster not found in beacon
status = BeaconCheckStatus.ERROR;
logger.error("[checkConsistency]{}:{}:{}", clusterType, orgId, t.getMessage());
}
handleCheckResult(status, clusterId, clusterType, orgId);
}

private void handleCheckResult(BeaconCheckStatus status, String clusterId, ClusterType clusterType, int orgId) {
try {
if(status == BeaconCheckStatus.CONSISTENCY) {
long currentTime = System.currentTimeMillis();
if(currentTime < lastSendTime) {
lastSendTime = currentTime;
}
if(currentTime - lastSendTime <= getBaseCheckInterval()) {
// avoid send many point to hickwall
return;
}
lastSendTime = currentTime;
} else {
beaconManager.registerCluster(clusterId, clusterType, orgId);
}
sendMetricData(clusterId, status);
} catch (Throwable t) {
logger.error("[checkPost]{}:{}:{}", clusterType, orgId, t.getMessage());
}
}

private MetricData getMetricData(String clusterId, BeaconCheckStatus status) {
MetricData metricData = new MetricData("beacon.checker", null, clusterId, null);
metricData.setTimestampMilli(System.currentTimeMillis());
metricData.setValue(1);
metricData.addTag("consistency", status.toString());
return metricData;
}

private void sendMetricData(String trackCluster, BeaconCheckStatus status) {
try {
MetricData metricData = getMetricData(trackCluster, status);
metricProxy.writeBinMultiDataPoint(metricData);
} catch (MetricProxyException e) {
logger.error("[sendMetricData]", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@
import java.util.Collections;
import java.util.List;

/**
* @author lishanglin
* date 2021/1/15
*/
@Component
public class BeaconMetaCheckActionFactory extends AbstractClusterLeaderAwareHealthCheckActionFactory implements OneWaySupport, BiDirectionSupport {
public class BeaconConsistencyCheckActionFactory extends AbstractClusterLeaderAwareHealthCheckActionFactory implements OneWaySupport, BiDirectionSupport {

@Autowired
private BeaconManager beaconManager;
Expand All @@ -28,14 +24,14 @@ public class BeaconMetaCheckActionFactory extends AbstractClusterLeaderAwareHeal

@Override
public SiteLeaderAwareHealthCheckAction create(ClusterHealthCheckInstance instance) {
BeaconMetaCheckAction action = new BeaconMetaCheckAction(scheduled, instance, executors, beaconManager);
BeaconConsistencyCheckAction action = new BeaconConsistencyCheckAction(scheduled, instance, executors, beaconManager);
action.addControllers(controllers);
return action;
}

@Override
public Class<? extends SiteLeaderAwareHealthCheckAction> support() {
return BeaconMetaCheckAction.class;
return BeaconConsistencyCheckAction.class;
}

@Override
Expand Down
Loading

0 comments on commit 3bff963

Please sign in to comment.