Skip to content

Commit

Permalink
DBZ-7358 Use relational table primary key by default
Browse files Browse the repository at this point in the history
This fix uses the relational table primary key by default; however, as a
user can define `message.key.columns` to even override the primary key
configuration or to handle keyless tables, the user can override the
`reselect.use.event.key` option with `true` so use the event key fields
instead for the re-select so that keyless tables can also participate
with the column reselection process.
  • Loading branch information
Naros authored and jpechane committed Jan 18, 2024
1 parent e8ca0e3 commit c1b7e68
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ public class ReselectColumnsPostProcessor implements PostProcessor, BeanRegistry
private static final String RESELECT_COLUMNS_EXCLUDE_LIST = "reselect.columns.exclude.list";
private static final String RESELECT_UNAVAILABLE_VALUES = "reselect.unavailable.values";
private static final String RESELECT_NULL_VALUES = "reselect.null.values";
private static final String RESELECT_USE_EVENT_KEY = "reselect.use.event.key";

private Predicate<String> selector;
private boolean reselectUnavailableValues;
private boolean reselectNullValues;
private boolean reselectUseEventKeyFields;
private JdbcConnection jdbcConnection;
private ValueConverterProvider valueConverterProvider;
private String unavailableValuePlaceholder;
Expand All @@ -68,6 +70,7 @@ public void configure(Map<String, ?> properties) {
final Configuration config = Configuration.from(properties);
this.reselectUnavailableValues = config.getBoolean(RESELECT_UNAVAILABLE_VALUES, true);
this.reselectNullValues = config.getBoolean(RESELECT_NULL_VALUES, true);
this.reselectUseEventKeyFields = config.getBoolean(RESELECT_USE_EVENT_KEY, false);
this.selector = new ReselectColumnsPredicateBuilder()
.includeColumns(config.getString(RESELECT_COLUMNS_INCLUDE_LIST))
.excludeColumns(config.getString(RESELECT_COLUMNS_EXCLUDE_LIST))
Expand Down Expand Up @@ -127,9 +130,17 @@ public void apply(Object messageKey, Struct value) {

final List<String> keyColumns = new ArrayList<>();
final List<Object> keyValues = new ArrayList<>();
for (org.apache.kafka.connect.data.Field field : key.schema().fields()) {
keyColumns.add(field.name());
keyValues.add(key.get(field));
if (reselectUseEventKeyFields) {
for (org.apache.kafka.connect.data.Field field : key.schema().fields()) {
keyColumns.add(field.name());
keyValues.add(key.get(field));
}
}
else {
for (Column column : table.primaryKeyColumns()) {
keyColumns.add(column.name());
keyValues.add(key.get(key.schema().field(column.name())));
}
}

Map<String, Object> selections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ You can configure the post processor to re-select the following column types:

Configuring a `PostProcessor` is similar to configuring a `CustomConverter` or `Transformation`.

== Keyless tables

The `ReselectColumnsPostProcessor` requires that the table have some unique combination of columns that can be used to generate a re-select query that returns a single row.
By default, the `PostProcessor` will use the relational table model to construct a where-clause based on the table's primary key columns or the unique index that is defined on the table.
However, if a table has no primary key or unique index, effectively keyless, then you can use the `message.key.columns` configuration to define a combination of columns that uniquely identifies a single row.
When using `message.key.columns` for keyless tables, it is important to set the `reselect.use.event.key` configuration property to `true` that the event's key fields are used as the basis for the selection criteria since the relational table model would have no primary key columns.

[NOTE]
====
The `ReselectColumnsPostProcessor` tolerates a re-select query that returns more than one row.
In such circumstances, only the first row will be used and that entry is entirely random and database driven.
It's recommended that if you use `reselect.use.event.key` set to `true`, your connector configuration and data model guarantees that the columns that participate in the event's key uniquely identify a single database row so that the re-select is always deterministic.
====

== Configuration example

Configure a `PostProcessor` much in the same way that you would configure a `CustomConverter` or `Transformation`.
Expand All @@ -37,12 +51,14 @@ To enable the connector to use the `ReselectColumnsPostProcessor`, add the follo
"reselector.reselect.columns.include.list": "<schema_name>.<table_name>:<colA>,<schema_name>.<table_name>:<colB>", // <3>
"reselector.reselect.unavailable.values": "true", // <4>
"reselector.reselect.null.values": "true" // <5>
"reselector.reselect.use.event.key": "false" // <6>
----
<1> Comma-separated list of post-processor prefixes.
<2> The fully-qualified class type name for the post-processor.
<3> Comma-separated list of column names specified by using the following format: `<schema>.<table>:<column>`.
<4> Enables or disables the re-selection of columns that contain the `unavailable.value.placeholder` sentinel value.
<5> Enables or disables the re-selection of columns that are `null`.
<6> Enables or disables the re-selection based event key field names.

== Configuration options

Expand Down Expand Up @@ -77,4 +93,11 @@ Do not set this property if you set the `reselect.columns.include.list` property
|`true`
|Specifies whether the post processor reselects a column that matches the `reselect.columns.include.list` filter if the column value is `null`.

|[[reselect-columns-post-processor-property-reselect-use-event-key]]<<reselect-columns-post-processor-property-reselect-use-event-key, `+reselect.use.event.key+`>>
|`false`
|Specifies whether the post processor reselects based on the event's key field names or uses the relational table's primary key column names. +
+
By default, the reselect is based on the relational table's primary key columns or unique key index.
Setting this to `true` can be useful if the table has no primary key and the connector is configured to use `message.key.columns` to create events with a key.
This will then use the key field names as the primary key in the SQL reselection query.
|===

0 comments on commit c1b7e68

Please sign in to comment.