Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the cross-zone-based stoppable check #2680

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -254,6 +255,27 @@ public static boolean hasErrorPartitions(HelixDataAccessor dataAccessor, String
public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalView> externalViews,
Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
HelixDataAccessor dataAccessor) {
return perPartitionHealthCheck(externalViews, globalPartitionHealthStatus, instanceToBeStop,
dataAccessor, Collections.emptySet());
}

/**
* Get the problematic partitions on the to-be-stop instance
* Requirement:
* If the instance and the toBeStoppedInstances are stopped and the partitions on them are OFFLINE,
* the cluster still have enough "healthy" replicas on other sibling instances
*
* - sibling instances mean those who share the same partition (replicas) of the to-be-stop instance
*
* @param globalPartitionHealthStatus (instance => (partition name, health status))
* @param instanceToBeStop The instance to be stopped
* @param dataAccessor The data accessor
* @param toBeStoppedInstances A set of instances presumed to be are already stopped
MarkGaox marked this conversation as resolved.
Show resolved Hide resolved
* @return A list of problematic partitions if the instance is stopped
*/
public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalView> externalViews,
Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
HelixDataAccessor dataAccessor, Set<String> toBeStoppedInstances) {
Map<String, List<String>> unhealthyPartitions = new HashMap<>();

for (ExternalView externalView : externalViews) {
Expand All @@ -273,7 +295,8 @@ public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalVie
&& stateMap.get(instanceToBeStop).equals(stateModelDefinition.getTopState())) {
for (String siblingInstance : stateMap.keySet()) {
// Skip this self check
if (siblingInstance.equals(instanceToBeStop)) {
if (siblingInstance.equals(instanceToBeStop) || toBeStoppedInstances.contains(
siblingInstance)) {
continue;
}

Expand Down Expand Up @@ -366,8 +389,27 @@ public static boolean isInstanceStable(HelixDataAccessor dataAccessor, String in
*
* TODO: Use in memory cache and query instance's currentStates
*
* @param dataAccessor
* @param instanceName
* @param dataAccessor A helper class to access the Helix data.
* @param instanceName A list of instance to be evaluated against this check.
* @return
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
String instanceName) {
return siblingNodesActiveReplicaCheck(dataAccessor, instanceName, Collections.emptySet());
}

/**
* Check if sibling nodes of the instance meet min active replicas constraint
* Two instances are sibling of each other if they host the same partition. And sibling nodes
* that are in toBeStoppableInstances will be presumed to be stopped.
* WARNING: The check uses ExternalView to reduce network traffic but suffer from accuracy
* due to external view propagation latency
*
* TODO: Use in memory cache and query instance's currentStates
*
* @param dataAccessor A helper class to access the Helix data.
* @param instanceName A list of instance to be evaluated against this check.
* @param toBeStoppedInstances A set of instances presumed to be are already stopped.
* @return
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void TestSiblingNodesActiveReplicaCheck_success() {
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);

Assert.assertTrue(result);
}
Expand Down Expand Up @@ -502,7 +502,7 @@ public void TestSiblingNodesActiveReplicaCheck_fail() {
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);

Assert.assertFalse(result);
}
Expand All @@ -526,7 +526,7 @@ public void TestSiblingNodesActiveReplicaCheck_whenNoMinActiveReplica() {
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));

boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());
boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
Assert.assertTrue(result);
}

Expand All @@ -546,7 +546,7 @@ public void TestSiblingNodesActiveReplicaCheck_exception_whenExternalViewUnavail
doReturn(null).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));

InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
}

private class Mock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public InstanceInfo getInstanceHealthInfo(String clusterId, String instanceName,
}
try {
Map<String, Boolean> healthStatus =
getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet());
getInstanceHealthStatus(clusterId, instanceName, healthChecks);
instanceInfoBuilder.healthStatus(healthStatus);
} catch (HelixException ex) {
LOG.error(
Expand Down Expand Up @@ -328,7 +328,7 @@ private List<OperationInterface> getAllOperationClasses(List<String> operations)
/**
* {@inheritDoc}
* Single instance stoppable check implementation is a special case of
* {@link #batchGetInstancesStoppableChecks(String, List, String, Set)}
* {@link #batchGetInstancesStoppableChecks(String, List, String)}
* <p>
* Step 1: Perform instance level Helix own health checks
* Step 2: Perform instance level client side health checks
Expand All @@ -339,17 +339,23 @@ private List<OperationInterface> getAllOperationClasses(List<String> operations)
*/
public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
String jsonContent) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent, Collections.emptySet())
.get(instanceName);
return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName),
jsonContent).get(instanceName);
}

public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent,
Collections.emptySet());
}

public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent, Set<String> toBeStoppedInstances) throws IOException {
MarkGaox marked this conversation as resolved.
Show resolved Hide resolved
Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
// helix instance check.
List<String> instancesForCustomInstanceLevelChecks =
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks, toBeStoppedInstances);
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks,
toBeStoppedInstances);
// custom check, includes partition check.
batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
finalStoppableChecks, getMapFromJsonPayload(jsonContent));
Expand Down Expand Up @@ -441,10 +447,11 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl
}

private List<String> batchHelixInstanceStoppableCheck(String clusterId,
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks, Set<String> toBeStoppedInstances) {
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(Collectors
.toMap(Function.identity(),
instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks,
Set<String> toBeStoppedInstances) {
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));
// finalStoppableChecks contains instances that does not pass this health check
return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
}
Expand Down Expand Up @@ -512,7 +519,8 @@ private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(
if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) {
// this is helix own check
instancesForNext =
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks, Collections.emptySet());
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks,
Collections.emptySet());
} else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
// custom check, includes custom Instance check and partition check.
instancesForNext =
Expand Down Expand Up @@ -601,10 +609,12 @@ private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) {
return true;
}

private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName, Set<String> toBeStoppedInstances) {
private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName,
Set<String> toBeStoppedInstances) {
LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
Map<String, Boolean> helixStoppableCheck =
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST, toBeStoppedInstances);
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST,
toBeStoppedInstances);

return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
}
Expand Down Expand Up @@ -695,6 +705,12 @@ public static boolean getBooleanFromJsonPayload(String jsonString)
return OBJECT_MAPPER.readTree(jsonString).asBoolean();
}

@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks) {
return getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet());
}

@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Expand Down
Loading
Loading