diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java index 348bd09292f..c8a1d6aef26 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java @@ -21,13 +21,14 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; public interface MetaClientCacheInterface extends MetaClientInterface { /** * TrieNode class to store the children of the entries to be cached. */ - class TrieNode { + public class TrieNode { // A mapping between trie key and children nodes. private Map _children; @@ -36,7 +37,7 @@ class TrieNode { private final String _nodeKey; - TrieNode(String path, String nodeKey) { + public TrieNode(String path, String nodeKey) { _path = path; _nodeKey = nodeKey; _children = new HashMap<>(); diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java index 9e032360198..07972945a20 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java @@ -25,13 +25,11 @@ public class MetaClientCacheConfig { private final String _rootEntry; private boolean _cacheData = false; private boolean _cacheChildren = false; - private boolean _lazyCaching = true; public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren, boolean lazyCaching) { _rootEntry = rootEntry; _cacheData = cacheData; _cacheChildren = cacheChildren; - _lazyCaching = lazyCaching; } public String getRootEntry() { @@ -46,7 +44,4 @@ public boolean getCacheChildren() { return _cacheChildren; } - public boolean getLazyCaching() { - return _lazyCaching; - } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java index ebb4549dae4..7cc86a8a98e 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java @@ -40,13 +40,7 @@ public MetaClientInterface getMetaClient(MetaClientConfig config) { throw new IllegalArgumentException("MetaClientConfig cannot be null."); } if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) { - ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder(). - setConnectionAddress(config.getConnectionAddress()) - .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy()) - .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis()) - .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis()) - .build(); - return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig); + return new ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config)); } return null; } @@ -56,14 +50,17 @@ public MetaClientCacheInterface getMetaClientCache(MetaClientConfig config, Meta throw new IllegalArgumentException("MetaClientConfig cannot be null."); } if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) { - ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder(). - setConnectionAddress(config.getConnectionAddress()) - .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy()) - .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis()) - .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis()) - .build(); - return new ZkMetaClientFactory().getMetaClientCache(zkMetaClientConfig, cacheConfig); + return new ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config), cacheConfig); } return null; } + + private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config) { + return new ZkMetaClientConfig.ZkMetaClientConfigBuilder(). + setConnectionAddress(config.getConnectionAddress()) + .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy()) + .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis()) + .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis()) + .build(); + } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java index af1c9d79158..724a9f52152 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java @@ -21,31 +21,42 @@ import org.apache.helix.metaclient.api.ChildChangeListener; import org.apache.helix.metaclient.api.MetaClientCacheInterface; -import org.apache.helix.metaclient.datamodel.DataRecord; import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.factories.MetaClientCacheConfig; -import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; -import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory; -import org.apache.helix.metaclient.recipes.lock.LockInfoSerializer; import org.apache.helix.zookeeper.zkclient.ZkClient; +import org.apache.helix.zookeeper.zkclient.exception.ZkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.Queue; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException; public class ZkMetaClientCache extends ZkMetaClient implements MetaClientCacheInterface { - private Map _dataCacheMap; + private ConcurrentHashMap _dataCacheMap; private final String _rootEntry; private TrieNode _childrenCacheTree; private ChildChangeListener _eventListener; private boolean _cacheData; private boolean _cacheChildren; - private boolean _lazyCaching; private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClientCache.class); private ZkClient _cacheClient; + private ExecutorService executor; + + // TODO: Look into using conditional variable instead of latch. + private final CountDownLatch _initializedCache = new CountDownLatch(1); /** * Constructor for ZkMetaClientCache. @@ -56,39 +67,229 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC super(config); _cacheClient = getZkClient(); _rootEntry = cacheConfig.getRootEntry(); - _lazyCaching = cacheConfig.getLazyCaching(); _cacheData = cacheConfig.getCacheData(); _cacheChildren = cacheConfig.getCacheChildren(); + + if (_cacheData) { + _dataCacheMap = new ConcurrentHashMap<>(); + } + if (_cacheChildren) { + _childrenCacheTree = new TrieNode(_rootEntry, _rootEntry.substring(1)); + } } + /** + * Get data for a given key. + * If datacache is enabled, will fetch for cache. If it doesn't exist + * returns null (for when initial populating cache is in progress). + * @param key key to identify the entry + * @return data for the key + */ @Override - public Stat exists(String key) { - throw new MetaClientException("Not implemented yet."); + public T get(final String key) { + if (_cacheData) { + T data = getDataCacheMap().get(key); + if (data == null) { + LOG.debug("Data not found in cache for key: {}. This could be because the cache is still being populated.", key); + } + return data; + } + return super.get(key); } @Override - public T get(final String key) { - throw new MetaClientException("Not implemented yet."); + public List get(List keys) { + List dataList = new ArrayList<>(); + for (String key : keys) { + dataList.add(get(key)); + } + return dataList; } + /** + * Get the direct children for a given key. + * @param key For metadata storage that has hierarchical key space (e.g. ZK), the key would be + * a parent key, + * For metadata storage that has non-hierarchical key space (e.g. etcd), the key would + * be a prefix key. + * @return list of direct children or null if key doesn't exist / cache is not populated yet. + */ @Override public List getDirectChildrenKeys(final String key) { - throw new MetaClientException("Not implemented yet."); + if (_cacheChildren) { + TrieNode node = getTree(key); + if (node == null) { + LOG.debug("Children not found in cache for key: {}. This could be because the cache is still being populated.", key); + return null; + } + return new ArrayList<>(node.getChildren().keySet()); + } + return super.getDirectChildrenKeys(key); } + /** + * Get the number of direct children for a given key. + * @param key For metadata storage that has hierarchical key space (e.g. ZK), the key would be + * a parent key, + * For metadata storage that has non-hierarchical key space (e.g. etcd), the key would + * be a prefix key. + * @return number of direct children or 0 if key doesn't exist / has no children / cache is not populated yet. + */ @Override public int countDirectChildren(final String key) { - throw new MetaClientException("Not implemented yet."); + if (_cacheChildren) { + TrieNode node = getTree(key); + if (node == null) { + LOG.debug("Children not found in cache for key: {}. This could be because the cache is still being populated.", key); + return 0; + } + return node.getChildren().size(); + } + return super.countDirectChildren(key); } - @Override - public List get(List keys) { - throw new MetaClientException("Not implemented yet."); + private TrieNode getTree(String path) { + String[] pathComponents = path.split("/"); + TrieNode currentNode = _childrenCacheTree; + for (int i = 1; i < pathComponents.length; i++) { + String component = pathComponents[i]; + if (!currentNode.getChildren().containsKey(component)) { + return currentNode; + } else { + currentNode = currentNode.getChildren().get(component); + } + } + return currentNode; } - @Override - public List exists(List keys) { - throw new MetaClientException("Not implemented yet."); + private void updateCache(String path, boolean isCreate) { + if (_cacheChildren) { + String[] pathComponents = path.split("/"); + TrieNode currentNode = _childrenCacheTree; + TrieNode previousNode = null; + for (int i = 1; i < pathComponents.length; i++) { + String component = pathComponents[i]; + if (component.equals(_childrenCacheTree.getNodeKey())) { + // Skip the root node + } + else if (!currentNode.getChildren().containsKey(component)) { + if (isCreate) { + TrieNode newNode = new TrieNode(currentNode.getPath() + "/" + component, component); + currentNode.addChild(component, newNode); + previousNode = currentNode; + currentNode = newNode; + } else { + return; + } + } else { + previousNode = currentNode; + currentNode = currentNode.getChildren().get(component); + } + } + if (!isCreate && previousNode != null) { + previousNode.getChildren().remove(currentNode.getNodeKey()); + } + } + } + + private void populateAllCache() { + // TODO: Concurrently populate children and data cache. + if (!_cacheClient.exists(_rootEntry)) { + LOG.warn("Root entry: {} does not exist.", _rootEntry); + // Let the other threads know that the cache is populated. + _initializedCache.countDown(); + return; + } + + Queue queue = new ArrayDeque<>(); + queue.add(_rootEntry); + + while (!queue.isEmpty()) { + String node = queue.poll(); + if (_cacheData) { + T dataRecord = _cacheClient.readData(node, true); + _dataCacheMap.put(node, dataRecord); + } + if (_cacheChildren) { + updateCache(node, true); + } + queue.addAll(_cacheClient.getChildren(node)); + } + // Let the other threads know that the cache is populated. + _initializedCache.countDown(); + } + + private class CacheUpdateRunnable implements Runnable { + private final String path; + private final ChildChangeListener.ChangeType changeType; + + public CacheUpdateRunnable(String path, ChildChangeListener.ChangeType changeType) { + this.path = path; + this.changeType = changeType; + } + + @Override + public void run() { + waitForPopulateAllCache(); + // TODO: HANDLE DEDUP EVENT CHANGES + switch (changeType) { + case ENTRY_CREATED: + // Not implemented yet. + updateCache(path, true); + modifyDataInCache(path, false); + break; + case ENTRY_DELETED: + updateCache(path, false); + modifyDataInCache(path, true); + break; + case ENTRY_DATA_CHANGE: + modifyDataInCache(path, false); + break; + default: + LOG.error("Unknown change type: " + changeType); + } + } + } + + private void waitForPopulateAllCache() { + try { + _initializedCache.await(); + } catch (InterruptedException e) { + throw new MetaClientException("Interrupted while waiting for cache to populate.", e); + } + } + + private void modifyDataInCache(String path, Boolean isDelete) { + if (_cacheData) { + if (isDelete) { + getDataCacheMap().remove(path); + } else { + T dataRecord = _cacheClient.readData(path, true); + getDataCacheMap().put(path, dataRecord); + } + } + } + + public ConcurrentHashMap getDataCacheMap() { + return _dataCacheMap; + } + + public TrieNode getChildrenCacheTree() { + return _childrenCacheTree; } + /** + * Connect to the underlying ZkClient. + */ + @Override + public void connect() { + super.connect(); + _eventListener = (path, changeType) -> { + Runnable cacheUpdateRunnable = new CacheUpdateRunnable(path, changeType); + executor.execute(cacheUpdateRunnable); + }; + executor = Executors.newSingleThreadExecutor(); + _cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener)); + populateAllCache(); + } } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java index a3a5b4eee3a..c9685bb1346 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java @@ -19,39 +19,151 @@ * under the License. */ - import org.apache.helix.metaclient.factories.MetaClientCacheConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.List; + public class TestZkMetaClientCache extends ZkMetaClientTestBase { - private static final String PATH = "/Cache"; + private static final String DATA_PATH = "/data"; + private static final String DATA_VALUE = "testData"; @Test public void testCreateClient() { - final String key = "/TestZkMetaClientCache_testCreate"; - try (ZkMetaClient zkMetaClientCache = createZkMetaClientCache()) { + final String key = "/testCreate"; + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { zkMetaClientCache.connect(); // Perform some random non-read operation zkMetaClientCache.create(key, ENTRY_STRING_VALUE); + } + } + + @Test + public void testCacheDataUpdates() { + final String key = "/testCacheDataUpdates"; + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + zkMetaClientCache.create(key, "test"); + zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE); + // Get data for DATA_PATH and cache it + String data = zkMetaClientCache.get(key + DATA_PATH); + Assert.assertEquals(data, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH)); + + // Update data for DATA_PATH + String newData = zkMetaClientCache.update(key + DATA_PATH, currentData -> currentData + "1"); + + // Verify that cached data is updated. Might take some time + for (int i = 0; i < 10; i++) { + if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH).equals(newData)) { + break; + } + Thread.sleep(1000); + } + Assert.assertEquals(newData, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH)); + + zkMetaClientCache.delete(key + DATA_PATH); + // Verify that cached data is updated. Might take some time + for (int i = 0; i < 10; i++) { + if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH) == null) { + break; + } + Thread.sleep(1000); + } + + Assert.assertNull(zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testGetDirectChildrenKeys() { + final String key = "/testGetDirectChildrenKeys"; + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + zkMetaClientCache.create(key, ENTRY_STRING_VALUE); + zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE); + zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE); - try { - //Perform some read operation - should fail. - // TODO: Remove this once implemented. - zkMetaClientCache.get(key); - Assert.fail("Should have failed with non implemented yet."); - } catch (Exception ignored) { + for (int i = 0; i < 10; i++) { + if (zkMetaClientCache.getChildrenCacheTree().getChildren().size() == 2) { + break; + } + Thread.sleep(500); } + // Retrieve the direct children keys + List children = zkMetaClientCache.getDirectChildrenKeys(key); + + // Assert that the retrieved children keys match the expected ones + Assert.assertEquals(2, children.size()); + Assert.assertTrue(children.contains("child1")); + Assert.assertTrue(children.contains("child2")); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testCountDirectChildren() { + final String key = "/testCountDirectChildren"; + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + zkMetaClientCache.create(key, ENTRY_STRING_VALUE); + zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE); + zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE); + + for (int i = 0; i < 10; i++) { + if (zkMetaClientCache.getChildrenCacheTree().getChildren().size() == 2) { + break; + } + Thread.sleep(500); + } + + int count = zkMetaClientCache.countDirectChildren(key); + Assert.assertEquals(2, count); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testChildCacheUpdate() { + final String key = "/testChildCacheUpdate"; + try (ZkMetaClientCache zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) { + zkMetaClientCache.connect(); + zkMetaClientCache.create(key, ENTRY_STRING_VALUE); + zkMetaClientCache.create(key + "/child1", ENTRY_STRING_VALUE); + zkMetaClientCache.create(key + "/child2", ENTRY_STRING_VALUE); + for (int i = 0; i < 10; i++) { + if (zkMetaClientCache.getChildrenCacheTree().getChildren().size() == 2) { + break; + } + Thread.sleep(500); + } + Assert.assertEquals(zkMetaClientCache.getChildrenCacheTree().getChildren().size(), 2); + zkMetaClientCache.delete(key + "/child1"); + + for (int i = 0; i < 10; i++) { + if (zkMetaClientCache.getChildrenCacheTree().getChildren().size() == 1) { + break; + } + Thread.sleep(500); + } + Assert.assertEquals(zkMetaClientCache.getChildrenCacheTree().getChildren().size(), 1); + + } catch (InterruptedException e) { + throw new RuntimeException(e); } } - protected static ZkMetaClientCache createZkMetaClientCache() { + public ZkMetaClientCache createZkMetaClientCacheLazyCaching(String rootPath) { ZkMetaClientConfig config = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR) //.setZkSerializer(new TestStringSerializer()) .build(); - MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(PATH, true, true, true); + MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true, true); return new ZkMetaClientCache<>(config, cacheConfig); } }