Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add an option for retrying conflict errors #54

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ libraryDependencies += "io.tarantool" %% "spark-tarantool-connector" % "0.7.0"
| tarantool.requestTimeout | request completion timeout, in milliseconds | 2000 |
| tarantool.connections | number of connections established with each host | 1 |
| tarantool.cursorBatchSize | default limit for prefetching tuples in RDD iterator | 1000 |
| tarantool.retries.errorType | configures automatic retry of requests to Tarantool cluster. Possible values: "network", "none" | none |
| tarantool.retries.maxAttempts | maximum number of retry attempts for each request. Mandatory if errorType is set to "network" | |
| tarantool.retries.delay | delay between subsequent retries of each request (in milliseconds). Mandatory if errorType is set to "network" | |
| tarantool.retries.errorType | configures automatic retry of requests to Tarantool cluster. Possible values: "network", "conflict", "all", "none" | none |
| tarantool.retries.maxAttempts | maximum number of retry attempts for each request. Mandatory if errorType is not "none" | |
| tarantool.retries.delay | delay between subsequent retries of each request (in milliseconds). Mandatory if errorType is not "none" | |

### Dataset API request options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ case class Timeouts(connect: Option[Int], read: Option[Int], request: Option[Int
object ErrorTypes extends Enumeration {
type ErrorType = Value

val NONE, NETWORK = Value
val NONE, NETWORK, CONFLICT, ALL = Value
}

case class Retries(errorType: ErrorType, retryAttempts: Option[Int], delay: Option[Int]) extends Serializable
Expand Down Expand Up @@ -110,7 +110,7 @@ object TarantoolConfig {
if (strErrorType.isDefined) {
val errorType = ErrorTypes.withName(strErrorType.get.toUpperCase)

if (errorType == ErrorTypes.NETWORK) {
if (errorType != ErrorTypes.NONE) {
if (retryAttempts.isEmpty) {
throw new IllegalArgumentException("Number of retry attempts must be specified")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import io.tarantool.driver.api.{TarantoolClient, TarantoolClientConfig, Tarantoo
import io.tarantool.driver.auth.SimpleTarantoolCredentials
import io.tarantool.driver.api.TarantoolClientFactory
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies.AttemptsBoundRetryPolicyFactory
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies.retryNetworkErrors
import io.tarantool.driver.exceptions.TarantoolInternalException
import io.tarantool.driver.protocol.Packable
import io.tarantool.spark.connector.Logging
import io.tarantool.spark.connector.config.{ErrorTypes, TarantoolConfig}
import io.tarantool.spark.connector.util.ScalaToJavaHelper.toJavaUnaryOperator
import io.tarantool.spark.connector.util.ScalaToJavaHelper.{toJavaPredicate, toJavaUnaryOperator}

import java.io.{Closeable, Serializable}
import java.util
Expand All @@ -24,6 +26,12 @@ object TarantoolConnection {
def apply(): TarantoolConnection[TarantoolTuple, TarantoolResult[TarantoolTuple]] =
TarantoolConnection(defaultClient)

private def isConflictError(e: Throwable): Boolean =
e.isInstanceOf[TarantoolInternalException] &&
e.getMessage.indexOf("Transaction has been aborted by conflict") > 0

private def retryConflictErrors(): Predicate[Throwable] = toJavaPredicate(isConflictError)

private def defaultClient(
clientConfig: TarantoolConfig
): TarantoolClient[TarantoolTuple, TarantoolResult[TarantoolTuple]] = {
Expand All @@ -36,9 +44,15 @@ object TarantoolConnection {

if (clientConfig.retries.isDefined) {
val retries = clientConfig.retries.get
if (retries.errorType == ErrorTypes.NETWORK) {
if (retries.errorType != ErrorTypes.NONE) {
val predicate = retries.errorType match {
case ErrorTypes.NETWORK => retryNetworkErrors
case ErrorTypes.CONFLICT => retryConflictErrors
case _ => retryNetworkErrors.and(retryConflictErrors)
}
clientFactory = clientFactory.withRetryingByNumberOfAttempts(
retries.retryAttempts.get,
predicate,
toJavaUnaryOperator { policyBuilder: AttemptsBoundRetryPolicyFactory.Builder[Predicate[Throwable]] =>
policyBuilder.withDelay(retries.delay.get)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.function.{
BiFunction => JBiFunction,
Consumer => JConsumer,
Function => JFunction,
Predicate => JPredicate,
Supplier => JSupplier,
UnaryOperator => JUnaryOperator
}
Expand Down Expand Up @@ -33,6 +34,13 @@ object ScalaToJavaHelper {
override def apply(t: T1): R = f.apply(t)
}

/**
* Converts a Scala {@link Function1} to a Java {@link java.util.function.Predicate}
*/
def toJavaPredicate[T1](f: T1 => Boolean): JPredicate[T1] = new JPredicate[T1] {
override def test(t: T1): Boolean = f.apply(t)
}

/**
* Converts a Scala {@link Function1} to a Java {@link java.util.function.UnaryOperator}
*/
Expand Down
Loading