Skip to content

Commit

Permalink
Children Cache Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mapeng committed Sep 28, 2023
1 parent 0555e93 commit e3d38c3
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

public interface MetaClientCacheInterface<T> extends MetaClientInterface<T> {

/**
* TrieNode class to store the children of the entries to be cached.
*/
class TrieNode {
public class TrieNode {
// A mapping between trie key and children nodes.
private Map<String, TrieNode> _children;

Expand All @@ -36,7 +37,7 @@ class TrieNode {

private final String _nodeKey;

TrieNode(String path, String nodeKey) {
public TrieNode(String path, String nodeKey) {
_path = path;
_nodeKey = nodeKey;
_children = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ public class MetaClientCacheConfig {
private final String _rootEntry;
private boolean _cacheData = false;
private boolean _cacheChildren = false;
private boolean _lazyCaching = true;

public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren, boolean lazyCaching) {
_rootEntry = rootEntry;
_cacheData = cacheData;
_cacheChildren = cacheChildren;
_lazyCaching = lazyCaching;
}

public String getRootEntry() {
Expand All @@ -46,7 +44,4 @@ public boolean getCacheChildren() {
return _cacheChildren;
}

public boolean getLazyCaching() {
return _lazyCaching;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,7 @@ public MetaClientInterface getMetaClient(MetaClientConfig config) {
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
return new ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config));
}
return null;
}
Expand All @@ -56,14 +50,17 @@ public MetaClientCacheInterface getMetaClientCache(MetaClientConfig config, Meta
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
return new ZkMetaClientFactory().getMetaClientCache(zkMetaClientConfig, cacheConfig);
return new ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config), cacheConfig);
}
return null;
}

private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config) {
return new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,42 @@

import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.MetaClientCacheInterface;
import org.apache.helix.metaclient.datamodel.DataRecord;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
import org.apache.helix.metaclient.recipes.lock.LockInfoSerializer;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;

public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientCacheInterface<T> {

private Map<String, DataRecord> _dataCacheMap;
private ConcurrentHashMap<String, T> _dataCacheMap;
private final String _rootEntry;
private TrieNode _childrenCacheTree;
private ChildChangeListener _eventListener;
private boolean _cacheData;
private boolean _cacheChildren;
private boolean _lazyCaching;
private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClientCache.class);
private ZkClient _cacheClient;
private ExecutorService executor;

// TODO: Look into using conditional variable instead of latch.
private final CountDownLatch _initializedCache = new CountDownLatch(1);

/**
* Constructor for ZkMetaClientCache.
Expand All @@ -56,39 +67,229 @@ public ZkMetaClientCache(ZkMetaClientConfig config, MetaClientCacheConfig cacheC
super(config);
_cacheClient = getZkClient();
_rootEntry = cacheConfig.getRootEntry();
_lazyCaching = cacheConfig.getLazyCaching();
_cacheData = cacheConfig.getCacheData();
_cacheChildren = cacheConfig.getCacheChildren();

if (_cacheData) {
_dataCacheMap = new ConcurrentHashMap<>();
}
if (_cacheChildren) {
_childrenCacheTree = new TrieNode(_rootEntry, _rootEntry.substring(1));
}
}

/**
* Get data for a given key.
* If datacache is enabled, will fetch for cache. If it doesn't exist
* returns null (for when initial populating cache is in progress).
* @param key key to identify the entry
* @return data for the key
*/
@Override
public Stat exists(String key) {
throw new MetaClientException("Not implemented yet.");
public T get(final String key) {
if (_cacheData) {
T data = getDataCacheMap().get(key);
if (data == null) {
LOG.debug("Data not found in cache for key: {}. This could be because the cache is still being populated.", key);
}
return data;
}
return super.get(key);
}

@Override
public T get(final String key) {
throw new MetaClientException("Not implemented yet.");
public List<T> get(List<String> keys) {
List<T> dataList = new ArrayList<>();
for (String key : keys) {
dataList.add(get(key));
}
return dataList;
}

/**
* Get the direct children for a given key.
* @param key For metadata storage that has hierarchical key space (e.g. ZK), the key would be
* a parent key,
* For metadata storage that has non-hierarchical key space (e.g. etcd), the key would
* be a prefix key.
* @return list of direct children or null if key doesn't exist / cache is not populated yet.
*/
@Override
public List<String> getDirectChildrenKeys(final String key) {
throw new MetaClientException("Not implemented yet.");
if (_cacheChildren) {
TrieNode node = getTree(key);
if (node == null) {
LOG.debug("Children not found in cache for key: {}. This could be because the cache is still being populated.", key);
return null;
}
return new ArrayList<>(node.getChildren().keySet());
}
return super.getDirectChildrenKeys(key);
}

/**
* Get the number of direct children for a given key.
* @param key For metadata storage that has hierarchical key space (e.g. ZK), the key would be
* a parent key,
* For metadata storage that has non-hierarchical key space (e.g. etcd), the key would
* be a prefix key.
* @return number of direct children or 0 if key doesn't exist / has no children / cache is not populated yet.
*/
@Override
public int countDirectChildren(final String key) {
throw new MetaClientException("Not implemented yet.");
if (_cacheChildren) {
TrieNode node = getTree(key);
if (node == null) {
LOG.debug("Children not found in cache for key: {}. This could be because the cache is still being populated.", key);
return 0;
}
return node.getChildren().size();
}
return super.countDirectChildren(key);
}

@Override
public List<T> get(List<String> keys) {
throw new MetaClientException("Not implemented yet.");
private TrieNode getTree(String path) {
String[] pathComponents = path.split("/");
TrieNode currentNode = _childrenCacheTree;
for (int i = 1; i < pathComponents.length; i++) {
String component = pathComponents[i];
if (!currentNode.getChildren().containsKey(component)) {
return currentNode;
} else {
currentNode = currentNode.getChildren().get(component);
}
}
return currentNode;
}

@Override
public List<Stat> exists(List<String> keys) {
throw new MetaClientException("Not implemented yet.");
private void updateCache(String path, boolean isCreate) {
if (_cacheChildren) {
String[] pathComponents = path.split("/");
TrieNode currentNode = _childrenCacheTree;
TrieNode previousNode = null;
for (int i = 1; i < pathComponents.length; i++) {
String component = pathComponents[i];
if (component.equals(_childrenCacheTree.getNodeKey())) {
// Skip the root node
}
else if (!currentNode.getChildren().containsKey(component)) {
if (isCreate) {
TrieNode newNode = new TrieNode(currentNode.getPath() + "/" + component, component);
currentNode.addChild(component, newNode);
previousNode = currentNode;
currentNode = newNode;
} else {
return;
}
} else {
previousNode = currentNode;
currentNode = currentNode.getChildren().get(component);
}
}
if (!isCreate && previousNode != null) {
previousNode.getChildren().remove(currentNode.getNodeKey());
}
}
}

private void populateAllCache() {
// TODO: Concurrently populate children and data cache.
if (!_cacheClient.exists(_rootEntry)) {
LOG.warn("Root entry: {} does not exist.", _rootEntry);
// Let the other threads know that the cache is populated.
_initializedCache.countDown();
return;
}

Queue<String> queue = new ArrayDeque<>();
queue.add(_rootEntry);

while (!queue.isEmpty()) {
String node = queue.poll();
if (_cacheData) {
T dataRecord = _cacheClient.readData(node, true);
_dataCacheMap.put(node, dataRecord);
}
if (_cacheChildren) {
updateCache(node, true);
}
queue.addAll(_cacheClient.getChildren(node));
}
// Let the other threads know that the cache is populated.
_initializedCache.countDown();
}

private class CacheUpdateRunnable implements Runnable {
private final String path;
private final ChildChangeListener.ChangeType changeType;

public CacheUpdateRunnable(String path, ChildChangeListener.ChangeType changeType) {
this.path = path;
this.changeType = changeType;
}

@Override
public void run() {
waitForPopulateAllCache();
// TODO: HANDLE DEDUP EVENT CHANGES
switch (changeType) {
case ENTRY_CREATED:
// Not implemented yet.
updateCache(path, true);
modifyDataInCache(path, false);
break;
case ENTRY_DELETED:
updateCache(path, false);
modifyDataInCache(path, true);
break;
case ENTRY_DATA_CHANGE:
modifyDataInCache(path, false);
break;
default:
LOG.error("Unknown change type: " + changeType);
}
}
}

private void waitForPopulateAllCache() {
try {
_initializedCache.await();
} catch (InterruptedException e) {
throw new MetaClientException("Interrupted while waiting for cache to populate.", e);
}
}

private void modifyDataInCache(String path, Boolean isDelete) {
if (_cacheData) {
if (isDelete) {
getDataCacheMap().remove(path);
} else {
T dataRecord = _cacheClient.readData(path, true);
getDataCacheMap().put(path, dataRecord);
}
}
}

public ConcurrentHashMap<String, T> getDataCacheMap() {
return _dataCacheMap;
}

public TrieNode getChildrenCacheTree() {
return _childrenCacheTree;
}

/**
* Connect to the underlying ZkClient.
*/
@Override
public void connect() {
super.connect();
_eventListener = (path, changeType) -> {
Runnable cacheUpdateRunnable = new CacheUpdateRunnable(path, changeType);
executor.execute(cacheUpdateRunnable);
};
executor = Executors.newSingleThreadExecutor();
_cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener));
populateAllCache();
}
}
Loading

0 comments on commit e3d38c3

Please sign in to comment.