Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PscSource, PscSink, and Table API's to close the gap on Flink 1.15.1 API's #45

Merged
merged 97 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
1ebd6a5
WIP full 1.15 upgrade
jeffxiang Aug 8, 2024
1bdb150
WIP sink API's done
jeffxiang Aug 9, 2024
82342af
WIP source API's
jeffxiang Aug 13, 2024
8a728e4
Abstracting out TransactionManager reflections and direct field acces…
jeffxiang Aug 13, 2024
5986f99
WIP refactored FlinkPscInternalProducer (sink) to use abstracted logic
jeffxiang Aug 13, 2024
08cb02c
Revert irrelevant changes
jeffxiang Aug 13, 2024
a3afbf8
Add javadocs
jeffxiang Aug 13, 2024
b9f8789
Improve javadocs
jeffxiang Aug 13, 2024
1f48ae6
Small spacing update
jeffxiang Aug 13, 2024
c826136
Quick javadoc update
jeffxiang Aug 13, 2024
76b7900
Add graceful handling of null transactionManager
jeffxiang Aug 14, 2024
600db21
Update package name
jeffxiang Aug 15, 2024
d899bef
Bump version to 3.2.1-SNAPSHOT
jeffxiang Aug 19, 2024
c88ab13
Make flink API's use TransactionManagerUtils
jeffxiang Aug 19, 2024
ae4b690
Disallow >1 backend producer upon send() when PscProducer is transact…
jeffxiang Aug 26, 2024
baba790
Merge branch 'txn_manager_utils' into full_1_15_upgrade
jeffxiang Aug 26, 2024
5927f7a
Merge branch '3.2' into full_1_15_upgrade
jeffxiang Aug 27, 2024
77ced73
Merge branch '3.2' into full_1_15_upgrade
jeffxiang Sep 11, 2024
55cab2d
Implement creation of backend producer immediately upon init of Flink…
jeffxiang Sep 11, 2024
af04165
WIP source changes
jeffxiang Sep 12, 2024
768ea46
WIP flink metrics
jeffxiang Sep 12, 2024
6ec696f
WIP PscTopicUriPartitionSplitReader
jeffxiang Sep 12, 2024
8eac82f
Finish code changes to source and sink API's
jeffxiang Sep 13, 2024
3ad84d8
WIP finished sink test refactoring
jeffxiang Sep 16, 2024
317464c
WIP finished table API source code refactor
jeffxiang Sep 18, 2024
9ac82ad
WIP source test changes
jeffxiang Sep 18, 2024
27529b9
WIP source/sink API's mainly done first round refactor
jeffxiang Sep 18, 2024
7def619
Finish table test imports
jeffxiang Sep 19, 2024
b6f5b43
WIP compiles
jeffxiang Sep 19, 2024
cf31969
WIP FlinkPscInternalProducerITCase
jeffxiang Sep 19, 2024
9e8c618
WIP PscCommitterTest
jeffxiang Sep 23, 2024
85c68ba
WIP finished PscCommitterTest
jeffxiang Sep 23, 2024
864ea72
WIP fixed Test Sink with lower parallelism in PscSinkITCase integrati…
jeffxiang Sep 24, 2024
424b901
WIP finished PscSinkITCase integration tests
jeffxiang Sep 24, 2024
602c345
WIP finish PscSinkITCase
jeffxiang Sep 26, 2024
a2cfd2e
WIP finish PscTransactionLogITCase
jeffxiang Sep 26, 2024
bd41598
WIP finish PscWriterITCase
jeffxiang Sep 26, 2024
addce63
WIP all sink tests pass
jeffxiang Sep 26, 2024
c978fdd
WIP finished OffsetInitializerTest
jeffxiang Oct 3, 2024
0d8b0be
WIP finished PscSubscriberTest
jeffxiang Oct 3, 2024
f3d8e9b
WIP finish PscEnumeratorTest
jeffxiang Oct 7, 2024
e0fada1
WIP finish PscSourceReaderMetricsTest
jeffxiang Oct 7, 2024
37661bc
WIP finish PscRecordDeserializationSchemaTest
jeffxiang Oct 7, 2024
c42d001
WIP in the middle of fixing PscSourceReaderTest
jeffxiang Oct 8, 2024
18cba80
WIP finish PscSourceReaderTest
jeffxiang Oct 9, 2024
7fa8184
WIP finished PscTopicUriPartitionSplitReaderTest
jeffxiang Oct 9, 2024
4a3a57b
WIP finish PscSourceBuilderTest
jeffxiang Oct 9, 2024
19c20c4
WIP fixing PscSourceeITCase integration tests
jeffxiang Oct 10, 2024
cd5a930
WIP finish PscSourceITCase, need to look into BaseTopicUri.equals() a…
jeffxiang Oct 10, 2024
e873e57
WIP finish Source and Sink tests
jeffxiang Oct 11, 2024
d781fec
Fix most producer tests in streaming.connectors.psc by calling super.…
jeffxiang Oct 15, 2024
88f402f
WIP fixing FlinkPscProducerMigrationOperatorTest
jeffxiang Oct 16, 2024
fb57a25
Revert "WIP fixing FlinkPscProducerMigrationOperatorTest"
jeffxiang Oct 16, 2024
e713312
WIP finish PscChangelogTableITCase; FlinkPscProducerMigrationOperator…
jeffxiang Oct 16, 2024
2cba3b1
WIP finish PscDynamicTableFactoryTest
jeffxiang Oct 16, 2024
e4bbc9b
WIP finish PscTableITCase
jeffxiang Oct 17, 2024
b7f30bc
Finish table tests
jeffxiang Oct 22, 2024
e2190ef
Minor fixes
jeffxiang Oct 23, 2024
f85ae75
Catch IOException
jeffxiang Oct 23, 2024
3621561
Skip config logging in tests
jeffxiang Oct 23, 2024
548b3f6
Exclude kafka-schema-registry-client from flink-avro-confluent-regist…
jeffxiang Oct 23, 2024
ec408c8
Revert "Exclude kafka-schema-registry-client from flink-avro-confluen…
jeffxiang Oct 23, 2024
2295af3
Include ITCase tests in mvn surefire plugin
jeffxiang Oct 23, 2024
642adfd
Convert hashmap to concurrenthashmap in TransactionManagerUtils
jeffxiang Oct 23, 2024
ccbf61d
Remove ITCase from surefire
jeffxiang Oct 24, 2024
1849158
Disable Kafka log segment rotation for flink tests
jeffxiang Oct 24, 2024
29adc49
Add retention.ms=Long.MAX_VALUE to prevent topic cleanup during test
jeffxiang Oct 25, 2024
6627948
Refactor to remove Kafka references
jeffxiang Oct 28, 2024
c444f4b
Add step to run ITCase tests in build
jeffxiang Oct 28, 2024
9cbe09d
Fix mvn clean test in yaml
jeffxiang Oct 28, 2024
044809e
Fix build yaml
jeffxiang Oct 28, 2024
b4c11af
Set default psc.config.logging.enabled=false for OSS
jeffxiang Oct 29, 2024
b00f8b9
Revert "Set default psc.config.logging.enabled=false for OSS"
jeffxiang Oct 29, 2024
e42732c
Add default psc.conf in flink test resources
jeffxiang Oct 29, 2024
0a1bded
Make PscMetadataClient convert BaseTopicUri to backend-specific topicUri
jeffxiang Oct 29, 2024
46054b5
Make metadataClient always convert to plaintext protocol
jeffxiang Oct 29, 2024
e982187
Add logs to debug
jeffxiang Oct 29, 2024
560ebf4
Revert "Make metadataClient always convert to plaintext protocol"
jeffxiang Oct 30, 2024
11feb61
Make metadata client preserve protocol in describeTopicUris()
jeffxiang Oct 30, 2024
8be3077
Add null check to prevent initialization NPE for byteOutMetric in Psc…
jeffxiang Oct 30, 2024
8c9ebbe
Add NPE check for updateNumBytesInCounter
jeffxiang Oct 30, 2024
5b19053
Surround NPE in PscSourceReaderMetrics with try/catch
jeffxiang Oct 30, 2024
c406077
Add logs to debug producer thread leak
jeffxiang Oct 31, 2024
a44f0b6
Add more debug logs to see why producerPool doesn't get added
jeffxiang Oct 31, 2024
5ab0954
Add even more logs to debug producerLeak
jeffxiang Nov 1, 2024
235c608
Add even more logs to debug producerleak
jeffxiang Nov 2, 2024
2db978d
Revert "Add even more logs to debug producerleak"
jeffxiang Nov 4, 2024
eba87a0
Revert "Add even more logs to debug producerLeak"
jeffxiang Nov 4, 2024
374515a
Revert "Add more debug logs to see why producerPool doesn't get added"
jeffxiang Nov 4, 2024
446ae45
Revert "Add logs to debug producer thread leak"
jeffxiang Nov 4, 2024
a8a60a2
Make metrics() return ConcurrentHashMap in PscConsumer
jeffxiang Nov 4, 2024
8c838d5
Surround NPE in PscWriter registerMetricSync with try/catch
jeffxiang Nov 5, 2024
49ed9d5
Remove commented pom content
jeffxiang Nov 6, 2024
a6efb43
Address comments
jeffxiang Nov 7, 2024
20535c0
Update javadocs for PscSink and PscSource to highlight difference com…
jeffxiang Nov 7, 2024
6e91af6
Add integration test for batch committed + javadocs
jeffxiang Nov 8, 2024
b33e479
Fix javadoc
jeffxiang Nov 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ jobs:
with:
java-version: 1.8
- name: Build with Maven
run: mvn -B package --file pom.xml
run: mvn -B install --file pom.xml
- name: Run psc-flink *ITCase tests
run: mvn clean test -pl psc-flink -Dtest=*ITCase
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ buildNumber.properties

# vscode
.vscode

#test files
psc-flink/orgapacheflinkutilNetUtils*
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
<gson.version>2.8.6</gson.version>
<guava.version>29.0-jre</guava.version>
<junit.jupiter.version>5.6.2</junit.jupiter.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<log4j2.version>2.17.1</log4j2.version>
<mockito.version>3.4.6</mockito.version>
<reflections.version>0.9.9</reflections.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import com.pinterest.psc.serde.IntegerSerializer;
import com.pinterest.psc.serde.StringSerializer;

import java.io.IOException;

public class ExamplePscProducer {

private static final PscLogger logger = PscLogger.getLogger(ExamplePscProducer.class);
private static final int NUM_MESSAGES = 10;

public static void main(String[] args) throws ConfigurationException, ProducerException {
public static void main(String[] args) throws ConfigurationException, ProducerException, IOException {
if (args.length < 1) {
logger.error("ExamplePscProducer needs one argument: topicUri");
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.pinterest.psc.example.migration.producer;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import com.pinterest.psc.config.PscConfiguration;
Expand All @@ -11,7 +12,7 @@

public class Psc {

static public void main(String[] args) throws ConfigurationException, ProducerException {
static public void main(String[] args) throws ConfigurationException, ProducerException, IOException {
String topicUri = "plaintext:/rn:kafka:dev:local-cloud_local-region::local-cluster:my_test_topic";

PscConfiguration pscConfiguration = new PscConfiguration();
Expand Down
142 changes: 94 additions & 48 deletions psc-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
<artifactId>psc-flink</artifactId>
<properties>
<flink.version>1.15.1</flink.version>
<flink.old.version>1.11.1</flink.old.version>
<kafka.version>2.8.1</kafka.version>
<zookeeper.version>3.4.10</zookeeper.version>
<curator.version>2.12.0</curator.version>
<scala.version>2.11.12</scala.version>
<scala.version>2.12.7</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<powermock.version>2.0.0-RC.4</powermock.version>
<hamcrest.version>1.3</hamcrest.version>
Expand All @@ -41,10 +40,6 @@
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>force-shading</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand All @@ -53,19 +48,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>force-shading</artifactId>
<version>${flink.old.version}</version>
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->
<!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.old.version}</version>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<optional>true</optional>
<exclusions>
Expand All @@ -88,20 +76,6 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.old.version}</version>
<scope>provided</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc</artifactId>
Expand All @@ -122,9 +96,40 @@

<!-- test dependencies -->

<dependency>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc-logging</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.logstash.log4j</groupId>
<artifactId>jsonevent-layout</artifactId>
<version>1.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
<exclusions>
Expand Down Expand Up @@ -239,8 +244,15 @@

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>${flink.old.version}</version>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -304,29 +316,23 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.old.version}</version>
<type>test-jar</type>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.old.version}</version>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -373,6 +379,20 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.16.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.16.2</version>
<scope>test</scope>
</dependency>

<!-- Kafka table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -382,6 +402,32 @@
<scope>test</scope>
</dependency>

<!-- Kafka SQL IT test with formats -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pinterest.flink.connector.psc;

import com.pinterest.psc.metrics.Metric;
import com.pinterest.psc.metrics.MetricName;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;

import java.util.Map;
import java.util.function.Predicate;

/** Collection of methods to interact with PSC's client metric system. */
@Internal
public class MetricUtil {

/**
* Tries to find the PSC {@link Metric} in the provided metrics.
*
* @return {@link Metric} which exposes continuous updates
* @throws IllegalStateException if the metric is not part of the provided metrics
*/
public static Metric getPscMetric(
Map<MetricName, ? extends Metric> metrics, String metricGroup, String metricName) {
return getPscMetric(
metrics,
e ->
e.getKey().group().equals(metricGroup)
&& e.getKey().name().equals(metricName));
}

/**
* Tries to find the PSC {@link Metric} in the provided metrics matching a given filter.
*
* @return {@link Metric} which exposes continuous updates
* @throws IllegalStateException if no metric matches the given filter
*/
public static Metric getPscMetric(
Map<MetricName, ? extends Metric> metrics,
Predicate<Map.Entry<MetricName, ? extends Metric>> filter) {
return metrics.entrySet().stream()
.filter(filter)
.map(Map.Entry::getValue)
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Cannot find PSC metric matching current filter."));
}

/**
* Ensures that the counter has the same value as the given Psc metric.
*
* <p>Do not use this method for every record because {@link Metric#metricValue()} is an
* expensive operation.
*
* @param from PSC's {@link Metric} to query
* @param to {@link Counter} to write the value to
*/
public static void sync(Metric from, Counter to) {
to.inc(((Number) from.metricValue()).longValue() - to.getCount());
}
}
Loading
Loading