Skip to content

Commit

Permalink
add APIs for evacuation
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Sep 18, 2023
1 parent 2ffb585 commit a32779b
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 1 deletion.
11 changes: 11 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -738,4 +738,15 @@ Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
*/
Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
List<String> instancesNames);

/**
* Return if instance operation 'Evacuate' is finished.
* Only return true if there is no current state on the instance and that instance is still alive.
* @param clusterName
* @param instancesNames
* @return
*/
boolean isEvacuateFinished(String clusterName, String instancesNames);

boolean isPrepopulateReady(String clusterName, String instancesNames);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
Expand Down Expand Up @@ -406,7 +407,41 @@ public ZNRecord update(ZNRecord currentData) {
}
}


@Override
public boolean isEvacuateFinished(String clusterName, String instanceName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();

// check the instance is alive
LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
if (liveInstance == null) {
logger.warn("instance {} in cluster {} is not alive.", instanceName, clusterName);
return false;
}

BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
String sessionId = liveInstance.getEphemeralOwner();

String path = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId);
List<String> currentStates = baseAccessor.getChildNames(path, 0);
if (currentStates == null) {
logger.warn("instance {} in cluster {} does not have live session.", instanceName,
clusterName);
return false;
}
return currentStates.isEmpty();
}

@Override
public boolean isPrepopulateReady(String clusterName, String instanceName) {
return this.getInstanceConfig(clusterName, instanceName)
.getInstanceOperation()
.equals(InstanceConstants.InstanceOperation.EVACUATE.name());
}

@Override
public void enableResource(final String clusterName, final String resourceName,
final boolean enabled) {
logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
Expand All @@ -22,6 +23,7 @@
import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
Expand Down Expand Up @@ -57,6 +59,8 @@ public class TestInstanceOperation extends ZkTestBase {
private ZkHelixClusterVerifier _clusterVerifier;
private ConfigAccessor _configAccessor;
private long _stateModelDelay = 3L;

private HelixAdmin _admin;
protected AssignmentMetadataStore _assignmentMetadataStore;
HelixDataAccessor _dataAccessor;

Expand Down Expand Up @@ -91,6 +95,8 @@ public void beforeClass() throws Exception {
createTestDBs(200);

setUpWagedBaseline();

_admin = new ZKHelixAdmin(_gZkClient);
}

@Test
Expand Down Expand Up @@ -119,6 +125,9 @@ public void testEvacuate() throws Exception {
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
Assert.assertTrue(_admin.isPrepopulateReady(CLUSTER_NAME, instanceToEvacuate));
}

@Test(dependsOnMethods = "testEvacuate")
Expand Down Expand Up @@ -215,6 +224,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000);
}
Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));

// sleep a bit so ST messages can start executing
Thread.sleep(Math.abs(_stateModelDelay / 100));
Expand Down Expand Up @@ -261,6 +271,7 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
// message should be pending at the to evacuate participant
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);
Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));

// cancel evacuation
_gSetupTool.getClusterManagementTool()
Expand Down Expand Up @@ -323,7 +334,7 @@ public void testMarkEvacuationAfterEMM() throws Exception {
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

Assert.assertTrue(_admin.isPrepopulateReady(CLUSTER_NAME, instanceToEvacuate));
}

@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
Expand Down
10 changes: 10 additions & 0 deletions helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,4 +550,14 @@ public Map<String, Boolean> validateInstancesForWagedRebalance(String clusterNam
List<String> instancesNames) {
return null;
}

@Override
public boolean isEvacuateFinished(String clusterName, String instancesNames) {
return false;
}

@Override
public boolean isPrepopulateReady(String clusterName, String instancesNames) {
return false;
}
}

0 comments on commit a32779b

Please sign in to comment.