Skip to content

Commit

Permalink
[DBZ-PGYB] Bug fix for retry limit and test addition (#167)
Browse files Browse the repository at this point in the history
This PR adds a bug fix which was causing one less number of retries, for
example, if `errors.max.retries` was set to 5, the connector was only
retrying for 4 times.

Additionally, this PR adds a test to verify the logic.
  • Loading branch information
vaibhav-yb authored Nov 29, 2024
1 parent 922a5c9 commit 2066314
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ public class PostgresConnectorTask extends BaseSourceTask<PostgresPartition, Pos
private volatile PostgresConnection beanRegistryJdbcConnection;
private volatile ReplicationConnection replicationConnection = null;

private volatile ErrorHandler errorHandler;
private volatile PostgresErrorHandler errorHandler;
private volatile PostgresSchema schema;

public static boolean TEST_THROW_ERROR_BEFORE_COORDINATOR_STARTUP = false;

@Override
public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> start(Configuration config) {
final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config);
Expand Down Expand Up @@ -257,6 +259,10 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
signalProcessor,
notificationService);

if (TEST_THROW_ERROR_BEFORE_COORDINATOR_STARTUP) {
throw new RuntimeException("[Test Only] Throwing error before starting coordinator");
}

coordinator.start(taskContext, this.queue, metadataProvider);

return coordinator;
Expand All @@ -268,7 +274,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
LOGGER.warn("Received exception, will be setting producer throwable", exception);
errorHandler.setProducerThrowable(new RetriableException(exception));

if (errorHandler.getRetries() == connectorConfig.getMaxRetriesOnError()) {
if (!errorHandler.isRetryRemaining()) {
throw new ConnectException("Maximum number of retries attempted, manually restart "
+ "the connector after fixing the error", exception);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
* @author Gunnar Morling
*/
public class PostgresErrorHandler extends ErrorHandler {
protected final PostgresConnectorConfig connectorConfig;


public PostgresErrorHandler(PostgresConnectorConfig connectorConfig, ChangeEventQueue<?> queue, ErrorHandler replacedErrorHandler) {
super(YugabyteDBConnector.class, connectorConfig, queue, replacedErrorHandler);
this.connectorConfig = connectorConfig;
}

@Override
Expand All @@ -40,4 +43,8 @@ protected boolean isCustomRetriable(Throwable throwable) {
// YB Note: Yes, all the errors are custom retriable.
return true;
}

protected boolean isRetryRemaining() {
return (connectorConfig.getMaxRetriesOnError() == -1) || getRetries() <= connectorConfig.getMaxRetriesOnError();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import javax.management.InstanceNotFoundException;

import io.debezium.embedded.EmbeddedEngineConfig;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -1143,6 +1144,29 @@ public void shouldFailIfParallelSnapshotRunWithMultipleTables() throws Exception
});
}


@Test
public void shouldFailAfterConfiguredRetries() throws Exception {
// We will intentionally not let the connector start and see if it retries.
PostgresConnectorTask.TEST_THROW_ERROR_BEFORE_COORDINATOR_STARTUP = true;

// We have to set the errors max retries for the embedded engine as well otherwise it would
// keep retrying indefinitely. Specifying a 0 means it will never retry on any error.
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test")
.with(EmbeddedEngineConfig.ERRORS_MAX_RETRIES, 0)
.with(PostgresConnectorConfig.ERRORS_MAX_RETRIES, 0)
.with(PostgresConnectorConfig.RETRIABLE_RESTART_WAIT, 12000);

start(YugabyteDBConnector.class, configBuilder.build());

TestHelper.waitFor(Duration.ofSeconds(80));

assertConnectorNotRunning();

PostgresConnectorTask.TEST_THROW_ERROR_BEFORE_COORDINATOR_STARTUP = false;
}

@Test
public void shouldFailWithSnapshotModeParallelIfNoTableIncludeListProvided() throws Exception {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
Expand Down

0 comments on commit 2066314

Please sign in to comment.