Skip to content

Commit

Permalink
change hardcoded strings to static finals
Browse files Browse the repository at this point in the history
  • Loading branch information
jfzunigac committed Nov 15, 2024
1 parent 8f640ec commit 8d087d6
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public class SingerConfigDef {
public static final String PROCESS_BATCH_SIZE = "batchSize";
public static final String SKIP_DRAINING = "skipDraining";

public static final String TEXT_READER_FILTER_MESSAGE_REGEX = "filterMessageRegex";

public static final String BUCKET = "bucket";
public static final String KEY_FORMAT = "keyFormat";
public static final String MAX_FILE_SIZE_MB = "maxFileSizeMB";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.config.Decider;
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import com.pinterest.singer.reader.LogFileReader;
import com.pinterest.singer.thrift.LogFile;
import com.pinterest.singer.thrift.LogFileAndPath;
import com.pinterest.singer.thrift.LogMessage;
Expand Down Expand Up @@ -312,8 +313,8 @@ int getDeciderValue() {
*/
protected boolean shouldSkipMessage(LogMessageAndPosition logMessageAndPosition) {
return logMessageAndPosition != null && logMessageAndPosition.getInjectedHeaders() != null
&& logMessageAndPosition.getInjectedHeaders().containsKey("skipMessage")
&& logMessageAndPosition.getInjectedHeaders().get("skipMessage").array().length == 0;
&& logMessageAndPosition.getInjectedHeaders().containsKey(LogFileReader.SKIP_MESSAGE_HEADER_KEY)
&& logMessageAndPosition.getInjectedHeaders().get(LogFileReader.SKIP_MESSAGE_HEADER_KEY).array().length == 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
*/
public interface LogFileReader extends Closeable {

/**
* Readers should inject this header to signal processor to skip a message
*/
String SKIP_MESSAGE_HEADER_KEY = "skipMessage";

/**
* Read a LogMessage with its position.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc
// Get the next message's byte offset
LogPosition position = new LogPosition(logFile, textMessageReader.getByteOffset());
LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, position);
// Inject an immutable map with a single skipMessage header so that processors can skip this message
// Inject an immutable map with a single "skipMessage" header so that processors can skip this message
// we initialize it here in case environmentVariableInjection is disabled
if (skipLogMessage) {
logMessageAndPosition.setInjectedHeaders(
ImmutableMap.of("skipMessage", ByteBuffer.wrap(new byte[0])));
ImmutableMap.of(LogFileReader.SKIP_MESSAGE_HEADER_KEY, ByteBuffer.wrap(new byte[0])));
OpenTsdbMetricConverter.incr("singer.reader.text.filtered_messages",
"logName=" + logStream.getSingerLog().getSingerLogConfig().getName());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1203,15 +1203,18 @@ protected static TextReaderConfig parseTextReaderConfig(AbstractConfiguration te
}
}

if (textReaderConfiguration.containsKey("filterMessageRegex")) {
if (textReaderConfiguration.containsKey(SingerConfigDef.TEXT_READER_FILTER_MESSAGE_REGEX)) {
try {
String filterMessageRegex = textReaderConfiguration.getString("filterMessageRegex");
String
filterMessageRegex =
textReaderConfiguration.getString(SingerConfigDef.TEXT_READER_FILTER_MESSAGE_REGEX);
if (!filterMessageRegex.isEmpty()) {
Pattern.compile(filterMessageRegex);
config.setFilterMessageRegex(filterMessageRegex);
}
} catch (PatternSyntaxException ex) {
throw new ConfigurationException("Bad filterMessageRegex", ex);
throw new ConfigurationException(
"Failed to compile " + SingerConfigDef.TEXT_READER_FILTER_MESSAGE_REGEX, ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.ImmutableMap;
import com.pinterest.singer.SingerTestBase;
import com.pinterest.singer.common.LogStream;
import com.pinterest.singer.common.SingerConfigDef;
import com.pinterest.singer.common.SingerLog;
import com.pinterest.singer.thrift.LogFile;
import com.pinterest.singer.thrift.LogMessageAndPosition;
Expand Down Expand Up @@ -126,8 +127,8 @@ public void testReadMessagesWithFilterRegexEnabled() throws Exception {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
if (i % 2 == 0) {
assertEquals(customInfoMessage, new String(log.getLogMessage().getMessage()));
assertTrue(log.getInjectedHeaders().containsKey("skipMessage"));
assertEquals(0, log.getInjectedHeaders().get("skipMessage").array().length);
assertTrue(log.getInjectedHeaders().containsKey(LogFileReader.SKIP_MESSAGE_HEADER_KEY));
assertEquals(0, log.getInjectedHeaders().get(LogFileReader.SKIP_MESSAGE_HEADER_KEY).array().length);
} else {
assertEquals(customErrorMessage, new String(log.getLogMessage().getMessage()));
assertEquals(null, log.getInjectedHeaders());
Expand Down

0 comments on commit 8d087d6

Please sign in to comment.