diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index c9065736b93..702b485c5b5 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -67,9 +67,11 @@ public class PostgresConnectorTask extends BaseSourceTask start(Configuration config) { final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config); @@ -257,6 +259,10 @@ public ChangeEventSourceCoordinator st signalProcessor, notificationService); + if (TEST_THROW_ERROR_BEFORE_COORDINATOR_STARTUP) { + throw new RuntimeException("[Test Only] Throwing error before starting coordinator"); + } + coordinator.start(taskContext, this.queue, metadataProvider); return coordinator; @@ -268,7 +274,7 @@ public ChangeEventSourceCoordinator st LOGGER.warn("Received exception, will be setting producer throwable", exception); errorHandler.setProducerThrowable(new RetriableException(exception)); - if (errorHandler.getRetries() == connectorConfig.getMaxRetriesOnError()) { + if (!errorHandler.isRetryRemaining()) { throw new ConnectException("Maximum number of retries attempted, manually restart " + "the connector after fixing the error", exception); } else { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresErrorHandler.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresErrorHandler.java index cedccbf43c7..ea246f4c0f8 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresErrorHandler.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresErrorHandler.java @@ -19,9 +19,12 @@ * @author Gunnar Morling */ public class PostgresErrorHandler extends ErrorHandler { + protected final PostgresConnectorConfig connectorConfig; + public PostgresErrorHandler(PostgresConnectorConfig connectorConfig, ChangeEventQueue queue, ErrorHandler replacedErrorHandler) { super(YugabyteDBConnector.class, connectorConfig, queue, replacedErrorHandler); + this.connectorConfig = connectorConfig; } @Override @@ -40,4 +43,8 @@ protected boolean isCustomRetriable(Throwable throwable) { // YB Note: Yes, all the errors are custom retriable. return true; } + + protected boolean isRetryRemaining() { + return (connectorConfig.getMaxRetriesOnError() == -1) || getRetries() <= connectorConfig.getMaxRetriesOnError(); + } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 1cf09d41eb7..b3c93c99a34 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -46,6 +46,7 @@ import javax.management.InstanceNotFoundException; +import io.debezium.embedded.EmbeddedEngineConfig; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; @@ -1143,6 +1144,29 @@ public void shouldFailIfParallelSnapshotRunWithMultipleTables() throws Exception }); } + + @Test + public void shouldFailAfterConfiguredRetries() throws Exception { + // We will intentionally not let the connector start and see if it retries. + PostgresConnectorTask.TEST_THROW_ERROR_BEFORE_COORDINATOR_STARTUP = true; + + // We have to set the errors max retries for the embedded engine as well otherwise it would + // keep retrying indefinitely. Specifying a 0 means it will never retry on any error. + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test") + .with(EmbeddedEngineConfig.ERRORS_MAX_RETRIES, 0) + .with(PostgresConnectorConfig.ERRORS_MAX_RETRIES, 0) + .with(PostgresConnectorConfig.RETRIABLE_RESTART_WAIT, 12000); + + start(YugabyteDBConnector.class, configBuilder.build()); + + TestHelper.waitFor(Duration.ofSeconds(80)); + + assertConnectorNotRunning(); + + PostgresConnectorTask.TEST_THROW_ERROR_BEFORE_COORDINATOR_STARTUP = false; + } + @Test public void shouldFailWithSnapshotModeParallelIfNoTableIncludeListProvided() throws Exception { Configuration.Builder configBuilder = TestHelper.defaultConfig()