Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

usm: watcher: Add periodic scan #32400

Merged
merged 6 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 56 additions & 24 deletions pkg/network/usm/sharedlibraries/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
var (
vitkyrka marked this conversation as resolved.
Show resolved Hide resolved
// The interval of the periodic scan for terminated processes. Increasing the interval, might cause larger spikes in cpu
// and lowering it might cause constant cpu usage.
// and lowering it might cause constant cpu usage. This is a var instead of a const only because the test code changes
// this value to speed up test execution.
scanTerminatedProcessesInterval = 30 * time.Second
)

Expand All @@ -55,6 +56,7 @@ type Rule struct {

// Watcher provides a way to tie callback functions to the lifecycle of shared libraries
type Watcher struct {
syncMutex sync.RWMutex
wg sync.WaitGroup
done chan struct{}
procRoot string
Expand All @@ -64,6 +66,7 @@ type Watcher struct {
ebpfProgram *EbpfProgram
libset Libset
thisPID int
scannedPIDs map[uint32]int

// telemetry
libHits *telemetry.Counter
Expand All @@ -90,6 +93,7 @@ func NewWatcher(cfg *config.Config, libset Libset, rules ...Rule) (*Watcher, err
processMonitor: monitor.GetProcessMonitor(),
ebpfProgram: ebpfProgram,
registry: utils.NewFileRegistry(consts.USMModuleName, "shared_libraries"),
scannedPIDs: make(map[uint32]int),

libHits: telemetry.NewCounter("usm.so_watcher.hits", telemetry.OptPrometheus),
libMatches: telemetry.NewCounter("usm.so_watcher.matches", telemetry.OptPrometheus),
Expand Down Expand Up @@ -274,11 +278,7 @@ func (w *Watcher) Start() {
case <-w.done:
return
case <-processSync.C:
processSet := w.registry.GetRegisteredProcesses()
deletedPids := findDeletedProcesses(processSet)
for deletedPid := range deletedPids {
_ = w.registry.Unregister(deletedPid)
}
w.sync()
}
}
}()
Expand All @@ -291,28 +291,60 @@ func (w *Watcher) Start() {
utils.AddAttacher(consts.USMModuleName, "native", w)
}

// findDeletedProcesses returns the terminated PIDs from the given map.
func findDeletedProcesses[V any](pids map[uint32]V) map[uint32]struct{} {
existingPids := make(map[uint32]struct{}, len(pids))
// sync unregisters from any terminated processes which we missed the exit
// callback for, and also attempts to register to running processes to ensure
// that we don't miss any process.
func (w *Watcher) sync() {
vitkyrka marked this conversation as resolved.
Show resolved Hide resolved
// The mutex is only used for protection with the test code which reads the
// scannedPIDs map.
w.syncMutex.Lock()
defer w.syncMutex.Unlock()

deletionCandidates := w.registry.GetRegisteredProcesses()
alivePIDs := make(map[uint32]struct{})

procIter := func(pid int) error {
if _, exists := pids[uint32(pid)]; exists {
existingPids[uint32(pid)] = struct{}{}
_ = kernel.WithAllProcs(kernel.ProcFSRoot(), func(origPid int) error {
if origPid == w.thisPID { // don't scan ourselves
return nil
}

pid := uint32(origPid)
alivePIDs[pid] = struct{}{}

if _, ok := deletionCandidates[pid]; ok {
// We have previously hooked into this process and it remains
// active, so we remove it from the deletionCandidates list, and
// move on to the next PID
delete(deletionCandidates, pid)
return nil
}

scanned := w.scannedPIDs[pid]

// Try to scan twice. This is because we may happen to scan the process
// just after it has been exec'd and before it has opened its shared
// libraries. Scanning twice with the sync interval reduce this risk of
// missing shared libraries due to this.
if scanned < 2 {
w.scannedPIDs[pid]++
err := w.AttachPID(pid)
if err == nil {
log.Debugf("watcher attached to %v via periodic scan", pid)
w.scannedPIDs[pid] = 2
}
}

return nil
}
// Scanning already running processes
if err := kernel.WithAllProcs(kernel.ProcFSRoot(), procIter); err != nil {
return nil
}
})

res := make(map[uint32]struct{}, len(pids)-len(existingPids))
for pid := range pids {
if _, exists := existingPids[pid]; exists {
continue
// Clean up dead processes from the list of scanned PIDs
for pid := range w.scannedPIDs {
if _, alive := alivePIDs[pid]; !alive {
delete(w.scannedPIDs, pid)
}
res[pid] = struct{}{}
}

return res
for pid := range deletionCandidates {
_ = w.registry.Unregister(pid)
}
}
94 changes: 94 additions & 0 deletions pkg/network/usm/sharedlibraries/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,100 @@ func (s *SharedLibrarySuite) TestLongPath() {
}, time.Second*10, 100*time.Millisecond)
}

// Tests that the periodic scan is able to detect processes which are missed by
// the eBPF-based watcher.
func (s *SharedLibrarySuite) TestSharedLibraryDetectionPeriodic() {
t := s.T()

// Construct a large path to exceed the limits of the eBPF-based watcher
// (LIB_PATH_MAX_SIZE). 255 is the max filename size of ext4. The path
// size will also include the directories leading up to this filename so the
// total size will be more.
var b strings.Builder
final := "foo-libssl.so"
for i := 0; i < 255-len(final); i++ {
b.WriteByte('x')
}
b.WriteString(final)
filename := b.String()

// Reduce interval to speed up test
orig := scanTerminatedProcessesInterval
t.Cleanup(func() { scanTerminatedProcessesInterval = orig })
scanTerminatedProcessesInterval = 10 * time.Millisecond

fooPath1, fooPathID1 := createTempTestFile(t, filename)
errPath, errorPathID := createTempTestFile(t, strings.Replace(filename, "xfoo", "yfoo", 1))

registerRecorder := new(utils.CallbackRecorder)
unregisterRecorder := new(utils.CallbackRecorder)

registerCallback := registerRecorder.Callback()

watcher, err := NewWatcher(utils.NewUSMEmptyConfig(), LibsetCrypto,
Rule{
Re: regexp.MustCompile(`foo-libssl.so`),
RegisterCB: func(fp utils.FilePath) error {
registerCallback(fp)
if fp.ID == errorPathID {
return utils.ErrEnvironment
}
return nil
},
UnregisterCB: unregisterRecorder.Callback(),
},
)
require.NoError(t, err)
watcher.Start()
t.Cleanup(watcher.Stop)

// create files
command1, err := fileopener.OpenFromAnotherProcess(t, fooPath1)
pid := command1.Process.Pid
require.NoError(t, err)

command2, err := fileopener.OpenFromAnotherProcess(t, errPath)
pid2 := command2.Process.Pid
require.NoError(t, err)

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, registerRecorder.CallsForPathID(fooPathID1), 1)

// Check that we tried to attach to the process twice. See w.sync() for
// why we do it. We don't actually need to attempt the registration
// twice, we just need to ensure that the maps were scanned twice but we
// don't have a hook for that so this check should be good enough.
assert.Equal(c, registerRecorder.CallsForPathID(errorPathID), 2)
}, time.Second*10, 100*time.Millisecond, "")

require.EventuallyWithT(t, func(c *assert.CollectT) {
watcher.syncMutex.Lock()
defer watcher.syncMutex.Unlock()

assert.Contains(c, watcher.scannedPIDs, uint32(pid))
assert.Contains(c, watcher.scannedPIDs, uint32(pid2))
}, time.Second*10, 100*time.Millisecond)

require.NoError(t, command1.Process.Kill())
require.NoError(t, command2.Process.Kill())

command1.Process.Wait()
command2.Process.Wait()

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, unregisterRecorder.CallsForPathID(fooPathID1), 1)
}, time.Second*10, 100*time.Millisecond)

// Check that clean up of dead processes works.
require.EventuallyWithT(t, func(c *assert.CollectT) {
watcher.syncMutex.Lock()
defer watcher.syncMutex.Unlock()

assert.NotContains(c, watcher.scannedPIDs, uint32(pid))
assert.NotContains(c, watcher.scannedPIDs, uint32(pid2))
}, time.Second*10, 100*time.Millisecond)
}

func (s *SharedLibrarySuite) TestSharedLibraryDetectionWithPIDAndRootNamespace() {
t := s.T()
_, err := os.Stat("/usr/bin/busybox")
Expand Down
Loading