Skip to content

Commit

Permalink
Add recursiveCreate functionality to metaclient (#2607)
Browse files Browse the repository at this point in the history

Co-authored-by: Grant Palau Spencer <[email protected]>
  • Loading branch information
2 people authored and Xiaoyuan Lu committed Oct 5, 2023
1 parent 827ca82 commit 6bd97a9
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ public Stat (EntryMode mode, int version, long ctime, long mtime, long etime) {
*/
void create(final String key, final T data, final EntryMode mode);

/**
* Create an entry of given EntryMode with given key and data. If any parent node in the node
* hierarchy does not exist, then the parent node will attempt to be created. The entry will not
* be created if there is an existing entry with the same full key. Ephemeral nodes cannot have
* children, so only the final child in the created path will be ephemeral.
*/
void recursiveCreate(final String key, final T Data, final EntryMode mode);

/**
* Create a TTL entry with given key, data, and expiry time (ttl). If any parent node in the node
* hierarchy does not exist, then the parent node will attempt to be created. The entry will not be created if
* there is an existing entry with the same full key.
*/
void recursiveCreateWithTTL(String key, T data, long ttl);

/**
* Create an entry of given EntryMode with given key, data, and expiry time (ttl).
* The entry will automatically purge when reached expiry time and has no children.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
Expand All @@ -63,6 +64,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.separateIntoUniqueNodePaths;
import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;


Expand Down Expand Up @@ -104,7 +106,7 @@ public void create(String key, Object data) {
}

@Override
public void create(String key, Object data, MetaClientInterface.EntryMode mode) {
public void create(String key, Object data, EntryMode mode) {

try {
_zkClient.create(key, data, ZkMetaClientUtil.convertMetaClientMode(mode));
Expand All @@ -115,6 +117,67 @@ public void create(String key, Object data, MetaClientInterface.EntryMode mode)
}
}

@Override
public void recursiveCreate(String key, T data, EntryMode mode) {
// Function named recursiveCreate to match naming scheme, but actual work is iterative
iterativeCreate(key, data, mode, -1);
}

@Override
public void recursiveCreateWithTTL(String key, T data, long ttl) {
iterativeCreate(key, data, EntryMode.TTL, ttl);
}

private void iterativeCreate(String key, T data, EntryMode mode, long ttl) {
List<String> nodePaths = separateIntoUniqueNodePaths(key);
int i = 0;
// Ephemeral nodes cant have children, so change mode when creating parents
EntryMode parentMode = (EntryMode.EPHEMERAL.equals(mode) ?
EntryMode.PERSISTENT : mode);

// Iterate over paths, starting with full key then attempting each successive parent
// Try /a/b/c, if parent /a/b, does not exist, then try to create parent, etc..
while (i < nodePaths.size()) {
// If parent exists or there is no parent node, then try to create the node
// and break out of loop on successful create
if (i == nodePaths.size() - 1 || _zkClient.exists(nodePaths.get(i+1))) {
try {
if (EntryMode.TTL.equals(mode)) {
createWithTTL(nodePaths.get(i), data, ttl);
} else {
create(nodePaths.get(i), data, i == 0 ? mode : parentMode);
}
// Race condition may occur where a node is created by another thread in between loops.
// We should not throw error if this occurs for parent nodes, only for the full node path.
} catch (MetaClientNodeExistsException e) {
if (i == 0) {
throw e;
}
}
break;
// Else try to create parent in next loop iteration
} else {
i++;
}
}

// Reattempt creation of children that failed due to parent not existing
while (--i >= 0) {
try {
if (EntryMode.TTL.equals(mode)) {
createWithTTL(nodePaths.get(i), data, ttl);
} else {
create(nodePaths.get(i), data, i == 0 ? mode : parentMode);
}
// Catch same race condition as above
} catch (MetaClientNodeExistsException e) {
if (i == 0) {
throw e;
}
}
}
}

@Override
public void createWithTTL(String key, T data, long ttl) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
Expand Down Expand Up @@ -358,4 +359,28 @@ public static MetaClientException.ReturnCode translateZooKeeperCodeToMetaClientC
return MetaClientException.ReturnCode.DB_USER_ERROR;
}
}

// Returns null if no parent path
public static String getZkParentPath(String path) {
int idx = path.lastIndexOf('/');
return idx == 0 ? null : path.substring(0, idx);
}

// Splits a path into the paths for each node along the way.
// /a/b/c --> /a/b/c, /a/b, /a
public static List<String> separateIntoUniqueNodePaths(String path) {
if (path == null || "/".equals(path)) {
return null;
}

String[] subPath = path.split("/");
String[] nodePaths = new String[subPath.length-1];
StringBuilder tempPath = new StringBuilder();
for (int i = 1; i < subPath.length; i++) {
tempPath.append( "/");
tempPath.append(subPath[i]);
nodePaths[subPath.length - 1 - i] = tempPath.toString();
}
return Arrays.asList(nodePaths);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.helix.metaclient.api.*;
import org.apache.helix.metaclient.datamodel.DataRecord;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.recipes.lock.DataRecordSerializer;
import org.testng.Assert;
Expand Down Expand Up @@ -129,6 +130,35 @@ public void testCreateAndRenewTTL() {
Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
}

@Test
public void testRecursiveCreate() {
final String zkParentKey = "/stressZk_testRecursiveCreate";
_zkMetaClient.create(zkParentKey, ENTRY_STRING_VALUE);

int count = (int) Math.pow(TEST_ITERATION_COUNT, 1/3d);
for (int i = 0; i < count; i++) {

for (int j = 0; j < count; j++) {

for (int k = 0; k < count; k++) {
String key = zkParentKey + "/" + i + "/" + j + "/" + k;
_zkMetaClient.recursiveCreate(key, String.valueOf(k), PERSISTENT);
Assert.assertEquals(String.valueOf(k), _zkMetaClient.get(key));
}
}
try {
_zkMetaClient.recursiveCreate(zkParentKey + "/" + i, "should_fail", PERSISTENT);
Assert.fail("Should have failed due to node existing");
} catch (MetaClientNodeExistsException ignoredException) {
}
}

// cleanup
_zkMetaClient.recursiveDelete(zkParentKey);
Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);

}

@Test
public void testGet() {
final String zkParentKey = "/stressZk_testGet";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.api.DirectChildChangeListener;

import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand All @@ -38,7 +40,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
Expand All @@ -50,6 +51,7 @@

import static org.apache.helix.metaclient.api.DataChangeListener.ChangeType.ENTRY_UPDATE;
import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER;
import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.EPHEMERAL;
import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT;


Expand Down Expand Up @@ -97,6 +99,48 @@ public void testCreateTTL() {
}
}

@Test
public void testRecursiveCreate() {
final String path = "/Test/ZkMetaClient/_fullPath";


try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
zkMetaClient.connect();
MetaClientInterface.EntryMode mode = EPHEMERAL;

// Should succeed even if one of the parent nodes exists
String extendedPath = "/A" + path;
zkMetaClient.create("/A", ENTRY_STRING_VALUE, PERSISTENT);
zkMetaClient.recursiveCreate(extendedPath, ENTRY_STRING_VALUE, mode);
Assert.assertNotNull(zkMetaClient.exists(extendedPath));

// Should succeed if no parent nodes exist
zkMetaClient.recursiveCreate(path, ENTRY_STRING_VALUE, mode);
Assert.assertNotNull(zkMetaClient.exists(path));
Assert.assertEquals(zkMetaClient.getDataAndStat("/Test").getRight().getEntryType(), PERSISTENT);
Assert.assertEquals(zkMetaClient.getDataAndStat(path).getRight().getEntryType(), mode);

// Should throw NodeExistsException if child node exists
zkMetaClient.recursiveCreate(path, ENTRY_STRING_VALUE, mode);
Assert.fail("Should have failed due to node already created");
} catch (MetaClientException e) {
Assert.assertEquals(e.getMessage(), "org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException: org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /Test/ZkMetaClient/_fullPath");
System.out.println(e.getMessage());
}

}

@Test
public void testRecursiveCreateWithTTL() {
final String path = "/Test/ZkMetaClient/_fullPath/withTTL";

try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
zkMetaClient.connect();
zkMetaClient.recursiveCreateWithTTL(path, ENTRY_STRING_VALUE, 1000);
Assert.assertNotNull(zkMetaClient.exists(path));
}
}

@Test
public void testRenewTTL() {
final String key = "/TestZkMetaClient_testRenewTTL_1";
Expand Down

0 comments on commit 6bd97a9

Please sign in to comment.