Skip to content

Commit

Permalink
Implement PscMetadataClient (#44)
Browse files Browse the repository at this point in the history
* WIP metadataClient impl

* WIP metadataClient API impl; finished listOffsets

* Minor code cleanups

* Add test for listConsumerGroupOffsets

* Try to fix test

* Add javadocs

* Address comments
  • Loading branch information
jeffxiang authored Sep 11, 2024
1 parent a9055ab commit ce370c3
Show file tree
Hide file tree
Showing 15 changed files with 1,115 additions and 2 deletions.

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions psc/src/main/java/com/pinterest/psc/common/TopicRn.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.pinterest.psc.logging.PscLogger;

import java.io.IOException;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -111,6 +112,10 @@ public String getTopicRnPrefixString() {
return topicRnPrefixString;
}

public String getStandard() {
return standard;
}

public String getService() {
return service;
}
Expand Down Expand Up @@ -167,6 +172,22 @@ private static String upgradeTopicRnToCurrentVersion(String topicRnAsStr, byte s
throw new TopicRnSyntaxException(String.format("Unsupported topic RN version %d", serializedVersion));
}

@Override
public int hashCode() {
return Objects.hash(
topicRnString,
topicRnPrefixString,
standard,
service,
environment,
cloud,
region,
classifier,
cluster,
topic
);
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public TopicUriPartition(String topicUriStr, int partition) {
}
}

protected TopicUriPartition(TopicUri topicUri, int partition) {
public TopicUriPartition(TopicUri topicUri, int partition) {
this.backendTopicUri = topicUri;
this.topicUriStr = topicUri.getTopicUriAsString();
this.partition = partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,12 @@ public class PscConfiguration extends PropertiesConfiguration {
public static final String PSC_PRODUCER_SSL_TRUSTSTORE_TYPE = PSC_PRODUCER + "." + SSL_TRUSTSTORE_TYPE;
*/

// **********************
// MetadataClient Configuration
// **********************

protected static final String PSC_METADATA = "psc.metadata";
public static final String PSC_METADATA_CLIENT_ID = PSC_METADATA + "." + CLIENT_ID;

// **********************
// Metrics Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public class PscConfigurationInternal {
private final static String PSC_CLIENT_TYPE = "psc.client.type";
public final static String PSC_CLIENT_TYPE_CONSUMER = "consumer";
public final static String PSC_CLIENT_TYPE_PRODUCER = "producer";
private final static String[] PSC_VALID_CLIENT_TYPES = {PSC_CLIENT_TYPE_CONSUMER, PSC_CLIENT_TYPE_PRODUCER};
public final static String PSC_CLIENT_TYPE_METADATA = "metadata";
private final static String[] PSC_VALID_CLIENT_TYPES = {PSC_CLIENT_TYPE_CONSUMER, PSC_CLIENT_TYPE_PRODUCER, PSC_CLIENT_TYPE_METADATA};

private PscConfiguration pscConfiguration;
private Deserializer keyDeserializer, valueDeserializer;
Expand Down Expand Up @@ -140,6 +141,9 @@ protected void validate(boolean isLenient, boolean isLogConfiguration) throws Co
case PSC_CLIENT_TYPE_PRODUCER:
validateProducerConfiguration(isLenient, isLogConfiguration);
break;
case PSC_CLIENT_TYPE_METADATA:
validateMetadataClientConfiguration(isLenient, isLogConfiguration);
break;
default:
throw new ConfigurationException("Valid client type expected: " + String.join(", ", PSC_VALID_CLIENT_TYPES));
}
Expand Down Expand Up @@ -483,6 +487,24 @@ private <T> T verifyConfigHasValue(
return configuration.get(expectedType, configKey);
}

private void validateMetadataClientConfiguration(boolean isLenient, boolean isLogConfiguration) throws ConfigurationException {
PscConfiguration metadataConfiguration = new PscConfiguration();
metadataConfiguration.copy(pscConfiguration.subset(PscConfiguration.PSC_METADATA));
Map<String, Exception> invalidConfigs = new HashMap<>();
verifyConfigHasValue(metadataConfiguration, PscConfiguration.CLIENT_ID, String.class, invalidConfigs);
if (isLogConfiguration)
logConfiguration();

if (invalidConfigs.isEmpty() || isLenient)
return;

StringBuilder stringBuilder = new StringBuilder();
invalidConfigs.forEach((error, exception) ->
stringBuilder.append(String.format("\t%s: %s\n", error, exception == null ? "" : exception.getMessage()))
);
throw new ConfigurationException("Invalid metadataClient configuration\n" + stringBuilder.toString());
}

private void validateProducerConfiguration(boolean isLenient, boolean isLogConfiguration) throws ConfigurationException {
PscConfiguration producerConfiguration = new PscConfiguration();
producerConfiguration.copy(pscConfiguration.subset(PscConfiguration.PSC_PRODUCER));
Expand Down Expand Up @@ -749,4 +771,8 @@ public int getAutoResolutionRetryCount() {
public MetricsReporterConfiguration getMetricsReporterConfiguration() {
return metricsReporterConfiguration;
}

public String getMetadataClientId() {
return pscConfiguration.getString(PscConfiguration.PSC_METADATA_CLIENT_ID);
}
}
27 changes: 27 additions & 0 deletions psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.pinterest.psc.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;

/**
* Utility class for common metadata logic
*/
public class MetadataUtils {

@VisibleForTesting
public static TopicRn createTopicRn(TopicUri topicUri, String topicName) {
return new TopicRn(
topicUri.getTopicRn().getTopicRnPrefixString() + topicName,
topicUri.getTopicRn().getTopicRnPrefixString(),
topicUri.getTopicRn().getStandard(),
topicUri.getTopicRn().getService(),
topicUri.getTopicRn().getEnvironment(),
topicUri.getTopicRn().getCloud(),
topicUri.getTopicRn().getRegion(),
topicUri.getTopicRn().getClassifier(),
topicUri.getTopicRn().getCluster(),
topicName
);
}
}
28 changes: 28 additions & 0 deletions psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.pinterest.psc.metadata;

import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUriPartition;

import java.util.List;

/**
* Metadata for a {@link TopicRn}, including the list of its partitions
*/
public class TopicRnMetadata {

private final TopicRn topicRn;
private final List<TopicUriPartition> topicUriPartitions;

public TopicRnMetadata(TopicRn topicRn, List<TopicUriPartition> topicUriPartitions) {
this.topicRn = topicRn;
this.topicUriPartitions = topicUriPartitions;
}

public TopicRn getTopicRn() {
return topicRn;
}

public List<TopicUriPartition> getTopicUriPartitions() {
return topicUriPartitions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.pinterest.psc.metadata.client;

import com.google.common.annotations.VisibleForTesting;
import com.pinterest.psc.common.MessageId;
import com.pinterest.psc.common.ServiceDiscoveryConfig;
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.config.PscConfigurationInternal;
import com.pinterest.psc.discovery.ServiceDiscoveryManager;
import com.pinterest.psc.environment.Environment;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.metadata.TopicRnMetadata;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

/**
* An abstract class that defines the interface for metadata queries and operations. Specific implementations
* of this class should be created for each backend, such as Kafka, MemQ, etc.
*/
public abstract class PscBackendMetadataClient implements AutoCloseable {

protected TopicUri topicUri;
protected PscConfigurationInternal pscConfigurationInternal;
protected ServiceDiscoveryConfig discoveryConfig;

public void initialize(TopicUri topicUri, Environment env, PscConfigurationInternal pscConfigurationInternal) throws ConfigurationException {
this.topicUri = topicUri;
this.pscConfigurationInternal = pscConfigurationInternal;
this.discoveryConfig =
ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri);
}

public abstract List<TopicRn> listTopicRns(Duration duration)
throws ExecutionException, InterruptedException, TimeoutException;

public abstract Map<TopicRn, TopicRnMetadata> describeTopicRns(
Collection<TopicRn> topicRns,
Duration duration
) throws ExecutionException, InterruptedException, TimeoutException;

public abstract Map<TopicUriPartition, Long> listOffsets(
Map<TopicUriPartition, PscMetadataClient.MetadataClientOption> topicRnsAndOptions,
Duration duration
) throws ExecutionException, InterruptedException, TimeoutException;

public abstract Map<TopicUriPartition, Long> listOffsetsForConsumerGroup(
String consumerGroupId,
Collection<TopicUriPartition> topicUriPartitions,
Duration duration
) throws ExecutionException, InterruptedException, TimeoutException;

public abstract void close() throws Exception;

@VisibleForTesting
protected TopicUri getTopicUri() {
return topicUri;
}
}
Loading

0 comments on commit ce370c3

Please sign in to comment.