Skip to content

Commit

Permalink
Merge feature branch (#2647)
Browse files Browse the repository at this point in the history
* MetaClientCache Part 1 - API's, configs, and builders (#2612)

MetaClientCache Part 1 - API's, configs, and builders

---------

Co-authored-by: mapeng <[email protected]>

* Skip one time listener re-register for exists for ZkClient - MetaClient usage. (#2637)

* Lattice cache - caching just data implementation (#2619)

Lattice cache - caching just data implementation
---------

Co-authored-by: mapeng <[email protected]>

* Add recursiveCreate functionality to metaclient (#2607)



Co-authored-by: Grant Palau Spencer <[email protected]>

* Lattice Children Cache Implementation(#2623)

Co-authored-by: mapeng <[email protected]>

---------

Co-authored-by: Marcos Rico Peng <[email protected]>
Co-authored-by: mapeng <[email protected]>
Co-authored-by: Grant Paláu Spencer <[email protected]>
Co-authored-by: Grant Palau Spencer <[email protected]>
  • Loading branch information
5 people authored Oct 16, 2023
1 parent 564574f commit 5f1a3f7
Show file tree
Hide file tree
Showing 12 changed files with 791 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.apache.helix.metaclient.api;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.HashMap;
import java.util.Map;

public interface MetaClientCacheInterface<T> extends MetaClientInterface<T> {

/**
* TrieNode class to store the children of the entries to be cached.
*/
class TrieNode {
// A mapping between trie key and children nodes.
private Map<String, TrieNode> _children;
// the complete path/prefix leading to the current node.
private final String _path;
private final String _nodeKey;

public TrieNode(String path, String nodeKey) {
_path = path;
_nodeKey = nodeKey;
_children = new HashMap<>();
}

public Map<String, TrieNode> getChildren() {
return _children;
}

public String getPath() {
return _path;
}

public String getNodeKey() {
return _nodeKey;
}

public void addChild(String key, TrieNode node) {
_children.put(key, node);
}

public TrieNode processPath(String path, boolean isCreate) {
String[] pathComponents = path.split("/");
TrieNode currentNode = this;
TrieNode previousNode = null;

for (int i = 1; i < pathComponents.length; i++) {
String component = pathComponents[i];
if (component.equals(_nodeKey)) {
// Skip the root node
} else if (!currentNode.getChildren().containsKey(component)) {
if (isCreate) {
TrieNode newNode = new TrieNode(currentNode.getPath() + "/" + component, component);
currentNode.addChild(component, newNode);
previousNode = currentNode;
currentNode = newNode;
} else {
return currentNode;
}
} else {
previousNode = currentNode;
currentNode = currentNode.getChildren().get(component);
}
}

if (!isCreate && previousNode != null) {
previousNode.getChildren().remove(currentNode.getNodeKey());
}

return currentNode;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ public Stat (EntryMode mode, int version, long ctime, long mtime, long etime) {
*/
void create(final String key, final T data, final EntryMode mode);

/**
* Create an entry of given EntryMode with given key and data. If any parent node in the node
* hierarchy does not exist, then the parent node will attempt to be created. The entry will not
* be created if there is an existing entry with the same full key. Ephemeral nodes cannot have
* children, so only the final child in the created path will be ephemeral.
*/
void recursiveCreate(final String key, final T Data, final EntryMode mode);

/**
* Create a TTL entry with given key, data, and expiry time (ttl). If any parent node in the node
* hierarchy does not exist, then the parent node will attempt to be created. The entry will not be created if
* there is an existing entry with the same full key.
*/
void recursiveCreateWithTTL(String key, T data, long ttl);

/**
* Create an entry of given EntryMode with given key, data, and expiry time (ttl).
* The entry will automatically purge when reached expiry time and has no children.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.helix.metaclient.factories;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/



public class MetaClientCacheConfig {
private final String _rootEntry;
private final boolean _cacheData;
private final boolean _cacheChildren;

public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren) {
_rootEntry = rootEntry;
_cacheData = cacheData;
_cacheChildren = cacheChildren;
}

public String getRootEntry() {
return _rootEntry;
}

public boolean getCacheData() {
return _cacheData;
}

public boolean getCacheChildren() {
return _cacheChildren;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/


import org.apache.helix.metaclient.api.MetaClientCacheInterface;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
Expand All @@ -39,14 +40,27 @@ public MetaClientInterface getMetaClient(MetaClientConfig config) {
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
return new ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config));
}
return null;
}

public MetaClientCacheInterface getMetaClientCache(MetaClientConfig config, MetaClientCacheConfig cacheConfig) {
if (config == null) {
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
return new ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config), cacheConfig);
}
return null;
}

private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config) {
return new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
setConnectionAddress(config.getConnectionAddress())
.setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
.setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
.setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
Expand All @@ -63,6 +64,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.separateIntoUniqueNodePaths;
import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;


Expand Down Expand Up @@ -104,7 +106,7 @@ public void create(String key, Object data) {
}

@Override
public void create(String key, Object data, MetaClientInterface.EntryMode mode) {
public void create(String key, Object data, EntryMode mode) {

try {
_zkClient.create(key, data, ZkMetaClientUtil.convertMetaClientMode(mode));
Expand All @@ -115,6 +117,67 @@ public void create(String key, Object data, MetaClientInterface.EntryMode mode)
}
}

@Override
public void recursiveCreate(String key, T data, EntryMode mode) {
// Function named recursiveCreate to match naming scheme, but actual work is iterative
iterativeCreate(key, data, mode, -1);
}

@Override
public void recursiveCreateWithTTL(String key, T data, long ttl) {
iterativeCreate(key, data, EntryMode.TTL, ttl);
}

private void iterativeCreate(String key, T data, EntryMode mode, long ttl) {
List<String> nodePaths = separateIntoUniqueNodePaths(key);
int i = 0;
// Ephemeral nodes cant have children, so change mode when creating parents
EntryMode parentMode = (EntryMode.EPHEMERAL.equals(mode) ?
EntryMode.PERSISTENT : mode);

// Iterate over paths, starting with full key then attempting each successive parent
// Try /a/b/c, if parent /a/b, does not exist, then try to create parent, etc..
while (i < nodePaths.size()) {
// If parent exists or there is no parent node, then try to create the node
// and break out of loop on successful create
if (i == nodePaths.size() - 1 || _zkClient.exists(nodePaths.get(i+1))) {
try {
if (EntryMode.TTL.equals(mode)) {
createWithTTL(nodePaths.get(i), data, ttl);
} else {
create(nodePaths.get(i), data, i == 0 ? mode : parentMode);
}
// Race condition may occur where a node is created by another thread in between loops.
// We should not throw error if this occurs for parent nodes, only for the full node path.
} catch (MetaClientNodeExistsException e) {
if (i == 0) {
throw e;
}
}
break;
// Else try to create parent in next loop iteration
} else {
i++;
}
}

// Reattempt creation of children that failed due to parent not existing
while (--i >= 0) {
try {
if (EntryMode.TTL.equals(mode)) {
createWithTTL(nodePaths.get(i), data, ttl);
} else {
create(nodePaths.get(i), data, i == 0 ? mode : parentMode);
}
// Catch same race condition as above
} catch (MetaClientNodeExistsException e) {
if (i == 0) {
throw e;
}
}
}
}

@Override
public void createWithTTL(String key, T data, long ttl) {
try {
Expand Down
Loading

0 comments on commit 5f1a3f7

Please sign in to comment.