diff --git a/waltz-client/src/main/java/com/wepay/waltz/client/internal/Partition.java b/waltz-client/src/main/java/com/wepay/waltz/client/internal/Partition.java index b1d7312d..6cfc3745 100644 --- a/waltz-client/src/main/java/com/wepay/waltz/client/internal/Partition.java +++ b/waltz-client/src/main/java/com/wepay/waltz/client/internal/Partition.java @@ -17,6 +17,7 @@ import com.wepay.waltz.exception.ClientClosedException; import com.wepay.waltz.exception.DataChecksumException; import com.wepay.waltz.exception.PartitionInactiveException; +import com.wepay.waltz.exception.ClientTimeoutException; import com.wepay.zktools.clustermgr.Endpoint; import org.slf4j.Logger; @@ -39,6 +40,7 @@ public class Partition { private static final Long[] EMPTY_LONG_ARRAY = new Long[0]; private static final int MAX_DATA_ATTEMPTS = 5; + private static final int ENSURE_MOUNTED_TIMEOUT = 10000; private enum PartitionState { ACTIVE, INACTIVE, CLOSED @@ -260,26 +262,32 @@ public void partitionAhead() { } /** - * Ensures that this partition is mounted. - * Waits until interrupted, or a PartitionInactiveException to occur, for the partition to be mounted. + * Ensures that this partition is mounted or closed. + * Waits for a maximum of {@link #ENSURE_MOUNTED_TIMEOUT} millis for the partition to be mounted. * * @throws PartitionInactiveException if this partition is not active. * @throws IllegalStateException if client's high watermark is ahead of server's high watermark. + * @throws ClientTimeoutException if wait time for partition to be mounted exceeds {@link #ENSURE_MOUNTED_TIMEOUT} millis */ public void ensureMounted() { if (!mounted) { synchronized (lock) { + final long due = System.currentTimeMillis() + ENSURE_MOUNTED_TIMEOUT; while (state != PartitionState.CLOSED && !mounted) { if (transactionMonitor.isStopped()) { throw new PartitionInactiveException(partitionId); } else if (clientHighWaterMarkAhead) { throw new IllegalStateException(String.format("client is ahead of store for partition: %d", partitionId)); } - - try { - lock.wait(); - } catch (InterruptedException ex) { - Thread.interrupted(); + long remaining = due - System.currentTimeMillis(); + if (remaining > 0) { + try { + lock.wait(remaining); + } catch (InterruptedException ex) { + Thread.interrupted(); + } + } else { + throw new ClientTimeoutException(String.format("Partition ensure mounted timeout: partitionId=%d elapsed=%d ms", partitionId, ENSURE_MOUNTED_TIMEOUT)); } } } diff --git a/waltz-common/src/main/java/com/wepay/waltz/exception/ClientTimeoutException.java b/waltz-common/src/main/java/com/wepay/waltz/exception/ClientTimeoutException.java new file mode 100644 index 00000000..8cec6361 --- /dev/null +++ b/waltz-common/src/main/java/com/wepay/waltz/exception/ClientTimeoutException.java @@ -0,0 +1,7 @@ +package com.wepay.waltz.exception; + +public class ClientTimeoutException extends ClientException { + public ClientTimeoutException(String message) { + super(message); + } +}