Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to C* Java Driver 4.7.2 #129

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ dependencies {
// https://mvnrepository.com/artifact/org.apache.commons/commons-text
compile group: 'org.apache.commons', name: 'commons-text', version: '1.3'

// https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-core
compile group: 'com.datastax.cassandra', name: 'cassandra-driver-core', version: '3.6.0'
// https://mvnrepository.com/artifact/com.datastax.oss/java-driver-core
compile group: 'com.datastax.oss', name: 'java-driver-core', version: '4.7.2'
compile "com.fasterxml.jackson.module:jackson-module-kotlin:2.9.+"

// https://mvnrepository.com/artifact/org.reflections/reflections
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.thelastpickle.tlpstress

import com.datastax.driver.core.Host
import com.datastax.oss.driver.api.core.metadata.Node
import com.google.common.base.Predicate

class CoordinatorHostPredicate : Predicate<Host> {
override fun apply(input: Host?): Boolean {
class CoordinatorHostPredicate : Predicate<Node> {
override fun apply(input: Node?): Boolean {
if(input == null)
return false
return input.tokens == null || input.tokens.size == 0
return true
}
}

11 changes: 4 additions & 7 deletions src/main/kotlin/com/thelastpickle/tlpstress/OperationCallback.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.thelastpickle.tlpstress

import com.datastax.driver.core.ResultSet
import com.google.common.util.concurrent.FutureCallback
import java.util.*
import com.codahale.metrics.Timer
import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import com.thelastpickle.tlpstress.profiles.IStressRunner
import com.thelastpickle.tlpstress.profiles.Operation
import org.apache.logging.log4j.kotlin.logger
Expand All @@ -18,13 +16,13 @@ class OperationCallback(val context: StressContext,
val semaphore: Semaphore,
val startTime: Timer.Context,
val runner: IStressRunner,
val op: Operation) : FutureCallback<ResultSet> {
val op: Operation) {

companion object {
val log = logger()
}

override fun onFailure(t: Throwable?) {
fun onFailure(t: Throwable?) {
semaphore.release()
context.metrics.errors.mark()
startTime.stop()
Expand All @@ -33,7 +31,7 @@ class OperationCallback(val context: StressContext,

}

override fun onSuccess(result: ResultSet?) {
fun onSuccess(result: AsyncResultSet?) {
semaphore.release()
startTime.stop()

Expand All @@ -42,6 +40,5 @@ class OperationCallback(val context: StressContext,
if(op is Operation.Mutation) {
runner.onSuccess(op, result)
}

}
}
23 changes: 18 additions & 5 deletions src/main/kotlin/com/thelastpickle/tlpstress/ProfileRunner.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.thelastpickle.tlpstress

import com.google.common.util.concurrent.Futures
import com.thelastpickle.tlpstress.profiles.IStressProfile
import com.thelastpickle.tlpstress.profiles.IStressRunner
import com.thelastpickle.tlpstress.profiles.Operation
Expand Down Expand Up @@ -131,8 +130,15 @@ class ProfileRunner(val context: StressContext,
is Operation.Deletion -> context.metrics.deletions.time()
}

val future = context.session.executeAsync(op.bound)
Futures.addCallback(future, OperationCallback(context, sem, startTime, runner, op) )
val operationCallback = OperationCallback(context, sem, startTime, runner, op)
context.session.executeAsync(op.bound)
.whenComplete { asyncResultSet, throwable ->
if (null != throwable) {
operationCallback.onFailure(throwable)
} else {
operationCallback.onSuccess(asyncResultSet)
}
}.toCompletableFuture().get()

operations++
}
Expand Down Expand Up @@ -169,8 +175,15 @@ class ProfileRunner(val context: StressContext,
sem.acquire()

val startTime = context.metrics.populate.time()
val future = context.session.executeAsync(op.bound)
Futures.addCallback(future, OperationCallback(context, sem, startTime, runner, op))
val operationCallback = OperationCallback(context, sem, startTime, runner, op)
context.session.executeAsync(op.bound)
.whenComplete { asyncResultSet, throwable ->
if (null != throwable) {
operationCallback.onFailure(throwable)
} else {
operationCallback.onSuccess(asyncResultSet)
}
}.toCompletableFuture().get()
}

when(profile.getPopulateOption(context.mainArguments)) {
Expand Down
6 changes: 2 additions & 4 deletions src/main/kotlin/com/thelastpickle/tlpstress/StressContext.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package com.thelastpickle.tlpstress

import com.datastax.driver.core.ConsistencyLevel
import com.datastax.driver.core.Session
import com.datastax.oss.driver.api.core.CqlSession
import com.google.common.util.concurrent.RateLimiter
import com.thelastpickle.tlpstress.commands.Run
import com.thelastpickle.tlpstress.generators.Registry
import java.util.concurrent.Semaphore

data class StressContext(val session: Session,
data class StressContext(val session: CqlSession,
val mainArguments: Run,
val thread: Int,
val metrics: Metrics,
Expand Down
88 changes: 39 additions & 49 deletions src/main/kotlin/com/thelastpickle/tlpstress/commands/Run.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import com.beust.jcommander.Parameters
import com.beust.jcommander.converters.IParameterSplitter
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.ScheduledReporter
import com.datastax.driver.core.*
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy
import com.datastax.driver.core.policies.HostFilterPolicy
import com.datastax.driver.core.policies.RoundRobinPolicy
import com.datastax.oss.driver.api.core.ConsistencyLevel
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.config.DefaultDriverOption
import com.datastax.oss.driver.api.core.config.DriverConfigLoader
import com.google.common.base.Preconditions
import com.google.common.util.concurrent.RateLimiter
import com.thelastpickle.tlpstress.*
import com.thelastpickle.tlpstress.Metrics
import com.thelastpickle.tlpstress.converters.ConsistencyLevelConverter
import com.thelastpickle.tlpstress.converters.HumanReadableConverter
import com.thelastpickle.tlpstress.converters.HumanReadableTimeConverter
Expand All @@ -22,10 +21,10 @@ import com.thelastpickle.tlpstress.generators.Registry
import me.tongfei.progressbar.ProgressBar
import me.tongfei.progressbar.ProgressBarStyle
import org.apache.logging.log4j.kotlin.logger
import java.io.File
import java.lang.RuntimeException
import java.net.InetSocketAddress
import kotlin.concurrent.fixedRateTimer


class NoSplitter : IParameterSplitter {
override fun split(value: String?): MutableList<String> {
return mutableListOf(value!!)
Expand Down Expand Up @@ -113,7 +112,7 @@ class Run(val command: String) : IStressCommand {
@Parameter(names = ["--partitiongenerator", "--pg"], description = "Method of generating partition keys. Supports random, normal (gaussian), and sequence.")
var partitionKeyGenerator: String = "random"

@Parameter(names = ["--coordinatoronly", "--co"], description = "Coordinator only made. This will cause tlp-stress to round robin between nodes without tokens. Requires using -Djoin_ring=false in cassandra-env.sh. When using this option you must only provide a coordinator to --host.")
@Parameter(names = ["--coordinatoronly", "--co"], description = "Coordinator only mode. This will cause tlp-stress to round robin between nodes without tokens. Requires using -Djoin_ring=false in cassandra-env.sh. When using this option you must only provide a coordinator to --host.")
var coordinatorOnlyMode = false

@Parameter(names = ["--csv"], description = "Write metrics to this file in CSV format.")
Expand Down Expand Up @@ -154,56 +153,47 @@ class Run(val command: String) : IStressCommand {
@Parameter(names = ["--max-requests"], description = "Sets the max requests per connection")
var maxRequestsPerConnection : Int = 32768

/**
* Lazily generate query options
*/
val options by lazy {
val tmp = QueryOptions().setConsistencyLevel(consistencyLevel)
if(paging != null) {
println("Using custom paging size of $paging")
tmp.setFetchSize(paging!!)
}
tmp
}

val session by lazy {

var builder = Cluster.builder()
.addContactPoint(host)
.withPort(cqlPort)
.withCredentials(username, password)
.withQueryOptions(options)
.withPoolingOptions(PoolingOptions()
.setConnectionsPerHost(HostDistance.LOCAL, 4, 8)
.setConnectionsPerHost(HostDistance.REMOTE, 4, 8)
.setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection)
.setMaxRequestsPerConnection(HostDistance.REMOTE, maxRequestsPerConnection))
if(ssl) {
builder = builder.withSSL()
}

if (dc != "") {
builder.withLoadBalancingPolicy(DCAwareRoundRobinPolicy.builder()
.withLocalDc(dc)
.withUsedHostsPerRemoteDc(0)
.build())
}
val configBuilder = DriverConfigLoader.programmaticBuilder()
.withString(DefaultDriverOption.REQUEST_CONSISTENCY, consistencyLevel.name())
.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 8)
.withInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS, maxRequestsPerConnection)
.withString(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS, "DcInferringLoadBalancingPolicy")

if(coordinatorOnlyMode) {
println("Using experimental coordinator only mode.")
val policy = HostFilterPolicy(RoundRobinPolicy(), CoordinatorHostPredicate())
builder = builder.withLoadBalancingPolicy(policy)
if (null != paging) {
println("Using custom paging size of $paging")
configBuilder.withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, paging!!)
}

val cluster = builder.build()

val config = configBuilder.build()

var builder = CqlSession.builder()
.addContactPoint(InetSocketAddress(host, cqlPort))
.withAuthCredentials(username, password)
.withConfigLoader(config)
// if(ssl) {
// builder = builder.withSSL()
// }

// if (dc != "") {
// builder.withLoadBalancingPolicy(DCAwareRoundRobinPolicy.builder()
// .withLocalDc(dc)
// .withUsedHostsPerRemoteDc(0)
// .build())
// }
//
// if(coordinatorOnlyMode) {
// // TODO: we only have info about token ranges after we connected to the cluster
// println("Using experimental coordinator only mode.")
// builder = builder.withNodeFilter(CoordinatorHostPredicate())
// }

val session = builder.build()
// get all the initial schema
println("Creating schema")

println("Executing $iterations operations with consistency level $consistencyLevel")

val session = cluster.connect()

println("Connected")
session
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.thelastpickle.tlpstress.converters

import com.beust.jcommander.IStringConverter
import com.datastax.driver.core.ConsistencyLevel
import com.datastax.oss.driver.api.core.ConsistencyLevel
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel

class ConsistencyLevelConverter : IStringConverter<ConsistencyLevel> {
override fun convert(value: String?): ConsistencyLevel {
return ConsistencyLevel.valueOf(value!!)
return DefaultConsistencyLevel.valueOf(value!!)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.thelastpickle.tlpstress.profiles

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.PreparedStatement
import com.thelastpickle.tlpstress.PartitionKey
import com.thelastpickle.tlpstress.StressContext
import com.thelastpickle.tlpstress.WorkloadParameter
Expand All @@ -23,7 +23,7 @@ class AllowFiltering : IStressProfile {
lateinit var select: PreparedStatement
lateinit var delete: PreparedStatement

override fun prepare(session: Session) {
override fun prepare(session: CqlSession) {
insert = session.prepare("INSERT INTO allow_filtering (partition_id, row_id, value, payload) values (?, ?, ?, ?)")
select = session.prepare("SELECT * from allow_filtering WHERE partition_id = ? and value = ? ALLOW FILTERING")
delete = session.prepare("DELETE from allow_filtering WHERE partition_id = ? and row_id = ?")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.thelastpickle.tlpstress.profiles

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
import com.datastax.driver.core.VersionNumber
import com.datastax.driver.core.utils.UUIDs
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.Version
import com.datastax.oss.driver.api.core.cql.PreparedStatement
import com.datastax.oss.driver.api.core.uuid.Uuids
import com.thelastpickle.tlpstress.PartitionKey
import com.thelastpickle.tlpstress.StressContext
import com.thelastpickle.tlpstress.WorkloadParameter
Expand Down Expand Up @@ -35,20 +35,21 @@ class BasicTimeSeries : IStressProfile {
lateinit var prepared: PreparedStatement
lateinit var getPartitionHead: PreparedStatement
lateinit var delete: PreparedStatement
lateinit var cassandraVersion: VersionNumber
lateinit var cassandraVersion: Version

@WorkloadParameter("Number of rows to fetch back on SELECT queries")
var limit = 500

@WorkloadParameter("Deletion range in seconds. Range tombstones will cover all rows older than the given value.")
var deleteDepth = 30

override fun prepare(session: Session) {
override fun prepare(session: CqlSession) {
println("Using a limit of $limit for reads and deleting data older than $deleteDepth seconds (if enabled).")
cassandraVersion = session.cluster.metadata.allHosts.map { host -> host.cassandraVersion }.min()!!

cassandraVersion = session.metadata.nodes.values.minBy { n -> n.cassandraVersion!! }?.cassandraVersion!!
prepared = session.prepare("INSERT INTO sensor_data (sensor_id, timestamp, data) VALUES (?, ?, ?)")
getPartitionHead = session.prepare("SELECT * from sensor_data WHERE sensor_id = ? LIMIT ?")
if (cassandraVersion.compareTo(VersionNumber.parse("3.0")) >= 0) {
if (cassandraVersion >= Version.parse("3.0")!!) {
delete = session.prepare("DELETE from sensor_data WHERE sensor_id = ? and timestamp < maxTimeuuid(?)")
} else {
throw UnsupportedOperationException("Cassandra version $cassandraVersion does not support range deletes (only available in 3.0+).")
Expand All @@ -71,7 +72,7 @@ class BasicTimeSeries : IStressProfile {

override fun getNextMutation(partitionKey: PartitionKey) : Operation {
val data = dataField.getText()
val timestamp = UUIDs.timeBased()
val timestamp = Uuids.timeBased()
val bound = prepared.bind(partitionKey.getText(),timestamp, data)
return Operation.Mutation(bound)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.thelastpickle.tlpstress.profiles

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.PreparedStatement
import com.thelastpickle.tlpstress.PartitionKey
import com.thelastpickle.tlpstress.StressContext
import com.thelastpickle.tlpstress.WorkloadParameter
Expand All @@ -18,7 +18,7 @@ class CountersWide : IStressProfile {
@WorkloadParameter("Total rows per partition.")
var rowsPerPartition = 10000

override fun prepare(session: Session) {
override fun prepare(session: CqlSession) {
increment = session.prepare("UPDATE counter_wide SET value = value + 1 WHERE key = ? and cluster = ?")
selectOne = session.prepare("SELECT * from counter_wide WHERE key = ? AND cluster = ?")
selectAll = session.prepare("SELECT * from counter_wide WHERE key = ?")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.thelastpickle.tlpstress.profiles

import com.datastax.driver.core.Session
import com.datastax.driver.core.BoundStatement
import com.datastax.driver.core.ResultSet
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import com.datastax.oss.driver.api.core.cql.BoundStatement
import com.datastax.oss.driver.api.core.cql.ResultSet
import com.thelastpickle.tlpstress.PartitionKey
import com.thelastpickle.tlpstress.PopulateOption
import com.thelastpickle.tlpstress.StressContext
Expand All @@ -19,7 +20,7 @@ interface IStressRunner {
* Will be used for state tracking on things like LWTs as well as provides an avenue for future work
* doing post-workload correctness checks
*/
fun onSuccess(op: Operation.Mutation, result: ResultSet?) { }
fun onSuccess(op: Operation.Mutation, result: AsyncResultSet?) { }

fun customPopulateIter() : Iterator<Operation.Mutation> {
return listOf<Operation.Mutation>().iterator()
Expand All @@ -38,7 +39,7 @@ interface IStressProfile {
* the class should track all prepared statements internally
* and pass them on to the Runner
*/
fun prepare(session: Session)
fun prepare(session: CqlSession)
/**
* returns a bunch of DDL statements
* this can be create table, index, materialized view, etc
Expand Down
Loading