Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change instance operation orthogonal to instance enable #2615

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ public enum InstanceDisabledType {
public enum InstanceOperation {
EVACUATE, // Node will be removed after a period of time
SWAP_IN, // New node joining for swap operation
SWAP_OUT, // Existing Node to be removed for swap operation
ENABLE, // Backward compatible field for HELIX_ENABLED. Set when changing from disabled to enabled.
DISABLE // Backward compatible field for HELIX_ENABLED. Set when changing from enabled to disabled.
SWAP_OUT // Existing Node to be removed for swap operation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@
import java.util.Optional;
import java.util.Set;

import java.util.stream.Collectors;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
Expand All @@ -53,6 +56,7 @@
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
private static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE = Set.of("EVACUATE", "SWAP_IN");

@Override
public IdealState computeNewIdealState(String resourceName,
Expand Down Expand Up @@ -109,14 +113,12 @@ public IdealState computeNewIdealState(String resourceName,
allNodes = clusterData.getAllInstances();
}

Set<String> activeNodes = liveEnabledNodes;
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);

Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
Expand Down Expand Up @@ -157,15 +159,20 @@ public IdealState computeNewIdealState(String resourceName,

// sort node lists to ensure consistent preferred assignments
List<String> allNodeList = new ArrayList<>(allNodes);
List<String> liveEnabledNodeList = new ArrayList<>(liveEnabledNodes);
// We will not assign partition to instances with evacuation and wap-out tag.
// TODO: Currently we have 2 groups of instances and compute preference list twice and merge.
// Eventually we want to have exclusive groups of instance for different instance tag.
List<String> liveEnabledAssignableNodeList = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
liveEnabledNodes);
Collections.sort(allNodeList);
Collections.sort(liveEnabledNodeList);
Collections.sort(liveEnabledAssignableNodeList);

ZNRecord newIdealMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, liveEnabledNodeList, currentMapping, clusterData);
.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;

if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)) {
if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)
|| liveEnabledAssignableNodeList.size()!= activeNodes.size()) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
Expand Down Expand Up @@ -194,6 +201,14 @@ public IdealState computeNewIdealState(String resourceName,
return idealState;
}

private static List<String> filterOutOnOperationInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(
instance -> !INSTANCE_OPERATION_TO_EXCLUDE.contains(instanceConfigMap.get(instance).getInstanceOperation()))
.collect(Collectors.toList());
}

private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState,
ZNRecord newMapping) {
IdealState newIdealState = new IdealState(resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static Set<String> getActiveNodes(Set<String> allNodes, IdealState idealS
private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap,
long delay, ClusterConfig clusterConfig) {
Set<String> activeNodes = filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
Set<String> activeNodes = new HashSet<>(liveEnabledNodes);
Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
long currentTime = System.currentTimeMillis();
Expand All @@ -126,10 +126,10 @@ private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> live
activeNodes.add(ins);
}
}
return activeNodes;
return filterOutEvacuatingInstances(instanceConfigMap, activeNodes);
}

private static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
public static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(instance -> !instanceConfigMap.get(instance).getInstanceOperation().equals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,6 @@ public boolean getInstanceEnabled() {
*/
public void setInstanceEnabled(boolean enabled) {
// set instance operation only when we need to change InstanceEnabled value.
// When enabling an instance where current HELIX_ENABLED is false, we update INSTANCE_OPERATION to 'ENABLE'
// When disabling and instance where current HELIX_ENABLED is false, we overwrite what current operation and
// update INSTANCE_OPERATION to 'DISABLE'.
String instanceOperationKey = InstanceConfigProperty.INSTANCE_OPERATION.toString();
if (enabled != getInstanceEnabled()) {
_record.setSimpleField(instanceOperationKey,
enabled ? InstanceConstants.InstanceOperation.ENABLE.name()
: InstanceConstants.InstanceOperation.DISABLE.name());
}
setInstanceEnabledHelper(enabled);
}

Expand Down Expand Up @@ -344,17 +335,8 @@ public long getInstanceEnabledTime() {
}

public void setInstanceOperation(InstanceConstants.InstanceOperation operation) {
if (operation != InstanceConstants.InstanceOperation.DISABLE
&& operation != InstanceConstants.InstanceOperation.ENABLE) {
if (!getInstanceEnabled()) {
throw new HelixException(
"setting non enable/disable operation (e.g. evacuate, swap) to helix disabled instance is not allowed");
}
} else {
setInstanceEnabledHelper(operation == InstanceConstants.InstanceOperation.ENABLE);
}

_record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.toString(), operation.toString());
_record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
operation == null ? "" : operation.name());
}

public String getInstanceOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.participant.StateMachineEngine;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class TestInstanceOperation extends ZkTestBase {
private Set<String> _allDBs = new HashSet<>();
private ZkHelixClusterVerifier _clusterVerifier;
private ConfigAccessor _configAccessor;
private long _stateModelDelay = 30L;
private long _stateModelDelay = 3L;
protected AssignmentMetadataStore _assignmentMetadataStore;
HelixDataAccessor _dataAccessor;

Expand Down Expand Up @@ -105,6 +106,7 @@ public void testEvacuate() throws Exception {
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);

System.out.println("123");
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// New ev should contain all instances but the evacuated one
Expand All @@ -125,7 +127,7 @@ public void testRevertEvacuation() throws Exception {
// revert an evacuate instance
String instanceToEvacuate = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);

Assert.assertTrue(_clusterVerifier.verifyByPolling());

Expand All @@ -138,6 +140,55 @@ public void testRevertEvacuation() throws Exception {
}

@Test(dependsOnMethods = "testRevertEvacuation")
public void testAddingNodeWithEvacuationTag() throws Exception {
// first disable and instance, and wait for all replicas to be moved out
String mockNewInstance = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, mockNewInstance, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
//ev should contain all instances but the disabled one
Map<String, ExternalView> assignment = getEV();
List<String> currentActiveInstances =
_participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList());
for (String resource : _allDBs) {
validateAssignmentInEv(assignment.get(resource), REPLICA-1);
Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource));
Assert.assertFalse(newPAssignedParticipants.contains(mockNewInstance));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

// add evacuate tag and enable instance
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, mockNewInstance, InstanceConstants.InstanceOperation.EVACUATE);
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, mockNewInstance, true);
//ev should be the same
assignment = getEV();
currentActiveInstances =
_participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList());
for (String resource : _allDBs) {
validateAssignmentInEv(assignment.get(resource), REPLICA-1);
Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource));
Assert.assertFalse(newPAssignedParticipants.contains(mockNewInstance));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

// now remove operation tag
String instanceToEvacuate = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);

Assert.assertTrue(_clusterVerifier.verifyByPolling());

// EV should contain all participants, check resources one by one
assignment = getEV();
for (String resource : _allDBs) {
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
validateAssignmentInEv(assignment.get(resource));
}
}

@Test(dependsOnMethods = "testAddingNodeWithEvacuationTag")
public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
// add a resource where downward state transition is slow
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
Expand All @@ -151,7 +202,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// set bootstrap ST delay to a large number
_stateModelDelay = -300000L;
_stateModelDelay = -10000L;
// evacuate an instance
String instanceToEvacuate = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
Expand All @@ -174,9 +225,9 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
validateAssignmentInEv(assignment.get(resource));
}

// cancel the evacuation by setting instance operation back to `ENABLE`
// cancel the evacuation
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);

assignment = getEV();
for (String resource : _allDBs) {
Expand All @@ -200,7 +251,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
public void testEvacuateAndCancelBeforeDropFinish() throws Exception {

// set DROP ST delay to a large number
_stateModelDelay = 300000L;
_stateModelDelay = 10000L;

// evacuate an instance
String instanceToEvacuate = _participants.get(0).getInstanceName();
Expand All @@ -211,8 +262,9 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);

// cancel evacuation
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
// check every replica has >= 3 active replicas, even before cluster converge
Map<String, ExternalView> assignment = getEV();
for (String resource : _allDBs) {
Expand Down Expand Up @@ -274,6 +326,27 @@ public void testMarkEvacuationAfterEMM() throws Exception {

}

@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
_participants.get(2).syncStop();
_participants.get(3).syncStop();
// wait for converge, and set evacuate on instance 0
Assert.assertTrue(_clusterVerifier.verifyByPolling());

String evacuateInstanceName = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, InstanceConstants.InstanceOperation.EVACUATE);

Map<String, IdealState> assignment;
List<String> currentActiveInstances =
_participantNames.stream().filter(n -> (!n.equals(evacuateInstanceName) && !n.equals(_participants.get(3).getInstanceName()))).collect(Collectors.toList());
TestHelper.verify( ()-> {return verifyIS(evacuateInstanceName);}, TestHelper.WAIT_DURATION);

_participants.get(3).syncStart();
_participants.get(2).syncStart();
}


private void addParticipant(String participantName) {
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, participantName);

Expand All @@ -290,8 +363,12 @@ private void addParticipant(String participantName) {
}

private void createTestDBs(long delayTime) throws InterruptedException {
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED",
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, -1,
CrushEdRebalanceStrategy.class.getName());
_allDBs.add("TEST_DB0_CRUSHED");
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB1_CRUSHED",
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, 200,
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, 2000000,
CrushEdRebalanceStrategy.class.getName());
_allDBs.add("TEST_DB1_CRUSHED");
createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB2_WAGED", BuiltInStateModelDefinitions.LeaderStandby.name(),
Expand All @@ -310,14 +387,39 @@ private Map<String, ExternalView> getEV() {
return externalViews;
}

private boolean verifyIS(String evacuateInstanceName) {
for (String db : _allDBs) {
IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
for (String partition : is.getPartitionSet()) {
List<String> newPAssignedParticipants = is.getPreferenceList(partition);
if (newPAssignedParticipants.contains(evacuateInstanceName)) {
System.out.println("partition " + partition + " assignment " + newPAssignedParticipants + " ev " + evacuateInstanceName);
return false;
}
}
}
return true;
}

private Set<String> getParticipantsInEv(ExternalView ev) {
Set<String> assignedParticipants = new HashSet<>();
ev.getPartitionSet().forEach(partition -> assignedParticipants.addAll(ev.getStateMap(partition).keySet()));
for (String partition : ev.getPartitionSet()) {
ev.getStateMap(partition)
.keySet()
.stream()
.filter(k -> !ev.getStateMap(partition).get(k).equals("OFFLINE"))
.forEach(assignedParticipants::add);
}
return assignedParticipants;
}

// verify that each partition has >=REPLICA (3 in this case) replicas

private void validateAssignmentInEv(ExternalView ev) {
validateAssignmentInEv(ev, REPLICA);
}

private void validateAssignmentInEv(ExternalView ev, int expectedNumber) {
Set<String> partitionSet = ev.getPartitionSet();
for (String partition : partitionSet) {
AtomicInteger activeReplicaCount = new AtomicInteger();
Expand All @@ -326,8 +428,7 @@ private void validateAssignmentInEv(ExternalView ev) {
.stream()
.filter(v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals("STANDBY"))
.forEach(v -> activeReplicaCount.getAndIncrement());
Assert.assertTrue(activeReplicaCount.get() >=REPLICA);

Assert.assertTrue(activeReplicaCount.get() >=expectedNumber);
}
}

Expand Down Expand Up @@ -375,7 +476,7 @@ public StDelayMSStateModel() {
private void sleepWhileNotCanceled(long sleepTime) throws InterruptedException{
while(sleepTime >0 && !isCancelled()) {
Thread.sleep(5000);
sleepTime =- 5000;
sleepTime = sleepTime - 5000;
}
if (isCancelled()) {
_cancelled = false;
Expand Down
Loading