From e90f4a5ec773236ac9177aad1d5f0dd7986968a0 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Wed, 20 Sep 2023 10:58:08 -0700 Subject: [PATCH 1/4] Exclude on-operation instance from computing min active replica in WAGED. (#2621) Exclude on-operation instance from computing min active replica in WAGED. --- .../rebalancer/waged/WagedRebalancer.java | 15 ++++++- .../rebalancer/TestInstanceOperation.java | 42 ++++++++++++++----- 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 6c1c4d74d9..e717aa9962 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -47,6 +47,7 @@ import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; @@ -395,7 +396,8 @@ private Map handleDelayedRebalanceMinActiveReplica( Map currentResourceAssignment, RebalanceAlgorithm algorithm) throws HelixRebalanceException { // the "real" live nodes at the time - final Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); + // TODO: this is a hacky way to filter our on operation instance. We should consider redesign `getEnabledLiveInstances()`. + final Set enabledLiveInstances = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances()); if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) { // no need for additional process, return the current resource assignment return currentResourceAssignment; @@ -424,6 +426,14 @@ private Map handleDelayedRebalanceMinActiveReplica( } } + private static Set filterOutOnOperationInstances(Map instanceConfigMap, + Set nodes) { + return nodes.stream() + .filter( + instance -> !DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation())) + .collect(Collectors.toSet()); + } + /** * Emergency rebalance is scheduled to quickly handle urgent cases like reassigning partitions from inactive nodes * and addressing for partitions failing to meet minActiveReplicas. @@ -608,7 +618,8 @@ protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clust bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> { String resourceName = resourceAssignment.getResourceName(); IdealState currentIdealState = clusterData.getIdealState(resourceName); - Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); + Set enabledLiveInstances = + filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances()); int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size()); int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName), diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 6c51d58bbc..c9c9c75979 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -15,6 +15,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixRollbackException; import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyPathBuilder; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.constants.InstanceConstants; @@ -89,9 +90,11 @@ public void beforeClass() throws Exception { ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); clusterConfig.stateTransitionCancelEnabled(true); + clusterConfig.setDelayRebalaceEnabled(true); + clusterConfig.setRebalanceDelayTime(1800000L); _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); - createTestDBs(200); + createTestDBs(1800000L); setUpWagedBaseline(); @@ -199,7 +202,7 @@ public void testAddingNodeWithEvacuationTag() throws Exception { public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { // add a resource where downward state transition is slow createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA, - REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName()); + REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName()); _allDBs.add("TEST_DB3_DELAYED_CRUSHED"); // add a resource where downward state transition is slow createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave", @@ -338,21 +341,38 @@ public void testMarkEvacuationAfterEMM() throws Exception { @Test(dependsOnMethods = "testMarkEvacuationAfterEMM") public void testEvacuationWithOfflineInstancesInCluster() throws Exception { + _participants.get(1).syncStop(); _participants.get(2).syncStop(); - _participants.get(3).syncStop(); - // wait for converge, and set evacuate on instance 0 - Assert.assertTrue(_clusterVerifier.verifyByPolling()); - String evacuateInstanceName = _participants.get(0).getInstanceName(); + String evacuateInstanceName = _participants.get(_participants.size()-2).getInstanceName(); _gSetupTool.getClusterManagementTool() .setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, InstanceConstants.InstanceOperation.EVACUATE); - Map assignment; - List currentActiveInstances = - _participantNames.stream().filter(n -> (!n.equals(evacuateInstanceName) && !n.equals(_participants.get(3).getInstanceName()))).collect(Collectors.toList()); - TestHelper.verify( ()-> {return verifyIS(evacuateInstanceName);}, TestHelper.WAIT_DURATION); + Map assignment; + // EV should contain all participants, check resources one by one + assignment = getEV(); + for (String resource : _allDBs) { + ExternalView ev = assignment.get(resource); + for (String partition : ev.getPartitionSet()) { + AtomicInteger activeReplicaCount = new AtomicInteger(); + ev.getStateMap(partition) + .values() + .stream() + .filter( + v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals( + "STANDBY")) + .forEach(v -> activeReplicaCount.getAndIncrement()); + Assert.assertTrue(activeReplicaCount.get() >= REPLICA-1); + Assert.assertFalse(ev.getStateMap(partition).containsKey(evacuateInstanceName) && ev.getStateMap(partition) + .get(evacuateInstanceName) + .equals("MASTER") && ev.getStateMap(partition) + .get(evacuateInstanceName) + .equals("LEADER")); + + } + } - _participants.get(3).syncStart(); + _participants.get(1).syncStart(); _participants.get(2).syncStart(); } From 0e064ddd62036ecd309361817ab9fff604ad7bd3 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Thu, 12 Oct 2023 11:57:40 -0700 Subject: [PATCH 2/4] property --- .../HelixRestDataProviderManager.java | 68 ++++++++++++++ .../dataprovider/PerClusterDataProvider.java | 91 +++++++++++++++++++ .../dataprovider/RestCurrentStateCache.java | 42 +++++++++ .../dataprovider/RestPropertyCache.java | 82 +++++++++++++++++ .../dataprovider/RestServiceDataProvider.java | 43 +++++++++ .../helix/rest/server/ServerContext.java | 15 +++ .../api/client/RealmAwareZkClient.java | 13 +++ .../impl/client/DedicatedZkClient.java | 11 +++ .../impl/client/FederatedZkClient.java | 11 +++ .../helix/zookeeper/impl/client/ZkClient.java | 1 + 10 files changed, 377 insertions(+) create mode 100644 helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/HelixRestDataProviderManager.java create mode 100644 helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/PerClusterDataProvider.java create mode 100644 helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestCurrentStateCache.java create mode 100644 helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestPropertyCache.java create mode 100644 helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestServiceDataProvider.java diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/HelixRestDataProviderManager.java b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/HelixRestDataProviderManager.java new file mode 100644 index 0000000000..58d59fc87d --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/HelixRestDataProviderManager.java @@ -0,0 +1,68 @@ +package org.apache.helix.rest.common.dataprovider; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** Init, register listener and manager callback handler for different + * clusters manage the providers lifecycle + * + */ + +import java.util.List; +import org.apache.helix.HelixAdmin; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; + + +public class HelixRestDataProviderManager { + + protected RealmAwareZkClient _zkclient; + + private HelixAdmin _helixAdmin; + + private RestServiceDataProvider _restServiceDataProvider; + // list of callback handlers + + //TODO: create own zk client + public HelixRestDataProviderManager(RealmAwareZkClient zkclient, HelixAdmin helixAdmin) { + _zkclient = zkclient; + _helixAdmin = helixAdmin; + _restServiceDataProvider = new RestServiceDataProvider(); + init(); + } + + public RestServiceDataProvider getRestServiceDataProvider() { + return _restServiceDataProvider; + } + + private void init() { + List clusters = _helixAdmin.getClusters(); + for (String cluster : clusters) { + PerClusterDataProvider clusterDataProvider = + new PerClusterDataProvider(cluster, _zkclient, new ZkBaseDataAccessor(_zkclient)); + clusterDataProvider.initCache(); + // register callback handler for each provider + _restServiceDataProvider.addClusterDataProvider(cluster, clusterDataProvider); + } + } + + public void close() { + } +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/PerClusterDataProvider.java b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/PerClusterDataProvider.java new file mode 100644 index 0000000000..adcde81f28 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/PerClusterDataProvider.java @@ -0,0 +1,91 @@ +package org.apache.helix.rest.common.dataprovider; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.common.caches.PropertyCache; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ClusterConstraints; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; + + +/** + * Dara cache for each Helix cluster. Configs, ideal stats and current states are read from ZK and updated + * using event changes. External view are consolidated using current state. + */ +public class PerClusterDataProvider { + + private HelixDataAccessor _accessor; + + private RealmAwareZkClient _zkclient; + + private final String _clusterName; + + // Simple caches + private final RestPropertyCache _instanceConfigCache; + private final RestPropertyCache _clusterConfigCache; + private final RestPropertyCache _resourceConfigCache; + private final RestPropertyCache _liveInstanceCache; + private final RestPropertyCache _idealStateCache; + private final RestPropertyCache _stateModelDefinitionCache; + + // special caches + private final RestCurrentStateCache _currentStateCache; + + // TODO: add external view caches + + public PerClusterDataProvider(String clusterName, RealmAwareZkClient zkClient, BaseDataAccessor baseDataAccessor) { + _clusterName = clusterName; + _accessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor); + + _zkclient = zkClient; + _instanceConfigCache = null; + _clusterConfigCache = null; + _resourceConfigCache = null; + _liveInstanceCache = null; + _idealStateCache = null; + _stateModelDefinitionCache = null; + _currentStateCache = null; + } + // TODO: consolidate EV from CSs + public Map consolidateExternalViews() { + return null; + } + + // Used for dummy cache. Remove later + public void initCache(final HelixDataAccessor accessor) { + + } + + public void initCache() { + + } +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestCurrentStateCache.java b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestCurrentStateCache.java new file mode 100644 index 0000000000..c92f4ce5b7 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestCurrentStateCache.java @@ -0,0 +1,42 @@ +package org.apache.helix.rest.common.dataprovider; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.model.CurrentState; + + +/** + * Special cache for instances current states. + * + */ +public class RestCurrentStateCache { + + //Map> + private ConcurrentHashMap> _objCache; + + public RestCurrentStateCache() { + _objCache = new ConcurrentHashMap<>(); + } + + public void init(final HelixDataAccessor accessor) { + } +} \ No newline at end of file diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestPropertyCache.java b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestPropertyCache.java new file mode 100644 index 0000000000..7cc129f136 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestPropertyCache.java @@ -0,0 +1,82 @@ +package org.apache.helix.rest.common.dataprovider; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.common.caches.PropertyCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class for caching simple HelixProperty objects. + * @param + */ +public class RestPropertyCache { + private static final Logger LOG = LoggerFactory.getLogger(RestPropertyCache.class); + + private ConcurrentHashMap _objCache; + private final String _propertyDescription; + + private final RestPropertyCache.PropertyCacheKeyFuncs _keyFuncs; + + public interface PropertyCacheKeyFuncs { + /** + * Get PropertyKey for the root of this type of object, used for LIST all objects + * @return property key to object root + */ + PropertyKey getRootKey(HelixDataAccessor accessor); + + /** + * Get PropertyKey for a single object of this type, used for GET single instance of the type + * @param objName object name + * @return property key to the object instance + */ + PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName); + + /** + * Get the string to identify the object when we actually use them. It's not necessarily the + * "id" field of HelixProperty, but could have more semantic meanings of that object type + * @param obj object instance + * @return object identifier + */ + String getObjName(O obj); + } + + public RestPropertyCache(String propertyDescription, RestPropertyCache.PropertyCacheKeyFuncs keyFuncs) { + _keyFuncs = keyFuncs; + _propertyDescription = propertyDescription; + } + + public void init(final HelixDataAccessor accessor) { + _objCache = new ConcurrentHashMap<>(accessor.getChildValuesMap(_keyFuncs.getRootKey(accessor), true)); + LOG.info("Init RestPropertyCache for {}. ", _propertyDescription); + } + + public Map getPropertyMap() { + return Collections.unmodifiableMap(_objCache); + } + +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestServiceDataProvider.java b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestServiceDataProvider.java new file mode 100644 index 0000000000..d880dbf7ce --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestServiceDataProvider.java @@ -0,0 +1,43 @@ +package org.apache.helix.rest.common.dataprovider; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; + + +public class RestServiceDataProvider { + protected Map _clusterDataProviders ; + + + public RestServiceDataProvider() { + _clusterDataProviders = new HashMap<>(); + } + + public PerClusterDataProvider getClusterData(String clusterName) { + return _clusterDataProviders.get(clusterName); + } + + public void addClusterDataProvider(String clusterName, PerClusterDataProvider clusterData) { + _clusterDataProviders.put(clusterName, clusterData); + } + + +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java index d355db0fe8..3ab0b97f58 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java @@ -35,6 +35,8 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkBucketDataAccessor; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; +import org.apache.helix.rest.common.dataprovider.HelixRestDataProviderManager; +import org.apache.helix.rest.common.dataprovider.RestServiceDataProvider; import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory; import org.apache.helix.task.TaskDriver; import org.apache.helix.tools.ClusterSetup; @@ -80,6 +82,8 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat // Create ZkBucketDataAccessor for ReadOnlyWagedRebalancer. private volatile ZkBucketDataAccessor _zkBucketDataAccessor; + private volatile HelixRestDataProviderManager _helixRestDataProviderManager; + /** * Multi-ZK support */ @@ -292,6 +296,17 @@ public ZkBucketDataAccessor getZkBucketDataAccessor() { return _zkBucketDataAccessor; } + public HelixRestDataProviderManager getHelixRestDataProviderManager() { + if (_helixRestDataProviderManager == null) { + synchronized (this) { + if (_helixRestDataProviderManager == null) { + _helixRestDataProviderManager = new HelixRestDataProviderManager(getRealmAwareZkClient(), getHelixAdmin()); + } + } + } + return _helixRestDataProviderManager; + } + public void close() { if (_zkClient != null) { _zkClient.close(); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java index ac0240d5f5..7abbd23599 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java @@ -31,6 +31,7 @@ import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.helix.zookeeper.zkclient.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.RecursivePersistListener; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException; import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; @@ -164,6 +165,18 @@ void subscribeStateChanges( void unsubscribeStateChanges( org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener); + default void subscribePersistRecursiveListener(String path, + RecursivePersistListener recursivePersistListener) { + throw new UnsupportedOperationException( + "subscribePersistRecursiveListener() is not supported!"); + } + + default void unsubscribePersistRecursiveListener(String path, + RecursivePersistListener recursivePersistListener) { + throw new UnsupportedOperationException( + "subscribePersistRecursiveListener() is not supported!"); + } + void unsubscribeAll(); // data access diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 8dba7d81ef..143300d3b1 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -35,6 +35,7 @@ import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkConnection; import org.apache.helix.zookeeper.zkclient.IZkDataListener; +import org.apache.helix.zookeeper.zkclient.RecursivePersistListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener; @@ -156,6 +157,16 @@ public void unsubscribeStateChanges(IZkStateListener listener) { _rawZkClient.unsubscribeStateChanges(listener); } + @Override + public void subscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { + + } + + @Override + public void unsubscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { + + } + @Override public void unsubscribeAll() { _rawZkClient.unsubscribeAll(); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java index 8069985e3f..d0ddae18f4 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java @@ -38,6 +38,7 @@ import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.helix.zookeeper.zkclient.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.RecursivePersistListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; @@ -158,6 +159,16 @@ public void unsubscribeStateChanges( throwUnsupportedOperationException(); } + @Override + public void subscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { + + } + + @Override + public void unsubscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { + + } + @Override public void unsubscribeAll() { _zkRealmToZkClientMap.values().forEach(ZkClient::unsubscribeAll); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java index 1dd601c21e..3ff9dc0581 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/ZkClient.java @@ -22,6 +22,7 @@ import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.exception.ZkClientException; import org.apache.helix.zookeeper.zkclient.IZkConnection; +import org.apache.helix.zookeeper.zkclient.RecursivePersistListener; import org.apache.helix.zookeeper.zkclient.ZkConnection; import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; From cca528a7b787d8131f0cc064621b63111e0b8b6a Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Thu, 12 Oct 2023 13:46:54 -0700 Subject: [PATCH 3/4] client change --- .../HelixRestDataProviderManager.java | 68 -------------- .../dataprovider/PerClusterDataProvider.java | 91 ------------------- .../dataprovider/RestCurrentStateCache.java | 42 --------- .../dataprovider/RestPropertyCache.java | 82 ----------------- .../dataprovider/RestServiceDataProvider.java | 43 --------- .../helix/rest/server/ServerContext.java | 13 --- 6 files changed, 339 deletions(-) delete mode 100644 helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/HelixRestDataProviderManager.java delete mode 100644 helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/PerClusterDataProvider.java delete mode 100644 helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestCurrentStateCache.java delete mode 100644 helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestPropertyCache.java delete mode 100644 helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestServiceDataProvider.java diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/HelixRestDataProviderManager.java b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/HelixRestDataProviderManager.java deleted file mode 100644 index 58d59fc87d..0000000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/HelixRestDataProviderManager.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.apache.helix.rest.common.dataprovider; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** Init, register listener and manager callback handler for different - * clusters manage the providers lifecycle - * - */ - -import java.util.List; -import org.apache.helix.HelixAdmin; -import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; - - -public class HelixRestDataProviderManager { - - protected RealmAwareZkClient _zkclient; - - private HelixAdmin _helixAdmin; - - private RestServiceDataProvider _restServiceDataProvider; - // list of callback handlers - - //TODO: create own zk client - public HelixRestDataProviderManager(RealmAwareZkClient zkclient, HelixAdmin helixAdmin) { - _zkclient = zkclient; - _helixAdmin = helixAdmin; - _restServiceDataProvider = new RestServiceDataProvider(); - init(); - } - - public RestServiceDataProvider getRestServiceDataProvider() { - return _restServiceDataProvider; - } - - private void init() { - List clusters = _helixAdmin.getClusters(); - for (String cluster : clusters) { - PerClusterDataProvider clusterDataProvider = - new PerClusterDataProvider(cluster, _zkclient, new ZkBaseDataAccessor(_zkclient)); - clusterDataProvider.initCache(); - // register callback handler for each provider - _restServiceDataProvider.addClusterDataProvider(cluster, clusterDataProvider); - } - } - - public void close() { - } -} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/PerClusterDataProvider.java b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/PerClusterDataProvider.java deleted file mode 100644 index adcde81f28..0000000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/PerClusterDataProvider.java +++ /dev/null @@ -1,91 +0,0 @@ -package org.apache.helix.rest.common.dataprovider; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Map; -import org.apache.helix.BaseDataAccessor; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.PropertyKey; -import org.apache.helix.common.caches.PropertyCache; -import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.ClusterConstraints; -import org.apache.helix.model.CurrentState; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.ResourceConfig; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; - - -/** - * Dara cache for each Helix cluster. Configs, ideal stats and current states are read from ZK and updated - * using event changes. External view are consolidated using current state. - */ -public class PerClusterDataProvider { - - private HelixDataAccessor _accessor; - - private RealmAwareZkClient _zkclient; - - private final String _clusterName; - - // Simple caches - private final RestPropertyCache _instanceConfigCache; - private final RestPropertyCache _clusterConfigCache; - private final RestPropertyCache _resourceConfigCache; - private final RestPropertyCache _liveInstanceCache; - private final RestPropertyCache _idealStateCache; - private final RestPropertyCache _stateModelDefinitionCache; - - // special caches - private final RestCurrentStateCache _currentStateCache; - - // TODO: add external view caches - - public PerClusterDataProvider(String clusterName, RealmAwareZkClient zkClient, BaseDataAccessor baseDataAccessor) { - _clusterName = clusterName; - _accessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor); - - _zkclient = zkClient; - _instanceConfigCache = null; - _clusterConfigCache = null; - _resourceConfigCache = null; - _liveInstanceCache = null; - _idealStateCache = null; - _stateModelDefinitionCache = null; - _currentStateCache = null; - } - // TODO: consolidate EV from CSs - public Map consolidateExternalViews() { - return null; - } - - // Used for dummy cache. Remove later - public void initCache(final HelixDataAccessor accessor) { - - } - - public void initCache() { - - } -} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestCurrentStateCache.java b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestCurrentStateCache.java deleted file mode 100644 index c92f4ce5b7..0000000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestCurrentStateCache.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.apache.helix.rest.common.dataprovider; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.concurrent.ConcurrentHashMap; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.model.CurrentState; - - -/** - * Special cache for instances current states. - * - */ -public class RestCurrentStateCache { - - //Map> - private ConcurrentHashMap> _objCache; - - public RestCurrentStateCache() { - _objCache = new ConcurrentHashMap<>(); - } - - public void init(final HelixDataAccessor accessor) { - } -} \ No newline at end of file diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestPropertyCache.java b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestPropertyCache.java deleted file mode 100644 index 7cc129f136..0000000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestPropertyCache.java +++ /dev/null @@ -1,82 +0,0 @@ -package org.apache.helix.rest.common.dataprovider; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixProperty; -import org.apache.helix.PropertyKey; -import org.apache.helix.common.caches.PropertyCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Class for caching simple HelixProperty objects. - * @param - */ -public class RestPropertyCache { - private static final Logger LOG = LoggerFactory.getLogger(RestPropertyCache.class); - - private ConcurrentHashMap _objCache; - private final String _propertyDescription; - - private final RestPropertyCache.PropertyCacheKeyFuncs _keyFuncs; - - public interface PropertyCacheKeyFuncs { - /** - * Get PropertyKey for the root of this type of object, used for LIST all objects - * @return property key to object root - */ - PropertyKey getRootKey(HelixDataAccessor accessor); - - /** - * Get PropertyKey for a single object of this type, used for GET single instance of the type - * @param objName object name - * @return property key to the object instance - */ - PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName); - - /** - * Get the string to identify the object when we actually use them. It's not necessarily the - * "id" field of HelixProperty, but could have more semantic meanings of that object type - * @param obj object instance - * @return object identifier - */ - String getObjName(O obj); - } - - public RestPropertyCache(String propertyDescription, RestPropertyCache.PropertyCacheKeyFuncs keyFuncs) { - _keyFuncs = keyFuncs; - _propertyDescription = propertyDescription; - } - - public void init(final HelixDataAccessor accessor) { - _objCache = new ConcurrentHashMap<>(accessor.getChildValuesMap(_keyFuncs.getRootKey(accessor), true)); - LOG.info("Init RestPropertyCache for {}. ", _propertyDescription); - } - - public Map getPropertyMap() { - return Collections.unmodifiableMap(_objCache); - } - -} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestServiceDataProvider.java b/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestServiceDataProvider.java deleted file mode 100644 index d880dbf7ce..0000000000 --- a/helix-rest/src/main/java/org/apache/helix/rest/common/dataprovider/RestServiceDataProvider.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.apache.helix.rest.common.dataprovider; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.HashMap; -import java.util.Map; - - -public class RestServiceDataProvider { - protected Map _clusterDataProviders ; - - - public RestServiceDataProvider() { - _clusterDataProviders = new HashMap<>(); - } - - public PerClusterDataProvider getClusterData(String clusterName) { - return _clusterDataProviders.get(clusterName); - } - - public void addClusterDataProvider(String clusterName, PerClusterDataProvider clusterData) { - _clusterDataProviders.put(clusterName, clusterData); - } - - -} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java index 3ab0b97f58..6b2bfff370 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java @@ -35,8 +35,6 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkBucketDataAccessor; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; -import org.apache.helix.rest.common.dataprovider.HelixRestDataProviderManager; -import org.apache.helix.rest.common.dataprovider.RestServiceDataProvider; import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory; import org.apache.helix.task.TaskDriver; import org.apache.helix.tools.ClusterSetup; @@ -82,7 +80,6 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat // Create ZkBucketDataAccessor for ReadOnlyWagedRebalancer. private volatile ZkBucketDataAccessor _zkBucketDataAccessor; - private volatile HelixRestDataProviderManager _helixRestDataProviderManager; /** * Multi-ZK support @@ -296,16 +293,6 @@ public ZkBucketDataAccessor getZkBucketDataAccessor() { return _zkBucketDataAccessor; } - public HelixRestDataProviderManager getHelixRestDataProviderManager() { - if (_helixRestDataProviderManager == null) { - synchronized (this) { - if (_helixRestDataProviderManager == null) { - _helixRestDataProviderManager = new HelixRestDataProviderManager(getRealmAwareZkClient(), getHelixAdmin()); - } - } - } - return _helixRestDataProviderManager; - } public void close() { if (_zkClient != null) { From e6eb423aaa0414f681f11a1688838b7d54d89c75 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Thu, 12 Oct 2023 22:24:03 -0700 Subject: [PATCH 4/4] add test --- .../api/client/RealmAwareZkClient.java | 11 ++++ .../impl/client/DedicatedZkClient.java | 13 ++-- .../impl/client/FederatedZkClient.java | 10 +-- .../impl/client/TestFederatedZkClient.java | 66 +++++++++++++++++++ 4 files changed, 89 insertions(+), 11 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java index 7abbd23599..5961b96415 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java @@ -577,6 +577,8 @@ class RealmAwareZkClientConfig { protected String _monitorInstanceName = null; protected boolean _monitorRootPathOnly = true; + protected boolean _usePersistWatcher = false; + public RealmAwareZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) { this._zkSerializer = zkSerializer; return this; @@ -632,6 +634,11 @@ public RealmAwareZkClientConfig setConnectInitTimeout(long _connectInitTimeout) return this; } + public RealmAwareZkClientConfig setUsePersistWatcher(boolean usePersistWatcher) { + this._usePersistWatcher = usePersistWatcher; + return this; + } + public PathBasedZkSerializer getZkSerializer() { if (_zkSerializer == null) { _zkSerializer = new BasicZkSerializer(new SerializableSerializer()); @@ -663,6 +670,10 @@ public long getConnectInitTimeout() { return _connectInitTimeout; } + public boolean isUsePersistWatcher() { + return _usePersistWatcher; + } + /** * Create HelixZkClient.ZkClientConfig based on RealmAwareZkClientConfig. * @return diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index 143300d3b1..ffa07ebd83 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -105,10 +105,11 @@ public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connect new ZkConnection(zkRealmAddress, connectionConfig.getSessionTimeout()); // Create a ZkClient - _rawZkClient = new ZkClient(zkConnection, (int) clientConfig.getConnectInitTimeout(), - clientConfig.getOperationRetryTimeout(), clientConfig.getZkSerializer(), - clientConfig.getMonitorType(), clientConfig.getMonitorKey(), - clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly()); + _rawZkClient = + new ZkClient(zkConnection, (int) clientConfig.getConnectInitTimeout(), clientConfig.getOperationRetryTimeout(), + clientConfig.getZkSerializer(), clientConfig.getMonitorType(), clientConfig.getMonitorKey(), + clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly(), true, + clientConfig.isUsePersistWatcher()); } @Override @@ -159,12 +160,12 @@ public void unsubscribeStateChanges(IZkStateListener listener) { @Override public void subscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { - + _rawZkClient.subscribePersistRecursiveListener(path, recursivePersistListener); } @Override public void unsubscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { - + _rawZkClient.unsubscribePersistRecursiveListener(path, recursivePersistListener); } @Override diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java index d0ddae18f4..ca4551f804 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java @@ -161,12 +161,12 @@ public void unsubscribeStateChanges( @Override public void subscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { - + getZkClient(path).subscribePersistRecursiveListener(path, recursivePersistListener); } @Override public void unsubscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) { - + getZkClient(path).unsubscribePersistRecursiveListener(path, recursivePersistListener); } @Override @@ -711,9 +711,9 @@ private String updateRoutingDataOnCacheMiss(String path) throws InvalidRoutingDa private ZkClient createZkClient(String zkAddress) { LOG.debug("Creating ZkClient for realm: {}.", zkAddress); return new ZkClient(new ZkConnection(zkAddress), (int) _clientConfig.getConnectInitTimeout(), - _clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer, - _clientConfig.getMonitorType(), _clientConfig.getMonitorKey(), - _clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly()); + _clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer, _clientConfig.getMonitorType(), + _clientConfig.getMonitorKey(), _clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly(), + true, _clientConfig.isUsePersistWatcher()); } private void checkClosedState() { diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java index 30335efb13..65bc2a7f0e 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java @@ -26,8 +26,10 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants; import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; @@ -39,7 +41,9 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; import org.apache.helix.zookeeper.routing.RoutingDataManager; +import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkStateListener; +import org.apache.helix.zookeeper.zkclient.RecursivePersistListener; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; import org.testng.Assert; @@ -648,6 +652,68 @@ public void testClose() { } } + @Test(dependsOnMethods = "testClose") + public void testFederatedZkClientWithPersistListener() throws InvalidRoutingDataException, InterruptedException { + RealmAwareZkClient realmAwareZkClient = + new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), + new RealmAwareZkClient.RealmAwareZkClientConfig().setUsePersistWatcher(true)); + int count = 100; + final AtomicInteger[] event_count = {new AtomicInteger(0)}; + final AtomicInteger[] event_count2 = {new AtomicInteger(0)}; + // for each iteration, we will edit a node, create a child, create a grand child, and + // delete child. Expect 4 event per iteration. -> total event should be count*4 + CountDownLatch countDownLatch1 = new CountDownLatch(count * 4); + CountDownLatch countDownLatch2 = new CountDownLatch(count); + realmAwareZkClient.createPersistent(TEST_VALID_PATH, true); + String path = TEST_VALID_PATH + "/testFederatedZkClientWithPersistListener"; + RecursivePersistListener rcListener = new RecursivePersistListener() { + @Override + public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType) throws Exception { + countDownLatch1.countDown(); + event_count[0].incrementAndGet(); + } + }; + realmAwareZkClient.create(path, "datat", CreateMode.PERSISTENT); + realmAwareZkClient.subscribePersistRecursiveListener(path, rcListener); + for (int i = 0; i < count; ++i) { + realmAwareZkClient.writeData(path, "data7" + i, -1); + realmAwareZkClient.create(path + "/c1_" + i, "datat", CreateMode.PERSISTENT); + realmAwareZkClient.create(path + "/c1_" + i + "/c2", "datat", CreateMode.PERSISTENT); + realmAwareZkClient.delete(path + "/c1_" + i + "/c2"); + } + Assert.assertTrue(countDownLatch1.await(50000000, TimeUnit.MILLISECONDS)); + + // subscribe a persist child watch, it should throw exception + IZkChildListener childListener2 = new IZkChildListener() { + @Override + public void handleChildChange(String parentPath, List currentChilds) throws Exception { + countDownLatch2.countDown(); + event_count2[0].incrementAndGet(); + } + }; + try { + realmAwareZkClient.subscribeChildChanges(path, childListener2, false); + } catch (Exception ex) { + Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException"); + } + + // unsubscribe recursive persist watcher, and subscribe persist watcher should success. + realmAwareZkClient.unsubscribePersistRecursiveListener(path, rcListener); + realmAwareZkClient.subscribeChildChanges(path, childListener2, false); + // we should only get 100 event since only 100 direct child change. + for (int i = 0; i < count; ++i) { + realmAwareZkClient.writeData(path, "data7" + i, -1); + realmAwareZkClient.create(path + "/c2_" + i, "datat", CreateMode.PERSISTENT); + realmAwareZkClient.create(path + "/c2_" + i + "/c3", "datat", CreateMode.PERSISTENT); + realmAwareZkClient.delete(path + "/c2_" + i + "/c3"); + } + Assert.assertTrue(countDownLatch2.await(50000000, TimeUnit.MILLISECONDS)); + + realmAwareZkClient.deleteRecursively(TEST_VALID_PATH); + realmAwareZkClient.close(); + } + + @Override public void testMultiSetup() throws InvalidRoutingDataException { super.testMultiSetup();