Skip to content

Commit

Permalink
Add an option for retrying conflict errors
Browse files Browse the repository at this point in the history
Make it possible to specify retrying of "Transaction aborted by conflict"
errors.
  • Loading branch information
akudiyar committed Feb 6, 2024
1 parent 09a275e commit 8336bb3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ case class Timeouts(connect: Option[Int], read: Option[Int], request: Option[Int
object ErrorTypes extends Enumeration {
type ErrorType = Value

val NONE, NETWORK = Value
val NONE, NETWORK, CONFLICT, ALL = Value
}

case class Retries(errorType: ErrorType, retryAttempts: Option[Int], delay: Option[Int]) extends Serializable
Expand Down Expand Up @@ -110,7 +110,7 @@ object TarantoolConfig {
if (strErrorType.isDefined) {
val errorType = ErrorTypes.withName(strErrorType.get.toUpperCase)

if (errorType == ErrorTypes.NETWORK) {
if (errorType != ErrorTypes.NONE) {
if (retryAttempts.isEmpty) {
throw new IllegalArgumentException("Number of retry attempts must be specified")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import io.tarantool.driver.api.{TarantoolClient, TarantoolClientConfig, Tarantoo
import io.tarantool.driver.auth.SimpleTarantoolCredentials
import io.tarantool.driver.api.TarantoolClientFactory
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies.AttemptsBoundRetryPolicyFactory
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies.retryNetworkErrors
import io.tarantool.driver.exceptions.TarantoolInternalException
import io.tarantool.driver.protocol.Packable
import io.tarantool.spark.connector.Logging
import io.tarantool.spark.connector.config.{ErrorTypes, TarantoolConfig}
import io.tarantool.spark.connector.util.ScalaToJavaHelper.toJavaUnaryOperator
import io.tarantool.spark.connector.util.ScalaToJavaHelper.{toJavaPredicate, toJavaUnaryOperator}

import java.io.{Closeable, Serializable}
import java.util
Expand All @@ -24,6 +26,13 @@ object TarantoolConnection {
def apply(): TarantoolConnection[TarantoolTuple, TarantoolResult[TarantoolTuple]] =
TarantoolConnection(defaultClient)

private def isConflictError(e: Throwable): Boolean = {
e.isInstanceOf[TarantoolInternalException] &&
e.getMessage.indexOf("Transaction has been aborted by conflict") > 0
}

private def retryConflictErrors(): Predicate[Throwable] = toJavaPredicate(isConflictError)

private def defaultClient(
clientConfig: TarantoolConfig
): TarantoolClient[TarantoolTuple, TarantoolResult[TarantoolTuple]] = {
Expand All @@ -36,9 +45,15 @@ object TarantoolConnection {

if (clientConfig.retries.isDefined) {
val retries = clientConfig.retries.get
if (retries.errorType == ErrorTypes.NETWORK) {
if (retries.errorType != ErrorTypes.NONE) {
val predicate = retries.errorType match {
case ErrorTypes.NETWORK => retryNetworkErrors
case ErrorTypes.CONFLICT => retryConflictErrors
case _ => retryNetworkErrors.and(retryConflictErrors)
}
clientFactory = clientFactory.withRetryingByNumberOfAttempts(
retries.retryAttempts.get,
predicate,
toJavaUnaryOperator { policyBuilder: AttemptsBoundRetryPolicyFactory.Builder[Predicate[Throwable]] =>
policyBuilder.withDelay(retries.delay.get)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.function.{
BiFunction => JBiFunction,
Consumer => JConsumer,
Function => JFunction,
Predicate => JPredicate,
Supplier => JSupplier,
UnaryOperator => JUnaryOperator
}
Expand Down Expand Up @@ -33,6 +34,13 @@ object ScalaToJavaHelper {
override def apply(t: T1): R = f.apply(t)
}

/**
* Converts a Scala {@link Function1} to a Java {@link java.util.function.Predicate}
*/
def toJavaPredicate[T1](f: T1 => Boolean): JPredicate[T1] = new JPredicate[T1] {
override def test(t: T1): Boolean = f.apply(t)
}

/**
* Converts a Scala {@link Function1} to a Java {@link java.util.function.UnaryOperator}
*/
Expand Down

0 comments on commit 8336bb3

Please sign in to comment.