Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beacon checker #905

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;

import java.util.Map;
import java.util.Set;

/**
Expand All @@ -27,6 +28,12 @@ public interface MonitorService {

void registerCluster(String system, String clusterName, Set<MonitorGroupMeta> groups);

void updateCluster(String system, String clusterName, Set<MonitorGroupMeta> groups);

void unregisterCluster(String system, String clusterName);

int getBeaconClusterHash(String system, String clusterName);

Map<String,Set<String>> getAllClusterWithDc(String system);

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.ctrip.xpipe.api.migration.auto.data;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.builder.HashCodeBuilder;

import java.util.*;

/**
* @author lishanglin
Expand Down Expand Up @@ -39,4 +39,20 @@ public void setExtra(Map<String, String> extra) {
this.extra = extra;
}

public int generateHashCodeForBeaconCheck () {
List<MonitorGroupMeta> nodeList = new ArrayList<>(nodeGroups);
nodeList.sort(Comparator.comparing(MonitorGroupMeta::getName, Comparator.nullsFirst(Comparator.naturalOrder()))
.thenComparing(MonitorGroupMeta::getIdc, Comparator.nullsFirst(Comparator.naturalOrder()))
.thenComparing(MonitorGroupMeta::isMasterGroup)
.thenComparing(monitorGroupMeta -> monitorGroupMeta.getNodes().toString()));
HashCodeBuilder builder = new HashCodeBuilder();
for(MonitorGroupMeta group : nodeList) {
builder.append(group.getName())
.append(group.getIdc())
.append(group.isMasterGroup())
.append(group.getNodes());
}
return builder.toHashCode();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -57,9 +58,24 @@ public void registerCluster(String system, String clusterName, Set<MonitorGroupM
// do nothing
}

@Override
public void updateCluster(String system, String clusterName, Set<MonitorGroupMeta> groups) {

}

@Override
public void unregisterCluster(String system, String clusterName) {
// do nothing
}

@Override
public int getBeaconClusterHash(String system, String clusterName) {
return 0;
}

@Override
public Map<String, Set<String>> getAllClusterWithDc(String system) {
return Collections.emptyMap();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.ctrip.xpipe.api.migration;

import com.ctrip.xpipe.api.migration.auto.data.MonitorClusterMeta;
import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;
import com.ctrip.xpipe.endpoint.HostPort;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;

import java.util.Set;

public class MonitorClusterMetaTest {

@Test
public void testMonitorClusterMeta(){
wangqifan marked this conversation as resolved.
Show resolved Hide resolved
Set<MonitorGroupMeta> group1 = getMonitorGroupMeta1();
MonitorClusterMeta monitorClusterMeta1 = new MonitorClusterMeta(group1);

Set<MonitorGroupMeta> group2 = getMonitorGroupMeta2();
MonitorClusterMeta monitorClusterMeta2 = new MonitorClusterMeta(group2);

Set<MonitorGroupMeta> group3 = getMonitorGroupMeta3();
MonitorClusterMeta monitorClusterMeta3 = new MonitorClusterMeta(group3);

Set<MonitorGroupMeta> group4 = getMonitorGroupMeta4();
MonitorClusterMeta monitorClusterMeta4 = new MonitorClusterMeta(group4);

Set<MonitorGroupMeta> group5 = getMonitorGroupMeta5();
MonitorClusterMeta monitorClusterMeta5 = new MonitorClusterMeta(group5);

Set<MonitorGroupMeta> group6 = getMonitorGroupMeta6();
MonitorClusterMeta monitorClusterMeta6 = new MonitorClusterMeta(group6);

Assert.assertEquals(monitorClusterMeta1.generateHashCodeForBeaconCheck(), monitorClusterMeta2.generateHashCodeForBeaconCheck());
Assert.assertNotEquals(monitorClusterMeta1.generateHashCodeForBeaconCheck(), monitorClusterMeta3.generateHashCodeForBeaconCheck());
Assert.assertNotEquals(monitorClusterMeta1.generateHashCodeForBeaconCheck(), monitorClusterMeta4.generateHashCodeForBeaconCheck());
Assert.assertNotEquals(monitorClusterMeta4.generateHashCodeForBeaconCheck(), monitorClusterMeta5.generateHashCodeForBeaconCheck());
Assert.assertNotEquals(monitorClusterMeta5.generateHashCodeForBeaconCheck(), monitorClusterMeta6.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta1.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta2.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta3.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta4.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta5.generateHashCodeForBeaconCheck());
System.out.println(monitorClusterMeta6.generateHashCodeForBeaconCheck());
}


private Set<MonitorGroupMeta> getMonitorGroupMeta1() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta2() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta3() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), false),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), false),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), true),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), true)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta4() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta5() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta6() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6382")), false),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6380")), false),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.checker;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon.BeaconCheckStatus;

/**
* @author lishanglin
Expand All @@ -10,4 +11,8 @@ public interface BeaconManager {

void registerCluster(String clusterId, ClusterType clusterType, int orgId);

void updateCluster(String clusterId, ClusterType clusterType, int orgId);

BeaconCheckStatus checkClusterHash(String clusterId, ClusterType clusterType, int orgId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon;

public enum BeaconCheckStatus {
SERVICE_NOT_FOUND("Service not found"),
SYSTEM_NOT_FOUND("System not found"),
CLUSTER_NOT_FOUND("Cluster not found"),
INCONSISTENCY("Inconsistency"),
CONSISTENCY("Consistency"),
UNKNOWN("Unknown"),
ERROR("Error");

private final String description;

BeaconCheckStatus(String description) {
this.description = description;
}

public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.metric.MetricData;
import com.ctrip.xpipe.metric.MetricProxy;
import com.ctrip.xpipe.metric.MetricProxyException;
import com.ctrip.xpipe.redis.checker.BeaconManager;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterInstanceInfo;
import com.ctrip.xpipe.redis.checker.healthcheck.leader.AbstractLeaderAwareHealthCheckAction;
import com.ctrip.xpipe.utils.ServicesUtil;
import org.slf4j.Logger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

public class BeaconConsistencyCheckAction extends AbstractLeaderAwareHealthCheckAction<ClusterHealthCheckInstance> {
wangqifan marked this conversation as resolved.
Show resolved Hide resolved

private BeaconManager beaconManager;

private MetricProxy metricProxy = ServicesUtil.getMetricProxy();

private static long lastSendTime = System.currentTimeMillis();

public BeaconConsistencyCheckAction(ScheduledExecutorService scheduled, ClusterHealthCheckInstance instance, ExecutorService executors, BeaconManager beaconManager) {
super(scheduled, instance, executors);
this.beaconManager = beaconManager;
}

@Override
protected void doTask() {
try {
tryDoTask();
} catch (Exception e) {
logger.error("[CheckBeaconConsistency]", e);
}
}

private void tryDoTask() {
ClusterInstanceInfo info = instance.getCheckInfo();
String clusterId = info.getClusterId();
int orgId = info.getOrgId();
checkConsistency(clusterId, info.getClusterType(), orgId);
}

@Override
protected Logger getHealthCheckLogger() {
return logger;
}

@Override
protected int getBaseCheckInterval() {
return getActionInstance().getHealthCheckConfig().clusterCheckIntervalMilli();
}

private void checkConsistency(String clusterId, ClusterType clusterType, int orgId) {
BeaconCheckStatus status;
try {
status = beaconManager.checkClusterHash(clusterId, clusterType, orgId);
} catch (Throwable t) {
// cluster not found in beacon
status = BeaconCheckStatus.ERROR;
logger.error("[checkConsistency]{}:{}:{}", clusterType, orgId, t.getMessage());
}
handleCheckResult(status, clusterId, clusterType, orgId);
}

private void handleCheckResult(BeaconCheckStatus status, String clusterId, ClusterType clusterType, int orgId) {
try {
if(status == BeaconCheckStatus.CONSISTENCY) {
long currentTime = System.currentTimeMillis();
if(currentTime < lastSendTime) {
lastSendTime = currentTime;
}
if(currentTime - lastSendTime <= getBaseCheckInterval()) {
// avoid send many point to hickwall
return;
}
lastSendTime = currentTime;
} else {
beaconManager.registerCluster(clusterId, clusterType, orgId);
}
sendMetricData(clusterId, status);
} catch (Throwable t) {
logger.error("[checkPost]{}:{}:{}", clusterType, orgId, t.getMessage());
}
}

private MetricData getMetricData(String clusterId, BeaconCheckStatus status) {
MetricData metricData = new MetricData("beacon.checker", null, clusterId, null);
metricData.setTimestampMilli(System.currentTimeMillis());
metricData.setValue(1);
metricData.addTag("consistency", status.toString());
return metricData;
}

private void sendMetricData(String trackCluster, BeaconCheckStatus status) {
try {
MetricData metricData = getMetricData(trackCluster, status);
metricProxy.writeBinMultiDataPoint(metricData);
} catch (MetricProxyException e) {
logger.error("[sendMetricData]", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@
import java.util.Collections;
import java.util.List;

/**
* @author lishanglin
* date 2021/1/15
*/
@Component
public class BeaconMetaCheckActionFactory extends AbstractClusterLeaderAwareHealthCheckActionFactory implements OneWaySupport, BiDirectionSupport {
public class BeaconConsistencyCheckActionFactory extends AbstractClusterLeaderAwareHealthCheckActionFactory implements OneWaySupport, BiDirectionSupport {

@Autowired
private BeaconManager beaconManager;
Expand All @@ -28,14 +24,14 @@ public class BeaconMetaCheckActionFactory extends AbstractClusterLeaderAwareHeal

@Override
public SiteLeaderAwareHealthCheckAction create(ClusterHealthCheckInstance instance) {
BeaconMetaCheckAction action = new BeaconMetaCheckAction(scheduled, instance, executors, beaconManager);
BeaconConsistencyCheckAction action = new BeaconConsistencyCheckAction(scheduled, instance, executors, beaconManager);
action.addControllers(controllers);
return action;
}

@Override
public Class<? extends SiteLeaderAwareHealthCheckAction> support() {
return BeaconMetaCheckAction.class;
return BeaconConsistencyCheckAction.class;
}

@Override
Expand Down
Loading
Loading