diff --git a/pkg/network/usm/sharedlibraries/watcher.go b/pkg/network/usm/sharedlibraries/watcher.go index f23673fc6c24e..f13548414a017 100644 --- a/pkg/network/usm/sharedlibraries/watcher.go +++ b/pkg/network/usm/sharedlibraries/watcher.go @@ -26,9 +26,10 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/log" ) -const ( +var ( // The interval of the periodic scan for terminated processes. Increasing the interval, might cause larger spikes in cpu - // and lowering it might cause constant cpu usage. + // and lowering it might cause constant cpu usage. This is a var instead of a const only because the test code changes + // this value to speed up test execution. scanTerminatedProcessesInterval = 30 * time.Second ) @@ -55,6 +56,7 @@ type Rule struct { // Watcher provides a way to tie callback functions to the lifecycle of shared libraries type Watcher struct { + syncMutex sync.RWMutex wg sync.WaitGroup done chan struct{} procRoot string @@ -64,6 +66,7 @@ type Watcher struct { ebpfProgram *EbpfProgram libset Libset thisPID int + scannedPIDs map[uint32]int // telemetry libHits *telemetry.Counter @@ -90,6 +93,7 @@ func NewWatcher(cfg *config.Config, libset Libset, rules ...Rule) (*Watcher, err processMonitor: monitor.GetProcessMonitor(), ebpfProgram: ebpfProgram, registry: utils.NewFileRegistry(consts.USMModuleName, "shared_libraries"), + scannedPIDs: make(map[uint32]int), libHits: telemetry.NewCounter("usm.so_watcher.hits", telemetry.OptPrometheus), libMatches: telemetry.NewCounter("usm.so_watcher.matches", telemetry.OptPrometheus), @@ -274,11 +278,7 @@ func (w *Watcher) Start() { case <-w.done: return case <-processSync.C: - processSet := w.registry.GetRegisteredProcesses() - deletedPids := findDeletedProcesses(processSet) - for deletedPid := range deletedPids { - _ = w.registry.Unregister(deletedPid) - } + w.sync() } } }() @@ -291,28 +291,60 @@ func (w *Watcher) Start() { utils.AddAttacher(consts.USMModuleName, "native", w) } -// findDeletedProcesses returns the terminated PIDs from the given map. -func findDeletedProcesses[V any](pids map[uint32]V) map[uint32]struct{} { - existingPids := make(map[uint32]struct{}, len(pids)) +// sync unregisters from any terminated processes which we missed the exit +// callback for, and also attempts to register to running processes to ensure +// that we don't miss any process. +func (w *Watcher) sync() { + // The mutex is only used for protection with the test code which reads the + // scannedPIDs map. + w.syncMutex.Lock() + defer w.syncMutex.Unlock() + + deletionCandidates := w.registry.GetRegisteredProcesses() + alivePIDs := make(map[uint32]struct{}) - procIter := func(pid int) error { - if _, exists := pids[uint32(pid)]; exists { - existingPids[uint32(pid)] = struct{}{} + _ = kernel.WithAllProcs(kernel.ProcFSRoot(), func(origPid int) error { + if origPid == w.thisPID { // don't scan ourselves + return nil } + + pid := uint32(origPid) + alivePIDs[pid] = struct{}{} + + if _, ok := deletionCandidates[pid]; ok { + // We have previously hooked into this process and it remains + // active, so we remove it from the deletionCandidates list, and + // move on to the next PID + delete(deletionCandidates, pid) + return nil + } + + scanned := w.scannedPIDs[pid] + + // Try to scan twice. This is because we may happen to scan the process + // just after it has been exec'd and before it has opened its shared + // libraries. Scanning twice with the sync interval reduce this risk of + // missing shared libraries due to this. + if scanned < 2 { + w.scannedPIDs[pid]++ + err := w.AttachPID(pid) + if err == nil { + log.Debugf("watcher attached to %v via periodic scan", pid) + w.scannedPIDs[pid] = 2 + } + } + return nil - } - // Scanning already running processes - if err := kernel.WithAllProcs(kernel.ProcFSRoot(), procIter); err != nil { - return nil - } + }) - res := make(map[uint32]struct{}, len(pids)-len(existingPids)) - for pid := range pids { - if _, exists := existingPids[pid]; exists { - continue + // Clean up dead processes from the list of scanned PIDs + for pid := range w.scannedPIDs { + if _, alive := alivePIDs[pid]; !alive { + delete(w.scannedPIDs, pid) } - res[pid] = struct{}{} } - return res + for pid := range deletionCandidates { + _ = w.registry.Unregister(pid) + } } diff --git a/pkg/network/usm/sharedlibraries/watcher_test.go b/pkg/network/usm/sharedlibraries/watcher_test.go index 8c9864a91af8f..5a0c29e862c10 100644 --- a/pkg/network/usm/sharedlibraries/watcher_test.go +++ b/pkg/network/usm/sharedlibraries/watcher_test.go @@ -151,6 +151,100 @@ func (s *SharedLibrarySuite) TestLongPath() { }, time.Second*10, 100*time.Millisecond) } +// Tests that the periodic scan is able to detect processes which are missed by +// the eBPF-based watcher. +func (s *SharedLibrarySuite) TestSharedLibraryDetectionPeriodic() { + t := s.T() + + // Construct a large path to exceed the limits of the eBPF-based watcher + // (LIB_PATH_MAX_SIZE). 255 is the max filename size of ext4. The path + // size will also include the directories leading up to this filename so the + // total size will be more. + var b strings.Builder + final := "foo-libssl.so" + for i := 0; i < 255-len(final); i++ { + b.WriteByte('x') + } + b.WriteString(final) + filename := b.String() + + // Reduce interval to speed up test + orig := scanTerminatedProcessesInterval + t.Cleanup(func() { scanTerminatedProcessesInterval = orig }) + scanTerminatedProcessesInterval = 10 * time.Millisecond + + fooPath1, fooPathID1 := createTempTestFile(t, filename) + errPath, errorPathID := createTempTestFile(t, strings.Replace(filename, "xfoo", "yfoo", 1)) + + registerRecorder := new(utils.CallbackRecorder) + unregisterRecorder := new(utils.CallbackRecorder) + + registerCallback := registerRecorder.Callback() + + watcher, err := NewWatcher(utils.NewUSMEmptyConfig(), LibsetCrypto, + Rule{ + Re: regexp.MustCompile(`foo-libssl.so`), + RegisterCB: func(fp utils.FilePath) error { + registerCallback(fp) + if fp.ID == errorPathID { + return utils.ErrEnvironment + } + return nil + }, + UnregisterCB: unregisterRecorder.Callback(), + }, + ) + require.NoError(t, err) + watcher.Start() + t.Cleanup(watcher.Stop) + + // create files + command1, err := fileopener.OpenFromAnotherProcess(t, fooPath1) + pid := command1.Process.Pid + require.NoError(t, err) + + command2, err := fileopener.OpenFromAnotherProcess(t, errPath) + pid2 := command2.Process.Pid + require.NoError(t, err) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, registerRecorder.CallsForPathID(fooPathID1), 1) + + // Check that we tried to attach to the process twice. See w.sync() for + // why we do it. We don't actually need to attempt the registration + // twice, we just need to ensure that the maps were scanned twice but we + // don't have a hook for that so this check should be good enough. + assert.Equal(c, registerRecorder.CallsForPathID(errorPathID), 2) + }, time.Second*10, 100*time.Millisecond, "") + + require.EventuallyWithT(t, func(c *assert.CollectT) { + watcher.syncMutex.Lock() + defer watcher.syncMutex.Unlock() + + assert.Contains(c, watcher.scannedPIDs, uint32(pid)) + assert.Contains(c, watcher.scannedPIDs, uint32(pid2)) + }, time.Second*10, 100*time.Millisecond) + + require.NoError(t, command1.Process.Kill()) + require.NoError(t, command2.Process.Kill()) + + command1.Process.Wait() + command2.Process.Wait() + + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, unregisterRecorder.CallsForPathID(fooPathID1), 1) + }, time.Second*10, 100*time.Millisecond) + + // Check that clean up of dead processes works. + require.EventuallyWithT(t, func(c *assert.CollectT) { + watcher.syncMutex.Lock() + defer watcher.syncMutex.Unlock() + + assert.NotContains(c, watcher.scannedPIDs, uint32(pid)) + assert.NotContains(c, watcher.scannedPIDs, uint32(pid2)) + }, time.Second*10, 100*time.Millisecond) +} + func (s *SharedLibrarySuite) TestSharedLibraryDetectionWithPIDAndRootNamespace() { t := s.T() _, err := os.Stat("/usr/bin/busybox")