Skip to content

Commit

Permalink
Add an option for retrying conflict errors
Browse files Browse the repository at this point in the history
Make it possible to specify retrying of "Transaction aborted by conflict"
errors.
  • Loading branch information
akudiyar committed Feb 7, 2024
1 parent 09a275e commit f60bab8
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 7 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ libraryDependencies += "io.tarantool" %% "spark-tarantool-connector" % "0.7.0"
| tarantool.requestTimeout | request completion timeout, in milliseconds | 2000 |
| tarantool.connections | number of connections established with each host | 1 |
| tarantool.cursorBatchSize | default limit for prefetching tuples in RDD iterator | 1000 |
| tarantool.retries.errorType | configures automatic retry of requests to Tarantool cluster. Possible values: "network", "none" | none |
| tarantool.retries.maxAttempts | maximum number of retry attempts for each request. Mandatory if errorType is set to "network" | |
| tarantool.retries.delay | delay between subsequent retries of each request (in milliseconds). Mandatory if errorType is set to "network" | |
| tarantool.retries.errorType | configures automatic retry of requests to Tarantool cluster. Possible values: "network", "conflict", "all", "none" | none |
| tarantool.retries.maxAttempts | maximum number of retry attempts for each request. Mandatory if errorType is not "none" | |
| tarantool.retries.delay | delay between subsequent retries of each request (in milliseconds). Mandatory if errorType is not "none" | |

### Dataset API request options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ case class Timeouts(connect: Option[Int], read: Option[Int], request: Option[Int
object ErrorTypes extends Enumeration {
type ErrorType = Value

val NONE, NETWORK = Value
val NONE, NETWORK, CONFLICT, ALL = Value
}

case class Retries(errorType: ErrorType, retryAttempts: Option[Int], delay: Option[Int]) extends Serializable
Expand Down Expand Up @@ -110,7 +110,7 @@ object TarantoolConfig {
if (strErrorType.isDefined) {
val errorType = ErrorTypes.withName(strErrorType.get.toUpperCase)

if (errorType == ErrorTypes.NETWORK) {
if (errorType != ErrorTypes.NONE) {
if (retryAttempts.isEmpty) {
throw new IllegalArgumentException("Number of retry attempts must be specified")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import io.tarantool.driver.api.{TarantoolClient, TarantoolClientConfig, Tarantoo
import io.tarantool.driver.auth.SimpleTarantoolCredentials
import io.tarantool.driver.api.TarantoolClientFactory
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies.AttemptsBoundRetryPolicyFactory
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies.retryNetworkErrors
import io.tarantool.driver.exceptions.TarantoolInternalException
import io.tarantool.driver.protocol.Packable
import io.tarantool.spark.connector.Logging
import io.tarantool.spark.connector.config.{ErrorTypes, TarantoolConfig}
import io.tarantool.spark.connector.util.ScalaToJavaHelper.toJavaUnaryOperator
import io.tarantool.spark.connector.util.ScalaToJavaHelper.{toJavaPredicate, toJavaUnaryOperator}

import java.io.{Closeable, Serializable}
import java.util
Expand All @@ -24,6 +26,12 @@ object TarantoolConnection {
def apply(): TarantoolConnection[TarantoolTuple, TarantoolResult[TarantoolTuple]] =
TarantoolConnection(defaultClient)

private def isConflictError(e: Throwable): Boolean =
e.isInstanceOf[TarantoolInternalException] &&
e.getMessage.indexOf("Transaction has been aborted by conflict") > 0

private def retryConflictErrors(): Predicate[Throwable] = toJavaPredicate(isConflictError)

private def defaultClient(
clientConfig: TarantoolConfig
): TarantoolClient[TarantoolTuple, TarantoolResult[TarantoolTuple]] = {
Expand All @@ -36,9 +44,15 @@ object TarantoolConnection {

if (clientConfig.retries.isDefined) {
val retries = clientConfig.retries.get
if (retries.errorType == ErrorTypes.NETWORK) {
if (retries.errorType != ErrorTypes.NONE) {
val predicate = retries.errorType match {
case ErrorTypes.NETWORK => retryNetworkErrors
case ErrorTypes.CONFLICT => retryConflictErrors
case _ => retryNetworkErrors.and(retryConflictErrors)
}
clientFactory = clientFactory.withRetryingByNumberOfAttempts(
retries.retryAttempts.get,
predicate,
toJavaUnaryOperator { policyBuilder: AttemptsBoundRetryPolicyFactory.Builder[Predicate[Throwable]] =>
policyBuilder.withDelay(retries.delay.get)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.function.{
BiFunction => JBiFunction,
Consumer => JConsumer,
Function => JFunction,
Predicate => JPredicate,
Supplier => JSupplier,
UnaryOperator => JUnaryOperator
}
Expand Down Expand Up @@ -33,6 +34,13 @@ object ScalaToJavaHelper {
override def apply(t: T1): R = f.apply(t)
}

/**
* Converts a Scala {@link Function1} to a Java {@link java.util.function.Predicate}
*/
def toJavaPredicate[T1](f: T1 => Boolean): JPredicate[T1] = new JPredicate[T1] {
override def test(t: T1): Boolean = f.apply(t)
}

/**
* Converts a Scala {@link Function1} to a Java {@link java.util.function.UnaryOperator}
*/
Expand Down

0 comments on commit f60bab8

Please sign in to comment.