Skip to content

Commit

Permalink
usm: watcher: Add periodic scan (#32400)
Browse files Browse the repository at this point in the history
Co-authored-by: Guy Arbitman <[email protected]>
  • Loading branch information
vitkyrka and guyarb authored Dec 24, 2024
1 parent dbadf91 commit ba35e2a
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 24 deletions.
80 changes: 56 additions & 24 deletions pkg/network/usm/sharedlibraries/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
var (
// The interval of the periodic scan for terminated processes. Increasing the interval, might cause larger spikes in cpu
// and lowering it might cause constant cpu usage.
// and lowering it might cause constant cpu usage. This is a var instead of a const only because the test code changes
// this value to speed up test execution.
scanTerminatedProcessesInterval = 30 * time.Second
)

Expand All @@ -55,6 +56,7 @@ type Rule struct {

// Watcher provides a way to tie callback functions to the lifecycle of shared libraries
type Watcher struct {
syncMutex sync.RWMutex
wg sync.WaitGroup
done chan struct{}
procRoot string
Expand All @@ -64,6 +66,7 @@ type Watcher struct {
ebpfProgram *EbpfProgram
libset Libset
thisPID int
scannedPIDs map[uint32]int

// telemetry
libHits *telemetry.Counter
Expand All @@ -90,6 +93,7 @@ func NewWatcher(cfg *config.Config, libset Libset, rules ...Rule) (*Watcher, err
processMonitor: monitor.GetProcessMonitor(),
ebpfProgram: ebpfProgram,
registry: utils.NewFileRegistry(consts.USMModuleName, "shared_libraries"),
scannedPIDs: make(map[uint32]int),

libHits: telemetry.NewCounter("usm.so_watcher.hits", telemetry.OptPrometheus),
libMatches: telemetry.NewCounter("usm.so_watcher.matches", telemetry.OptPrometheus),
Expand Down Expand Up @@ -274,11 +278,7 @@ func (w *Watcher) Start() {
case <-w.done:
return
case <-processSync.C:
processSet := w.registry.GetRegisteredProcesses()
deletedPids := findDeletedProcesses(processSet)
for deletedPid := range deletedPids {
_ = w.registry.Unregister(deletedPid)
}
w.sync()
}
}
}()
Expand All @@ -291,28 +291,60 @@ func (w *Watcher) Start() {
utils.AddAttacher(consts.USMModuleName, "native", w)
}

// findDeletedProcesses returns the terminated PIDs from the given map.
func findDeletedProcesses[V any](pids map[uint32]V) map[uint32]struct{} {
existingPids := make(map[uint32]struct{}, len(pids))
// sync unregisters from any terminated processes which we missed the exit
// callback for, and also attempts to register to running processes to ensure
// that we don't miss any process.
func (w *Watcher) sync() {
// The mutex is only used for protection with the test code which reads the
// scannedPIDs map.
w.syncMutex.Lock()
defer w.syncMutex.Unlock()

deletionCandidates := w.registry.GetRegisteredProcesses()
alivePIDs := make(map[uint32]struct{})

procIter := func(pid int) error {
if _, exists := pids[uint32(pid)]; exists {
existingPids[uint32(pid)] = struct{}{}
_ = kernel.WithAllProcs(kernel.ProcFSRoot(), func(origPid int) error {
if origPid == w.thisPID { // don't scan ourselves
return nil
}

pid := uint32(origPid)
alivePIDs[pid] = struct{}{}

if _, ok := deletionCandidates[pid]; ok {
// We have previously hooked into this process and it remains
// active, so we remove it from the deletionCandidates list, and
// move on to the next PID
delete(deletionCandidates, pid)
return nil
}

scanned := w.scannedPIDs[pid]

// Try to scan twice. This is because we may happen to scan the process
// just after it has been exec'd and before it has opened its shared
// libraries. Scanning twice with the sync interval reduce this risk of
// missing shared libraries due to this.
if scanned < 2 {
w.scannedPIDs[pid]++
err := w.AttachPID(pid)
if err == nil {
log.Debugf("watcher attached to %v via periodic scan", pid)
w.scannedPIDs[pid] = 2
}
}

return nil
}
// Scanning already running processes
if err := kernel.WithAllProcs(kernel.ProcFSRoot(), procIter); err != nil {
return nil
}
})

res := make(map[uint32]struct{}, len(pids)-len(existingPids))
for pid := range pids {
if _, exists := existingPids[pid]; exists {
continue
// Clean up dead processes from the list of scanned PIDs
for pid := range w.scannedPIDs {
if _, alive := alivePIDs[pid]; !alive {
delete(w.scannedPIDs, pid)
}
res[pid] = struct{}{}
}

return res
for pid := range deletionCandidates {
_ = w.registry.Unregister(pid)
}
}
94 changes: 94 additions & 0 deletions pkg/network/usm/sharedlibraries/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,100 @@ func (s *SharedLibrarySuite) TestLongPath() {
}, time.Second*10, 100*time.Millisecond)
}

// Tests that the periodic scan is able to detect processes which are missed by
// the eBPF-based watcher.
func (s *SharedLibrarySuite) TestSharedLibraryDetectionPeriodic() {
t := s.T()

// Construct a large path to exceed the limits of the eBPF-based watcher
// (LIB_PATH_MAX_SIZE). 255 is the max filename size of ext4. The path
// size will also include the directories leading up to this filename so the
// total size will be more.
var b strings.Builder
final := "foo-libssl.so"
for i := 0; i < 255-len(final); i++ {
b.WriteByte('x')
}
b.WriteString(final)
filename := b.String()

// Reduce interval to speed up test
orig := scanTerminatedProcessesInterval
t.Cleanup(func() { scanTerminatedProcessesInterval = orig })
scanTerminatedProcessesInterval = 10 * time.Millisecond

fooPath1, fooPathID1 := createTempTestFile(t, filename)
errPath, errorPathID := createTempTestFile(t, strings.Replace(filename, "xfoo", "yfoo", 1))

registerRecorder := new(utils.CallbackRecorder)
unregisterRecorder := new(utils.CallbackRecorder)

registerCallback := registerRecorder.Callback()

watcher, err := NewWatcher(utils.NewUSMEmptyConfig(), LibsetCrypto,
Rule{
Re: regexp.MustCompile(`foo-libssl.so`),
RegisterCB: func(fp utils.FilePath) error {
registerCallback(fp)
if fp.ID == errorPathID {
return utils.ErrEnvironment
}
return nil
},
UnregisterCB: unregisterRecorder.Callback(),
},
)
require.NoError(t, err)
watcher.Start()
t.Cleanup(watcher.Stop)

// create files
command1, err := fileopener.OpenFromAnotherProcess(t, fooPath1)
pid := command1.Process.Pid
require.NoError(t, err)

command2, err := fileopener.OpenFromAnotherProcess(t, errPath)
pid2 := command2.Process.Pid
require.NoError(t, err)

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, registerRecorder.CallsForPathID(fooPathID1), 1)

// Check that we tried to attach to the process twice. See w.sync() for
// why we do it. We don't actually need to attempt the registration
// twice, we just need to ensure that the maps were scanned twice but we
// don't have a hook for that so this check should be good enough.
assert.Equal(c, registerRecorder.CallsForPathID(errorPathID), 2)
}, time.Second*10, 100*time.Millisecond, "")

require.EventuallyWithT(t, func(c *assert.CollectT) {
watcher.syncMutex.Lock()
defer watcher.syncMutex.Unlock()

assert.Contains(c, watcher.scannedPIDs, uint32(pid))
assert.Contains(c, watcher.scannedPIDs, uint32(pid2))
}, time.Second*10, 100*time.Millisecond)

require.NoError(t, command1.Process.Kill())
require.NoError(t, command2.Process.Kill())

command1.Process.Wait()
command2.Process.Wait()

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, unregisterRecorder.CallsForPathID(fooPathID1), 1)
}, time.Second*10, 100*time.Millisecond)

// Check that clean up of dead processes works.
require.EventuallyWithT(t, func(c *assert.CollectT) {
watcher.syncMutex.Lock()
defer watcher.syncMutex.Unlock()

assert.NotContains(c, watcher.scannedPIDs, uint32(pid))
assert.NotContains(c, watcher.scannedPIDs, uint32(pid2))
}, time.Second*10, 100*time.Millisecond)
}

func (s *SharedLibrarySuite) TestSharedLibraryDetectionWithPIDAndRootNamespace() {
t := s.T()
_, err := os.Stat("/usr/bin/busybox")
Expand Down

0 comments on commit ba35e2a

Please sign in to comment.