Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AWS MCS (Amazon Keyspaces) #56

Open
wants to merge 5 commits into
base: master_v4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException;
import org.cognitor.cassandra.migration.cql.SimpleCQLLexer;
import org.cognitor.cassandra.migration.executors.Executor;
import org.cognitor.cassandra.migration.executors.ExecutorDetector;
import org.cognitor.cassandra.migration.keyspace.Keyspace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -99,12 +100,8 @@ public class Database implements Closeable {
private final String leaderTableName;
private final String keyspaceName;
private final Keyspace keyspace;
private final CqlSession session;
private Executor executor;
private String executionProfileName;
private ConsistencyLevel consistencyLevel = DefaultConsistencyLevel.QUORUM;
private final PreparedStatement logMigrationStatement;
private final PreparedStatement takeMigrationLeadStatement;
private final PreparedStatement releaseMigrationLeadStatement;
private boolean tookLead = false;

public Database(CqlSession session, Keyspace keyspace) {
Expand All @@ -130,17 +127,17 @@ public Database(CqlSession session, String keyspaceName, String tablePrefix) {
}

private Database(CqlSession session, Keyspace keyspace, String keyspaceName, String tablePrefix) {
this.session = notNull(session, "session");
notNull(session, "session");
this.keyspace = keyspace;
this.keyspaceName = Optional.ofNullable(keyspace).map(Keyspace::getKeyspaceName).orElse(keyspaceName);
this.tableName = createTableName(tablePrefix, SCHEMA_CF);
this.leaderTableName = createTableName(tablePrefix, SCHEMA_LEADER_CF);
executor = new ExecutorDetector(session).getExecutor();

createKeyspaceIfRequired();
useKeyspace();
ensureSchemaTable();
this.logMigrationStatement = this.session.prepare(format(INSERT_MIGRATION, getTableName()));
this.takeMigrationLeadStatement = session.prepare(format(TAKE_LEAD_QUERY, getLeaderTableName(), LEAD_TTL));
this.releaseMigrationLeadStatement = session.prepare(format(RELEASE_LEAD_QUERY, getLeaderTableName()));

String tmpInstanceAddress;
try {
tmpInstanceAddress = InetAddress.getLocalHost().getHostAddress();
Expand All @@ -153,7 +150,7 @@ private Database(CqlSession session, Keyspace keyspace, String keyspaceName, Str

private void useKeyspace() {
LOGGER.info("Changing keyspace of the session to '{}'", keyspaceName);
session.execute("USE " + keyspaceName);
executor.execute("USE " + keyspaceName);
}

private static String createTableName(String tablePrefix, String tableName) {
Expand All @@ -164,27 +161,23 @@ private static String createTableName(String tablePrefix, String tableName) {
}

private void createKeyspaceIfRequired() {
if (keyspace == null || keyspaceExists()) {
if (keyspace == null || executor.keyspaceExists(keyspaceName)) {
return;
}
try {
session.execute(this.keyspace.getCqlStatement());
executor.execute(this.keyspace.getCqlStatement());
} catch (DriverException exception) {
throw new MigrationException(format("Unable to create keyspace %s.", keyspaceName), exception);
}
}

private boolean keyspaceExists() {
return session.getMetadata().getKeyspace(keyspaceName).isPresent();
}

/**
* Closes the underlying session object. The cluster will not be touched
* and will stay open. Call this after all migrations are done.
* After calling this, this database instance can no longer be used.
*/
public void close() {
this.session.close();
executor.close();
}

/**
Expand All @@ -194,9 +187,7 @@ public void close() {
* @return the current schema version
*/
public int getVersion() {
SimpleStatement getVersionQuery = SimpleStatement.newInstance(format(VERSION_QUERY, getTableName()))
.setConsistencyLevel(this.consistencyLevel);
ResultSet resultSet = session.execute(getVersionQuery);
ResultSet resultSet = executor.execute(format(VERSION_QUERY, getTableName()));
Row result = resultSet.one();
if (result == null) {
return 0;
Expand Down Expand Up @@ -232,23 +223,13 @@ private void ensureSchemaTable() {
}

private boolean schemaTablesIsExisting() {
Metadata metadata = session.getMetadata();

return isTableExisting(metadata, getTableName())
&& isTableExisting(metadata, getLeaderTableName());
return executor.tableExists(keyspaceName, getTableName())
&& executor.tableExists(keyspaceName, getLeaderTableName());
}

private boolean isTableExisting(Metadata metadata, String tableName) {
return metadata
.getKeyspace(keyspaceName)
.map(keyspaceMetadata -> keyspaceMetadata.getTable(tableName).isPresent())
.orElse(false);
}


private void createSchemaTable() {
session.execute(format(CREATE_MIGRATION_CF, getTableName()));
session.execute(format(CREATE_LEADER_CF, getLeaderTableName()));
executor.execute(format(CREATE_MIGRATION_CF, getTableName()));
executor.execute(format(CREATE_LEADER_CF, getLeaderTableName()));
}

/**
Expand All @@ -262,9 +243,9 @@ boolean takeLeadOnMigrations(int repositoryLatestVersion) {
while (repositoryLatestVersion > getVersion()) {
try {
LOGGER.debug("Trying to take lead on schema migrations");
BoundStatement boundStatement = takeMigrationLeadStatement.bind(getKeyspaceName(), this.instanceId,

ResultSet lwtResult = executor.execute(format(TAKE_LEAD_QUERY, getLeaderTableName(), LEAD_TTL), getKeyspaceName(), this.instanceId,
this.instanceAddress);
ResultSet lwtResult = session.execute(boundStatement);

if (lwtResult.wasApplied()) {
LOGGER.debug("Took lead on schema migrations");
Expand Down Expand Up @@ -301,8 +282,7 @@ void removeLeadOnMigrations() {
if (tookLead) {
LOGGER.debug("Trying to release lead on schema migrations");

BoundStatement boundStatement = releaseMigrationLeadStatement.bind(getKeyspaceName(), this.instanceId);
ResultSet lwtResult = session.execute(boundStatement);
ResultSet lwtResult = executor.execute(format(RELEASE_LEAD_QUERY, getLeaderTableName()), getKeyspaceName(), this.instanceId);

if (lwtResult.wasApplied()) {
LOGGER.debug("Released lead on schema migrations");
Expand Down Expand Up @@ -346,10 +326,8 @@ public void execute(DbMigration migration) {

private void executeStatement(String statement, DbMigration migration) {
if (!statement.isEmpty()) {
SimpleStatement simpleStatement = SimpleStatement.newInstance(statement)
.setExecutionProfileName(executionProfileName)
.setConsistencyLevel(consistencyLevel);
ResultSet resultSet = session.execute(simpleStatement);
ResultSet resultSet = executor.execute(statement, executionProfileName);

if (!resultSet.getExecutionInfo().isSchemaInAgreement()) {
throw new MigrationException("Schema agreement could not be reached. " +
"You might consider increasing 'maxSchemaAgreementWaitSeconds'.",
Expand All @@ -365,13 +343,12 @@ private void executeStatement(String statement, DbMigration migration) {
* @param wasSuccessful indicates if the migration was successful or not
*/
private void logMigration(DbMigration migration, boolean wasSuccessful) {
BoundStatement boundStatement = logMigrationStatement.bind(wasSuccessful, migration.getVersion(),
executor.execute(format(INSERT_MIGRATION, getTableName()), wasSuccessful, migration.getVersion(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logMigration query should include keyspace name. When using AWS Keyspaces, having multiple tables with the same name (even if they're in different keyspaces), sometimes makes the tool insert log entries into improper tables (or executing the query ends with an exception about insufficient access rights, depending on IAM policy configuration). Consider the following scenario:

Keyspaces: A, B, C
Tables: A.schema_migration, B.schema_migration, C.schema_migration
Query in the BoundStatement: "insert into schema_migration (applied_successful, version, script_name, script, executed_at) values(?, ?, ?, ?, ?)"

The above statement executed in the context of keyspace A will sometimes end up modifying the contents of the table B.schema_migration or C.schema_migration.

I was able to avoid this problem by modifying the query and explicitly stating the keyspace name:
"insert into %s.%s (applied_successful, version, script_name, script, executed_at) values(?, ?, ?, ?, ?)"

Not sure what is the root cause - perhaps it's because I'm using datastax driver 3.7.2 and it's not properly taking into account the context in which the query is executed? (ignoring USE keyspace?). Then again, modifying the statement here would avoid any ambiguity (related to the driver etc.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Nihilum. I will include this in the refactoring.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is included in my last commit.

migration.getScriptName(), migration.getMigrationScript(), Instant.now());
session.execute(boundStatement);
}

public ConsistencyLevel getConsistencyLevel() {
return consistencyLevel;
return executor.getConsistencyLevel();
}

/**
Expand All @@ -381,7 +358,7 @@ public ConsistencyLevel getConsistencyLevel() {
* @return the current database instance
*/
public Database setConsistencyLevel(ConsistencyLevel consistencyLevel) {
this.consistencyLevel = notNull(consistencyLevel, "consistencyLevel");
this.executor.setConsistencyLevel(notNull(consistencyLevel, "consistencyLevel"));
return this;
}

Expand All @@ -395,4 +372,24 @@ public Database setExecutionProfileName(@Nullable String executionProfileName) {
this.executionProfileName = executionProfileName;
return this;
}

/**
* Can be a {@link org.cognitor.cassandra.migration.executors.SimpleExecutor} which only execute cqls
* against the CqlSession or a {@link org.cognitor.cassandra.migration.executors.AwsMcsExecutor} which
* wait for ddl completion (which are asycronous in aws).
* {@link org.cognitor.cassandra.migration.executors.AwsMcsExecutor} is auto selected when running
* in AWS MCS.
*/
public Executor getExecutor() {
return executor;
}

/**
* Allow custom implementation of cql executors.
* {@link org.cognitor.cassandra.migration.executors.AwsMcsExecutor} is auto selected when running
* in AWS MCS.It waits for ddl completion because they are executed asynchronously on AWS.
*/
public void setExecutor(Executor executor) {
this.executor = executor;
}
}
Loading