Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HelixAdmin APIs and pipeline changes to support Helix Node Swap #2661

Merged
merged 18 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import java.util.Map;

import javax.annotation.Nullable;

import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.api.topology.ClusterTopology;
Expand Down Expand Up @@ -302,8 +304,15 @@ void enableInstance(String clusterName, String instanceName, boolean enabled,
*/
void enableInstance(String clusterName, List<String> instances, boolean enabled);

void setInstanceOperation(String clusterName, String instance,
InstanceConstants.InstanceOperation instanceOperation);
/**
* Set the instanceOperation field.
*
* @param clusterName The cluster name
* @param instanceName The instance name
* @param instanceOperation The instance operation
*/
void setInstanceOperation(String clusterName, String instanceName,
@Nullable InstanceConstants.InstanceOperation instanceOperation);

/**
* Disable or enable a resource
Expand Down Expand Up @@ -747,6 +756,26 @@ Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
*/
boolean isEvacuateFinished(String clusterName, String instancesNames);

/**
* Check to see if swapping between two instances can be completed. Either the swapOut or
* swapIn instance can be passed in.
* @param clusterName The cluster name
* @param instanceName The instance that is being swapped out or swapped in
* @return True if the swap is ready to be completed, false otherwise.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for my lack of understanding, typically swap involves 2 entries, why does the API take only one? Also what does "swap" mean in this context?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless instanceName is like "virtual" and there are 2 entities tied to the same virtual instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instance name in this case will be one of the unique instance names(same as InstanceConfig znode ID) involved in the swap. Either SWAP_OUT or SWAP_IN. This API is intended to be used with the PerInstanceAccessor in helix-rest. For that endpoint you pass one instanceName.

Because we can find the matching swap instance for either SWAP_OUT or SWAP_IN node, we can take either as the instance name.

"swap" is referring to the operation between two instances which can be deduced from providing one of the instances involved in the swap.

*/
boolean canCompleteSwap(String clusterName, String instanceName);

/**
* Check to see if swapping between two instances is ready to be completed and complete it if
* possible. Either the swapOut or swapIn instance can be passed in.
*
* @param clusterName The cluster name
* @param instanceName The instance that is being swapped out or swapped in
* @return True if the swap is ready to be completed and was completed successfully, false
* otherwise.
*/
boolean completeSwapIfPossible(String clusterName, String instanceName);

/**
* Return if instance is ready for preparing joining cluster. The instance should have no current state,
* no pending message and tagged with operation that exclude the instance from Helix assignment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import org.apache.helix.common.caches.PropertyCache;
import org.apache.helix.common.caches.TaskCurrentStateCache;
import org.apache.helix.common.controllers.ControlContextProvider;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
Expand Down Expand Up @@ -116,6 +118,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
private Map<String, Map<String, String>> _idealStateRuleMap;
private final Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
private final Set<String> _disabledInstanceSet = new HashSet<>();
private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = new HashMap<>();
private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance = new HashMap<>();
Expand Down Expand Up @@ -437,6 +441,8 @@ protected synchronized Set<HelixConstants.ChangeType> doRefresh(HelixDataAccesso

updateIdealRuleMap(getClusterConfig());
updateDisabledInstances(getInstanceConfigMap().values(), getClusterConfig());
updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(),
getClusterConfig());

return refreshedTypes;
}
Expand Down Expand Up @@ -471,6 +477,8 @@ public void setClusterConfig(ClusterConfig clusterConfig) {
refreshAbnormalStateResolverMap(_clusterConfig);
updateIdealRuleMap(_clusterConfig);
updateDisabledInstances(getInstanceConfigMap().values(), _clusterConfig);
updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(),
_clusterConfig);
}

@Override
Expand Down Expand Up @@ -617,6 +625,24 @@ public Set<String> getDisabledInstances() {
return Collections.unmodifiableSet(_disabledInstanceSet);
}

/**
* Get all swapping instance pairs.
*
* @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN instanceNames.
*/
public Map<String, String> getSwapOutToSwapInInstancePairs() {
return Collections.unmodifiableMap(_swapOutInstanceNameToSwapInInstanceName);
}

/**
* Get all the enabled and live SWAP_IN instances.
*
* @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance.
*/
public Set<String> getEnabledLiveSwapInInstanceNames() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: when we tag SWAP_IN instanceNames, don't we already check if there is already an paring SWAP_OUT instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline.

TLDR: Sanity checks ensure that we can't have a case where there are two instances with the same logicalId and both have InstanceOperation unset. This allows for setting either SWAP_OUT node or SWAP_IN node instance operation first.

return Collections.unmodifiableSet(_enabledLiveSwapInInstanceNames);
}

public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
_liveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances));
_updateInstanceOfflineTime = true;
Expand Down Expand Up @@ -750,6 +776,8 @@ public Map<String, InstanceConfig> getInstanceConfigMap() {
public void setInstanceConfigMap(Map<String, InstanceConfig> instanceConfigMap) {
_instanceConfigCache.setPropertyMap(instanceConfigMap);
updateDisabledInstances(instanceConfigMap.values(), getClusterConfig());
updateSwappingInstances(instanceConfigMap.values(), getEnabledLiveInstances(),
getClusterConfig());
}

/**
Expand Down Expand Up @@ -858,6 +886,49 @@ private void updateDisabledInstances(Collection<InstanceConfig> instanceConfigs,
}
}

private void updateSwappingInstances(Collection<InstanceConfig> instanceConfigs,
zpinto marked this conversation as resolved.
Show resolved Hide resolved
Set<String> liveEnabledInstances, ClusterConfig clusterConfig) {
_swapOutInstanceNameToSwapInInstanceName.clear();
_enabledLiveSwapInInstanceNames.clear();

if (clusterConfig == null) {
logger.warn("Skip refreshing swapping instances because clusterConfig is null.");
return;
}

ClusterTopologyConfig clusterTopologyConfig =
ClusterTopologyConfig.createFromClusterConfig(clusterConfig);

Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
Map<String, String> swapInInstancesByLogicalId = new HashMap<>();
instanceConfigs.forEach(instanceConfig -> {
if (instanceConfig == null) {
return;
}
if (instanceConfig.getInstanceOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
swapOutLogicalIdsByInstanceName.put(instanceConfig.getInstanceName(),
instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()));
}
if (instanceConfig.getInstanceOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
swapInInstancesByLogicalId.put(
instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()),
instanceConfig.getInstanceName());
}
});

swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
String swapInInstanceName = swapInInstancesByLogicalId.get(value);
if (swapInInstanceName != null) {
_swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName);
if (liveEnabledInstances.contains(swapInInstanceName)) {
_enabledLiveSwapInInstanceNames.add(swapInInstanceName);
}
}
});
}

/*
* Check if the instance is timed-out during maintenance mode. An instance is timed-out if it has
* been offline for longer than the user defined timeout window.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@
import java.util.Optional;
import java.util.Set;

import java.util.stream.Collectors;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
Expand All @@ -56,7 +56,8 @@
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
public static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = ImmutableSet.of("EVACUATE", "SWAP_IN");
public static ImmutableSet<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT =
ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE.name());

@Override
public IdealState computeNewIdealState(String resourceName,
Expand Down Expand Up @@ -113,9 +114,16 @@ public IdealState computeNewIdealState(String resourceName,
allNodes = clusterData.getAllInstances();
}

Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we could integrate the logic into getEnabledLiveInstances?
Probably with better name when we work on consolidating operation and disable/enable in one enum and we redesign all the getters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO for this. We can do it when we refactor to use states instead of InstanceOperation.

ClusterTopologyConfig.createFromClusterConfig(clusterConfig),
clusterData.getInstanceConfigMap(), allNodes);
// Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
// This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
liveEnabledNodes.retainAll(allNodesDeduped);

long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
.getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
Expand All @@ -127,11 +135,11 @@ public IdealState computeNewIdealState(String resourceName,
clusterConfig, _manager);
}

if (allNodes.isEmpty() || activeNodes.isEmpty()) {
if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) {
LOG.error(String.format(
"No instances or active instances available for resource %s, "
+ "allInstances: %s, liveInstances: %s, activeInstances: %s",
resourceName, allNodes, liveEnabledNodes, activeNodes));
resourceName, allNodesDeduped, liveEnabledNodes, activeNodes));
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
Expand All @@ -157,41 +165,58 @@ public IdealState computeNewIdealState(String resourceName,
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), allPartitions, resourceName,
stateCountMap, maxPartition);

// sort node lists to ensure consistent preferred assignments
List<String> allNodeList = new ArrayList<>(allNodes);
// We will not assign partition to instances with evacuation and wap-out tag.
List<String> allNodeList = new ArrayList<>(allNodesDeduped);

// TODO: Currently we have 2 groups of instances and compute preference list twice and merge.
// Eventually we want to have exclusive groups of instance for different instance tag.
List<String> liveEnabledAssignableNodeList = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
liveEnabledNodes);
List<String> liveEnabledAssignableNodeList = new ArrayList<>(
// We will not assign partitions to instances with EVACUATE InstanceOperation.
DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(),
liveEnabledNodes));
// sort node lists to ensure consistent preferred assignments
Collections.sort(allNodeList);
Collections.sort(liveEnabledAssignableNodeList);

ZNRecord newIdealMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, currentMapping, clusterData);
ZNRecord newIdealMapping =
_rebalanceStrategy.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList,
currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;

if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)
|| liveEnabledAssignableNodeList.size()!= activeNodes.size()) {
|| liveEnabledAssignableNodeList.size() != activeNodes.size()) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, currentIdealState),
currentIdealState, replicaCount);

ZNRecord newActiveMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
ZNRecord newActiveMapping =
_rebalanceStrategy.computePartitionAssignment(allNodeList, activeNodeList, currentMapping,
clusterData);
finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping,
liveEnabledNodes, replicaCount, minActiveReplicas);
}

finalMapping.getListFields().putAll(userDefinedPreferenceList);

// 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
Map<String, String> swapOutToSwapInInstancePairs =
clusterData.getSwapOutToSwapInInstancePairs();
// 2. Get all enabled and live SWAP_IN instances in the cluster.
Set<String> enabledLiveSwapInInstances = clusterData.getEnabledLiveSwapInInstanceNames();
// 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end.
// Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active
// swap occurring.
if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(finalMapping,
swapOutToSwapInInstancePairs, enabledLiveSwapInInstances);
}

LOG.debug("currentMapping: {}", currentMapping);
LOG.debug("stateCountMap: {}", stateCountMap);
LOG.debug("liveEnabledNodes: {}", liveEnabledNodes);
LOG.debug("activeNodes: {}", activeNodes);
LOG.debug("allNodes: {}", allNodes);
LOG.debug("allNodes: {}", allNodesDeduped);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newIdealMapping: {}", newIdealMapping);
LOG.debug("finalMapping: {}", finalMapping);
Expand All @@ -201,14 +226,6 @@ public IdealState computeNewIdealState(String resourceName,
return idealState;
}

private static List<String> filterOutOnOperationInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(
instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
.collect(Collectors.toList());
}

private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState,
ZNRecord newMapping) {
IdealState newIdealState = new IdealState(resourceName);
Expand Down Expand Up @@ -376,7 +393,7 @@ protected Map<String, String> computeBestPossibleStateForPartition(Set<String> l
// if preference list is not empty, and we do have new intanceToAdd, we
// should check if it has capacity to hold the partition.
boolean isWaged = WagedValidationUtil.isWagedEnabled(idealState) && cache != null;
if (isWaged && !isPreferenceListEmpty && instanceToAdd.size() > 0) {
if (isWaged && !isPreferenceListEmpty && !instanceToAdd.isEmpty()) {
// check instanceToAdd instance appears in combinedPreferenceList
for (String instance : instanceToAdd) {
if (combinedPreferenceList.contains(instance)) {
Expand Down Expand Up @@ -409,7 +426,11 @@ protected Map<String, String> computeBestPossibleStateForPartition(Set<String> l
bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name());
// We do not want to drop a SWAP_IN node if it is at the end of the preferenceList,
// because partitions are actively being added on this node to prepare for SWAP completion.
if (cache == null || !cache.getEnabledLiveSwapInInstanceNames().contains(instanceToDrop)) {
bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name());
}
}
}

Expand Down
Loading
Loading