From 4abdaff95f528adac9f68683701d3620a1dce7ed Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Fri, 31 May 2024 13:50:46 +0200 Subject: [PATCH 01/11] test: Filestream tests should not be in one-shot mode. We handle their shutdown with fs.Stop(). --- internal/tailer/logstream/filestream_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/tailer/logstream/filestream_test.go b/internal/tailer/logstream/filestream_test.go index f672a236c..19ff7db15 100644 --- a/internal/tailer/logstream/filestream_test.go +++ b/internal/tailer/logstream/filestream_test.go @@ -27,7 +27,7 @@ func TestFileStreamRead(t *testing.T) { lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) @@ -62,7 +62,7 @@ func TestFileStreamReadNonSingleByteEnd(t *testing.T) { lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) @@ -103,7 +103,7 @@ func TestStreamDoesntBreakOnCorruptRune(t *testing.T) { lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) @@ -149,7 +149,7 @@ func TestFileStreamTruncation(t *testing.T) { lines := make(chan *logline.LogLine, 3) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) // fs.Stop() is also called explicitly further down but a failed test // and early return would lead to the handle staying open defer fs.Stop() @@ -197,7 +197,7 @@ func TestFileStreamFinishedBecauseCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) // Synchronise past first read after seekToEnd @@ -232,7 +232,7 @@ func TestFileStreamPartialRead(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) From cf9ad9d689508e30a38c71a70b696328e1464126 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Fri, 31 May 2024 13:55:08 +0200 Subject: [PATCH 02/11] chore: Lint fixes --- .../tailer/logstream/dgramstream_unix_test.go | 4 ++-- internal/tailer/logstream/filestream_test.go | 16 ++++++++-------- .../tailer/logstream/socketstream_unix_test.go | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/tailer/logstream/dgramstream_unix_test.go b/internal/tailer/logstream/dgramstream_unix_test.go index 29cf41b1e..6fefa3d91 100644 --- a/internal/tailer/logstream/dgramstream_unix_test.go +++ b/internal/tailer/logstream/dgramstream_unix_test.go @@ -65,7 +65,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), addr, "1"}, + {Context: context.TODO(), Filename: addr, Line: "1"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -118,7 +118,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) { received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), addr, "1"}, + {Context: context.TODO(), Filename: addr, Line: "1"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) diff --git a/internal/tailer/logstream/filestream_test.go b/internal/tailer/logstream/filestream_test.go index 19ff7db15..6950630b8 100644 --- a/internal/tailer/logstream/filestream_test.go +++ b/internal/tailer/logstream/filestream_test.go @@ -39,7 +39,7 @@ func TestFileStreamRead(t *testing.T) { close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "yo"}, + {Context: context.TODO(), Filename: name, Line: "yo"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -80,7 +80,7 @@ func TestFileStreamReadNonSingleByteEnd(t *testing.T) { close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, s}, + {Context: context.TODO(), Filename: name, Line: s}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -126,7 +126,7 @@ func TestStreamDoesntBreakOnCorruptRune(t *testing.T) { close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, s[1:]}, + {Context: context.TODO(), Filename: name, Line: s[1:]}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -174,9 +174,9 @@ func TestFileStreamTruncation(t *testing.T) { received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "1"}, - {context.TODO(), name, "2"}, - {context.TODO(), name, "3"}, + {Context: context.TODO(), Filename: name, Line: "1"}, + {Context: context.TODO(), Filename: name, Line: "2"}, + {Context: context.TODO(), Filename: name, Line: "3"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -210,7 +210,7 @@ func TestFileStreamFinishedBecauseCancel(t *testing.T) { received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "yo"}, + {Context: context.TODO(), Filename: name, Line: "yo"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -251,7 +251,7 @@ func TestFileStreamPartialRead(t *testing.T) { close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "yo"}, + {Context: context.TODO(), Filename: name, Line: "yo"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) diff --git a/internal/tailer/logstream/socketstream_unix_test.go b/internal/tailer/logstream/socketstream_unix_test.go index e2b6c9a20..06e02bc7a 100644 --- a/internal/tailer/logstream/socketstream_unix_test.go +++ b/internal/tailer/logstream/socketstream_unix_test.go @@ -62,7 +62,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), addr, "1"}, + {Context: context.TODO(), Filename: addr, Line: "1"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) @@ -114,7 +114,7 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) { received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), addr, "1"}, + {Context: context.TODO(), Filename: addr, Line: "1"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) From 3e813d5b218aee6fcb1385e05a98749229c20cb0 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Sat, 1 Jun 2024 16:14:32 +0200 Subject: [PATCH 03/11] chore: homogenise error messages --- internal/tailer/logstream/filestream.go | 56 ++++++++++++------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index 708e062dc..709cf194e 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -69,17 +69,17 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake return err } logOpens.Add(fs.pathname, 1) - glog.V(2).Infof("%v: opened new file", fd) if !streamFromStart { + glog.V(2).Infof("stream(%s): opened new file", fs.pathname) if _, err := fd.Seek(0, io.SeekEnd); err != nil { logErrors.Add(fs.pathname, 1) if err := fd.Close(); err != nil { logErrors.Add(fs.pathname, 1) - glog.Info(err) + glog.Infof("stream(%s): closing file: %v", fs.pathname, err) } return err } - glog.V(2).Infof("%v: seeked to end", fd) + glog.V(2).Infof("stream(%s): seeked to end", fs.pathname) } b := make([]byte, defaultReadBufferSize) var lastBytes []byte @@ -90,11 +90,11 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake go func() { defer wg.Done() defer func() { - glog.V(2).Infof("%v: read total %d bytes from %s", fd, total, fs.pathname) - glog.V(2).Infof("%v: closing file descriptor", fd) + glog.V(2).Infof("stream(%s): read total %d bytes", fs.pathname, total) + glog.V(2).Infof("stream(%s): closing file descriptor", fs.pathname) if err := fd.Close(); err != nil { logErrors.Add(fs.pathname, 1) - glog.Info(err) + glog.Infof("stream(%s): closing file: %v", fs.pathname, err) } logCloses.Add(fs.pathname, 1) }() @@ -102,11 +102,11 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake for { // Blocking read but regular files will return EOF straight away. count, err := fd.Read(b) - glog.V(2).Infof("%v: read %d bytes, err is %v", fd, count, err) + glog.V(2).Infof("stream(%s): read %d bytes, err is %v", fs.pathname, count, err) if count > 0 { total += count - glog.V(2).Infof("%v: decode and send", fd) + glog.V(2).Infof("stream(%s): decode and send", fs.pathname) needSend := lastBytes needSend = append(needSend, b[:count]...) sendCount := decodeAndSend(ctx, fs.lines, fs.pathname, len(needSend), needSend, partial) @@ -126,25 +126,25 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // errors, and end on unretriables; e.g. ESTALE looks // retryable. if errors.Is(err, syscall.ESTALE) { - glog.Infof("%v: reopening stream due to %s", fd, err) + glog.Infof("stream(%s): reopening stream due to %s", fs.pathname, err) if nerr := fs.stream(ctx, wg, waker, fi, true); nerr != nil { - glog.Info(nerr) + glog.Infof("stream(%s): new stream: %v", fs.pathname, nerr) } // Close this stream. return } - glog.Info(err) + glog.Infof("stream(%s): read error: %v", fs.pathname, err) } // If we have read no bytes and are at EOF, check for truncation and rotation. if err == io.EOF && count == 0 { - glog.V(2).Infof("%v: eof an no bytes", fd) + glog.V(2).Infof("stream(%s): eof an no bytes", fs.pathname) // Both rotation and truncation need to stat, so check for // rotation first. It is assumed that rotation is the more // common change pattern anyway. newfi, serr := os.Stat(fs.pathname) if serr != nil { - glog.Info(serr) + glog.Infof("stream(%s): stat error: %v", serr) // If this is a NotExist error, then we should wrap up this // goroutine. The Tailer will create a new logstream if the // file is in the middle of a rotation and gets recreated @@ -153,7 +153,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // Stop, which ends up causing us to race here against // detection of IsCompleted. if os.IsNotExist(serr) { - glog.V(2).Infof("%v: source no longer exists, exiting", fd) + glog.V(2).Infof("stream(%s): source no longer exists, exiting", fs.pathname) if partial.Len() > 0 { sendLine(ctx, fs.pathname, partial, fs.lines) } @@ -166,9 +166,9 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake goto Sleep } if !os.SameFile(fi, newfi) { - glog.V(2).Infof("%v: adding a new file routine", fd) + glog.V(2).Infof("stream(%s): adding a new file routine", fs.pathname) if err := fs.stream(ctx, wg, waker, newfi, true); err != nil { - glog.Info(err) + glog.Info("stream(%s): new stream: %v", fs.pathname, err) } // We're at EOF so there's nothing left to read here. return @@ -179,8 +179,8 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake glog.Info(serr) continue } - glog.V(2).Infof("%v: current seek is %d", fd, currentOffset) - glog.V(2).Infof("%v: new size is %d", fd, newfi.Size()) + glog.V(2).Infof("stream(%s): current seek is %d", fs.pathname, currentOffset) + glog.V(2).Infof("stream(%s): new size is %d", fs.pathname, newfi.Size()) // We know that newfi is from the current file. Truncation can // only be detected if the new file is currently shorter than // the current seek offset. In test this can be a race, but in @@ -188,7 +188,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // than the previous after rotation in the time it takes for // mtail to notice. if newfi.Size() < currentOffset { - glog.V(2).Infof("%v: truncate? currentoffset is %d and size is %d", fd, currentOffset, newfi.Size()) + glog.V(2).Infof("stream(%s): truncate? currentoffset is %d and size is %d", fs.pathname, currentOffset, newfi.Size()) // About to lose all remaining data because of the truncate so flush the accumulator. if partial.Len() > 0 { sendLine(ctx, fs.pathname, partial, fs.lines) @@ -196,9 +196,9 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake p, serr := fd.Seek(0, io.SeekStart) if serr != nil { logErrors.Add(fs.pathname, 1) - glog.Info(serr) + glog.Infof("stream(%s): seek: %v", fs.pathname, serr) } - glog.V(2).Infof("%v: Seeked to %d", fd, p) + glog.V(2).Infof("stream(%s): Seeked to %d", fs.pathname, p) fileTruncates.Add(fs.pathname, 1) continue } @@ -216,7 +216,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if err == io.EOF || ctx.Err() != nil { select { case <-fs.stopChan: - glog.V(2).Infof("%v: stream has been stopped, exiting", fd) + glog.V(2).Infof("stream(%s): stream has been sopped, exiting", fs.pathname) if partial.Len() > 0 { sendLine(ctx, fs.pathname, partial, fs.lines) } @@ -225,7 +225,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake fs.mu.Unlock() return case <-ctx.Done(): - glog.V(2).Infof("%v: stream has been cancelled, exiting", fd) + glog.V(2).Infof("stream(%s): stream has been cancelled, exiting", fs.pathname) if partial.Len() > 0 { sendLine(ctx, fs.pathname, partial, fs.lines) } @@ -240,7 +240,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // Don't exit, instead yield and wait for a termination signal or // wakeup. - glog.V(2).Infof("%v: waiting", fd) + glog.V(2).Infof("stream(%s): waiting", fs.pathname) select { case <-fs.stopChan: // We may have started waiting here when the stop signal @@ -248,16 +248,16 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // written to. The file is not technically yet at EOF so // we need to go back and try one more read. We'll exit // the stream in the select stanza above. - glog.V(2).Infof("%v: Stopping after next read", fd) + glog.V(2).Infof("stream(%s): Stopping after next read", fs.pathname) case <-ctx.Done(): // Same for cancellation; this makes tests stable, but // could argue exiting immediately is less surprising. // Assumption is that this doesn't make a difference in // production. - glog.V(2).Infof("%v: Cancelled after next read", fd) + glog.V(2).Infof("stream(%s): Cancelled after next read", fs.pathname) case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("%v: Wake received", fd) + glog.V(2).Infof("stream(%s): Wake received", fs.pathname) } } }() @@ -275,7 +275,7 @@ func (fs *fileStream) IsComplete() bool { // Stop implements the LogStream interface. func (fs *fileStream) Stop() { fs.stopOnce.Do(func() { - glog.Info("signalling stop at next EOF") + glog.Infof("stream(%s): signalling stop at next EOF", fs.pathname) close(fs.stopChan) }) } From aa618305323ed39e1919b6e95bde34218eaebc49 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Sat, 1 Jun 2024 17:00:59 +0200 Subject: [PATCH 04/11] fix: Exit a logstream at EOF when OneShotEnabled. This means a stream will shut itself down and does not need the tailer to instruct it to do so. It thus avoids a race when the tailer doesn't know if the stream has performed enough reads. We need to separate the meaning of oneShot mode and streamFromStart as the former tells us how to exit a stream, and the latter tells us how to start reading a new file after log rotation. --- internal/tailer/logstream/filestream.go | 31 ++++++++++++++----- internal/tailer/logstream/filestream_test.go | 31 +++++++++++++++++++ .../tailer/logstream/filestream_unix_test.go | 5 +-- 3 files changed, 58 insertions(+), 9 deletions(-) diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index 709cf194e..c0eecafe6 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -48,9 +48,11 @@ type fileStream struct { } // newFileStream creates a new log stream from a regular file. -func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine, streamFromStart OneShotMode) (LogStream, error) { +func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { fs := &fileStream{ctx: ctx, pathname: pathname, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})} - if err := fs.stream(ctx, wg, waker, fi, streamFromStart); err != nil { + // Stream from the start of the file when in one shot mode. + streamFromStart := oneShot == OneShotEnabled + if err := fs.stream(ctx, wg, waker, fi, oneShot, streamFromStart); err != nil { return nil, err } return fs, nil @@ -62,15 +64,17 @@ func (fs *fileStream) LastReadTime() time.Time { return fs.lastReadTime } -func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, fi os.FileInfo, streamFromStart OneShotMode) error { +func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, fi os.FileInfo, oneShot OneShotMode, streamFromStart bool) error { fd, err := os.OpenFile(fs.pathname, os.O_RDONLY, 0o600) if err != nil { logErrors.Add(fs.pathname, 1) return err } logOpens.Add(fs.pathname, 1) - if !streamFromStart { glog.V(2).Infof("stream(%s): opened new file", fs.pathname) + if !streamFromStart { + // Normal operation for first stream is to ignore the past, and seek to + // EOF immediately to start tailing. if _, err := fd.Seek(0, io.SeekEnd); err != nil { logErrors.Add(fs.pathname, 1) if err := fd.Close(); err != nil { @@ -127,7 +131,8 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // retryable. if errors.Is(err, syscall.ESTALE) { glog.Infof("stream(%s): reopening stream due to %s", fs.pathname, err) - if nerr := fs.stream(ctx, wg, waker, fi, true); nerr != nil { + // streamFromStart always true on a stream reopen + if nerr := fs.stream(ctx, wg, waker, fi, oneShot, true); nerr != nil { glog.Infof("stream(%s): new stream: %v", fs.pathname, nerr) } // Close this stream. @@ -167,7 +172,8 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } if !os.SameFile(fi, newfi) { glog.V(2).Infof("stream(%s): adding a new file routine", fs.pathname) - if err := fs.stream(ctx, wg, waker, newfi, true); err != nil { + // Stream from start always true on a stream reopen + if err := fs.stream(ctx, wg, waker, newfi, oneShot, true); err != nil { glog.Info("stream(%s): new stream: %v", fs.pathname, err) } // We're at EOF so there's nothing left to read here. @@ -213,7 +219,18 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake Sleep: // If we get here it's because we've stalled. First test to see if it's // time to exit. - if err == io.EOF || ctx.Err() != nil { + if err == io.EOF { + if oneShot == OneShotEnabled { + // Exit now, because oneShot means read only to EOF. + glog.V(2).Infof("stream(%s): EOF in one shot mode, exiting", fs.pathname) + if partial.Len() > 0 { + sendLine(ctx, fs.pathname, partial, fs.lines) + } + fs.mu.Lock() + fs.completed = true + fs.mu.Unlock() + return + } select { case <-fs.stopChan: glog.V(2).Infof("stream(%s): stream has been sopped, exiting", fs.pathname) diff --git a/internal/tailer/logstream/filestream_test.go b/internal/tailer/logstream/filestream_test.go index 6950630b8..c37379df1 100644 --- a/internal/tailer/logstream/filestream_test.go +++ b/internal/tailer/logstream/filestream_test.go @@ -50,6 +50,37 @@ func TestFileStreamRead(t *testing.T) { wg.Wait() } +func TestFileStreamReadOneShot(t *testing.T) { + var wg sync.WaitGroup + + tmpDir := testutil.TestTempDir(t) + + name := filepath.Join(tmpDir, "log") + f := testutil.TestOpenFile(t, name) + defer f.Close() + testutil.WriteString(t, f, "yo\n") + + lines := make(chan *logline.LogLine, 1) + ctx, cancel := context.WithCancel(context.Background()) + waker := waker.NewTestAlways() + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + testutil.FatalIfErr(t, err) + + wg.Wait() + close(lines) + received := testutil.LinesReceived(lines) + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "yo"}, + } + testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + + if !fs.IsComplete() { + t.Errorf("expecting filestream to be complete because stopped") + } + cancel() + wg.Wait() +} + func TestFileStreamReadNonSingleByteEnd(t *testing.T) { var wg sync.WaitGroup diff --git a/internal/tailer/logstream/filestream_unix_test.go b/internal/tailer/logstream/filestream_unix_test.go index 9b6cc9ae3..1e8e1b976 100644 --- a/internal/tailer/logstream/filestream_unix_test.go +++ b/internal/tailer/logstream/filestream_unix_test.go @@ -37,7 +37,8 @@ func TestFileStreamRotation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + // OneShotDisabled because we hit EOF and need to wait. + fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) // fs.Stop() is also called explicitly further down but a failed test // and early return would lead to the handle staying open defer fs.Stop() @@ -88,7 +89,7 @@ func TestFileStreamURL(t *testing.T) { lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, "file://"+name, lines, logstream.OneShotEnabled) + fs, err := logstream.New(ctx, &wg, waker, "file://"+name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) From 6ab8d9c476e3e9cfa3673df22778eb0b9b4ee70d Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Sat, 8 Jun 2024 18:31:07 +0200 Subject: [PATCH 05/11] fix: Handle `oneShot` mode in the `dgramStream`. In this mode we merely tell datagrams to exit when a zero byte read occurs in oneshot mode. This is the same as before except we must also explicitly tell the stream to do it in this mode. --- internal/tailer/logstream/dgramstream.go | 13 ++++++++++--- internal/tailer/logstream/logstream.go | 4 ++-- internal/tailer/tail.go | 4 ---- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index 56819e281..776f51680 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -30,12 +30,12 @@ type dgramStream struct { stopChan chan struct{} // Close to start graceful shutdown. } -func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine) (LogStream, error) { +func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { if address == "" { return nil, ErrEmptySocketAddress } ss := &dgramStream{ctx: ctx, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})} - if err := ss.stream(ctx, wg, waker); err != nil { + if err := ss.stream(ctx, wg, waker, oneShot); err != nil { return nil, err } return ss, nil @@ -50,7 +50,7 @@ func (ss *dgramStream) LastReadTime() time.Time { // The read buffer size for datagrams. const datagramReadBufferSize = 131072 -func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker) error { +func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error { c, err := net.ListenPacket(ss.scheme, ss.address) if err != nil { logErrors.Add(ss.address, 1) @@ -89,6 +89,13 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak // logstream in graceful shutdown, then a zero-byte read is // equivalent to an "EOF" in connection and file oriented streams. if n == 0 { + if oneShot { + glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ss.scheme, ss.address) + if partial.Len() > 0 { + sendLine(ctx, ss.address, partial, ss.lines) + } + return + } select { case <-ss.stopChan: glog.V(2).Infof("%v: exiting because zero byte read after Stop", c) diff --git a/internal/tailer/logstream/logstream.go b/internal/tailer/logstream/logstream.go index 54b926257..a60a9f416 100644 --- a/internal/tailer/logstream/logstream.go +++ b/internal/tailer/logstream/logstream.go @@ -80,13 +80,13 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st default: glog.V(2).Infof("%v: %q in path pattern %q, treating as path", ErrUnsupportedURLScheme, u.Scheme, pathname) case "unixgram": - return newDgramStream(ctx, wg, waker, u.Scheme, u.Path, lines) + return newDgramStream(ctx, wg, waker, u.Scheme, u.Path, lines, oneShot) case "unix": return newSocketStream(ctx, wg, waker, u.Scheme, u.Path, lines, oneShot) case "tcp": return newSocketStream(ctx, wg, waker, u.Scheme, u.Host, lines, oneShot) case "udp": - return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, lines) + return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, lines, oneShot) case "", "file": path = u.Path } diff --git a/internal/tailer/tail.go b/internal/tailer/tail.go index f1e59c2e4..f60e69694 100644 --- a/internal/tailer/tail.go +++ b/internal/tailer/tail.go @@ -287,10 +287,6 @@ func (t *Tailer) TailPath(pathname string) error { if err != nil { return err } - if t.oneShot { - glog.V(2).Infof("Starting oneshot read at startup of %q", pathname) - l.Stop() - } t.logstreams[pathname] = l glog.Infof("Tailing %s", pathname) logCount.Add(1) From 38ab690f5c9a6537d1c6c6c3f13999563fc4faf8 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Sat, 8 Jun 2024 18:46:17 +0200 Subject: [PATCH 06/11] chore: Make log messages more readable in the logstreams. --- internal/tailer/logstream/dgramstream.go | 20 +++++++++---------- internal/tailer/logstream/pipestream.go | 14 ++++++------- internal/tailer/logstream/socketstream.go | 24 +++++++++++------------ 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index 776f51680..c006773ee 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -56,7 +56,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak logErrors.Add(ss.address, 1) return err } - glog.V(2).Infof("opened new datagram socket %v", c) + glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ss.scheme, ss.address, c) b := make([]byte, datagramReadBufferSize) partial := bytes.NewBufferString("") var total int @@ -64,8 +64,8 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak go func() { defer wg.Done() defer func() { - glog.V(2).Infof("%v: read total %d bytes from %s", c, total, ss.address) - glog.V(2).Infof("%v: closing connection", c) + glog.V(2).Infof("stream(%s:%s): read total %d bytes", ss.scheme, ss.address, total) + glog.V(2).Infof("stream(%s:%s): closing connection", ss.scheme, ss.address) err := c.Close() if err != nil { logErrors.Add(ss.address, 1) @@ -83,7 +83,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak for { n, _, err := c.ReadFrom(b) - glog.V(2).Infof("%v: read %d bytes, err is %v", c, n, err) + glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ss.scheme, ss.address, n, err) // This is a test-only trick that says if we've already put this // logstream in graceful shutdown, then a zero-byte read is @@ -98,7 +98,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak } select { case <-ss.stopChan: - glog.V(2).Infof("%v: exiting because zero byte read after Stop", c) + glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after Stop", ss.scheme, ss.address) return default: } @@ -117,12 +117,12 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if partial.Len() > 0 { sendLine(ctx, ss.address, partial, ss.lines) } - glog.V(2).Infof("%v: exiting, stream has error %s", c, err) + glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ss.scheme, ss.address, err) return } // Yield and wait - glog.V(2).Infof("%v: waiting", c) + glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address) select { case <-ss.stopChan: // We may have started waiting here when the stop signal @@ -130,7 +130,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak // written to. The file is not technically yet at EOF so // we need to go back and try one more read. We'll exit // the stream in the zero byte handler above. - glog.V(2).Infof("%v: Stopping after next zero byte read", c) + glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ss.scheme, ss.address) case <-ctx.Done(): // Exit immediately; a cancelled context will set an immediate // deadline on the next read which will cause us to exit then, @@ -138,7 +138,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak return case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("%v: Wake received", c) + glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address) } } }() @@ -152,7 +152,7 @@ func (ss *dgramStream) IsComplete() bool { } func (ss *dgramStream) Stop() { - glog.V(2).Infof("Stop received on datagram stream.") + glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ss.scheme, ss.address) ss.stopOnce.Do(func() { close(ss.stopChan) }) diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index f9493bfb8..f362c2d80 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -69,7 +69,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake logErrors.Add(ps.pathname, 1) return err } - glog.V(2).Infof("opened new pipe %v", fd) + glog.V(2).Infof("stream(%s): opened new pipe %v", ps.pathname, fd) b := make([]byte, defaultPipeReadBufferSize) partial := bytes.NewBufferString("") var total int @@ -77,8 +77,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake go func() { defer wg.Done() defer func() { - glog.V(2).Infof("%v: read total %d bytes from %s", fd, total, ps.pathname) - glog.V(2).Infof("%v: closing file descriptor", fd) + glog.V(2).Infof("stream(%s): read total %d bytes", ps.pathname, fd, total) + glog.V(2).Infof("stream(%s): closing file descriptor %v", ps.pathname, fd) err := fd.Close() if err != nil { logErrors.Add(ps.pathname, 1) @@ -95,7 +95,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake for { n, err := fd.Read(b) - glog.V(2).Infof("%v: read %d bytes, err is %v", fd, n, err) + glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ps.pathname, n, err) if n > 0 { total += n @@ -111,12 +111,12 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake if partial.Len() > 0 { sendLine(ctx, ps.pathname, partial, ps.lines) } - glog.V(2).Infof("%v: exiting, stream has error %s", fd, err) + glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.pathname, err) return } // Wait for wakeup or termination. - glog.V(2).Infof("%v: waiting", fd) + glog.V(2).Infof("stream(%s): waiting", ps.pathname) select { case <-ctx.Done(): // Exit immediately; cancelled context is going to cause the @@ -125,7 +125,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake return case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("%v: Wake received", fd) + glog.V(2).Infof("stream(%s): Wake received", ps.pathname) } } }() diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index 2e7f860d3..2ec867042 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -55,7 +55,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa logErrors.Add(ss.address, 1) return err } - glog.V(2).Infof("opened new socket listener %v", l) + glog.V(2).Infof("stream(%s:%s): opened new socket listener %v", ss.scheme, ss.address, l) initDone := make(chan struct{}) // Set up for shutdown @@ -70,7 +70,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa case <-ss.stopChan: } } - glog.V(2).Infof("%v: closing listener", l) + glog.V(2).Infof("stream(%s:%s): closing listener", ss.scheme, ss.address, l) err := l.Close() if err != nil { glog.Info(err) @@ -86,7 +86,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa glog.Info(err) return err } - glog.V(2).Infof("%v: got new conn %v", l, c) + glog.V(2).Infof("stream(%s:%s): got new conn %v", ss.scheme, ss.address, c) wg.Add(1) go ss.handleConn(ctx, wg, waker, c) return nil @@ -99,7 +99,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa if err := acceptConn(); err != nil { glog.Info(err) } - glog.Info("oneshot mode, retuning") + glog.Info("stream(%s:%s): oneshot mode, returning", ss.scheme, ss.address) close(initDone) }() return nil @@ -124,8 +124,8 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake partial := bytes.NewBufferString("") var total int defer func() { - glog.V(2).Infof("%v: read total %d bytes from %s", c, total, ss.address) - glog.V(2).Infof("%v: closing connection", c) + glog.V(2).Infof("stream(%s:%s): read total %d bytes from %s", ss.scheme, ss.address, c, total) + glog.V(2).Infof("stream(%s:%s): closing connection, %v", ss.scheme, ss.address, c) err := c.Close() if err != nil { logErrors.Add(ss.address, 1) @@ -139,7 +139,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake for { n, err := c.Read(b) - glog.V(2).Infof("%v: read %d bytes, err is %v", c, n, err) + glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ss.scheme, ss.address, n, err) if n > 0 { total += n @@ -154,23 +154,23 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake if partial.Len() > 0 { sendLine(ctx, ss.address, partial, ss.lines) } - glog.V(2).Infof("%v: exiting, conn has error %s", c, err) + glog.V(2).Infof("stream(%s:%s): exiting, conn has error %s", ss.scheme, ss.address, err) return } // Yield and wait - glog.V(2).Infof("%v: waiting", c) + glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address) select { case <-ctx.Done(): // Exit immediately; cancelled context will cause the next read to be interrupted and exit anyway, so no point waiting to loop. return case <-ss.stopChan: // Stop after connection is closed. - glog.V(2).Infof("%v: stopchan closed, exiting after next read timeout", c) + glog.V(2).Infof("stream(%s:%s): stopchan closed, exiting after next read timeout", ss.scheme, ss.address) case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("%v: Wake received", c) + glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address) } } } @@ -185,7 +185,7 @@ func (ss *socketStream) IsComplete() bool { // Stop will close the listener so no new connections will be accepted, and close all current connections once they have been closed by their peers. func (ss *socketStream) Stop() { ss.stopOnce.Do(func() { - glog.Info("signalling stop at next EOF") + glog.Infof("stream(%s:%s): signalling stop at next EOF", ss.scheme, ss.address) close(ss.stopChan) }) } From 3e8abbf3a1d3725b29eca7d82a01977754cb9eec Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Mon, 10 Jun 2024 17:04:59 +0200 Subject: [PATCH 07/11] fix: Use context cancellation to stop a dgramstream. Remove the stop channel, use a local context. Fix up one test. --- internal/tailer/logstream/dgramstream.go | 30 ++++++++----------- .../tailer/logstream/dgramstream_unix_test.go | 6 ++-- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index c006773ee..f7cbbd957 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -16,7 +16,9 @@ import ( ) type dgramStream struct { - ctx context.Context + ctx context.Context + cancel context.CancelFunc + lines chan<- *logline.LogLine scheme string // Datagram scheme, either "unixgram" or "udp". @@ -25,17 +27,15 @@ type dgramStream struct { mu sync.RWMutex // protects following fields completed bool // This pipestream is completed and can no longer be used. lastReadTime time.Time // Last time a log line was read from this named pipe - - stopOnce sync.Once // Ensure stopChan only closed once. - stopChan chan struct{} // Close to start graceful shutdown. } func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { if address == "" { return nil, ErrEmptySocketAddress } - ss := &dgramStream{ctx: ctx, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})} - if err := ss.stream(ctx, wg, waker, oneShot); err != nil { + ss := &dgramStream{scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines} + ss.ctx, ss.cancel = context.WithCancel(ctx) + if err := ss.stream(ss.ctx, wg, waker, oneShot); err != nil { return nil, err } return ss, nil @@ -97,8 +97,11 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak return } select { - case <-ss.stopChan: - glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after Stop", ss.scheme, ss.address) + case <-ctx.Done(): + glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ss.scheme, ss.address) + if partial.Len() > 0 { + sendLine(ctx, ss.address, partial, ss.lines) + } return default: } @@ -124,18 +127,13 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak // Yield and wait glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address) select { - case <-ss.stopChan: + case <-ctx.Done(): // We may have started waiting here when the stop signal // arrives, but since that wait the file may have been // written to. The file is not technically yet at EOF so // we need to go back and try one more read. We'll exit // the stream in the zero byte handler above. glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ss.scheme, ss.address) - case <-ctx.Done(): - // Exit immediately; a cancelled context will set an immediate - // deadline on the next read which will cause us to exit then, - // so don't bother going around the loop again. - return case <-waker.Wake(): // sleep until next Wake() glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address) @@ -153,7 +151,5 @@ func (ss *dgramStream) IsComplete() bool { func (ss *dgramStream) Stop() { glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ss.scheme, ss.address) - ss.stopOnce.Do(func() { - close(ss.stopChan) - }) + ss.cancel() } diff --git a/internal/tailer/logstream/dgramstream_unix_test.go b/internal/tailer/logstream/dgramstream_unix_test.go index 6fefa3d91..3bb89abc2 100644 --- a/internal/tailer/logstream/dgramstream_unix_test.go +++ b/internal/tailer/logstream/dgramstream_unix_test.go @@ -43,7 +43,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { waker, awaken := waker.NewTest(ctx, 1, "stream") sockName := scheme + "://" + addr - ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotDisabled) + ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotEnabled) testutil.FatalIfErr(t, err) s, err := net.Dial(scheme, addr) @@ -54,9 +54,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { awaken(0, 0) // sync past read - ss.Stop() - - // "Close" the socket by sending zero bytes, which after Stop tells the stream to act as if we're done. + // "Close" the socket by sending zero bytes, which in oneshot mode tells the stream to act as if we're done. _, err = s.Write([]byte{}) testutil.FatalIfErr(t, err) From 35ce3f58a00c70ec41db4f7d738a8b1f4325b51f Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Mon, 10 Jun 2024 17:10:40 +0200 Subject: [PATCH 08/11] fix: Use context cancellation to shut down socketstream. Fix up the test to not need cancellation for oneshot mode. --- internal/tailer/logstream/socketstream.go | 26 +++++++------------ .../logstream/socketstream_unix_test.go | 12 ++++----- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index 2ec867042..d35e0bd3d 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -16,8 +16,9 @@ import ( ) type socketStream struct { - ctx context.Context - lines chan<- *logline.LogLine + ctx context.Context + cancel context.CancelFunc + lines chan<- *logline.LogLine oneShot OneShotMode scheme string // URL Scheme to listen with, either tcp or unix @@ -26,17 +27,15 @@ type socketStream struct { mu sync.RWMutex // protects following fields completed bool // This socketStream is completed and can no longer be used. lastReadTime time.Time // Last time a log line was read from this socket - - stopOnce sync.Once // Ensure stopChan only closed once. - stopChan chan struct{} // Close to start graceful shutdown. } func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { if address == "" { return nil, ErrEmptySocketAddress } - ss := &socketStream{ctx: ctx, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})} - if err := ss.stream(ctx, wg, waker); err != nil { + ss := &socketStream{ctx: ctx, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines} + ss.ctx, ss.cancel = context.WithCancel(ctx) + if err := ss.stream(ss.ctx, wg, waker); err != nil { return nil, err } return ss, nil @@ -67,7 +66,6 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa if !ss.oneShot { select { case <-ctx.Done(): - case <-ss.stopChan: } } glog.V(2).Infof("stream(%s:%s): closing listener", ss.scheme, ss.address, l) @@ -163,11 +161,8 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address) select { case <-ctx.Done(): - // Exit immediately; cancelled context will cause the next read to be interrupted and exit anyway, so no point waiting to loop. - return - case <-ss.stopChan: - // Stop after connection is closed. - glog.V(2).Infof("stream(%s:%s): stopchan closed, exiting after next read timeout", ss.scheme, ss.address) + // Cancelled context will cause the next read to be interrupted and exit. + glog.V(2).Infof("stream(%s:%s): context cancelled, exiting after next read timeout", ss.scheme, ss.address) case <-waker.Wake(): // sleep until next Wake() glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address) @@ -184,8 +179,5 @@ func (ss *socketStream) IsComplete() bool { // Stop implements the Logstream interface. // Stop will close the listener so no new connections will be accepted, and close all current connections once they have been closed by their peers. func (ss *socketStream) Stop() { - ss.stopOnce.Do(func() { - glog.Infof("stream(%s:%s): signalling stop at next EOF", ss.scheme, ss.address) - close(ss.stopChan) - }) + ss.cancel() } diff --git a/internal/tailer/logstream/socketstream_unix_test.go b/internal/tailer/logstream/socketstream_unix_test.go index 06e02bc7a..9517d87b0 100644 --- a/internal/tailer/logstream/socketstream_unix_test.go +++ b/internal/tailer/logstream/socketstream_unix_test.go @@ -41,7 +41,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { waker, awaken := waker.NewTest(ctx, 1, "stream") sockName := scheme + "://" + addr - ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotDisabled) + ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotEnabled) testutil.FatalIfErr(t, err) s, err := net.Dial(scheme, addr) @@ -55,9 +55,8 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { // Close the socket to signal to the socketStream to shut down. testutil.FatalIfErr(t, s.Close()) - ss.Stop() // stop after connection closes - wg.Wait() + close(lines) received := testutil.LinesReceived(lines) @@ -66,11 +65,12 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) - cancel() - if !ss.IsComplete() { t.Errorf("expecting socketstream to be complete because socket closed") } + + cancel() // stop after connection closes + })) } } @@ -108,8 +108,8 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) { awaken(0, 0) // Sync past read to ensure we read cancel() // This cancellation should cause the stream to shut down immediately. - wg.Wait() + close(lines) received := testutil.LinesReceived(lines) From f35753ed529af83eb1aad4a218ce63694da7921f Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Tue, 11 Jun 2024 07:05:01 +0200 Subject: [PATCH 09/11] fix: Use context cancellation to shut down a filestream. --- internal/tailer/logstream/filestream.go | 40 +++++------------ internal/tailer/logstream/filestream_test.go | 45 +++++++++---------- .../tailer/logstream/filestream_unix_test.go | 25 +++++------ 3 files changed, 43 insertions(+), 67 deletions(-) diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index c0eecafe6..c65e913af 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -34,25 +34,24 @@ var fileTruncates = expvar.NewMap("file_truncates_total") // a new goroutine and closes itself down. The shared context is used for // cancellation. type fileStream struct { - ctx context.Context - lines chan<- *logline.LogLine + ctx context.Context + cancel context.CancelFunc + lines chan<- *logline.LogLine pathname string // Given name for the underlying file on the filesystem mu sync.RWMutex // protects following fields. lastReadTime time.Time // Last time a log line was read from this file completed bool // The filestream is completed and can no longer be used. - - stopOnce sync.Once // Ensure stopChan only closed once. - stopChan chan struct{} // Close to start graceful shutdown. } // newFileStream creates a new log stream from a regular file. func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { - fs := &fileStream{ctx: ctx, pathname: pathname, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})} + fs := &fileStream{pathname: pathname, lastReadTime: time.Now(), lines: lines} + fs.ctx, fs.cancel = context.WithCancel(ctx) // Stream from the start of the file when in one shot mode. streamFromStart := oneShot == OneShotEnabled - if err := fs.stream(ctx, wg, waker, fi, oneShot, streamFromStart); err != nil { + if err := fs.stream(fs.ctx, wg, waker, fi, oneShot, streamFromStart); err != nil { return nil, err } return fs, nil @@ -155,7 +154,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // file is in the middle of a rotation and gets recreated // in the next moment. We can't rely on the Tailer to tell // us we're deleted because the tailer can only tell us to - // Stop, which ends up causing us to race here against + // cancel, which ends up causing us to race here against // detection of IsCompleted. if os.IsNotExist(serr) { glog.V(2).Infof("stream(%s): source no longer exists, exiting", fs.pathname) @@ -232,17 +231,8 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake return } select { - case <-fs.stopChan: - glog.V(2).Infof("stream(%s): stream has been sopped, exiting", fs.pathname) - if partial.Len() > 0 { - sendLine(ctx, fs.pathname, partial, fs.lines) - } - fs.mu.Lock() - fs.completed = true - fs.mu.Unlock() - return case <-ctx.Done(): - glog.V(2).Infof("stream(%s): stream has been cancelled, exiting", fs.pathname) + glog.V(2).Infof("stream(%s): context has been cancelled, exiting", fs.pathname) if partial.Len() > 0 { sendLine(ctx, fs.pathname, partial, fs.lines) } @@ -259,15 +249,12 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // wakeup. glog.V(2).Infof("stream(%s): waiting", fs.pathname) select { - case <-fs.stopChan: - // We may have started waiting here when the stop signal + case <-ctx.Done(): + // We may have started waiting here when the cancellation // arrives, but since that wait the file may have been // written to. The file is not technically yet at EOF so // we need to go back and try one more read. We'll exit - // the stream in the select stanza above. - glog.V(2).Infof("stream(%s): Stopping after next read", fs.pathname) - case <-ctx.Done(): - // Same for cancellation; this makes tests stable, but + // the stream in the select stanza above. This makes tests stable, but // could argue exiting immediately is less surprising. // Assumption is that this doesn't make a difference in // production. @@ -291,8 +278,5 @@ func (fs *fileStream) IsComplete() bool { // Stop implements the LogStream interface. func (fs *fileStream) Stop() { - fs.stopOnce.Do(func() { - glog.Infof("stream(%s): signalling stop at next EOF", fs.pathname) - close(fs.stopChan) - }) + fs.cancel() } diff --git a/internal/tailer/logstream/filestream_test.go b/internal/tailer/logstream/filestream_test.go index c37379df1..f3ab76d98 100644 --- a/internal/tailer/logstream/filestream_test.go +++ b/internal/tailer/logstream/filestream_test.go @@ -29,12 +29,12 @@ func TestFileStreamRead(t *testing.T) { waker, awaken := waker.NewTest(ctx, 1, "stream") fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) - awaken(1, 1) + awaken(1, 1) // synchronise past first read testutil.WriteString(t, f, "yo\n") awaken(1, 1) - fs.Stop() + cancel() wg.Wait() close(lines) received := testutil.LinesReceived(lines) @@ -46,8 +46,6 @@ func TestFileStreamRead(t *testing.T) { if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because stopped") } - cancel() - wg.Wait() } func TestFileStreamReadOneShot(t *testing.T) { @@ -215,7 +213,7 @@ func TestFileStreamTruncation(t *testing.T) { wg.Wait() } -func TestFileStreamFinishedBecauseCancel(t *testing.T) { +func TestFileStreamPartialRead(t *testing.T) { var wg sync.WaitGroup tmpDir := testutil.TestTempDir(t) @@ -230,15 +228,18 @@ func TestFileStreamFinishedBecauseCancel(t *testing.T) { fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) - awaken(1, 1) // Synchronise past first read after seekToEnd + awaken(1, 1) - testutil.WriteString(t, f, "yo\n") + testutil.WriteString(t, f, "yo") + awaken(1, 1) + + testutil.WriteString(t, f, "\n") awaken(1, 1) cancel() wg.Wait() - close(lines) // Signal it's time to go. + close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ {Context: context.TODO(), Filename: name, Line: "yo"}, @@ -246,12 +247,13 @@ func TestFileStreamFinishedBecauseCancel(t *testing.T) { testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) if !fs.IsComplete() { - t.Errorf("expecting filestream to be complete because stream was cancelled") + t.Errorf("expecting filestream to be complete because cancellation") } } -func TestFileStreamPartialRead(t *testing.T) { +func TestFileStreamReadToEOFOnCancel(t *testing.T) { var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) tmpDir := testutil.TestTempDir(t) @@ -259,37 +261,30 @@ func TestFileStreamPartialRead(t *testing.T) { f := testutil.TestOpenFile(t, name) defer f.Close() - lines := make(chan *logline.LogLine, 1) - ctx, cancel := context.WithCancel(context.Background()) + lines := make(chan *logline.LogLine, 2) waker, awaken := waker.NewTest(ctx, 1, "stream") fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) awaken(1, 1) - testutil.WriteString(t, f, "yo") + testutil.WriteString(t, f, "line 1\n") awaken(1, 1) - // received := testutil.LinesReceived(lines) - // expected := []*logline.LogLine{} - // testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) - - testutil.WriteString(t, f, "\n") - awaken(1, 1) + testutil.WriteString(t, f, "line 2\n") + cancel() // cancel wakes the stream - fs.Stop() wg.Wait() - close(lines) + + close(lines) // Signal it's time to go. received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: "yo"}, + {Context: context.TODO(), Filename: name, Line: "line 1"}, + {Context: context.TODO(), Filename: name, Line: "line 2"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because cancellation") } - - cancel() - wg.Wait() } diff --git a/internal/tailer/logstream/filestream_unix_test.go b/internal/tailer/logstream/filestream_unix_test.go index 1e8e1b976..ae63173e5 100644 --- a/internal/tailer/logstream/filestream_unix_test.go +++ b/internal/tailer/logstream/filestream_unix_test.go @@ -39,12 +39,9 @@ func TestFileStreamRotation(t *testing.T) { // OneShotDisabled because we hit EOF and need to wait. fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) - // fs.Stop() is also called explicitly further down but a failed test - // and early return would lead to the handle staying open - defer fs.Stop() testutil.FatalIfErr(t, err) - awaken(1, 1) + awaken(1, 1) // sync to eof glog.Info("write 1") testutil.WriteString(t, f, "1\n") @@ -62,19 +59,20 @@ func TestFileStreamRotation(t *testing.T) { testutil.WriteString(t, f, "2\n") awaken(1, 1) - fs.Stop() + cancel() wg.Wait() - close(lines) + close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "1"}, - {context.TODO(), name, "2"}, + {Context: context.TODO(), Filename: name, Line: "1"}, + {Context: context.TODO(), Filename: name, Line: "2"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) - cancel() - wg.Wait() + if !fs.IsComplete() { + t.Errorf("expecting filestream to be complete because stopped") + } } func TestFileStreamURL(t *testing.T) { @@ -96,20 +94,19 @@ func TestFileStreamURL(t *testing.T) { testutil.WriteString(t, f, "yo\n") awaken(1, 1) - fs.Stop() + cancel() wg.Wait() + close(lines) received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ - {context.TODO(), name, "yo"}, + {Context: context.TODO(), Filename: name, Line: "yo"}, } testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because stopped") } - cancel() - wg.Wait() } // TestFileStreamOpenFailure is a unix-specific test because on Windows, it is not possible to create a file From c967ed5c3db30c79efb23cee83f87b86f5174cc4 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Tue, 11 Jun 2024 07:59:03 +0200 Subject: [PATCH 10/11] chore: Pass contexts instead of storing them in the logstreams. We don't use the contexts in struct anyway, so this is neater. Fixes a lint warning. --- internal/tailer/logstream/dgramstream.go | 9 ++++----- internal/tailer/logstream/filestream.go | 10 +++++----- internal/tailer/logstream/socketstream.go | 12 ++++++------ 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index f7cbbd957..acce398f6 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -16,7 +16,6 @@ import ( ) type dgramStream struct { - ctx context.Context cancel context.CancelFunc lines chan<- *logline.LogLine @@ -33,9 +32,9 @@ func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, if address == "" { return nil, ErrEmptySocketAddress } - ss := &dgramStream{scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines} - ss.ctx, ss.cancel = context.WithCancel(ctx) - if err := ss.stream(ss.ctx, wg, waker, oneShot); err != nil { + ctx, cancel := context.WithCancel(ctx) + ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines} + if err := ss.stream(ctx, wg, waker, oneShot); err != nil { return nil, err } return ss, nil @@ -110,7 +109,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if n > 0 { total += n //nolint:contextcheck - decodeAndSend(ss.ctx, ss.lines, ss.address, n, b[:n], partial) + decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial) ss.mu.Lock() ss.lastReadTime = time.Now() ss.mu.Unlock() diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index c65e913af..0a2ab9317 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -34,9 +34,9 @@ var fileTruncates = expvar.NewMap("file_truncates_total") // a new goroutine and closes itself down. The shared context is used for // cancellation. type fileStream struct { - ctx context.Context cancel context.CancelFunc - lines chan<- *logline.LogLine + + lines chan<- *logline.LogLine pathname string // Given name for the underlying file on the filesystem @@ -47,11 +47,11 @@ type fileStream struct { // newFileStream creates a new log stream from a regular file. func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { - fs := &fileStream{pathname: pathname, lastReadTime: time.Now(), lines: lines} - fs.ctx, fs.cancel = context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) + fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: lines} // Stream from the start of the file when in one shot mode. streamFromStart := oneShot == OneShotEnabled - if err := fs.stream(fs.ctx, wg, waker, fi, oneShot, streamFromStart); err != nil { + if err := fs.stream(ctx, wg, waker, fi, oneShot, streamFromStart); err != nil { return nil, err } return fs, nil diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index d35e0bd3d..f50d1ef05 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -16,9 +16,9 @@ import ( ) type socketStream struct { - ctx context.Context cancel context.CancelFunc - lines chan<- *logline.LogLine + + lines chan<- *logline.LogLine oneShot OneShotMode scheme string // URL Scheme to listen with, either tcp or unix @@ -33,9 +33,9 @@ func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, if address == "" { return nil, ErrEmptySocketAddress } - ss := &socketStream{ctx: ctx, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines} - ss.ctx, ss.cancel = context.WithCancel(ctx) - if err := ss.stream(ss.ctx, wg, waker); err != nil { + ctx, cancel := context.WithCancel(ctx) + ss := &socketStream{cancel: cancel, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines} + if err := ss.stream(ctx, wg, waker); err != nil { return nil, err } return ss, nil @@ -142,7 +142,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake if n > 0 { total += n //nolint:contextcheck - decodeAndSend(ss.ctx, ss.lines, ss.address, n, b[:n], partial) + decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial) ss.mu.Lock() ss.lastReadTime = time.Now() ss.mu.Unlock() From 3d4fdba1122a7b7ed2226f8cb147f8300756c0de Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Tue, 11 Jun 2024 07:59:47 +0200 Subject: [PATCH 11/11] chore: Fix a lint warning --- internal/tailer/logstream/socketstream_unix_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/tailer/logstream/socketstream_unix_test.go b/internal/tailer/logstream/socketstream_unix_test.go index 9517d87b0..d6f5f3c24 100644 --- a/internal/tailer/logstream/socketstream_unix_test.go +++ b/internal/tailer/logstream/socketstream_unix_test.go @@ -70,7 +70,6 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { } cancel() // stop after connection closes - })) } }