Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the cross-zone-based stoppable check #2680

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -254,6 +255,28 @@ public static boolean hasErrorPartitions(HelixDataAccessor dataAccessor, String
public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalView> externalViews,
Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
HelixDataAccessor dataAccessor) {
return perPartitionHealthCheck(externalViews, globalPartitionHealthStatus, instanceToBeStop,
dataAccessor, Collections.emptySet());
}

/**
* Get the problematic partitions on the to-be-stop instance
* Requirement:
* If the instance and the toBeStoppedInstances are stopped and the partitions on them are OFFLINE,
* the cluster still have enough "healthy" replicas on other sibling instances
*
* - sibling instances mean those who share the same partition (replicas) of the to-be-stop instance
*
* @param globalPartitionHealthStatus (instance => (partition name, health status))
* @param instanceToBeStop The instance to be stopped
* @param dataAccessor The data accessor
* @param toBeStoppedInstances A set of instances presumed to be are already stopped. And it
* shouldn't contain the `instanceToBeStop`
* @return A list of problematic partitions if the instance is stopped
*/
public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalView> externalViews,
Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
HelixDataAccessor dataAccessor, Set<String> toBeStoppedInstances) {
Map<String, List<String>> unhealthyPartitions = new HashMap<>();

for (ExternalView externalView : externalViews) {
Expand All @@ -273,7 +296,8 @@ public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalVie
&& stateMap.get(instanceToBeStop).equals(stateModelDefinition.getTopState())) {
for (String siblingInstance : stateMap.keySet()) {
// Skip this self check
if (siblingInstance.equals(instanceToBeStop)) {
if (siblingInstance.equals(instanceToBeStop) || (toBeStoppedInstances != null
&& toBeStoppedInstances.contains(siblingInstance))) {
continue;
}

Expand Down Expand Up @@ -366,11 +390,32 @@ public static boolean isInstanceStable(HelixDataAccessor dataAccessor, String in
*
* TODO: Use in memory cache and query instance's currentStates
*
* @param dataAccessor
* @param instanceName
* @param dataAccessor A helper class to access the Helix data.
* @param instanceName An instance to be evaluated against this check.
* @return
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
String instanceName) {
return siblingNodesActiveReplicaCheck(dataAccessor, instanceName, Collections.emptySet());
}

/**
* Check if sibling nodes of the instance meet min active replicas constraint
* Two instances are sibling of each other if they host the same partition. And sibling nodes
* that are in toBeStoppableInstances will be presumed to be stopped.
* WARNING: The check uses ExternalView to reduce network traffic but suffer from accuracy
* due to external view propagation latency
*
* TODO: Use in memory cache and query instance's currentStates
*
* @param dataAccessor A helper class to access the Helix data.
* @param instanceName An instance to be evaluated against this check.
* @param toBeStoppedInstances A set of instances presumed to be are already stopped. And it
* shouldn't contain the `instanceName`
* @return
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor, String instanceName) {
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
String instanceName, Set<String> toBeStoppedInstances) {
MarkGaox marked this conversation as resolved.
Show resolved Hide resolved
PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
List<String> resources = dataAccessor.getChildNames(propertyKeyBuilder.idealStates());

Expand Down Expand Up @@ -406,8 +451,9 @@ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAcces
if (stateByInstanceMap.containsKey(instanceName)) {
int numHealthySiblings = 0;
for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) {
if (!entry.getKey().equals(instanceName)
&& !unhealthyStates.contains(entry.getValue())) {
if (!entry.getKey().equals(instanceName) && (toBeStoppedInstances == null
|| !toBeStoppedInstances.contains(entry.getKey())) && !unhealthyStates.contains(
entry.getValue())) {
numHealthySiblings++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -401,6 +403,81 @@ public void TestSiblingNodesActiveReplicaCheck_success() {
Assert.assertTrue(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheckSuccessWithToBeStoppedInstances() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));

// set external view
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(2);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master",
"instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance3");
toBeStoppedInstances.add("invalidInstances"); // include an invalid instance.
boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);
Assert.assertTrue(result);

result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, null);
Assert.assertTrue(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));

// set external view
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(2);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master",
"instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance1");
toBeStoppedInstances.add("instance2");
boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);

Assert.assertFalse(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheck_fail() {
String resource = "resource";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,23 @@ private List<OperationInterface> getAllOperationClasses(List<String> operations)
*/
public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
String jsonContent) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent)
.get(instanceName);
return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName),
jsonContent).get(instanceName);
}


public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent,
Collections.emptySet());
}

public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent, Set<String> toBeStoppedInstances) throws IOException {
MarkGaox marked this conversation as resolved.
Show resolved Hide resolved
Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
// helix instance check.
List<String> instancesForCustomInstanceLevelChecks =
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks);
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks,
toBeStoppedInstances);
// custom check, includes partition check.
batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
finalStoppableChecks, getMapFromJsonPayload(jsonContent));
Expand Down Expand Up @@ -441,10 +447,11 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl
}

private List<String> batchHelixInstanceStoppableCheck(String clusterId,
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks) {
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(Collectors
.toMap(Function.identity(),
instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance))));
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks,
Set<String> toBeStoppedInstances) {
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));
// finalStoppableChecks contains instances that does not pass this health check
return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
}
Expand Down Expand Up @@ -512,7 +519,8 @@ private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(
if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) {
// this is helix own check
instancesForNext =
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks);
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks,
Collections.emptySet());
} else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
// custom check, includes custom Instance check and partition check.
instancesForNext =
Expand Down Expand Up @@ -601,10 +609,12 @@ private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) {
return true;
}

private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName) {
private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName,
Set<String> toBeStoppedInstances) {
LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
Map<String, Boolean> helixStoppableCheck =
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST);
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST,
toBeStoppedInstances);

return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
}
Expand Down Expand Up @@ -698,6 +708,12 @@ public static boolean getBooleanFromJsonPayload(String jsonString)
@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks) {
return getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet());
}

@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Map<String, Boolean> healthStatus = new HashMap<>();
for (HealthCheck healthCheck : healthChecks) {
switch (healthCheck) {
Expand Down Expand Up @@ -745,7 +761,7 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String
break;
case MIN_ACTIVE_REPLICA_CHECK_FAILED:
healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName, toBeStoppedInstances));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessarily a part of the change, but I think we should re-evaluate whether the EMPTY_RESOURCE_ASSIGNMENT should keep an instance from being stoppable. There could be legitimate reason there is no resource assignment. Why should that block a deployment or other operation?

break;
default:
LOG.error("Unsupported health check: {}", healthCheck);
Expand Down
Loading
Loading