Skip to content

Commit

Permalink
Merge branch 'ybdb-debezium-2.5.2' into have-perf-test
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhav-yb committed Nov 6, 2024
2 parents 2d557f0 + 4e55ebd commit 4c506c0
Show file tree
Hide file tree
Showing 12 changed files with 359 additions and 8 deletions.
56 changes: 56 additions & 0 deletions .github/workflows/yb-confluent-package.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: Create package for Confluent

on:
workflow_dispatch:
inputs:
version:
description: "Version of the connector to be packaged"
required: true
type: string
isSnapshotBuild:
description: "Snapshot build?"
required: true
type: boolean
default: false

permissions: write-all

jobs:
build:
name: "Create YugabyteDBConnector package for"
runs-on: ubuntu-latest
steps:
- name: Checkout Action
uses: actions/checkout@v4

- name: Set up Java 17
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 17

- name: Set version for release
run: ./mvnw versions:set -DnewVersion=${{ inputs.version }}

- name: Compile jar file
run: ./mvnw clean install -Dquick -pl debezium-connector-postgres -pl debezium-bom -pl support/ide-configs -am
- name: Create GitHub release
id: create_release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: ${{ inputs.version }}
release_name: Release ${{ inputs.version }}
draft: true
prerelease: ${{ inputs.isSnapshotBuild }}
- name: Upload zip package GitHub release
id: upload-zip-package
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./debezium-connector-postgres/target/components/packages/yugabyte-debezium-connector-yugabytedb-${{ inputs.version }}.zip
asset_name: yugabyte-debezium-connector-yugabytedb-${{ inputs.version }}.zip
asset_content_type: application/zip
Binary file added debezium-connector-postgres/logos/yugabytedb.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
64 changes: 64 additions & 0 deletions debezium-connector-postgres/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,70 @@

<build>
<plugins>
<plugin>
<groupId>io.confluent</groupId>
<version>0.12.0</version>
<artifactId>kafka-connect-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>kafka-connect</goal>
</goals>
<configuration>
<title>Kafka Connect YugabyteDB</title>
<documentationUrl>https://docs.yugabyte.com/preview/explore/change-data-capture/using-logical-replication/yugabytedb-connector/</documentationUrl>
<description>
The YugabyteDB Connector is based on the Debezium API, and captures row-level changes in the schemas of a YugabyteDB database using the PostgreSQL replication protocol.

The first time it connects to a YugabyteDB server, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content, and that were committed to a YugabyteDB database.

The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.
</description>
<logo>logos/yugabytedb.png</logo>

<supportProviderName>Yugabyte Inc.</supportProviderName>
<supportSummary>Yugabyte supports the YugabyteDB source connector.</supportSummary>
<supportUrl>http://support.yugabyte.com/</supportUrl>
<supportLogo>logos/yugabytedb.png</supportLogo>

<ownerUsername>yugabyte</ownerUsername>
<ownerType>organization</ownerType>
<ownerName>Yugabyte Inc.</ownerName>
<ownerUrl>https://www.yugabyte.com//</ownerUrl>
<ownerLogo>logos/yugabytedb.png</ownerLogo>

<dockerNamespace>quay.io/yugabyte</dockerNamespace>
<dockerName>ybdb-debezium</dockerName>
<dockerTag>${project.version}</dockerTag>

<sourceUrl>https://github.com/yugabyte/debezium/tree/ybdb-debezium-2.5.2/debezium-connector-postgres</sourceUrl>

<componentTypes>
<componentType>source</componentType>
</componentTypes>

<tags>
<tag>Yugabyte</tag>
<tag>yugabytedb</tag>
<tag>source</tag>
<tag>cdc</tag>
<tag>wal</tag>
<tag>replication</tag>
</tags>

<requirements>
<requirement>YugabyteDB 2024.1.x</requirement>
</requirements>

<deliveryGuarantee>
<deliveryGuarantee>atLeastOnce</deliveryGuarantee>
</deliveryGuarantee>

<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
final PostgresPartition partition = previousOffsets.getTheOnlyPartition();
final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();

previousLogContext.set(taskContext.configureLoggingContext("snapshot", partition));
previousLogContext.set(taskContext.configureLoggingContext(
String.format("snapshot|%s", taskContext.getTaskId()), partition));
SnapshotResult<PostgresOffsetContext> snapshotResult = doSnapshot(snapshotSource, context, partition, previousOffset);

getSignalProcessor(previousOffsets).ifPresent(s -> s.setContext(snapshotResult.getOffset()));
Expand All @@ -94,7 +95,8 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
}
}
LOGGER.info("Transitioning to streaming");
previousLogContext.set(taskContext.configureLoggingContext("streaming", partition));
previousLogContext.set(taskContext.configureLoggingContext(
String.format("streaming|%s", taskContext.getTaskId()), partition));
streamEvents(context, partition, snapshotResult.getOffset());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.regex.Pattern;

import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.snapshot.ParallelSnapshotter;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatConnectionProvider;
Expand Down Expand Up @@ -212,6 +213,11 @@ public enum SnapshotMode implements EnumeratedValue {
*/
INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()),

/**
* Perform a snapshot using parallel tasks.
*/
PARALLEL("parallel", (c) -> new ParallelSnapshotter()),

/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/
Expand Down Expand Up @@ -983,6 +989,27 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(PostgresSourceInfoStructMaker.class.getName());

public static final Field TASK_ID = Field.create("task.id")
.withDisplayName("ID of the connector task")
.withType(Type.INT)
.withDefault(0)
.withImportance(Importance.LOW)
.withDescription("Internal use only");

public static final Field PRIMARY_KEY_HASH_COLUMNS = Field.create("primary.key.hash.columns")
.withDisplayName("Comma separated primary key fields")
.withType(Type.STRING)
.withImportance(Importance.LOW)
.withDescription("A comma separated value having all the hash components of the primary key")
.withValidation((config, field, output) -> {
if (config.getString(SNAPSHOT_MODE).equalsIgnoreCase("parallel") && config.getString(field, "").isEmpty()) {
output.accept(field, "", "primary.key.hash.columns cannot be empty when snapshot.mode is 'parallel'");
return 1;
}

return 0;
});

private final LogicalDecodingMessageFilter logicalDecodingMessageFilter;
private final HStoreHandlingMode hStoreHandlingMode;
private final IntervalHandlingMode intervalHandlingMode;
Expand Down Expand Up @@ -1108,6 +1135,14 @@ public boolean isFlushLsnOnSource() {
return flushLsnOnSource;
}

public int taskId() {
return getConfig().getInteger(TASK_ID);
}

public String primaryKeyHashColumns() {
return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS);
}

@Override
public byte[] getUnavailableValuePlaceholder() {
String placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER);
Expand Down Expand Up @@ -1181,6 +1216,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
SNAPSHOT_MODE,
SNAPSHOT_MODE_CLASS,
YB_CONSISTENT_SNAPSHOT,
PRIMARY_KEY_HASH_COLUMNS,
HSTORE_HANDLING_MODE,
BINARY_HANDLING_MODE,
SCHEMA_NAME_ADJUSTMENT_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
final PostgresValueConverter valueConverter = valueConverterBuilder.build(typeRegistry);

schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverter);
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy);
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.taskId());
final Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets(
new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ public class PostgresPartition extends AbstractPartition implements Partition {
private static final String SERVER_PARTITION_KEY = "server";

private final String serverName;
private final int taskId;

public PostgresPartition(String serverName, String databaseName) {
public PostgresPartition(String serverName, String databaseName, int taskId) {
super(databaseName);
this.serverName = serverName;
this.taskId = taskId;
}

@Override
public Map<String, String> getSourcePartition() {
return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName);
return Collect.hashMapOf(SERVER_PARTITION_KEY, getPartitionIdentificationKey());
}

@Override
Expand All @@ -54,6 +56,10 @@ public String toString() {
return "PostgresPartition [sourcePartition=" + getSourcePartition() + "]";
}

public String getPartitionIdentificationKey() {
return String.format("%s_%d", serverName, taskId);
}

static class Provider implements Partition.Provider<PostgresPartition> {
private final PostgresConnectorConfig connectorConfig;
private final Configuration taskConfig;
Expand All @@ -66,7 +72,8 @@ static class Provider implements Partition.Provider<PostgresPartition> {
@Override
public Set<PostgresPartition> getPartitions() {
return Collections.singleton(new PostgresPartition(
connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name())));
connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()),
connectorConfig.taskId()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch
this.schema = schema;
}

protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy<TableId> topicNamingStrategy, int taskId) {
super(config.getContextName(), config.getLogicalName(), String.valueOf(taskId), config.getCustomMetricTags(), Collections::emptySet);

this.config = config;
if (config.xminFetchInterval().toMillis() > 0) {
this.refreshXmin = ElapsedTimeStrategy.constant(Clock.SYSTEM, config.xminFetchInterval().toMillis());
}
this.topicNamingStrategy = topicNamingStrategy;
assert schema != null;
this.schema = schema;
}

protected TopicNamingStrategy<TableId> topicNamingStrategy() {
return topicNamingStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package io.debezium.connector.postgresql;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -61,10 +62,81 @@ public void start(Map<String, String> props) {

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (props == null) {
return Collections.emptyList();
}

if (props.containsKey(PostgresConnectorConfig.SNAPSHOT_MODE.name())
&& props.get(PostgresConnectorConfig.SNAPSHOT_MODE.name())
.equalsIgnoreCase(PostgresConnectorConfig.SnapshotMode.PARALLEL.getValue())) {
LOGGER.info("Initialising parallel snapshot consumption");

final String tableIncludeList = props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name());
// Perform basic validations.
validateSingleTableProvidedForParallelSnapshot(tableIncludeList);

// Publication auto create mode should not be for all tables.
if (props.containsKey(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name())
&& props.get(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name())
.equalsIgnoreCase(PostgresConnectorConfig.AutoCreateMode.ALL_TABLES.getValue())) {
throw new DebeziumException("Snapshot mode parallel is not supported with publication.autocreate.mode all_tables, " +
"use publication.autocreate.mode=filtered");
}

// Add configuration for select override.
props.put(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name(), tableIncludeList);

return getConfigForParallelSnapshotConsumption(maxTasks);
}

// YB Note: Only applicable when snapshot mode is not parallel.
// this will always have just one task with the given list of properties
return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props));
}

protected void validateSingleTableProvidedForParallelSnapshot(String tableIncludeList) throws DebeziumException {
if (tableIncludeList == null) {
throw new DebeziumException("No table provided, provide a table in the table.include.list");
} else if (tableIncludeList.contains(",")) {
// This might indicate the presence of multiple tables in the include list, we do not want that.
throw new DebeziumException("parallel snapshot consumption is only supported with one table at a time");
}
}

protected List<Map<String, String>> getConfigForParallelSnapshotConsumption(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();

final long upperBoundExclusive = 64 * 1024;
final long rangeSize = upperBoundExclusive / maxTasks;

for (int i = 0; i < maxTasks; ++i) {
Map<String, String> taskProps = new HashMap<>(this.props);

taskProps.put(PostgresConnectorConfig.TASK_ID.name(), String.valueOf(i));

long lowerBound = i * rangeSize;
long upperBound = (i == maxTasks - 1) ? upperBoundExclusive - 1 : (lowerBound + rangeSize - 1);

LOGGER.info("Using query for task {}: {}", i, getQueryForParallelSnapshotSelect(lowerBound, upperBound));

taskProps.put(
PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + "." + taskProps.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()),
getQueryForParallelSnapshotSelect(lowerBound, upperBound)
);

taskConfigs.add(taskProps);
}

return taskConfigs;
}

protected String getQueryForParallelSnapshotSelect(long lowerBound, long upperBound) {
return String.format("SELECT * FROM %s WHERE yb_hash_code(%s) >= %d AND yb_hash_code(%s) <= %d",
props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()),
props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), lowerBound,
props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), upperBound);
}

@Override
public void stop() {
this.props = null;
Expand Down
Loading

0 comments on commit 4c506c0

Please sign in to comment.