From 3ca8afdc4f892417ad5f240f7973041e315a40ed Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha <34186745+vaibhav-yb@users.noreply.github.com> Date: Tue, 17 Sep 2024 12:05:47 +0530 Subject: [PATCH 1/3] [DBZ-PGYB] Add plugin to package connector in a ZIP file (#157) This PR changes the following: 1. Adds a plugin to the pom.xml file to package the connector according to Confluent specifications 2. Adds a GitHub pipeline to automate the above packaging process --- .github/workflows/yb-confluent-package.yml | 56 +++++++++++++++ .../logos/yugabytedb.png | Bin 0 -> 10321 bytes debezium-connector-postgres/pom.xml | 64 ++++++++++++++++++ 3 files changed, 120 insertions(+) create mode 100644 .github/workflows/yb-confluent-package.yml create mode 100644 debezium-connector-postgres/logos/yugabytedb.png diff --git a/.github/workflows/yb-confluent-package.yml b/.github/workflows/yb-confluent-package.yml new file mode 100644 index 00000000000..2b8fc6fe5e0 --- /dev/null +++ b/.github/workflows/yb-confluent-package.yml @@ -0,0 +1,56 @@ +name: Create package for Confluent + +on: + workflow_dispatch: + inputs: + version: + description: "Version of the connector to be packaged" + required: true + type: string + isSnapshotBuild: + description: "Snapshot build?" + required: true + type: boolean + default: false + +permissions: write-all + +jobs: + build: + name: "Create YugabyteDBConnector package for" + runs-on: ubuntu-latest + steps: + - name: Checkout Action + uses: actions/checkout@v4 + + - name: Set up Java 17 + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: 17 + + - name: Set version for release + run: ./mvnw versions:set -DnewVersion=${{ inputs.version }} + + - name: Compile jar file + run: ./mvnw clean install -Dquick -pl debezium-connector-postgres -pl debezium-bom -pl support/ide-configs -am + - name: Create GitHub release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ inputs.version }} + release_name: Release ${{ inputs.version }} + draft: true + prerelease: ${{ inputs.isSnapshotBuild }} + - name: Upload zip package GitHub release + id: upload-zip-package + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: ./debezium-connector-postgres/target/components/packages/yugabyte-debezium-connector-yugabytedb-${{ inputs.version }}.zip + asset_name: yugabyte-debezium-connector-yugabytedb-${{ inputs.version }}.zip + asset_content_type: application/zip diff --git a/debezium-connector-postgres/logos/yugabytedb.png b/debezium-connector-postgres/logos/yugabytedb.png new file mode 100644 index 0000000000000000000000000000000000000000..e7cd9dedd1d60cd08555199c5f6af4445d01512a GIT binary patch literal 10321 zcmeHMc~q0vwm(ULfI9I-DsHuT6hvk**RRQlSHm&kJYI7#4 zsbZJ^;tR3^es>`VpS9*9vx_>wIeri>0$FoFY>e-yh ziPtVu!}QX}BeJp9oTB29;iGaLfel}Ldn&@ec)UZ(%Cye8D$c(7T|T)ctuklwiFM*t zac-Gc-Sg)h`gqZ!&?sC^*!z5fa=4R+<51MHj)C1d?g7m6-IdjkLUz0j^T@TS>>fNa z^&*-+71iGqzL$BG#5q_J-d`1dY@liXq^bsGPS+<{NiIjacMM~W85*8I`h*GL5)!Fk70k8Ai=E4J;TokgAQdUhO?={St%~; zor0S9g0cQyr?ijs<&xyFXM=CoJJxvGb({Y=)^xw-?0mPWe#2V&P}7YS_~fYC2OatJ z_4@o0Db8yB}-2+s9=%ggN@bEDEYkA=DGV86-RM*7KZ2;Fx%^N^NMPe zd3K+R{tyjx8>S>qIdZA3QN<&kb3>C2;y;K|XFU8E5Fw!P4@kF5h9@1lhC$qP8vngC z#G-YU-cpIro8x;ebN8QP(|_C$Gv<~>UrET6QjB7ZUQ>B?i6mi5lpEKqc3J;@LgS;U zn+BCkH@*!gRoD9xL%2(^L$9X6Gg5I#=x(FBGbd&X<=~7guQ0z=rrQKHDdFpWKVs;6 zyW}J70;%!d=VXI8PkV)%_>!jHVu_Ef+fR2w%JGSfpPycEp*|As)@ik3vJ}pzn>|do z2x_htg!^mbtE=ZaCtUTs9T03*efLHUnP=ycwNW%$tIf{rqH46Rr=JvK>77&8mr4YRx6WvcORoHLy1pxoGLlxG&gW$U}lWhy_hYk?9SauWwcF7i1jY{ zq6&1y9_3V0s|lIkY)mE5SJwQa8X1&TX~mY!u!t>NZV+3>It#?;ylgYEyuA?CY`%fR zQ=nkM!ze6dIn>?0VS#EKwP8?ZR|#Y5mh^m@U46LM?pqy2mpq zNZ^4_2CMy|P@iaRHsVJ8umPsQ_!8Fh>@cejc%*_DrA0GeWBxOKWCoI~mY^RXyTNugOG)%BB>lOzjN9LyK#-kXV?Or7Tg*p`unzGJOlBjAfV{;D=esW zn=g)jhfssl`=+^Uy+slq@uLv9L7iKxpP-_~ZeaNd;gD&BcitBnLaWL9D#$x^^IR;7 ziqrCwY>3GFhsZaX@)Ht}l^EQz0wL3iss(yWBtDn@`Vi{9Pqw)UOJ?x;8Z;1hojJ#D zx*O;&FIPeW6P+Ku6Bt6bJZl_!C>!G|?+3;=3i{_DZ6q+u%ddevQ0U8oRcxCVdb0vbIya~5;Y&>|5bIh{M0=*H0eyLNDO{rt_cd55?}r4_TKyT!=UU-F zAg>N~6VlQBW2PlSgZo-L)cH3F_+)Fc4iQm3`mBX>{_5MWBUVM7+t%><1P z?pd1eVkKOQPo8w%OhmtVDUwKP?6Cvd6lCZU>GUAnj;~}9c^|}FOt{xyN+E&nfshs^=`Y&}j_XHHPrmn-;ed2-Rng*#%FCmBWoB z?RD|MV=`!`f~S!97im38DA!8iR>h=gb_M(Jxx^pMGy}OWXC}}t7 zX3j2c!K0nRmN0*+K1OkqB&NUZhJ6;4S&HeaL@V%=^Y^6`#MDzHS2Ag#=NSCf2Uy=V z=*^FjGQ9gDNjq&l-K3+U5isy5{(d3PP4u@4%9f{2^3U8r$fltGuyJanJ@>sD+c_=e zOo8GM4lxba(*yhT?Eu4KOS$<4SnpK9-R8I>0SkE~VY=*<#u^-=6~eHR+M&THp$1o>NkDV(1)= zucgL@iZowm)_tg05%+C|{98qGXN43W3yQD5dZ{l-2T`{wu3jHoW#8(<{oXv9WS`D+ z=R{5p_3U;34!rhNci7ZFzK^g#Ngb*}2|&zwc3^(Nc-0OSe+(QA^^bNjKvN`Zw@?k#Uw7r1kU%R=pmN z)OgU0g6h|0c52!PZ19;MkZ9eq~xaX#Oh+O=xvW)OjS6JJB9jS9EWrpfSU$U(NB9{ofORR||reN#!3_V|$<5L*%Yg zV|k}A_>S0cG?nkK%Jxo%;jr1A5(%J#CARUpp6;o?KhrNTlHg$V#(>`DXqlXU){Dx}>Jojw)uxzqIbz%%hg2NPimHD~ z=iv1hNO<)9G2t4Y_`|kuIglDyvpYq`N*1~MBYAgHb;Y)CC;abakdW_|1j5nTU7`=& ze+;{L%rA01v9cD&qy#PeT&;Ni;DPm4T%DnWrOEcBeuW`T zX(Wcr&V$4)izU3kTHSbYN#N#r$WOCVq@-j)UFGx;M|H~HK`=6s@O<<&6w^ESBPraX z>^#@~+HWM^GYsNw2b+mb;o124P7eGQiIT8@N2=nlXt?d*;g9QtW!J*8qk)?nO^fRBtallv z0w{wM@^Aj?F20jEZmmA2iT8v$qTzj-L2L;4^mAT^c~Et=;S$4SDiN|OFRxbj(HL-# zs`$3c*R*Nlj z@IgWt%KQ|%*=1r%o4L*CJ|SzDfRJ=J3V&emmn7ml8hvf56lM-gc2d8B~(Ca$j>J?`xCCsyxBAgFH<5Y#=h0iC#x z;G5AhwnrR4lf``s02fe_$0z%vj6YvEE>3?F#|b3)J_ATM!?9a9cVIE;%7f#i#s?8z z1J~xvd5bghRHiU}w~EtOJ4y4?N)EP;cn|!d5uoQuq8jr^)VL0Fg|1jv2Bsa^;Gh`e z6#AFun`qNswoA_%n5b-8~1)1B;HYYt2Go#W4ZL@rAtxICXH6UEsX@~OwC z6~f&WhQ7O$>y}jE76Q3akVA0{dt&8*;uMEg_Mze}qmM#dBbBnC-Z^WaHW7)_+#n#_ zGS};IIQOX_!(E(7pLlX5zI2tfgWKHFsWb2+R>NDo>ePQ!@uDtsZRh@(~)B6m7djR0je#0u<;ob0-Ty-t;HM+h4 zhC0hZPrKmg5X*C+C8GutW8q1XD8pOx#f+2{9w}>xf-op>4bOoTH;mBulsV%K(24TE zdANWA<(0LqM^XRtc71yQ4q`P&yAzG`uqxcRDAc{8C3CLh%VnC+Ihf9w$T`DwmeJ6F|mT0x$T)gcP+M_&7H=2gD8)Rnx)F9dLy7ua!4+B z7v6tE<&4_mxP#aeU?SLLEcQyCbu`T3I+e&&0ugPdL2WK>Yg!@9F;`+Dwc+d&F2YhH zR-PmgRP>0OL5gae8ZfMqU;Xpl4>)eR8YJywV7!{xLXjZ#6qL*ik8TH_SyP2qil?+X z(I#xf0D=jF21sFdLu(gFaNJRd5M1nD%q%%& zjx<1kXdK*_3#Bz;T7$U@JOSTtORI2Q*wy&B8Vgx~V!10J+zITlDds{;lnfsOM8Pme zTMTo(0U*5yd;-;cJEDQk!vS-BRAKTGNGJ!RW10XXIR#Fdc>rsjhXcgQ7DS_al8_8? z2KNDm5c4aPxE5rnz|n9OumJfS$gsyr;sFAq>TDf3P z%uHx}%65;-`|M`?IpUc<%G)L&FmfOqahJhj;ftzphQBa~TzA+ox&a&ZetW#Xv1Jx) zjT-<2Sja_?c!}JrfIh(lbZQbh_J`Cl7%{Jc?5b;k8_TMQwYUl=fNY9756YkZIIoaO zz{oD%_y{_Q*h@&)^#l3W^bXdMIsa!Vg&x($v`cBH6#9koQFa@a)K zw`3A(+ljUAg#!R^Z*F_76h3WMV*=DOWeCYI0WmPfWr1pwQm0#h?O$5(D1o>ez?Z6D zbekiQA;|OcQo!b@An!p0rdDzcgV^>l815A)2DC&=d_j-HLMCF8F`Bm#dw>SKn8}JM z!y$F3n{v&|+R=1Osc$Ni&qSyUqPg#F|6`o7TL@8C!ln<;hk633LrTiM7GA7}P&DAm z5)TC}R8bE&!mR<)T7HxrJcfJ5&iyQ|atg@X5 z)2%l4ZqBNXjvTmz4uC|v;c3XOz+Pj{f$Jrv`VUETV1vi2HZ}woCj8@GmbM3|{1*V3lJG$YPz--vJ=Le zbr{pME$Sa|Mp4u{O~jmi+|wwx0GKe{)X@2ZK1lWbeKvd zF4)%t91Z();2Qb>i30y&NBZ*D9e5;SW9E5fRj~tVgL^vn{TWp!Yp&$NSQeT9fPI@@Jwfh-J>b4TZ{N3HClY-Q6W9 z%gr8V;kXX4RHk(B{z6L3_xbsWJxHVmBTMIWQDb`ybPx^lYuiDC@iauJ4=Y8Y?bu{y zW7FO7r=zv2-%ZuM0--=!C1UQqUXk z&lIM&YO&^_dMHJiEHmp(7W(30!gR5lbWW8=|Fami>9>sZGN3?f;Owg?ZcHl*briVh zwyf-^lcjxxaNL#!;AP-q#)YkM_lIqGeXbG1hSjB}nj|g@VyL>8{ON`}t>T8Ia_( zDepBjEu;bf$SHe-hZF@*dkj`R8-Mm3k8EH+v9b@BH)mnj4aiLiY2hW~71HO9Lmfo4 z1y~a7G={?6i?g%dz_(J}DR(I+J9SB>JT~2bH>bC?HC*B4rkyhtIkz+Dxjr%i2JHJz z#FwA{G+2BM0e`~~@So*@uSfg!X#XxXfM^O|bkM+|W>6V!M+j}-Y_} + + io.confluent + 0.12.0 + kafka-connect-maven-plugin + + + + kafka-connect + + + Kafka Connect YugabyteDB + https://docs.yugabyte.com/preview/explore/change-data-capture/using-logical-replication/yugabytedb-connector/ + + The YugabyteDB Connector is based on the Debezium API, and captures row-level changes in the schemas of a YugabyteDB database using the PostgreSQL replication protocol. + + The first time it connects to a YugabyteDB server, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content, and that were committed to a YugabyteDB database. + + The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic. + + logos/yugabytedb.png + + Yugabyte Inc. + Yugabyte supports the YugabyteDB source connector. + http://support.yugabyte.com/ + logos/yugabytedb.png + + yugabyte + organization + Yugabyte Inc. + https://www.yugabyte.com// + logos/yugabytedb.png + + quay.io/yugabyte + ybdb-debezium + ${project.version} + + https://github.com/yugabyte/debezium/tree/ybdb-debezium-2.5.2/debezium-connector-postgres + + + source + + + + Yugabyte + yugabytedb + source + cdc + wal + replication + + + + YugabyteDB 2024.1.x + + + + atLeastOnce + + + true + + + + org.apache.maven.plugins maven-assembly-plugin From 2cda9b77544ab63f71335df37d6116d945815d45 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha <34186745+vaibhav-yb@users.noreply.github.com> Date: Thu, 17 Oct 2024 21:56:22 +0530 Subject: [PATCH 2/3] [DBZ-PGYB][yugabyte/yugabyte-db#24200] Execute snapshot in chunks (#161) ## Problem For very large tables, the default `SELECT *` query can take a really long time to complete leading to longer time for snapshots. ## Solution This PR aims to implement snapshotting the table in parallel using an inbuilt method `yb_hash_code` to only run the query for a given hash range. The following 2 configuration properties are introduced with this PR: 1. A new `snapshot.mode` called `parallel` - this will behave exactly like `initial_only` but we will have the ability to launch multiple tasks. 2. `primary.key.hash.columns` - this config takes in a comma separated values of the primary key hash component of the table. > **Note:** When `snapshot.mode` is set to `parallel`, we will not support providing regex in the property `table.include.list` and the user will need to specify the full name of the table in the property. Additionally, we will only allow one table in the `table.include.list` if `snapshot.mode` is `parallel`. --- .../PostgresChangeEventSourceCoordinator.java | 6 +- .../postgresql/PostgresConnectorConfig.java | 36 ++++++++++ .../postgresql/PostgresConnectorTask.java | 2 +- .../postgresql/PostgresTaskContext.java | 12 ++++ .../postgresql/YugabyteDBConnector.java | 72 +++++++++++++++++++ .../snapshot/ParallelSnapshotter.java | 46 ++++++++++++ .../postgresql/PostgresConnectorIT.java | 56 +++++++++++++++ 7 files changed, 227 insertions(+), 3 deletions(-) create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java index 7d25a68fa42..708a6b234fb 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java @@ -73,7 +73,8 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps final PostgresPartition partition = previousOffsets.getTheOnlyPartition(); final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset(); - previousLogContext.set(taskContext.configureLoggingContext("snapshot", partition)); + previousLogContext.set(taskContext.configureLoggingContext( + String.format("snapshot|%s", taskContext.getTaskId()), partition)); SnapshotResult snapshotResult = doSnapshot(snapshotSource, context, partition, previousOffset); getSignalProcessor(previousOffsets).ifPresent(s -> s.setContext(snapshotResult.getOffset())); @@ -94,7 +95,8 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps } } LOGGER.info("Transitioning to streaming"); - previousLogContext.set(taskContext.configureLoggingContext("streaming", partition)); + previousLogContext.set(taskContext.configureLoggingContext( + String.format("streaming|%s", taskContext.getTaskId()), partition)); streamEvents(context, partition, snapshotResult.getOffset()); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index f342426dce3..7d70a05260a 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -14,6 +14,7 @@ import java.util.regex.Pattern; import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.snapshot.ParallelSnapshotter; import io.debezium.data.Envelope; import io.debezium.heartbeat.Heartbeat; import io.debezium.heartbeat.HeartbeatConnectionProvider; @@ -212,6 +213,11 @@ public enum SnapshotMode implements EnumeratedValue { */ INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()), + /** + * Perform a snapshot using parallel tasks. + */ + PARALLEL("parallel", (c) -> new ParallelSnapshotter()), + /** * Inject a custom snapshotter, which allows for more control over snapshots. */ @@ -983,6 +989,27 @@ public static AutoCreateMode parse(String value, String defaultValue) { public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER .withDefault(PostgresSourceInfoStructMaker.class.getName()); + public static final Field TASK_ID = Field.create("task.id") + .withDisplayName("ID of the connector task") + .withType(Type.INT) + .withDefault(0) + .withImportance(Importance.LOW) + .withDescription("Internal use only"); + + public static final Field PRIMARY_KEY_HASH_COLUMNS = Field.create("primary.key.hash.columns") + .withDisplayName("Comma separated primary key fields") + .withType(Type.STRING) + .withImportance(Importance.LOW) + .withDescription("A comma separated value having all the hash components of the primary key") + .withValidation((config, field, output) -> { + if (config.getString(SNAPSHOT_MODE).equalsIgnoreCase("parallel") && config.getString(field, "").isEmpty()) { + output.accept(field, "", "primary.key.hash.columns cannot be empty when snapshot.mode is 'parallel'"); + return 1; + } + + return 0; + }); + private final LogicalDecodingMessageFilter logicalDecodingMessageFilter; private final HStoreHandlingMode hStoreHandlingMode; private final IntervalHandlingMode intervalHandlingMode; @@ -1108,6 +1135,14 @@ public boolean isFlushLsnOnSource() { return flushLsnOnSource; } + public int taskId() { + return getConfig().getInteger(TASK_ID); + } + + public String primaryKeyHashColumns() { + return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS); + } + @Override public byte[] getUnavailableValuePlaceholder() { String placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER); @@ -1181,6 +1216,7 @@ protected SourceInfoStructMaker getSourceInfoStruc SNAPSHOT_MODE, SNAPSHOT_MODE_CLASS, YB_CONSISTENT_SNAPSHOT, + PRIMARY_KEY_HASH_COLUMNS, HSTORE_HANDLING_MODE, BINARY_HANDLING_MODE, SCHEMA_NAME_ADJUSTMENT_MODE, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 38d961b8b0d..99eaca22b00 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -109,7 +109,7 @@ public ChangeEventSourceCoordinator st final PostgresValueConverter valueConverter = valueConverterBuilder.build(typeRegistry); schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverter); - this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy); + this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.taskId()); final Offsets previousOffsets = getPreviousOffsets( new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig)); final Clock clock = Clock.system(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java index f9b1f711202..721c408681d 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -52,6 +52,18 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch this.schema = schema; } + protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy topicNamingStrategy, int taskId) { + super(config.getContextName(), config.getLogicalName(), String.valueOf(taskId), config.getCustomMetricTags(), Collections::emptySet); + + this.config = config; + if (config.xminFetchInterval().toMillis() > 0) { + this.refreshXmin = ElapsedTimeStrategy.constant(Clock.SYSTEM, config.xminFetchInterval().toMillis()); + } + this.topicNamingStrategy = topicNamingStrategy; + assert schema != null; + this.schema = schema; + } + protected TopicNamingStrategy topicNamingStrategy() { return topicNamingStrategy; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java index 335d73bfb53..2491e98d8c0 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java @@ -7,6 +7,7 @@ package io.debezium.connector.postgresql; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -61,10 +62,81 @@ public void start(Map props) { @Override public List> taskConfigs(int maxTasks) { + if (props == null) { + return Collections.emptyList(); + } + + if (props.containsKey(PostgresConnectorConfig.SNAPSHOT_MODE.name()) + && props.get(PostgresConnectorConfig.SNAPSHOT_MODE.name()) + .equalsIgnoreCase(PostgresConnectorConfig.SnapshotMode.PARALLEL.getValue())) { + LOGGER.info("Initialising parallel snapshot consumption"); + + final String tableIncludeList = props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()); + // Perform basic validations. + validateSingleTableProvidedForParallelSnapshot(tableIncludeList); + + // Publication auto create mode should not be for all tables. + if (props.containsKey(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name()) + && props.get(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name()) + .equalsIgnoreCase(PostgresConnectorConfig.AutoCreateMode.ALL_TABLES.getValue())) { + throw new DebeziumException("Snapshot mode parallel is not supported with publication.autocreate.mode all_tables, " + + "use publication.autocreate.mode=filtered"); + } + + // Add configuration for select override. + props.put(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name(), tableIncludeList); + + return getConfigForParallelSnapshotConsumption(maxTasks); + } + + // YB Note: Only applicable when snapshot mode is not parallel. // this will always have just one task with the given list of properties return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props)); } + protected void validateSingleTableProvidedForParallelSnapshot(String tableIncludeList) throws DebeziumException { + if (tableIncludeList == null) { + throw new DebeziumException("No table provided, provide a table in the table.include.list"); + } else if (tableIncludeList.contains(",")) { + // This might indicate the presence of multiple tables in the include list, we do not want that. + throw new DebeziumException("parallel snapshot consumption is only supported with one table at a time"); + } + } + + protected List> getConfigForParallelSnapshotConsumption(int maxTasks) { + List> taskConfigs = new ArrayList<>(); + + final long upperBoundExclusive = 64 * 1024; + final long rangeSize = upperBoundExclusive / maxTasks; + + for (int i = 0; i < maxTasks; ++i) { + Map taskProps = new HashMap<>(this.props); + + taskProps.put(PostgresConnectorConfig.TASK_ID.name(), String.valueOf(i)); + + long lowerBound = i * rangeSize; + long upperBound = (i == maxTasks - 1) ? upperBoundExclusive - 1 : (lowerBound + rangeSize - 1); + + LOGGER.info("Using query for task {}: {}", i, getQueryForParallelSnapshotSelect(lowerBound, upperBound)); + + taskProps.put( + PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + "." + taskProps.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()), + getQueryForParallelSnapshotSelect(lowerBound, upperBound) + ); + + taskConfigs.add(taskProps); + } + + return taskConfigs; + } + + protected String getQueryForParallelSnapshotSelect(long lowerBound, long upperBound) { + return String.format("SELECT * FROM %s WHERE yb_hash_code(%s) >= %d AND yb_hash_code(%s) <= %d", + props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()), + props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), lowerBound, + props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), upperBound); + } + @Override public void stop() { this.props = null; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java new file mode 100644 index 00000000000..e4fdabf23a4 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/ParallelSnapshotter.java @@ -0,0 +1,46 @@ +package io.debezium.connector.postgresql.snapshot; + +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.spi.OffsetState; +import io.debezium.connector.postgresql.spi.SlotState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Snapshotter class to take snapshot using parallel tasks. + * + * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) + */ +public class ParallelSnapshotter extends QueryingSnapshotter { + private final static Logger LOGGER = LoggerFactory.getLogger(ParallelSnapshotter.class); + private OffsetState sourceInfo; + + @Override + public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { + super.init(config, sourceInfo, slotState); + this.sourceInfo = sourceInfo; + + LOGGER.info("Initialised ParallelSnapshotter for task {}", config.taskId()); + } + + @Override + public boolean shouldStream() { + return false; + } + + @Override + public boolean shouldSnapshot() { + if (sourceInfo == null) { + LOGGER.info("Taking parallel snapshot for new datasource"); + return true; + } + else if (sourceInfo.snapshotInEffect()) { + LOGGER.info("Found previous incomplete snapshot"); + return true; + } + else { + LOGGER.info("Previous snapshot completed, no snapshot will be performed"); + return false; + } + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 0a743c99fd0..1cf09d41eb7 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1116,6 +1116,62 @@ public void shouldHaveBeforeImageOfUpdatedRow() throws InterruptedException { assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("aa").getInt32("value")).isEqualTo(404); } + @Test + public void shouldFailIfNoPrimaryKeyHashColumnSpecifiedWithSnapshotModeParallel() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test") + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, ""); + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + assertThat(message.contains("primary.key.hash.columns cannot be empty when snapshot.mode is 'parallel'")).isTrue(); + }); + } + + @Test + public void shouldFailIfParallelSnapshotRunWithMultipleTables() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test,public.test2") + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id"); + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + + assertThat(error.getMessage().contains("parallel snapshot consumption is only supported with one table at a time")).isTrue(); + }); + } + + @Test + public void shouldFailWithSnapshotModeParallelIfNoTableIncludeListProvided() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "") + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id"); + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + + assertThat(error.getMessage().contains("No table provided, provide a table in the table.include.list")).isTrue(); + }); + } + + @Test + public void shouldFailIfSnapshotModeParallelHasPublicationAutoCreateModeAllTables() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue()) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test") + .with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.ALL_TABLES) + .with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id");; + + start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> { + assertFalse(success); + + assertThat(error.getMessage().contains("Snapshot mode parallel is not supported with publication.autocreate.mode all_tables")).isTrue(); + }); + } + @Test public void shouldResumeSnapshotIfFailingMidstream() throws Exception { // insert another set of rows so we can stop at certain point From 4e55ebdf132352456d376b27846ea14ce20d6eec Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha <34186745+vaibhav-yb@users.noreply.github.com> Date: Thu, 24 Oct 2024 14:57:03 +0530 Subject: [PATCH 3/3] [DBZ-PGYB][yugabyte/yugabyte-db#24555] Add task ID to PostgresPartition (#163) ## Problem With the introduction of the parallel snapshot model, we can have multiple tasks when the snapshot mode is set to `parallel`. This introduces a problem at the underlying layer when the connector stores the sourceInfo for its partitions i.e. `PostgresPartition` objects in Kafka. The `PostgresPartition` is identified by a map which has a structure `{"server", topicPrefix}` - currently this is the same for all the `PostgresPartition` objects which are created by the tasks when `snapshot.mode` is `parallel` and hence they all end up referring to the same source partition in the Kafka topic. Subsequently, what happens is that (assume that we have 2 tasks i.e. 0 and 1): 1. One task (task_0) completes the snapshot while the other is yet to start. a. After completion, `task_0` updates the `sourceInfo` saying that its snapshot is completed. 2. When task_1 starts up, it reads the same `sourceInfo` object and concludes that the snapshot is completed so it skips its snapshot. The above situation will cause a data loss since task_1 will never actually take a snapshot. ## Solution This PR implements a short term solution where we simply add the task ID to the partition so that each `PostgresPartition` can identity a sourcePartition uniquely, the identifying map will now become `{"server", topicPrefix_taskId}`. **Note:** This solution is a quick fix for the problem given that the number of tasks in the connector remain the same. This partially fixes yugabyte/yugabyte-db#24555 --- .../connector/postgresql/PostgresPartition.java | 13 ++++++++++--- .../connector/postgresql/PostgresPartitionTest.java | 4 ++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java index 4e28d658bd8..d1ee3609de1 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java @@ -21,15 +21,17 @@ public class PostgresPartition extends AbstractPartition implements Partition { private static final String SERVER_PARTITION_KEY = "server"; private final String serverName; + private final int taskId; - public PostgresPartition(String serverName, String databaseName) { + public PostgresPartition(String serverName, String databaseName, int taskId) { super(databaseName); this.serverName = serverName; + this.taskId = taskId; } @Override public Map getSourcePartition() { - return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName); + return Collect.hashMapOf(SERVER_PARTITION_KEY, getPartitionIdentificationKey()); } @Override @@ -54,6 +56,10 @@ public String toString() { return "PostgresPartition [sourcePartition=" + getSourcePartition() + "]"; } + public String getPartitionIdentificationKey() { + return String.format("%s_%d", serverName, taskId); + } + static class Provider implements Partition.Provider { private final PostgresConnectorConfig connectorConfig; private final Configuration taskConfig; @@ -66,7 +72,8 @@ static class Provider implements Partition.Provider { @Override public Set getPartitions() { return Collections.singleton(new PostgresPartition( - connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()))); + connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()), + connectorConfig.taskId())); } } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java index 1a12573ab91..201f32792ef 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java @@ -11,11 +11,11 @@ public class PostgresPartitionTest extends AbstractPartitionTest