diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java index 92aeac77..b426aa59 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java @@ -102,6 +102,8 @@ public class SingerConfigDef { public static final String PROCESS_BATCH_SIZE = "batchSize"; public static final String SKIP_DRAINING = "skipDraining"; + public static final String TEXT_READER_FILTER_MESSAGE_REGEX = "filterMessageRegex"; + public static final String BUCKET = "bucket"; public static final String KEY_FORMAT = "keyFormat"; public static final String MAX_FILE_SIZE_MB = "maxFileSizeMB"; diff --git a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java index 239a0873..e8a379a2 100644 --- a/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java +++ b/singer/src/main/java/com/pinterest/singer/processor/DefaultLogStreamProcessor.java @@ -26,6 +26,7 @@ import com.pinterest.singer.common.SingerSettings; import com.pinterest.singer.config.Decider; import com.pinterest.singer.metrics.OpenTsdbMetricConverter; +import com.pinterest.singer.reader.LogFileReader; import com.pinterest.singer.thrift.LogFile; import com.pinterest.singer.thrift.LogFileAndPath; import com.pinterest.singer.thrift.LogMessage; @@ -312,8 +313,8 @@ int getDeciderValue() { */ protected boolean shouldSkipMessage(LogMessageAndPosition logMessageAndPosition) { return logMessageAndPosition != null && logMessageAndPosition.getInjectedHeaders() != null - && logMessageAndPosition.getInjectedHeaders().containsKey("skipMessage") - && logMessageAndPosition.getInjectedHeaders().get("skipMessage").array().length == 0; + && logMessageAndPosition.getInjectedHeaders().containsKey(LogFileReader.SKIP_MESSAGE_HEADER_KEY) + && logMessageAndPosition.getInjectedHeaders().get(LogFileReader.SKIP_MESSAGE_HEADER_KEY).array().length == 0; } @Override diff --git a/singer/src/main/java/com/pinterest/singer/reader/LogFileReader.java b/singer/src/main/java/com/pinterest/singer/reader/LogFileReader.java index c03b9822..23f1630c 100644 --- a/singer/src/main/java/com/pinterest/singer/reader/LogFileReader.java +++ b/singer/src/main/java/com/pinterest/singer/reader/LogFileReader.java @@ -25,6 +25,11 @@ */ public interface LogFileReader extends Closeable { + /** + * Readers should inject this header to signal processor to skip a message + */ + String SKIP_MESSAGE_HEADER_KEY = "skipMessage"; + /** * Read a LogMessage with its position. * diff --git a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java index c3157520..1c663c81 100644 --- a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java +++ b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java @@ -199,11 +199,11 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc // Get the next message's byte offset LogPosition position = new LogPosition(logFile, textMessageReader.getByteOffset()); LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, position); - // Inject an immutable map with a single skipMessage header so that processors can skip this message + // Inject an immutable map with a single "skipMessage" header so that processors can skip this message // we initialize it here in case environmentVariableInjection is disabled if (skipLogMessage) { logMessageAndPosition.setInjectedHeaders( - ImmutableMap.of("skipMessage", ByteBuffer.wrap(new byte[0]))); + ImmutableMap.of(LogFileReader.SKIP_MESSAGE_HEADER_KEY, ByteBuffer.wrap(new byte[0]))); OpenTsdbMetricConverter.incr("singer.reader.text.filtered_messages", "logName=" + logStream.getSingerLog().getSingerLogConfig().getName()); } else { diff --git a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java index 3de37b73..280dab44 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java @@ -1203,15 +1203,18 @@ protected static TextReaderConfig parseTextReaderConfig(AbstractConfiguration te } } - if (textReaderConfiguration.containsKey("filterMessageRegex")) { + if (textReaderConfiguration.containsKey(SingerConfigDef.TEXT_READER_FILTER_MESSAGE_REGEX)) { try { - String filterMessageRegex = textReaderConfiguration.getString("filterMessageRegex"); + String + filterMessageRegex = + textReaderConfiguration.getString(SingerConfigDef.TEXT_READER_FILTER_MESSAGE_REGEX); if (!filterMessageRegex.isEmpty()) { Pattern.compile(filterMessageRegex); config.setFilterMessageRegex(filterMessageRegex); } } catch (PatternSyntaxException ex) { - throw new ConfigurationException("Bad filterMessageRegex", ex); + throw new ConfigurationException( + "Failed to compile " + SingerConfigDef.TEXT_READER_FILTER_MESSAGE_REGEX, ex); } } diff --git a/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java b/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java index eff4aa08..56691b12 100644 --- a/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java +++ b/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap; import com.pinterest.singer.SingerTestBase; import com.pinterest.singer.common.LogStream; +import com.pinterest.singer.common.SingerConfigDef; import com.pinterest.singer.common.SingerLog; import com.pinterest.singer.thrift.LogFile; import com.pinterest.singer.thrift.LogMessageAndPosition; @@ -126,8 +127,8 @@ public void testReadMessagesWithFilterRegexEnabled() throws Exception { LogMessageAndPosition log = reader.readLogMessageAndPosition(); if (i % 2 == 0) { assertEquals(customInfoMessage, new String(log.getLogMessage().getMessage())); - assertTrue(log.getInjectedHeaders().containsKey("skipMessage")); - assertEquals(0, log.getInjectedHeaders().get("skipMessage").array().length); + assertTrue(log.getInjectedHeaders().containsKey(LogFileReader.SKIP_MESSAGE_HEADER_KEY)); + assertEquals(0, log.getInjectedHeaders().get(LogFileReader.SKIP_MESSAGE_HEADER_KEY).array().length); } else { assertEquals(customErrorMessage, new String(log.getLogMessage().getMessage())); assertEquals(null, log.getInjectedHeaders());