diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 6c1c4d74d9..e717aa9962 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -47,6 +47,7 @@ import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; @@ -395,7 +396,8 @@ private Map handleDelayedRebalanceMinActiveReplica( Map currentResourceAssignment, RebalanceAlgorithm algorithm) throws HelixRebalanceException { // the "real" live nodes at the time - final Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); + // TODO: this is a hacky way to filter our on operation instance. We should consider redesign `getEnabledLiveInstances()`. + final Set enabledLiveInstances = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances()); if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) { // no need for additional process, return the current resource assignment return currentResourceAssignment; @@ -424,6 +426,14 @@ private Map handleDelayedRebalanceMinActiveReplica( } } + private static Set filterOutOnOperationInstances(Map instanceConfigMap, + Set nodes) { + return nodes.stream() + .filter( + instance -> !DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation())) + .collect(Collectors.toSet()); + } + /** * Emergency rebalance is scheduled to quickly handle urgent cases like reassigning partitions from inactive nodes * and addressing for partitions failing to meet minActiveReplicas. @@ -608,7 +618,8 @@ protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clust bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> { String resourceName = resourceAssignment.getResourceName(); IdealState currentIdealState = clusterData.getIdealState(resourceName); - Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); + Set enabledLiveInstances = + filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances()); int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size()); int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName), diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 6c51d58bbc..10cd662cb2 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -15,6 +15,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixRollbackException; import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyPathBuilder; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.constants.InstanceConstants; @@ -89,9 +90,11 @@ public void beforeClass() throws Exception { ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); clusterConfig.stateTransitionCancelEnabled(true); + clusterConfig.setDelayRebalaceEnabled(true); + clusterConfig.setRebalanceDelayTime(1800000L); _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); - createTestDBs(200); + createTestDBs(1800000L); setUpWagedBaseline(); @@ -100,6 +103,7 @@ public void beforeClass() throws Exception { @Test public void testEvacuate() throws Exception { + System.out.println("START TestInstanceOperation.testEvacuate() at " + new Date(System.currentTimeMillis())); // EV should contain all participants, check resources one by one Map assignment = getEV(); for (String resource : _allDBs) { @@ -130,7 +134,7 @@ public void testEvacuate() throws Exception { @Test(dependsOnMethods = "testEvacuate") public void testRevertEvacuation() throws Exception { - + System.out.println("START TestInstanceOperation.testRevertEvacuation() at " + new Date(System.currentTimeMillis())); // revert an evacuate instance String instanceToEvacuate = _participants.get(0).getInstanceName(); _gSetupTool.getClusterManagementTool() @@ -148,6 +152,7 @@ public void testRevertEvacuation() throws Exception { @Test(dependsOnMethods = "testRevertEvacuation") public void testAddingNodeWithEvacuationTag() throws Exception { + System.out.println("START TestInstanceOperation.testAddingNodeWithEvacuationTag() at " + new Date(System.currentTimeMillis())); // first disable and instance, and wait for all replicas to be moved out String mockNewInstance = _participants.get(0).getInstanceName(); _gSetupTool.getClusterManagementTool() @@ -197,9 +202,10 @@ public void testAddingNodeWithEvacuationTag() throws Exception { @Test(dependsOnMethods = "testAddingNodeWithEvacuationTag") public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { + System.out.println("START TestInstanceOperation.testEvacuateAndCancelBeforeBootstrapFinish() at " + new Date(System.currentTimeMillis())); // add a resource where downward state transition is slow createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA, - REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName()); + REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName()); _allDBs.add("TEST_DB3_DELAYED_CRUSHED"); // add a resource where downward state transition is slow createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave", @@ -258,6 +264,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish") public void testEvacuateAndCancelBeforeDropFinish() throws Exception { + System.out.println("START TestInstanceOperation.testEvacuateAndCancelBeforeDropFinish() at " + new Date(System.currentTimeMillis())); // set DROP ST delay to a large number _stateModelDelay = 10000L; @@ -294,6 +301,7 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception { @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish") public void testMarkEvacuationAfterEMM() throws Exception { + System.out.println("START TestInstanceOperation.testMarkEvacuationAfterEMM() at " + new Date(System.currentTimeMillis())); _stateModelDelay = 1000L; Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME)); _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, @@ -338,21 +346,40 @@ public void testMarkEvacuationAfterEMM() throws Exception { @Test(dependsOnMethods = "testMarkEvacuationAfterEMM") public void testEvacuationWithOfflineInstancesInCluster() throws Exception { + System.out.println("START TestInstanceOperation.testEvacuationWithOfflineInstancesInCluster() at " + new Date(System.currentTimeMillis())); + _participants.get(1).syncStop(); _participants.get(2).syncStop(); - _participants.get(3).syncStop(); - // wait for converge, and set evacuate on instance 0 - Assert.assertTrue(_clusterVerifier.verifyByPolling()); - String evacuateInstanceName = _participants.get(0).getInstanceName(); + String evacuateInstanceName = _participants.get(_participants.size()-2).getInstanceName(); _gSetupTool.getClusterManagementTool() .setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, InstanceConstants.InstanceOperation.EVACUATE); - Map assignment; - List currentActiveInstances = - _participantNames.stream().filter(n -> (!n.equals(evacuateInstanceName) && !n.equals(_participants.get(3).getInstanceName()))).collect(Collectors.toList()); - TestHelper.verify( ()-> {return verifyIS(evacuateInstanceName);}, TestHelper.WAIT_DURATION); + Map assignment; + // EV should contain all participants, check resources one by one + assignment = getEV(); + for (String resource : _allDBs) { + TestHelper.verify(() -> { + ExternalView ev = assignment.get(resource); + for (String partition : ev.getPartitionSet()) { + AtomicInteger activeReplicaCount = new AtomicInteger(); + ev.getStateMap(partition) + .values() + .stream() + .filter(v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") + || v.equals("STANDBY")) + .forEach(v -> activeReplicaCount.getAndIncrement()); + if (activeReplicaCount.get() < REPLICA - 1 || (ev.getStateMap(partition).containsKey(evacuateInstanceName) + && ev.getStateMap(partition).get(evacuateInstanceName).equals("MASTER") && ev.getStateMap(partition) + .get(evacuateInstanceName) + .equals("LEADER"))) { + return false; + } + } + return true; + }, 30000); + } - _participants.get(3).syncStart(); + _participants.get(1).syncStart(); _participants.get(2).syncStart(); }