From c650cf60500184543757006f51cc2dbf5340faf4 Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Wed, 6 Dec 2023 17:43:43 -0800 Subject: [PATCH] Make logic to determine state of replicas on SWAP_IN instance simpler and more predictable during an in-flight node swap. (#2706) --- .../controller/rebalancer/AbstractRebalancer.java | 4 ++-- .../controller/stages/BestPossibleStateCalcStage.java | 10 +++++++--- .../helix/controller/stages/MessageSelectionStage.java | 4 ++-- .../java/org/apache/helix/examples/Quickstart.java | 2 +- .../java/org/apache/helix/manager/zk/ZKHelixAdmin.java | 5 +++-- .../java/org/apache/helix/model/LeaderStandbySMD.java | 5 +++-- .../java/org/apache/helix/model/MasterSlaveSMD.java | 5 +++-- .../java/org/apache/helix/model/OnlineOfflineSMD.java | 5 +++-- .../helix/model/OnlineOfflineWithBootstrapSMD.java | 3 ++- .../org/apache/helix/model/StateModelDefinition.java | 10 ++++++---- .../org/apache/helix/model/StorageSchemataSMD.java | 5 +++-- .../model/util/StateModelDefinitionValidator.java | 3 ++- .../main/java/org/apache/helix/util/RebalanceUtil.java | 5 +++-- .../src/test/java/org/apache/helix/TestHelper.java | 2 +- .../TestPartitionLevelTransitionConstraint.java | 2 +- .../helix/integration/TestPreferenceListAsQueue.java | 3 ++- .../integration/messaging/TestMessageThrottle2.java | 2 +- .../org/apache/helix/model/TestStateModelValidity.java | 2 +- 18 files changed, 46 insertions(+), 31 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java index 7a23b8f280..51158cb911 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java @@ -329,9 +329,9 @@ public static int getStateCount(String state, StateModelDefinition stateModelDef int preferenceListSize) { String num = stateModelDef.getNumInstancesPerState(state); int stateCount = -1; - if ("N".equals(num)) { + if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(num)) { stateCount = liveAndEnabledSize; - } else if ("R".equals(num)) { + } else if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) { stateCount = preferenceListSize; } else { try { diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 8ec4b44757..05652e222d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -37,7 +37,6 @@ import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; -import org.apache.helix.controller.rebalancer.AbstractRebalancer; import org.apache.helix.controller.rebalancer.CustomRebalancer; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.MaintenanceRebalancer; @@ -150,8 +149,13 @@ private void addSwapInInstancesToBestPossibleState(Map resourc commonInstances.forEach(swapOutInstance -> { if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState())) { - if (AbstractRebalancer.getStateCount(stateModelDef.getTopState(), stateModelDef, - stateMap.size() + 1, stateMap.size() + 1) > stateMap.size()) { + + String topStateCount = + stateModelDef.getNumInstancesPerState(stateModelDef.getTopState()); + if (topStateCount.equals( + StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES) + || topStateCount.equals( + StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) { // If the swap-out instance's replica is a topState and the StateModel allows for // another replica with the topState to be added, set the swap-in instance's replica // to the topState. diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java index 09894263f0..2751f1b26a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java @@ -262,9 +262,9 @@ private Map computeStateConstraints(StateModelDefinition stateMo for (String state : statePriorityList) { String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state); int max = -1; - if ("N".equals(numInstancesPerState)) { + if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(numInstancesPerState)) { max = cache.getLiveInstances().size(); - } else if ("R".equals(numInstancesPerState)) { + } else if (StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS.equals(numInstancesPerState)) { // idealState is null when resource has been dropped, // R can't be evaluated and ignore state constraints //if (idealState != null) { diff --git a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java index 5d1df0a02c..9cc14b6039 100644 --- a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java +++ b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java @@ -124,7 +124,7 @@ private static StateModelDefinition defineStateModel() { builder.upperBound(LEADER, 1); // dynamic constraint, R means it should be derived based on the replication // factor. - builder.dynamicUpperBound(STANDBY, "R"); + builder.dynamicUpperBound(STANDBY, StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); StateModelDefinition statemodelDefinition = builder.build(); return statemodelDefinition; diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 7a0fe6377e..8a8d13b7c0 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -2103,12 +2103,13 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP throw new HelixException("Invalid or unsupported state model definition"); } masterStateValue = state; - } else if (count.equalsIgnoreCase("R")) { + } else if (count.equalsIgnoreCase(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) { if (slaveStateValue != null) { throw new HelixException("Invalid or unsupported state model definition"); } slaveStateValue = state; - } else if (count.equalsIgnoreCase("N")) { + } else if (count.equalsIgnoreCase( + StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)) { if (!(masterStateValue == null && slaveStateValue == null)) { throw new HelixException("Invalid or unsupported state model definition"); } diff --git a/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java b/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java index e7c92a9eab..0d400817cd 100644 --- a/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java +++ b/helix-core/src/main/java/org/apache/helix/model/LeaderStandbySMD.java @@ -68,7 +68,8 @@ public static StateModelDefinition build() { // bounds builder.upperBound(States.LEADER.name(), 1); - builder.dynamicUpperBound(States.STANDBY.name(), "R"); + builder.dynamicUpperBound(States.STANDBY.name(), + StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); return builder.build(); } @@ -97,7 +98,7 @@ public static ZNRecord generateConfigForLeaderStandby() { record.setMapField(key, metadata); } if (state.equals("STANDBY")) { - metadata.put("count", "R"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); record.setMapField(key, metadata); } if (state.equals("OFFLINE")) { diff --git a/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java b/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java index 02900a27ac..09b06b27a7 100644 --- a/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java +++ b/helix-core/src/main/java/org/apache/helix/model/MasterSlaveSMD.java @@ -69,7 +69,8 @@ public static StateModelDefinition build() { // bounds builder.upperBound(States.MASTER.name(), 1); - builder.dynamicUpperBound(States.SLAVE.name(), "R"); + builder.dynamicUpperBound(States.SLAVE.name(), + StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); return builder.build(); } @@ -98,7 +99,7 @@ public static ZNRecord generateConfigForMasterSlave() { metadata.put("count", "1"); record.setMapField(key, metadata); } else if (state.equals("SLAVE")) { - metadata.put("count", "R"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); record.setMapField(key, metadata); } else if (state.equals("OFFLINE")) { metadata.put("count", "-1"); diff --git a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java index 3f3759d8d6..fd97c7ba9f 100644 --- a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java +++ b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineSMD.java @@ -63,7 +63,8 @@ public static StateModelDefinition build() { builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name()); // bounds - builder.dynamicUpperBound(States.ONLINE.name(), "R"); + builder.dynamicUpperBound(States.ONLINE.name(), + StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); return builder.build(); } @@ -87,7 +88,7 @@ public static ZNRecord generateConfigForOnlineOffline() { String key = state + ".meta"; Map metadata = new HashMap(); if (state.equals("ONLINE")) { - metadata.put("count", "R"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); record.setMapField(key, metadata); } if (state.equals("OFFLINE")) { diff --git a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java index 90ccbde4ae..58acf02a22 100644 --- a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java +++ b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java @@ -67,7 +67,8 @@ public static OnlineOfflineWithBootstrapSMD build() { builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name()); // bounds - builder.dynamicUpperBound(States.ONLINE.name(), "R"); + builder.dynamicUpperBound(States.ONLINE.name(), + StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); return new OnlineOfflineWithBootstrapSMD(builder.build().getRecord()); } diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java index 9570dfeb3e..fcf24fb305 100644 --- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java +++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java @@ -49,6 +49,8 @@ public enum StateModelDefinitionProperty { } public static final int TOP_STATE_PRIORITY = 1; + public static final String STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES = "N"; + public static final String STATE_REPLICA_COUNT_ALL_REPLICAS = "R"; /** * state model's initial state @@ -200,7 +202,7 @@ public String getInitialState() { /** * Number of instances that can be in each state * @param state the state name - * @return maximum instance count per state, can be "N" or "R" + * @return maximum instance count per state, can be STATE_REPLICA_COUNT_ALL_NODES or STATE_REPLICA_COUNT_ALL_REPLICAS */ public String getNumInstancesPerState(String state) { return _statesCountMap.get(state); @@ -449,11 +451,11 @@ public LinkedHashMap getStateCountMap(int candidateNodeNum, int if (candidateNodeNum <= 0) { break; } - if ("N".equals(num)) { + if (STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES.equals(num)) { stateCountMap.put(state, candidateNodeNum); replicas -= candidateNodeNum; break; - } else if ("R".equals(num)) { + } else if (STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) { // wait until we get the counts for all other states continue; } else { @@ -475,7 +477,7 @@ public LinkedHashMap getStateCountMap(int candidateNodeNum, int // get state count for R for (String state : statesPriorityList) { String num = getNumInstancesPerState(state); - if ("R".equals(num)) { + if (STATE_REPLICA_COUNT_ALL_REPLICAS.equals(num)) { if (candidateNodeNum > 0 && replicas > 0) { stateCountMap.put(state, replicas < candidateNodeNum ? replicas : candidateNodeNum); } diff --git a/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java b/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java index ea3fb4d9ff..c19e3c44d0 100644 --- a/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java +++ b/helix-core/src/main/java/org/apache/helix/model/StorageSchemataSMD.java @@ -63,7 +63,8 @@ public static StateModelDefinition build() { builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name()); // bounds - builder.dynamicUpperBound(States.MASTER.name(), "N"); + builder.dynamicUpperBound(States.MASTER.name(), + StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES); return builder.build(); } @@ -88,7 +89,7 @@ public static ZNRecord generateConfigForStorageSchemata() { String key = state + ".meta"; Map metadata = new HashMap(); if (state.equals("MASTER")) { - metadata.put("count", "N"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES); record.setMapField(key, metadata); } else if (state.equals("OFFLINE")) { metadata.put("count", "-1"); diff --git a/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java b/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java index b208efa693..7eb2047cc3 100644 --- a/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java +++ b/helix-core/src/main/java/org/apache/helix/model/util/StateModelDefinitionValidator.java @@ -122,7 +122,8 @@ private boolean areStateCountsValid() { try { Integer.parseInt(count); } catch (NumberFormatException e) { - if (!count.equals("N") && !count.equals("R")) { + if (!count.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES) + && !count.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) { _logger.error("State " + state + " has invalid count " + count + ", state model: " + _stateModelDef.getId()); return false; diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java index 868e0cf577..5c7effb6f4 100644 --- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java @@ -125,12 +125,13 @@ public static String[] parseStates(String clusterName, StateModelDefinition stat throw new HelixException("Invalid or unsupported state model definition"); } masterStateValue = state; - } else if (count.equalsIgnoreCase("R")) { + } else if (count.equalsIgnoreCase(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) { if (slaveStateValue != null) { throw new HelixException("Invalid or unsupported state model definition"); } slaveStateValue = state; - } else if (count.equalsIgnoreCase("N")) { + } else if (count.equalsIgnoreCase( + StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)) { if (!(masterStateValue == null && slaveStateValue == null)) { throw new HelixException("Invalid or unsupported state model definition"); } diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java index 79f238da77..9dbba34769 100644 --- a/helix-core/src/test/java/org/apache/helix/TestHelper.java +++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java @@ -693,7 +693,7 @@ public static StateModelDefinition generateStateModelDefForBootstrap() { String key = state + ".meta"; Map metadata = new HashMap(); if (state.equals("ONLINE")) { - metadata.put("count", "R"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); record.setMapField(key, metadata); } else if (state.equals("BOOTSTRAP")) { metadata.put("count", "-1"); diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java index c4c37fe386..9805a8c086 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionLevelTransitionConstraint.java @@ -230,7 +230,7 @@ private static StateModelDefinition defineStateModel() { // static constraint builder.upperBound("MASTER", 1); // dynamic constraint, R means it should be derived based on the replication factor. - builder.dynamicUpperBound("SLAVE", "R"); + builder.dynamicUpperBound("SLAVE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); StateModelDefinition statemodelDefinition = builder.build(); diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java index 2b32c219e8..178b37a9cf 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestPreferenceListAsQueue.java @@ -266,7 +266,8 @@ private StateModelDefinition createReprioritizedStateModelDef(String stateModelN .addState("ONLINE", 1).addState("OFFLINE").addState("DROPPED").addState("ERROR") .initialState("OFFLINE").addTransition("ERROR", "OFFLINE", 1) .addTransition("ONLINE", "OFFLINE", 2).addTransition("OFFLINE", "DROPPED", 3) - .addTransition("OFFLINE", "ONLINE", 4).dynamicUpperBound("ONLINE", "R") + .addTransition("OFFLINE", "ONLINE", 4) + .dynamicUpperBound("ONLINE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS) .upperBound("OFFLINE", -1).upperBound("DROPPED", -1).upperBound("ERROR", -1); return builder.build(); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java index b37493101e..b11e6350e5 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java @@ -186,7 +186,7 @@ private ZNRecord generateConfigForMasterSlave() { record.setMapField(key, metadata); break; case "SLAVE": - metadata.put("count", "R"); + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); record.setMapField(key, metadata); break; case "OFFLINE": diff --git a/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java b/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java index f8955abbd8..724c3315db 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestStateModelValidity.java @@ -230,7 +230,7 @@ public void testBasic() { .upperBound("MASTER", 1) // R indicates an upper bound of number of replicas for each partition - .dynamicUpperBound("SLAVE", "R") + .dynamicUpperBound("SLAVE", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS) // Add some high-priority transitions .addTransition("SLAVE", "MASTER", 1).addTransition("OFFLINE", "SLAVE", 2)