Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug for beacon checker #917

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.ServicesUtil;
import com.ctrip.xpipe.utils.StringUtil;
import com.ctrip.xpipe.utils.VisibleForTesting;

import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -42,6 +43,11 @@ protected void doExecute() throws Throwable {
}
}

@VisibleForTesting
public void setMetricProxy(MetricProxy metricProxy) {
this.metricProxy = metricProxy;
}

private void doJob() {
Map<String, Set<String>> allClusters = new HashMap<>();
clustersByBeaconSystemOrg.forEach(((beaconSystem, clustersByOrg) -> {
Expand All @@ -59,16 +65,21 @@ private void doJob() {
Set<String> inconsistencyClusters = new HashSet<>();
Set<String> notfoundCluster = new HashSet<>();

Set<String> supportDcs = getSupportDcs();

Map<String, DcMeta> dcs = metaCache.getXpipeMeta().getDcs();
Set<String> supportZones = config.getBeaconSupportZones();
for (DcMeta dcMeta : dcs.values()) {
if(!supportZones.isEmpty() && supportZones.stream().noneMatch(zone -> StringUtil.trimEquals(dcMeta.getZone(), zone, true))) {
if(!supportDcs.contains(dcMeta.getId().toUpperCase())) {
continue;
}
Map<String, ClusterMeta> clusterMetaMap = dcMeta.getClusters();
for (ClusterMeta cluster : clusterMetaMap.values()) {
String clusterName = cluster.getId();
String activeDc = cluster.getActiveDc();
if(StringUtil.trimEquals(ClusterType.ONE_WAY.toString(), cluster.getType(), true)) {
if(!StringUtil.trimEquals(dcMeta.getId(), activeDc, true)) {
continue;
}
String dc = cluster.getActiveDc().toUpperCase();
if(!allClusters.containsKey(clusterName)) {
notfoundCluster.add(clusterName);
Expand All @@ -87,6 +98,7 @@ private void doJob() {
Set<String> beaconDcSet = getBeaconClusterDcSet(allClusters, clusterName);
Set<String> loaclDcSet = Arrays.stream(cluster.getDcs().split(","))
.map(String::toUpperCase)
.filter(dc -> supportDcs.contains(dc))
.collect(Collectors.toSet());
if(!beaconDcSet.equals(loaclDcSet)) {
inconsistencyClusters.add(clusterName);
Expand All @@ -105,6 +117,10 @@ private void doJob() {
MetricData metricData = getMetricData(clusterName, "NOTFOUND");
sendMetricData(metricData);
}
if(inconsistencyClusters.isEmpty() && notfoundCluster.isEmpty()) {
MetricData metricData = getMetricData("", "CONSISTENT");
sendMetricData(metricData);
}

}

Expand Down Expand Up @@ -132,6 +148,18 @@ private void sendMetricData(MetricData metricData) {
}
}

private Set<String> getSupportDcs() {
Set<String> dcs = metaCache.getXpipeMeta().getDcs().keySet();
Set<String> supportZones = config.getBeaconSupportZones();
Set<String> supportDcs = new HashSet<>();
for(String dc : dcs) {
if(supportZones.stream().anyMatch(zone -> metaCache.isDcInRegion(dc, zone))) {
supportDcs.add(dc.toUpperCase());
}
}
return supportDcs;
}

@Override
protected void doReset() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package com.ctrip.xpipe.redis.console.healthcheck.nonredis.beacon;

import com.ctrip.xpipe.api.migration.auto.MonitorService;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.metric.MetricData;
import com.ctrip.xpipe.metric.MetricProxy;
import com.ctrip.xpipe.redis.console.AbstractConsoleTest;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.config.model.BeaconOrgRoute;
import com.ctrip.xpipe.redis.console.migration.auto.DefaultMonitorManager;
import com.ctrip.xpipe.redis.console.resources.DefaultMetaCache;
import com.ctrip.xpipe.redis.core.config.ConsoleCommonConfig;
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.entity.XpipeMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.redis.core.meta.XpipeMetaManager;
import com.ctrip.xpipe.redis.core.meta.impl.DefaultXpipeMetaManager;
import com.ctrip.xpipe.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)
public class BeaconConsistencyCheckTest extends AbstractConsoleTest {

@Mock
ConsoleCommonConfig config;
@Mock
ConsoleConfig consoleConfig;
@Mock
MonitorService monitorService;
@Mock
MetricProxy metricProxy;

private BeaconConsistencyCheckJob consistencyCheckJob;

private Map<Long, List<MonitorService>> buildServices() {
HashMap result = new HashMap<>();
result.put(1L, Collections.singletonList(monitorService));
return result;
}

private ClusterMeta buildClusterMeta() {
ClusterMeta clusterMeta = new ClusterMeta("oneway_test");
clusterMeta.setType(ClusterType.ONE_WAY.toString());
clusterMeta.setActiveDc("AWS");
clusterMeta.setDcs("AWS,PTJQ");
clusterMeta.setOrgId(1);
return clusterMeta;
}

private ClusterMeta buildBiClusterMeta() {
ClusterMeta clusterMeta = new ClusterMeta("bi_test");
clusterMeta.setType(ClusterType.BI_DIRECTION.toString());
clusterMeta.setDcs("AWS,PTJQ");
clusterMeta.setOrgId(1);
return clusterMeta;
}

private ClusterMeta buildOneWaySHA() {
ClusterMeta clusterMeta = new ClusterMeta("oneway_sha");
clusterMeta.setType(ClusterType.ONE_WAY.toString());
clusterMeta.setActiveDc("PTJQ");
clusterMeta.setDcs("AWS,PTJQ");
clusterMeta.setOrgId(1);
return clusterMeta;
}

private MetaCache buildMetaCache() {
ClusterMeta oneWay = buildClusterMeta();
ClusterMeta bi = buildBiClusterMeta();
ClusterMeta oneWaySHA = buildOneWaySHA();

XpipeMeta currentMeta = new XpipeMeta();
DcMeta jqMeta = new DcMeta("PTJQ");
jqMeta.setZone("SHA");
jqMeta.addCluster(oneWay);
jqMeta.addCluster(bi);
jqMeta.addCluster(oneWaySHA);

DcMeta awsMeta = new DcMeta("AWS");
awsMeta.setZone("AWS");
awsMeta.addCluster(oneWay);
awsMeta.addCluster(bi);
awsMeta.addCluster(oneWaySHA);

currentMeta.addDc(jqMeta)
.addDc(awsMeta);
DefaultMetaCache metaCache = new DefaultMetaCache();
Pair<XpipeMeta, XpipeMetaManager> meta = new Pair<>(currentMeta, new DefaultXpipeMetaManager(currentMeta));
metaCache.setMeta(meta);
return metaCache;
}

@Before
public void beforeBeaconConsistencyCheckTest() {

Mockito.when(config.getBeaconSupportZones()).thenReturn(Stream.of("SHA").collect(Collectors.toSet()));
Mockito.when(consoleConfig.getServerMode()).thenReturn("CONSOLE");
Mockito.when(consoleConfig.getClusterHealthCheckInterval()).thenReturn(10000);
List<BeaconOrgRoute> routes = new ArrayList<>();
routes.add(new BeaconOrgRoute(1L, new ArrayList<>(), 1));
Mockito.when(consoleConfig.getBeaconOrgRoutes()).thenReturn(routes);

DefaultMonitorManager defaultMonitorManager = new DefaultMonitorManager(buildMetaCache(), consoleConfig, config);

consistencyCheckJob = new BeaconConsistencyCheckJob(defaultMonitorManager.clustersByBeaconSystemOrg(),
buildServices(),
buildMetaCache(),
config);

}

@Test
public void testCheck() throws Throwable {
consistencyCheckJob.setMetricProxy(metricProxy);
HashMap<String, Set<String>> oneWays = new HashMap<>();
oneWays.put("oneway_sha", Collections.singleton("PTJQ"));
Mockito.when(monitorService.getAllClusterWithDc("xpipe")).thenReturn(oneWays);
HashMap<String, Set<String>> allCluster = new HashMap<>();
allCluster.put("bi_test", Collections.singletonList("PTJQ").stream().collect(Collectors.toSet()));

Mockito.when(monitorService.getAllClusterWithDc("xpipe-bi")).thenReturn(allCluster);
consistencyCheckJob.doExecute();

verify(metricProxy, times(1))
.writeBinMultiDataPoint(
argThat(new ArgumentMatcher<MetricData>() {
@Override
public boolean matches(MetricData argument) {
return argument.getTags().get("consistency").equals("CONSISTENT");
}
}));

}

@Test
public void testCheckInConsistency() throws Throwable {
consistencyCheckJob.setMetricProxy(metricProxy);
HashMap<String, Set<String>> oneWays = new HashMap<>();
oneWays.put("oneway_sha", Collections.singleton("PTJQ"));
Mockito.when(monitorService.getAllClusterWithDc("xpipe")).thenReturn(oneWays);
HashMap<String, Set<String>> allCluster = new HashMap<>();
allCluster.put("bi_test", Arrays.asList("PTJQ", "AWS").stream().collect(Collectors.toSet()));

Mockito.when(monitorService.getAllClusterWithDc("xpipe-bi")).thenReturn(allCluster);
consistencyCheckJob.doExecute();

verify(metricProxy, times(1))
.writeBinMultiDataPoint(
argThat(new ArgumentMatcher<MetricData>() {
@Override
public boolean matches(MetricData argument) {
return argument.getTags().get("consistency").equals("INCONSISTENT")
&& argument.getClusterName().equals("bi_test");
}
}));
}

@Test
public void testCheckInNotFound() throws Throwable {
consistencyCheckJob.setMetricProxy(metricProxy);
HashMap<String, Set<String>> oneWays = new HashMap<>();
Mockito.when(monitorService.getAllClusterWithDc("xpipe")).thenReturn(oneWays);
HashMap<String, Set<String>> allCluster = new HashMap<>();
allCluster.put("bi_test", Collections.singletonList("PTJQ").stream().collect(Collectors.toSet()));

Mockito.when(monitorService.getAllClusterWithDc("xpipe-bi")).thenReturn(allCluster);
consistencyCheckJob.doExecute();

verify(metricProxy, times(1))
.writeBinMultiDataPoint(
argThat(new ArgumentMatcher<MetricData>() {
@Override
public boolean matches(MetricData argument) {
return argument.getTags().get("consistency").equals("NOTFOUND")
&& argument.getClusterName().equals("oneway_sha");
}
}));

}


}
Loading