Skip to content

Commit

Permalink
Implement PscSource, PscSink, and Table API's to close the gap on Fli…
Browse files Browse the repository at this point in the history
…nk 1.15.1 API's (#45)

* WIP full 1.15 upgrade

* WIP sink API's done

* WIP source API's

* Abstracting out TransactionManager reflections and direct field access logic

* WIP refactored FlinkPscInternalProducer (sink) to use abstracted logic

* Revert irrelevant changes

* Add javadocs

* Improve javadocs

* Small spacing update

* Quick javadoc update

* Add graceful handling of null transactionManager

* Update package name

* Bump version to 3.2.1-SNAPSHOT

* Make flink API's use TransactionManagerUtils

* Disallow >1 backend producer upon send() when PscProducer is transactional

* Implement creation of backend producer immediately upon init of FlinkPscInternalProducer

* WIP source changes

* WIP flink metrics

* WIP PscTopicUriPartitionSplitReader

* Finish code changes to source and sink API's

* WIP finished sink test refactoring

* WIP finished table API source code refactor

* WIP source test changes

* WIP source/sink API's mainly done first round refactor

* Finish table test imports

* WIP compiles

* WIP FlinkPscInternalProducerITCase

* WIP PscCommitterTest

* WIP finished PscCommitterTest

* WIP fixed Test Sink with lower parallelism in PscSinkITCase integration tests

* WIP finished PscSinkITCase integration tests

* WIP finish PscSinkITCase

* WIP finish PscTransactionLogITCase

* WIP finish PscWriterITCase

* WIP all sink tests pass

* WIP finished OffsetInitializerTest

* WIP finished PscSubscriberTest

* WIP finish PscEnumeratorTest

* WIP finish PscSourceReaderMetricsTest

* WIP finish PscRecordDeserializationSchemaTest

* WIP in the middle of fixing PscSourceReaderTest

* WIP finish PscSourceReaderTest

* WIP finished PscTopicUriPartitionSplitReaderTest

* WIP finish PscSourceBuilderTest

* WIP fixing PscSourceeITCase integration tests

* WIP finish PscSourceITCase, need to look into BaseTopicUri.equals() and whether we can introduce logic in validate() to build the correct subclass of TopicUri

* WIP finish Source and Sink tests

* Fix most producer tests in streaming.connectors.psc by calling super.snapshotState() instead of supersSnapshotState(); table API and some checkpoint migration tests remaining

* WIP fixing FlinkPscProducerMigrationOperatorTest

* Revert "WIP fixing FlinkPscProducerMigrationOperatorTest"

This reverts commit 88f402f.

* WIP finish PscChangelogTableITCase; FlinkPscProducerMigrationOperatorTest still flaky but 011 so ignoring for now

* WIP finish PscDynamicTableFactoryTest

* WIP finish PscTableITCase

* Finish table tests

* Minor fixes

* Catch IOException

* Skip config logging in tests

* Exclude kafka-schema-registry-client from flink-avro-confluent-registry in psc-flink oss

* Revert "Exclude kafka-schema-registry-client from flink-avro-confluent-registry in psc-flink oss"

This reverts commit 548b3f6.

* Include ITCase tests in mvn surefire plugin

* Convert hashmap to concurrenthashmap in TransactionManagerUtils

* Remove ITCase from surefire

* Disable Kafka log segment rotation for flink tests

* Add retention.ms=Long.MAX_VALUE to prevent topic cleanup during test

* Refactor to remove Kafka references

* Add step to run ITCase tests in build

* Fix mvn clean test in yaml

* Fix build yaml

* Set default psc.config.logging.enabled=false for OSS

* Revert "Set default psc.config.logging.enabled=false for OSS"

This reverts commit b4c11af.

* Add default psc.conf in flink test resources

* Make PscMetadataClient convert BaseTopicUri to backend-specific topicUri

* Make metadataClient always convert to plaintext protocol

* Add logs to debug

* Revert "Make metadataClient always convert to plaintext protocol"

This reverts commit 46054b5.

* Make metadata client preserve protocol in describeTopicUris()

* Add null check to prevent initialization NPE for byteOutMetric in PscWriter

* Add NPE check for updateNumBytesInCounter

* Surround NPE in PscSourceReaderMetrics with try/catch

* Add logs to debug producer thread leak

* Add more debug logs to see why producerPool doesn't get added

* Add even more logs to debug producerLeak

* Add even more logs to debug producerleak

* Revert "Add even more logs to debug producerleak"

This reverts commit 235c608.

* Revert "Add even more logs to debug producerLeak"

This reverts commit 5ab0954.

* Revert "Add more debug logs to see why producerPool doesn't get added"

This reverts commit a44f0b6.

* Revert "Add logs to debug producer thread leak"

This reverts commit c406077.

* Make metrics() return ConcurrentHashMap in PscConsumer

* Surround NPE in PscWriter registerMetricSync with try/catch

* Remove commented pom content

* Address comments

* Update javadocs for PscSink and PscSource to highlight difference compared to FlinkPscProducer and Consumer

* Add integration test for batch committed + javadocs

* Fix javadoc
  • Loading branch information
jeffxiang authored Nov 8, 2024
1 parent ce370c3 commit ba0c8f8
Show file tree
Hide file tree
Showing 189 changed files with 24,117 additions and 4,913 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ jobs:
with:
java-version: 1.8
- name: Build with Maven
run: mvn -B package --file pom.xml
run: mvn -B install --file pom.xml
- name: Run psc-flink *ITCase tests
run: mvn clean test -pl psc-flink -Dtest=*ITCase
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ buildNumber.properties

# vscode
.vscode

#test files
psc-flink/orgapacheflinkutilNetUtils*
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
<gson.version>2.8.6</gson.version>
<guava.version>29.0-jre</guava.version>
<junit.jupiter.version>5.6.2</junit.jupiter.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<log4j2.version>2.17.1</log4j2.version>
<mockito.version>3.4.6</mockito.version>
<reflections.version>0.9.9</reflections.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import com.pinterest.psc.serde.IntegerSerializer;
import com.pinterest.psc.serde.StringSerializer;

import java.io.IOException;

public class ExamplePscProducer {

private static final PscLogger logger = PscLogger.getLogger(ExamplePscProducer.class);
private static final int NUM_MESSAGES = 10;

public static void main(String[] args) throws ConfigurationException, ProducerException {
public static void main(String[] args) throws ConfigurationException, ProducerException, IOException {
if (args.length < 1) {
logger.error("ExamplePscProducer needs one argument: topicUri");
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.pinterest.psc.example.migration.producer;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import com.pinterest.psc.config.PscConfiguration;
Expand All @@ -11,7 +12,7 @@

public class Psc {

static public void main(String[] args) throws ConfigurationException, ProducerException {
static public void main(String[] args) throws ConfigurationException, ProducerException, IOException {
String topicUri = "plaintext:/rn:kafka:dev:local-cloud_local-region::local-cluster:my_test_topic";

PscConfiguration pscConfiguration = new PscConfiguration();
Expand Down
142 changes: 94 additions & 48 deletions psc-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
<artifactId>psc-flink</artifactId>
<properties>
<flink.version>1.15.1</flink.version>
<flink.old.version>1.11.1</flink.old.version>
<kafka.version>2.8.1</kafka.version>
<zookeeper.version>3.4.10</zookeeper.version>
<curator.version>2.12.0</curator.version>
<scala.version>2.11.12</scala.version>
<scala.version>2.12.7</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<powermock.version>2.0.0-RC.4</powermock.version>
<hamcrest.version>1.3</hamcrest.version>
Expand All @@ -41,10 +40,6 @@
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>force-shading</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand All @@ -53,19 +48,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>force-shading</artifactId>
<version>${flink.old.version}</version>
<scope>provided</scope>
</dependency>

<!-- Table ecosystem -->
<!-- Projects depending on this project won't depend on flink-table-*. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.old.version}</version>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<optional>true</optional>
<exclusions>
Expand All @@ -88,20 +76,6 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.old.version}</version>
<scope>provided</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc</artifactId>
Expand All @@ -122,9 +96,40 @@

<!-- test dependencies -->

<dependency>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc-logging</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.logstash.log4j</groupId>
<artifactId>jsonevent-layout</artifactId>
<version>1.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
<exclusions>
Expand Down Expand Up @@ -239,8 +244,15 @@

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>${flink.old.version}</version>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -304,29 +316,23 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.old.version}</version>
<type>test-jar</type>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.old.version}</version>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -373,6 +379,20 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.16.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.16.2</version>
<scope>test</scope>
</dependency>

<!-- Kafka table descriptor testing -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -382,6 +402,32 @@
<scope>test</scope>
</dependency>

<!-- Kafka SQL IT test with formats -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pinterest.flink.connector.psc;

import com.pinterest.psc.metrics.Metric;
import com.pinterest.psc.metrics.MetricName;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;

import java.util.Map;
import java.util.function.Predicate;

/** Collection of methods to interact with PSC's client metric system. */
@Internal
public class MetricUtil {

/**
* Tries to find the PSC {@link Metric} in the provided metrics.
*
* @return {@link Metric} which exposes continuous updates
* @throws IllegalStateException if the metric is not part of the provided metrics
*/
public static Metric getPscMetric(
Map<MetricName, ? extends Metric> metrics, String metricGroup, String metricName) {
return getPscMetric(
metrics,
e ->
e.getKey().group().equals(metricGroup)
&& e.getKey().name().equals(metricName));
}

/**
* Tries to find the PSC {@link Metric} in the provided metrics matching a given filter.
*
* @return {@link Metric} which exposes continuous updates
* @throws IllegalStateException if no metric matches the given filter
*/
public static Metric getPscMetric(
Map<MetricName, ? extends Metric> metrics,
Predicate<Map.Entry<MetricName, ? extends Metric>> filter) {
return metrics.entrySet().stream()
.filter(filter)
.map(Map.Entry::getValue)
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Cannot find PSC metric matching current filter."));
}

/**
* Ensures that the counter has the same value as the given Psc metric.
*
* <p>Do not use this method for every record because {@link Metric#metricValue()} is an
* expensive operation.
*
* @param from PSC's {@link Metric} to query
* @param to {@link Counter} to write the value to
*/
public static void sync(Metric from, Counter to) {
to.inc(((Number) from.metricValue()).longValue() - to.getCount());
}
}
Loading

0 comments on commit ba0c8f8

Please sign in to comment.