Skip to content

Commit

Permalink
add simple regex-based message filtering to TextLogFileReader
Browse files Browse the repository at this point in the history
  • Loading branch information
jfzunigac committed Nov 15, 2024
1 parent 6e2771e commit 8f640ec
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 11 deletions.
4 changes: 3 additions & 1 deletion singer-commons/src/main/thrift/config.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ struct TextReaderConfig {
// ability to trim trailing new line character
9: optional bool trimTailingNewlineCharacter = false;
// custom environment variables to be injected into text logs
10:optional map<string, binary> environmentVariables;
10: optional map<string, binary> environmentVariables;
// Regex used to filter out messaqges that do not match the regex.
11: optional string filterMessageRegex;
}

struct LogStreamReaderConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,20 @@ int getDeciderValue() {
return result;
}

/**
* Check if message should be skipped based on injected headers. If the message contains
* the header "skipMessage", the message should be skipped. Note that the value is irrelevant
* since the readers should only inject the header if the message should be skipped.
*
* @param logMessageAndPosition
* @return true if logMessageAndPosition contains skipMessageHeader, else otherwise.
*/
protected boolean shouldSkipMessage(LogMessageAndPosition logMessageAndPosition) {
return logMessageAndPosition != null && logMessageAndPosition.getInjectedHeaders() != null
&& logMessageAndPosition.getInjectedHeaders().containsKey("skipMessage")
&& logMessageAndPosition.getInjectedHeaders().get("skipMessage").array().length == 0;
}

@Override
public void run() {
long logMessagesProcessed = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,20 @@ protected int processLogMessageBatch() throws IOException, LogStreamWriterExcept
writer.startCommit(isDraining);
}
emitMessageSizeMetrics(logStream, logMessageAndPosition.getLogMessage());

if (enableDeciderBasedSampling && deciderValue != FULL_THROUGHPUT
&& deciderValue <= ThreadLocalRandom.current().nextInt(FULL_THROUGHPUT)) {

// We skip the message if sampling is enabled or if the message is marked as skip by the reader
if ((enableDeciderBasedSampling && deciderValue != FULL_THROUGHPUT
&& deciderValue <= ThreadLocalRandom.current().nextInt(FULL_THROUGHPUT))
|| shouldSkipMessage(logMessageAndPosition)) {
logMessagesSkipped++;
continue;
}
writer.writeLogMessageToCommit(logMessageAndPosition, isDraining);
}

if (logMessagesRead > 0) {
LOG.debug("Number of log messages skipped: {} in logStream: {}",
logMessagesSkipped, logStream);
// Subtract the number of skipped messages from the total number of messages read
if (logMessagesRead >= logMessagesSkipped) {
logMessagesToWrite = logMessagesRead - logMessagesSkipped;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.pinterest.singer.reader;

import com.pinterest.singer.common.LogStream;
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import com.pinterest.singer.thrift.LogFile;
import com.pinterest.singer.thrift.LogMessage;
import com.pinterest.singer.thrift.LogMessageAndPosition;
Expand All @@ -26,6 +27,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import org.apache.thrift.TSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -54,6 +56,7 @@ public class TextLogFileReader implements LogFileReader {
private final String prependFieldDelimiter;
private final TSerializer serializer;
private final TextMessageReader textMessageReader;
private final Pattern filterMessageRegex;
private ByteBuffer maxBuffer;

// The text log message format, can be TextMessage, or String;
Expand All @@ -74,6 +77,7 @@ public TextLogFileReader(
int maxMessageSize,
int numMessagesPerLogMessage,
Pattern messageStartPattern,
Pattern filterMessageRegex,
TextLogMessageType messageType,
boolean prependTimestamp,
boolean prependHostName,
Expand Down Expand Up @@ -109,6 +113,7 @@ public TextLogFileReader(
int capacity = (maxMessageSize * numMessagesPerLogMessage) + MAX_BUFFER_HEADROOM;
this.maxBuffer = ByteBuffer.allocate(capacity);
this.trimTailingNewlineCharacter = trimTailingNewlineCharacter;
this.filterMessageRegex = filterMessageRegex;

// Make sure the path is still associated with the LogFile.
// This can happen when the path is reused for another LogFile during log
Expand All @@ -135,13 +140,18 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc

try {
TextMessageReader.resetByteBuffer(maxBuffer);

boolean skipLogMessage = false;
for (int i = 0; i < numMessagesPerLogMessage; ++i) {
ByteBuffer message = textMessageReader.readMessage(true);
// If no message in the file, break.
if (message == null) {
break;
}
// If the message does not match the filter regex, mark it to be skipped.
if (filterMessageRegex != null && !filterMessageRegex.matcher(
TextMessageReader.bufToString(message)).matches()) {
skipLogMessage = true;
}
String prependStr = "";
if (prependTimestamp) {
prependStr += System.currentTimeMillis() + prependFieldDelimiter;
Expand Down Expand Up @@ -189,7 +199,16 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc
// Get the next message's byte offset
LogPosition position = new LogPosition(logFile, textMessageReader.getByteOffset());
LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, position);
logMessageAndPosition.setInjectedHeaders(headers);
// Inject an immutable map with a single skipMessage header so that processors can skip this message
// we initialize it here in case environmentVariableInjection is disabled
if (skipLogMessage) {
logMessageAndPosition.setInjectedHeaders(
ImmutableMap.of("skipMessage", ByteBuffer.wrap(new byte[0])));
OpenTsdbMetricConverter.incr("singer.reader.text.filtered_messages",
"logName=" + logStream.getSingerLog().getSingerLogConfig().getName());
} else {
logMessageAndPosition.setInjectedHeaders(headers);
}
return logMessageAndPosition;
} catch (Exception e) {
LOG.error("Caught exception when read a log message from log file: " + logFile, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public LogFileReader getLogFileReader(
readerConfig.getMaxMessageSize(),
readerConfig.getNumMessagesPerLogMessage(),
Pattern.compile(readerConfig.getMessageStartRegex(), Pattern.UNIX_LINES),
readerConfig.getFilterMessageRegex() != null ? Pattern.compile(
readerConfig.getFilterMessageRegex(), Pattern.DOTALL) : null,
readerConfig.getTextLogMessageType(),
readerConfig.isPrependTimestamp(),
readerConfig.isPrependHostname(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,18 @@ protected static TextReaderConfig parseTextReaderConfig(AbstractConfiguration te
}
}

if (textReaderConfiguration.containsKey("filterMessageRegex")) {
try {
String filterMessageRegex = textReaderConfiguration.getString("filterMessageRegex");
if (!filterMessageRegex.isEmpty()) {
Pattern.compile(filterMessageRegex);
config.setFilterMessageRegex(filterMessageRegex);
}
} catch (PatternSyntaxException ex) {
throw new ConfigurationException("Bad filterMessageRegex", ex);
}
}

config.setPrependTimestamp(false);
if (textReaderConfiguration.containsKey("prependTimestamp")) {
String prependTimestampStr = textReaderConfiguration.getString("prependTimestamp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,21 @@
import com.pinterest.singer.config.Decider;
import com.pinterest.singer.monitor.LogStreamManager;
import com.pinterest.singer.reader.DefaultLogStreamReader;
import com.pinterest.singer.reader.TextLogFileReaderFactory;
import com.pinterest.singer.reader.ThriftLogFileReaderFactory;
import com.pinterest.singer.thrift.LogFile;
import com.pinterest.singer.thrift.LogMessage;
import com.pinterest.singer.thrift.LogMessageAndPosition;
import com.pinterest.singer.thrift.LogPosition;
import com.pinterest.singer.thrift.configuration.FileNameMatchMode;
import com.pinterest.singer.thrift.configuration.SingerConfig;
import com.pinterest.singer.thrift.configuration.SingerLogConfig;
import com.pinterest.singer.thrift.configuration.TextLogMessageType;
import com.pinterest.singer.thrift.configuration.TextReaderConfig;
import com.pinterest.singer.thrift.configuration.ThriftReaderConfig;
import com.pinterest.singer.utils.SimpleThriftLogger;
import com.pinterest.singer.utils.SingerUtils;
import com.pinterest.singer.utils.TextLogger;
import com.pinterest.singer.utils.WatermarkUtils;
import com.pinterest.singer.writer.kafka.CommittableKafkaWriter;
import com.google.common.collect.ImmutableMap;
Expand All @@ -48,6 +54,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -435,6 +442,84 @@ public void testProcessLogStreamWithDecider() throws Exception {
}
}

@Test
public void testProcessLogStreamWithMessagesSkipped() throws Exception {
String tempPath = getTempPath();
String logStreamHeadFileName = "text.log";
String path = FilenameUtils.concat(tempPath, logStreamHeadFileName);
String infoMessage = "This is a sample INFO message\n";
String errorMessage = "This is a sample ERROR message\n";

int readerBufferSize = 16000;
int maxMessageSize = 16000;
int processorBatchSize = 200;

long processingIntervalInMillisMin = 1;
long processingIntervalInMillisMax = 1;
long processingTimeSliceInMilliseconds = 3600;
int logRetentionInSecs = 15;

// initialize a singer log config
SingerLogConfig
logConfig =
new SingerLogConfig("test", tempPath, logStreamHeadFileName, null, null, null);
SingerLog singerLog = new SingerLog(logConfig);
singerLog.getSingerLogConfig().setFilenameMatchMode(FileNameMatchMode.PREFIX);

// initialize global variables in SingerSettings
try {
SingerConfig
singerConfig =
initializeSingerConfig(1, 1, Collections.singletonList(logConfig));
SingerSettings.initialize(singerConfig);
} catch (Exception e) {
e.printStackTrace();
fail("got exception in test: " + e);
}

// initialize log stream
LogStream logStream = new LogStream(singerLog, logStreamHeadFileName);
LogStreamManager.addLogStream(logStream);
TextLogger textLogger = new TextLogger(path);

// initialize reader, writer & processor
NoOpLogStreamWriter writer = new NoOpLogStreamWriter();
TextReaderConfig
textReaderConfig =
new TextReaderConfig(readerBufferSize, maxMessageSize, 1, "^.*$");
// Regex to filter messages out
textReaderConfig.setFilterMessageRegex(".*\\bERROR\\b.*");
textReaderConfig.setTextLogMessageType(TextLogMessageType.PLAIN_TEXT);
LogStreamReader logStreamReader = new DefaultLogStreamReader(
logStream,
new TextLogFileReaderFactory(textReaderConfig));
MemoryEfficientLogStreamProcessor processor = new MemoryEfficientLogStreamProcessor(logStream,
null, logStreamReader, writer, processorBatchSize, processingIntervalInMillisMin,
processingIntervalInMillisMax, processingTimeSliceInMilliseconds, logRetentionInSecs, false);

for (int i = 0; i < 100; ++i) {
textLogger.logText(infoMessage);
textLogger.logText(errorMessage);
}

// Save start position to watermark file.
LogPosition
startPosition =
new LogPosition(new LogFile(SingerUtils.getFileInode(path)), 0);
WatermarkUtils.saveCommittedPositionToWatermark(DefaultLogStreamProcessor
.getWatermarkFilename(logStream), startPosition);

// Process all message written so far.
Thread.sleep(FILE_EVENT_WAIT_TIME_MS);
long numOfMessageProcessed = processor.processLogStream();
assertEquals("Should process all messages written ", processorBatchSize, numOfMessageProcessed);

assertEquals("Should have processed only half of the messages written", 100, writer.getLogMessages().size());
for (int i = 0; i < 100; i++) {
assertEquals(errorMessage, new String(writer.getLogMessages().get(i).getMessage()));
}
}

private static List<LogMessage> getMessages(List<LogMessageAndPosition> messageAndPositions) {
List<LogMessage> messages = Lists.newArrayListWithExpectedSize(messageAndPositions.size());
for (LogMessageAndPosition messageAndPosition : messageAndPositions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testReadLogMessageAndPosition() throws Exception {
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, null, null,
Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, false, true, null, null,
null, null);
for (int i = 0; i < 100; i++) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
Expand All @@ -70,7 +70,7 @@ public void testReadLogMessageAndPositionWithHostname() throws Exception {
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, true, false, hostname, "n/a",
Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, true, false, hostname, "n/a",
delimiter, null);
for (int i = 0; i < 100; i++) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
Expand All @@ -91,7 +91,7 @@ public void testReadLogMessageAndPositionMultiRead() throws Exception {
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, null, "n/a",
Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, false, true, null, "n/a",
null, null);
for (int i = 0; i < 100; i = i + 2) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
Expand All @@ -102,6 +102,52 @@ public void testReadLogMessageAndPositionMultiRead() throws Exception {
reader.close();
}

@Test
public void testReadMessagesWithFilterRegexEnabled() throws Exception {
String path = FilenameUtils.concat(getTempPath(), "test_filtered.log");
String customInfoMessage = "2024-09-26 00:00:00,000 [Thread-1] (com.pinterest.singer.TestClass:120) INFO Sample info message\n";
String customErrorMessage = "2024-09-26 00:00:00,000 [Thread-1] (com.pinterest.singer.TestClass:120) ERROR Sample error message\n";
String filterRegex = ".*\\bERROR\\b.*";

TextLogger logger = new TextLogger(path);
for (int i = 0; i < 100; i++) {
logger.logText(customInfoMessage);
logger.logText(customErrorMessage);
}

long inode = SingerUtils.getFileInode(SingerUtils.getPath(path));
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1,
Pattern.compile("^.*$"), Pattern.compile(filterRegex, Pattern.DOTALL),
TextLogMessageType.PLAIN_TEXT, false, false, false, null, null,
null, null);
for (int i = 0; i < 100; i++) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
if (i % 2 == 0) {
assertEquals(customInfoMessage, new String(log.getLogMessage().getMessage()));
assertTrue(log.getInjectedHeaders().containsKey("skipMessage"));
assertEquals(0, log.getInjectedHeaders().get("skipMessage").array().length);
} else {
assertEquals(customErrorMessage, new String(log.getLogMessage().getMessage()));
assertEquals(null, log.getInjectedHeaders());
}
}
reader.close();

filterRegex = ".*\\bThread-1\\b.*";
reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 1,
Pattern.compile("^.*$"), Pattern.compile(filterRegex, Pattern.DOTALL),
TextLogMessageType.PLAIN_TEXT, false, false, false, "test", "test-az",
null, new HashMap<>());
// No messages should have skipMessageHeader
for (int i = 0; i < 100; i++) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
assertFalse(log.getInjectedHeaders().containsKey("skipMessage"));
}
reader.close();
}

@Test
public void testEnvironmentVariableInjection() throws Exception {
String path = FilenameUtils.concat(getTempPath(), "test3.log");
Expand All @@ -111,7 +157,7 @@ public void testEnvironmentVariableInjection() throws Exception {
LogFile logFile = new LogFile(inode);
LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig()), "test");
LogFileReader reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, "host", "n/a", null,
Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, false, true, "host", "n/a", null,
new HashMap<>(ImmutableMap.of("test", ByteBuffer.wrap("value".getBytes()))));
for (int i = 0; i < 100; i = i + 2) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
Expand All @@ -127,7 +173,7 @@ public void testEnvironmentVariableInjection() throws Exception {
reader.close();

reader = new TextLogFileReader(logStream, logFile, path, 0, 8192, 102400, 2,
Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, true, "host", "n/a", null,
Pattern.compile("^.*$"), null, TextLogMessageType.PLAIN_TEXT, false, false, true, "host", "n/a", null,
null);
for (int i = 0; i < 100; i = i + 2) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
Expand Down

0 comments on commit 8f640ec

Please sign in to comment.