From ff026d3eea85ad65c157b1f5936353696087278b Mon Sep 17 00:00:00 2001 From: Dale Hamel Date: Wed, 21 Jul 2021 12:49:26 -0400 Subject: [PATCH] feat: Support generic tracers, bcc, target namespace This adds support for generic tracers, fixing #125 The new "--tracer" flag allows specifying arbitrary generic tracers. The bcc suite of tools is added to take advantage of this, as well as a "fake" tracer for integration testing this functionality, fixing #112 The primary original author of this functionality was Zeeshan Qureshi. In order to pull this change in, it was necessary to also pull in functionality that refactored how trace job targets are specified. This introduces a new flag which allows specifying the target namespace separately from the namespace where the trace is created, which fixes #147. This functionality was co-authored with Aaron Olson. Co-authored-by: Aaron Olson <934893+honkfestival@users.noreply.github.com> Co-authored-by: Zeeshan Qureshi --- build/Dockerfile.tracerunner | 11 ++ build/test/fake/success | 3 + go.mod | 1 + go.sum | 6 + integration/cmd_run_test.go | 30 ++++ pkg/cmd/get.go | 2 +- pkg/cmd/run.go | 197 ++++++++++--------------- pkg/cmd/tracerunner.go | 189 +++++++++++++++--------- pkg/errors/errors.go | 45 ++++++ pkg/procfs/procfs.go | 184 +++++++++++++++++++++++ pkg/procfs/procfs_test.go | 121 +++++++++++++++ pkg/tracejob/job.go | 107 +++++++++----- pkg/tracejob/selected_target.go | 253 ++++++++++++++++++++++++++++++++ 13 files changed, 921 insertions(+), 228 deletions(-) create mode 100755 build/test/fake/success create mode 100644 pkg/errors/errors.go create mode 100644 pkg/procfs/procfs.go create mode 100644 pkg/procfs/procfs_test.go create mode 100644 pkg/tracejob/selected_target.go diff --git a/build/Dockerfile.tracerunner b/build/Dockerfile.tracerunner index 69b53227..8d15596d 100644 --- a/build/Dockerfile.tracerunner +++ b/build/Dockerfile.tracerunner @@ -1,6 +1,8 @@ # syntax = docker/dockerfile:1.2 ARG bpftraceversion=v0.13.0 +ARG bccversion=v0.21.0-focal-release FROM quay.io/iovisor/bpftrace:$bpftraceversion as bpftrace +FROM quay.io/iovisor/bcc:$bccversion as bcc FROM golang:1.15-buster as gobuilder ARG GIT_ORG=iovisor @@ -26,12 +28,21 @@ RUN --mount=type=cache,target=/root/.cache/go-build make _output/bin/trace-runne FROM ubuntu:20.04 +# Install bcc by copying apt packages from docker image +COPY --from=bcc /root/bcc /tmp/bcc +RUN apt-get update && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y python python3 binutils libelf1 kmod && apt-get clean && \ + dpkg -i /tmp/bcc/*.deb && rm -rf /tmp/bcc + # Install CA certificates RUN apt-get update && apt-get install -y ca-certificates && update-ca-certificates && apt-get clean COPY --from=bpftrace /usr/bin/bpftrace /usr/bin/bpftrace COPY --from=gobuilder /go/src/github.com/iovisor/kubectl-trace/_output/bin/trace-runner /bin/trace-runner +# Inject some fake tracer 'programs' for integration testing. +COPY /build/test/fake/success /usr/share/fake/success + COPY /build/hooks/prestop /bin/hooks/prestop ENTRYPOINT ["/bin/trace-runner"] diff --git a/build/test/fake/success b/build/test/fake/success new file mode 100755 index 00000000..8c3cbfc3 --- /dev/null +++ b/build/test/fake/success @@ -0,0 +1,3 @@ +#!/bin/bash + +exit 0 diff --git a/go.mod b/go.mod index 15bc900c..203910e9 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/evanphx/json-patch v4.9.0+incompatible github.com/fntlnz/mountinfo v0.0.0-20171106231217-40cb42681fad github.com/kr/pretty v0.2.1 // indirect + github.com/spf13/afero v1.3.1 github.com/spf13/cobra v1.1.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 4813ab3d..e5e0d2ea 100644 --- a/go.sum +++ b/go.sum @@ -240,6 +240,7 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -306,6 +307,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= @@ -343,7 +345,10 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9 github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= +github.com/spf13/afero v1.3.1 h1:GPTpEAuNr98px18yNQ66JllNil98wfRZ/5Ukny8FeQA= +github.com/spf13/afero v1.3.1/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= github.com/spf13/cobra v1.1.1 h1:KfztREH0tPxJJ+geloSLaAkaPkr4ki2Er5quFV1TDo4= @@ -384,6 +389,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= diff --git a/integration/cmd_run_test.go b/integration/cmd_run_test.go index ee4bf70d..3ad3504e 100644 --- a/integration/cmd_run_test.go +++ b/integration/cmd_run_test.go @@ -44,3 +44,33 @@ func (k *KubectlTraceSuite) TestReturnErrOnErr() { assert.Equal(k.T(), int32(0), job.Status.Succeeded, "No jobs in the batch should have succeeded") assert.Greater(k.T(), job.Status.Failed, int32(1), "There should be at least one failed job") } + +func (k *KubectlTraceSuite) TestGenericTracer() { + nodeName := k.GetTestNode() + + out := k.KubectlTraceCmd( + "run", + "node/"+nodeName, + "--tracer=fake", + "--program=success", + "--deadline=5", + "--imagename="+k.RunnerImage()) + assert.Regexp(k.T(), regexp.MustCompile("trace [a-f0-9-]{36} created"), out) + + var job batchv1.Job + + for { + jobs := k.GetJobs().Items + assert.Equal(k.T(), 1, len(jobs)) + + job = jobs[0] + if len(job.Status.Conditions) > 0 { + break // on the first condition + } + + time.Sleep(1 * time.Second) + } + + assert.Equal(k.T(), 1, len(job.Status.Conditions)) + assert.Equal(k.T(), "Complete", string(job.Status.Conditions[0].Type)) +} diff --git a/pkg/cmd/get.go b/pkg/cmd/get.go index 55200658..1c436c47 100644 --- a/pkg/cmd/get.go +++ b/pkg/cmd/get.go @@ -193,7 +193,7 @@ func jobsTablePrint(o io.Writer, jobs []tracejob.TraceJob) { if status == "" { status = tracejob.TraceJobUnknown } - fmt.Fprintf(w, "\n"+format, j.Namespace, j.Hostname, j.Name, status, translateTimestampSince(j.StartTime)) + fmt.Fprintf(w, "\n"+format, j.Namespace, j.Target.Node, j.Name, status, translateTimestampSince(j.StartTime)) } fmt.Fprintf(w, "\n") } diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 8dd4ad93..0d632893 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -10,19 +10,16 @@ import ( "github.com/iovisor/kubectl-trace/pkg/signals" "github.com/iovisor/kubectl-trace/pkg/tracejob" "github.com/spf13/cobra" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/cli-runtime/pkg/genericclioptions" - "k8s.io/client-go/kubernetes/scheme" - batchv1client "k8s.io/client-go/kubernetes/typed/batch/v1" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" cmdutil "k8s.io/kubectl/pkg/cmd/util" ) var ( // ImageName represents the default tracerunner image - ImageName = "quay.io/iovisor/kubectl-trace-bpftrace" + ImageName = "quay.io/iovisor/kubectl-trace-runner" // ImageTag represents the tag to fetch for ImageName ImageTag = "latest" // InitImageName represents the default init container image @@ -67,6 +64,7 @@ var ( bpftraceEmptyErrString = "the bpftrace programm cannot be empty" bpftracePatchWithoutTypeErrString = "to use --patch you must also specify the --patch-type argument" bpftracePatchTypeWithoutPatchErrString = "to use --patch-type you must specify the --patch argument" + tracerNotFound = "unknown tracer %s" ) // RunOptions ... @@ -77,9 +75,18 @@ type RunOptions struct { explicitNamespace bool // Flags local to this command - container string - eval string - program string + eval string + filename string + + // Flags for generic interface + // See TraceRunnerOptions for definitions. + // TODO: clean this up + tracer string + targetNamespace string + program string + programArgs []string + tracerDefined bool + serviceAccount string imageName string initImageName string @@ -88,13 +95,11 @@ type RunOptions struct { deadlineGracePeriod int64 resourceArg string - attach bool - isPod bool - podUID string - nodeName string + container string patch string patchType string + attach bool clientConfig *rest.Config } @@ -137,10 +142,19 @@ func NewRunCommand(factory cmdutil.Factory, streams genericclioptions.IOStreams) }, } + // flags for existing usage cmd.Flags().StringVarP(&o.container, "container", "c", o.container, "Specify the container") - cmd.Flags().BoolVarP(&o.attach, "attach", "a", o.attach, "Whether or not to attach to the trace program once it is created") cmd.Flags().StringVarP(&o.eval, "eval", "e", o.eval, "Literal string to be evaluated as a bpftrace program") - cmd.Flags().StringVarP(&o.program, "filename", "f", o.program, "File containing a bpftrace program") + cmd.Flags().StringVarP(&o.filename, "filename", "f", o.filename, "File containing a bpftrace program") + + // flags for new generic interface + cmd.Flags().StringVar(&o.tracer, "tracer", "bpftrace", "Tracing system to use") + cmd.Flags().StringVar(&o.targetNamespace, "target-namespace", "", "Namespace in which the target pod exists (if applicable). Defaults to the namespace argument passed to kubectl.") + cmd.Flags().StringVar(&o.program, "program", o.program, "Program to execute") + cmd.Flags().StringArrayVar(&o.programArgs, "args", o.programArgs, "Additional arguments to pass on to program, repeat flag for multiple arguments") + + // global flags + cmd.Flags().BoolVarP(&o.attach, "attach", "a", o.attach, "Whether or not to attach to the trace program once it is created") cmd.Flags().StringVar(&o.serviceAccount, "serviceaccount", o.serviceAccount, "Service account to use to set in the pod spec of the kubectl-trace job") cmd.Flags().StringVar(&o.imageName, "imagename", o.imageName, "Custom image for the tracerunner") cmd.Flags().StringVar(&o.initImageName, "init-imagename", o.initImageName, "Custom image for the init container responsible to fetch and prepare linux headers") @@ -155,7 +169,17 @@ func NewRunCommand(factory cmdutil.Factory, streams genericclioptions.IOStreams) // Validate validates the arguments and flags populating RunOptions accordingly. func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error { + // Selector can only be used in conjunction with tracer. + o.tracerDefined = cmd.Flag("tracer").Changed + + switch o.tracer { + case bpftrace, bcc, fake: + default: + return fmt.Errorf(tracerNotFound, o.tracer) + } + containerFlagDefined := cmd.Flag("container").Changed + switch len(args) { case 1: o.resourceArg = args[0] @@ -163,7 +187,7 @@ func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error { // 2nd argument interpreted as container when provided case 2: o.resourceArg = args[0] - o.container = args[1] + o.container = args[1] // NOTE: this should actually be -c, to be consistent with the rest of kubectl if containerFlagDefined { return fmt.Errorf(containerAsArgOrFlagErrString) } @@ -172,16 +196,6 @@ func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error { return fmt.Errorf(requiredArgErrString) } - if !cmd.Flag("eval").Changed && !cmd.Flag("filename").Changed { - return fmt.Errorf(bpftraceMissingErrString) - } - if cmd.Flag("eval").Changed == cmd.Flag("filename").Changed { - return fmt.Errorf(bpftraceDoubleErrString) - } - if (cmd.Flag("eval").Changed && len(o.eval) == 0) || (cmd.Flag("filename").Changed && len(o.program) == 0) { - return fmt.Errorf(bpftraceEmptyErrString) - } - havePatch := cmd.Flag("patch").Changed havePatchType := cmd.Flag("patch-type").Changed @@ -193,106 +207,51 @@ func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error { return fmt.Errorf(bpftracePatchTypeWithoutPatchErrString) } + switch o.tracer { + case bpftrace, bcc: + evalDefined, filenameDefined, programDefined := cmd.Flag("eval").Changed, cmd.Flag("filename").Changed, cmd.Flag("program").Changed + if !evalDefined && !filenameDefined && !programDefined { + return fmt.Errorf(bpftraceMissingErrString) + } + if (evalDefined && filenameDefined) || (evalDefined && programDefined) || (filenameDefined && programDefined) { + return fmt.Errorf(bpftraceDoubleErrString) + } + if (evalDefined && len(o.eval) == 0) || (filenameDefined && len(o.program) == 0) || (programDefined && len(o.program) == 0) { + return fmt.Errorf(bpftraceEmptyErrString) + } + default: + } + return nil } // Complete completes the setup of the command. func (o *RunOptions) Complete(factory cmdutil.Factory, cmd *cobra.Command, args []string) error { // Prepare program - if len(o.program) > 0 { - b, err := ioutil.ReadFile(o.program) - if err != nil { - return fmt.Errorf("error opening program file") + if len(o.program) == 0 { + if len(o.filename) > 0 { + b, err := ioutil.ReadFile(o.filename) + if err != nil { + return fmt.Errorf("error opening program file") + } + o.program = string(b) + } else { + o.program = o.eval } - o.program = string(b) - } else { - o.program = o.eval } - // Prepare namespace + // Prepare namespaces var err error o.namespace, o.explicitNamespace, err = factory.ToRawKubeConfigLoader().Namespace() if err != nil { return err } - // Look for the target object - x := factory. - NewBuilder(). - WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). - NamespaceParam(o.namespace). - SingleResourceType(). - ResourceNames("nodes", o.resourceArg). // Search nodes by default - Do() - - obj, err := x.Object() - if err != nil { - return err - } - - // Check we got a pod or a node - o.isPod = false - - var node *v1.Node - - switch v := obj.(type) { - case *v1.Pod: - if len(v.Spec.NodeName) == 0 { - return fmt.Errorf("cannot attach a trace program to a pod that is not currently scheduled on a node") - } - o.isPod = true - found := false - o.podUID = string(v.UID) - for _, c := range v.Spec.Containers { - // default if no container provided - if len(o.container) == 0 { - o.container = c.Name - found = true - break - } - // check if the provided one exists - if c.Name == o.container { - found = true - break - } - } - - if !found { - return fmt.Errorf("no containers found for the provided pod/container combination") - } - - obj, err = factory. - NewBuilder(). - WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). - ResourceNames("nodes", v.Spec.NodeName). - Do().Object() - - if err != nil { - return err - } - - if n, ok := obj.(*v1.Node); ok { - node = n - } - - break - case *v1.Node: - node = v - break - default: - return fmt.Errorf("first argument must be %s", usageString) - } - - if node == nil { - return fmt.Errorf("could not determine on which node to run the trace program") - } - - labels := node.GetLabels() - val, ok := labels["kubernetes.io/hostname"] - if !ok { - return fmt.Errorf("label kubernetes.io/hostname not found in node") + // If a target namespace is not specified explicitly, we want to reuse the + // namespace as understood by kubectl. + if o.targetNamespace == "" { + o.targetNamespace = o.namespace } - o.nodeName = val // Prepare client o.clientConfig, err = factory.ToRESTConfig() @@ -306,31 +265,29 @@ func (o *RunOptions) Complete(factory cmdutil.Factory, cmd *cobra.Command, args // Run executes the run command. func (o *RunOptions) Run() error { juid := uuid.NewUUID() - jobsClient, err := batchv1client.NewForConfig(o.clientConfig) + + clientset, err := kubernetes.NewForConfig(o.clientConfig) if err != nil { return err } - coreClient, err := corev1client.NewForConfig(o.clientConfig) + target, err := tracejob.ResolveTraceJobTarget(clientset, o.resourceArg, o.container, o.targetNamespace) + if err != nil { return err } - tc := &tracejob.TraceJobClient{ - JobClient: jobsClient.Jobs(o.namespace), - ConfigClient: coreClient.ConfigMaps(o.namespace), - } + tc := tracejob.NewTraceJobClient(clientset, o.namespace) tj := tracejob.TraceJob{ Name: fmt.Sprintf("%s%s", meta.ObjectNamePrefix, string(juid)), Namespace: o.namespace, ServiceAccount: o.serviceAccount, ID: juid, - Hostname: o.nodeName, + Target: *target, + Tracer: o.tracer, Program: o.program, - PodUID: o.podUID, - ContainerName: o.container, - IsPod: o.isPod, + ProgramArgs: o.programArgs, ImageNameTag: o.imageName, InitImageNameTag: o.initImageName, FetchHeaders: o.fetchHeaders, @@ -350,7 +307,7 @@ func (o *RunOptions) Run() error { if o.attach { ctx := context.Background() ctx = signals.WithStandardSignals(ctx) - a := attacher.NewAttacher(coreClient, o.clientConfig, o.IOStreams) + a := attacher.NewAttacher(clientset.CoreV1(), o.clientConfig, o.IOStreams) a.WithContext(ctx) a.AttachJob(tj.ID, job.Namespace) } diff --git a/pkg/cmd/tracerunner.go b/pkg/cmd/tracerunner.go index d25f4f07..0bd64d2d 100644 --- a/pkg/cmd/tracerunner.go +++ b/pkg/cmd/tracerunner.go @@ -3,7 +3,6 @@ package cmd import ( "context" "fmt" - "io" "io/ioutil" "os" "os/exec" @@ -12,16 +11,41 @@ import ( "strings" "syscall" - "github.com/fntlnz/mountinfo" + "github.com/iovisor/kubectl-trace/pkg/procfs" "github.com/spf13/cobra" ) +const ( + // MetadataDir is where trace-runner will output traces and metadata + MetadataDir = "/tmp/kubectl-trace" + + bpftrace = "bpftrace" + bcc = "bcc" + fake = "fake" +) + +var ( + bpfTraceBinaryPath = "/usr/bin/bpftrace" + bccToolsDir = "/usr/share/bcc/tools/" + fakeToolsDir = "/usr/share/fake/" +) + type TraceRunnerOptions struct { - podUID string - containerName string - inPod bool - programPath string - bpftraceBinaryPath string + // The tracing system to use. + // tracer = bpftrace | bcc | fake + tracer string + + podUID string + + containerID string + + // In the case of bcc the name of the bcc program to execute. + // In the case of bpftrace the path to contents of the user provided expression or program. + program string + + // In the case of bcc the user provided arguments to pass on to program. + // Not used for bpftrace. + programArgs []string } func NewTraceRunnerOptions() *TraceRunnerOptions { @@ -46,19 +70,21 @@ func NewTraceRunnerCommand() *cobra.Command { }, } - cmd.Flags().StringVarP(&o.containerName, "container", "c", o.containerName, "Specify the container") - cmd.Flags().StringVarP(&o.podUID, "poduid", "p", o.podUID, "Specify the pod UID") - cmd.Flags().StringVarP(&o.programPath, "program", "f", "program.bt", "Specify the bpftrace program path") - cmd.Flags().StringVarP(&o.bpftraceBinaryPath, "bpftracebinary", "b", "/usr/bin/bpftrace", "Specify the bpftrace binary path") - cmd.Flags().BoolVar(&o.inPod, "inpod", false, "Whether or not run this bpftrace in a pod's container process namespace") + cmd.Flags().StringVar(&o.tracer, "tracer", "bpftrace", "Tracing system to use") + cmd.Flags().StringVar(&o.podUID, "pod-uid", "", "UID of target pod") + cmd.Flags().StringVar(&o.containerID, "container-id", "", "ID of target container") + cmd.Flags().StringVar(&o.program, "program", "/programs/program.bt", "Tracer input script or executable") + cmd.Flags().StringArrayVar(&o.programArgs, "args", o.programArgs, "Arguments to pass through to executable in --program") return cmd } func (o *TraceRunnerOptions) Validate(cmd *cobra.Command, args []string) error { - // TODO(fntlnz): do some more meaningful validation here, for now just checking if they are there - if o.inPod == true && (len(o.containerName) == 0 || len(o.podUID) == 0) { - return fmt.Errorf("poduid and container must be specified when inpod=true") + switch o.tracer { + case bpftrace, bcc, fake: + default: + return fmt.Errorf("unknown tracer %s", o.tracer) } + return nil } @@ -68,29 +94,24 @@ func (o *TraceRunnerOptions) Complete(cmd *cobra.Command, args []string) error { } func (o *TraceRunnerOptions) Run() error { - programPath := o.programPath - if o.inPod == true { - pid, err := findPidByPodContainer(o.podUID, o.containerName) - if err != nil { - return err - } - if pid == nil { - return fmt.Errorf("pid not found") - } - if len(*pid) == 0 { - return fmt.Errorf("invalid pid found") - } - f, err := ioutil.ReadFile(programPath) - if err != nil { - return err - } - programPath = path.Join(os.TempDir(), "program-container.bt") - r := strings.Replace(string(f), "$container_pid", *pid, -1) - if err := ioutil.WriteFile(programPath, []byte(r), 0755); err != nil { - return err - } + var err error + var binary *string + var args []string + + switch o.tracer { + case bpftrace: + binary, args, err = o.prepBpfTraceCommand() + case bcc: + binary, args, err = o.prepBccCommand() + case fake: + binary, args, err = o.prepFakeCommand() } + if err != nil { + return err + } + + // Assume output is stdout until other backends are implemented. fmt.Println("if your program has maps to print, send a SIGINT using Ctrl-C, if you want to interrupt the execution send SIGINT two times") ctx, cancel := context.WithCancel(context.Background()) sigCh := make(chan os.Signal, 1) @@ -116,53 +137,85 @@ func (o *TraceRunnerOptions) Run() error { } }() - c := exec.CommandContext(ctx, o.bpftraceBinaryPath, programPath) - c.Stdout = os.Stdout + var c *exec.Cmd + if len(args) == 0 { + c = exec.CommandContext(ctx, *binary) + } else { + c = exec.CommandContext(ctx, *binary, args...) + } + + return runTraceCommand(c) +} + +func runTraceCommand(c *exec.Cmd) error { c.Stdin = os.Stdin + c.Stdout = os.Stdout c.Stderr = os.Stderr return c.Run() } -func findPidByPodContainer(podUID, containerName string) (*string, error) { - d, err := os.Open("/proc") +func (o *TraceRunnerOptions) prepBpfTraceCommand() (*string, []string, error) { + programPath := o.program - if err != nil { - return nil, err + // Render $container_pid to actual process pid if scoped to container. + if o.podUID != "" && o.containerID != "" { + pid, err := procfs.FindPidByPodContainer(o.podUID, o.containerID) + if err != nil { + return nil, nil, err + } + f, err := ioutil.ReadFile(programPath) + if err != nil { + return nil, nil, err + } + programPath = path.Join(os.TempDir(), "program-container.bt") + r := strings.Replace(string(f), "$container_pid", pid, -1) + if err := ioutil.WriteFile(programPath, []byte(r), 0755); err != nil { + return nil, nil, err + } } - defer d.Close() + return &bpfTraceBinaryPath, []string{programPath}, nil +} - for { - dirs, err := d.Readdir(10) - if err == io.EOF { - break - } +func (o *TraceRunnerOptions) prepBccCommand() (*string, []string, error) { + // Sanitize o.program by removing common prefix/suffixes. + name := o.program + name = strings.TrimPrefix(name, "/usr/bin/") + name = strings.TrimPrefix(name, "/usr/sbin/") + name = strings.TrimSuffix(name, "-bpfcc") + + program := bccToolsDir + name + args := append([]string{}, o.programArgs...) + + if o.podUID != "" && o.containerID != "" { + pid, err := procfs.FindPidByPodContainer(o.podUID, o.containerID) if err != nil { - return nil, err + return nil, nil, err } - for _, di := range dirs { - if !di.IsDir() { - continue - } - dname := di.Name() - if dname[0] < '0' || dname[0] > '9' { - continue - } + for i, arg := range args { + args[i] = strings.Replace(arg, "$container_pid", pid, -1) + } + } - mi, err := mountinfo.GetMountInfo(path.Join("/proc", dname, "mountinfo")) - if err != nil { - continue - } + return &program, args, nil +} - for _, m := range mi { - root := m.Root - if strings.Contains(root, podUID) && strings.Contains(root, containerName) { - return &dname, nil - } - } +func (o *TraceRunnerOptions) prepFakeCommand() (*string, []string, error) { + name := path.Base(o.program) + program := fakeToolsDir + name + args := append([]string{}, o.programArgs...) + + if o.podUID != "" && o.containerID != "" { + pid, err := procfs.FindPidByPodContainer(o.podUID, o.containerID) + if err != nil { + return nil, nil, err + } + + for i, arg := range args { + args[i] = strings.Replace(arg, "$container_pid", pid, -1) } } - return nil, fmt.Errorf("no process found for specified pod and container") + return &program, args, nil } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go new file mode 100644 index 00000000..c5b7eb55 --- /dev/null +++ b/pkg/errors/errors.go @@ -0,0 +1,45 @@ +package errors + +const ( + ErrorInvalid string = "Invalid" + ErrorUnallocatable string = "Unallocatable" +) + +type TargetSelectionError struct { + ErrorMessage string + ErrorType string +} + +func (e TargetSelectionError) Error() string { + return e.ErrorMessage +} + +func NewErrorInvalid(message string) error { + return TargetSelectionError{ + ErrorMessage: message, + ErrorType: ErrorInvalid, + } +} + +func NewErrorUnallocatable(message string) error { + return TargetSelectionError{ + ErrorMessage: message, + ErrorType: ErrorUnallocatable, + } +} + +func IsInvalidTargetError(err error) bool { + return reasonForError(err) == ErrorInvalid +} + +func IsUnallocatableTargetError(err error) bool { + return reasonForError(err) == ErrorUnallocatable +} + +func reasonForError(err error) string { + myErr, ok := err.(TargetSelectionError) + if ok { + return myErr.ErrorType + } + return "" +} diff --git a/pkg/procfs/procfs.go b/pkg/procfs/procfs.go new file mode 100644 index 00000000..d297a11d --- /dev/null +++ b/pkg/procfs/procfs.go @@ -0,0 +1,184 @@ +package procfs + +import ( + "bufio" + "fmt" + "io" + "os" + "path" + "strings" + + "github.com/fntlnz/mountinfo" + "github.com/spf13/afero" +) + +var ProcFs = afero.NewOsFs() + +func FindPidByPodContainer(podUID, containerID string) (string, error) { + d, err := ProcFs.Open("/proc") + + if err != nil { + return "", err + } + + defer d.Close() + + for { + dirs, err := d.Readdir(10) + if err == io.EOF { + break + } + if err != nil { + return "", err + } + + for _, di := range dirs { + if !di.IsDir() { + continue + } + dname := di.Name() + if dname[0] < '0' || dname[0] > '9' { + continue + } + + mi, err := getMountInfo(path.Join("/proc", dname, "mountinfo")) + if err != nil { + continue + } + + for _, m := range mi { + root := m.Root + // See https://github.com/kubernetes/kubernetes/blob/2f3a4ec9cb96e8e2414834991d63c59988c3c866/pkg/kubelet/cm/cgroup_manager_linux.go#L81-L85 + // Note that these identifiers are currently specific to systemd, however, this mounting approach is what allows us to find the containerized + // process. + // + // EG: /kubepods/burstable/pod31dd0274-bb43-4975-bdbc-7e10047a23f8/851c75dad6ad8ce6a5d9b9129a4eb1645f7c6e5ba8406b12d50377b665737072 + // /kubepods/burstable/pod{POD_ID}/{CONTAINER_ID} + // + // This "needle" that we look for in the mountinfo haystack should match one and only one container. + needle := path.Join(podUID, containerID) + if strings.Contains(root, needle) { + return dname, nil + } + } + } + } + + return "", fmt.Errorf("no process found for specified pod and container") +} + +func FindPidsForContainer(pid string) ([]string, error) { + d, err := ProcFs.Open("/proc") + + if err != nil { + return nil, err + } + + defer d.Close() + + pids := []string{} + + ns, err := readlink(path.Join("/proc", pid, "ns", "pid")) + if err != nil { + return nil, err + } + + for { + dirs, err := d.Readdir(10) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + for _, di := range dirs { + if !di.IsDir() { + continue + } + dname := di.Name() + if dname[0] < '0' || dname[0] > '9' { + continue + } + + cns, err := readlink(path.Join("/proc", dname, "ns", "pid")) + if err != nil { + return nil, err + } + + if cns == ns { + pids = append(pids, dname) + } + } + } + + return pids, nil +} + +func GetFinalNamespacePid(pid string) (string, error) { + status, err := ProcFs.Open(path.Join("/proc", pid, "status")) + if err != nil { + return "", err + } + + scanner := bufio.NewScanner(status) + var line string + for scanner.Scan() { + line = scanner.Text() + if strings.HasPrefix(line, "NSpid") { + break + } + } + + if err := scanner.Err(); err != nil { + return "", err + } + + fields := strings.Fields(line) + return fields[len(fields)-1], nil +} + +func GetProcExe(pid string) (string, error) { + exe, err := readlink(path.Join("/proc", pid, "exe")) + if err != nil { + return "", err + } + + return exe, nil +} + +func GetProcComm(pid string) (string, error) { + comm, err := afero.ReadFile(ProcFs, path.Join("/proc", pid, "comm")) + if err != nil { + return "", err + } + + return string(comm), nil +} + +func GetProcCmdline(pid string) (string, error) { + cmdline, err := afero.ReadFile(ProcFs, path.Join("/proc", pid, "cmdline")) + if err != nil { + return "", err + } + + return string(cmdline), nil +} + +func getMountInfo(fd string) ([]mountinfo.Mountinfo, error) { + file, err := ProcFs.Open(fd) + if err != nil { + return nil, err + } + defer file.Close() + + return mountinfo.ParseMountInfo(file) +} + +func readlink(name string) (string, error) { + if r, ok := ProcFs.(afero.LinkReader); ok { + return r.ReadlinkIfPossible(name) + } + + return "", &os.PathError{Op: "readlink", Path: name, Err: afero.ErrNoReadlink} +} diff --git a/pkg/procfs/procfs_test.go b/pkg/procfs/procfs_test.go new file mode 100644 index 00000000..193c39a9 --- /dev/null +++ b/pkg/procfs/procfs_test.go @@ -0,0 +1,121 @@ +package procfs + +import ( + "io/ioutil" + "log" + "os" + "path" + "sort" + "testing" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" +) + +func TestFindPidsForContainerFindsTheContainer(t *testing.T) { + _ = setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc", 0755)) + assert.Nil(t, ProcFs.MkdirAll("/proc/1/ns", 0755)) + assert.Nil(t, symlink(ProcFs, "/pid:[0000000001]", "/proc/1/ns/pid")) + + pids, err := FindPidsForContainer("1") + + assert.Nil(t, err) + assert.Equal(t, []string{"1"}, pids) +} + +func TestFindPidsForContainerFindsTheCorrectPids(t *testing.T) { + _ = setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc/1/ns", 0755)) + assert.Nil(t, symlink(ProcFs, "/pid:[0000000001]", "/proc/1/ns/pid")) + + assert.Nil(t, ProcFs.MkdirAll("/proc/2/ns", 0755)) + assert.Nil(t, symlink(ProcFs, "/pid:[0000000001]", "/proc/2/ns/pid")) + + assert.Nil(t, ProcFs.MkdirAll("/proc/3/ns", 0755)) + assert.Nil(t, symlink(ProcFs, "/pid:[0000000001]", "/proc/3/ns/pid")) + + assert.Nil(t, ProcFs.MkdirAll("/proc/10/ns", 0755)) + assert.Nil(t, symlink(ProcFs, "/pid:[1010101010]", "/proc/10/ns/pid")) + + pids, err := FindPidsForContainer("1") + assert.Nil(t, err) + + sort.Strings(pids) + assert.Equal(t, []string{"1", "2", "3"}, pids) +} + +func TestGetFinalNamespacePid(t *testing.T) { + _ = setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc/47/ns", 0755)) + data := []byte("Name: testprogram\n NStgid: 47 1\n NSpid: 47 1\n NSpgid: 47 1\n NSsid: 47 1\n") + assert.Nil(t, afero.WriteFile(ProcFs, "/proc/47/status", data, 0444)) + + pid, err := GetFinalNamespacePid("47") + assert.Nil(t, err) + + assert.Equal(t, "1", pid) +} + +func TestGetProcComm(t *testing.T) { + _ = setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc/42", 0755)) + data := []byte("ruby") + assert.Nil(t, afero.WriteFile(ProcFs, "/proc/42/comm", data, 0444)) + + comm, err := GetProcComm("42") + assert.Nil(t, err) + + assert.Equal(t, "ruby", comm) +} + +func TestGetProcCmdline(t *testing.T) { + _ = setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc/42", 0755)) + data := []byte("Rails uri_path=/foo/bar request_id=1234") + assert.Nil(t, afero.WriteFile(ProcFs, "/proc/42/cmdline", data, 0444)) + + cmdline, err := GetProcCmdline("42") + assert.Nil(t, err) + + assert.Equal(t, "Rails uri_path=/foo/bar request_id=1234", cmdline) +} + +func TestGetProcExe(t *testing.T) { + basePath := setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc/42", 0755)) + assert.Nil(t, symlink(ProcFs, "/usr/local/bin/ruby", "/proc/42/exe")) + + exe, err := GetProcExe("42") + assert.Nil(t, err) + + expected := path.Join(basePath, "/usr/local/bin/ruby") + assert.Equal(t, expected, exe) +} + +func setupBasePath(t *testing.T) string { + tempDir, err := ioutil.TempDir("", "example") + if err != nil { + log.Fatal(err) + } + defer os.RemoveAll(tempDir) // clean up + + ProcFs = afero.NewBasePathFs(afero.NewOsFs(), tempDir) + assert.Nil(t, ProcFs.MkdirAll("/proc", 0755)) + + return tempDir +} + +func symlink(fs afero.Fs, oldname string, newname string) error { + if r, ok := fs.(afero.Linker); ok { + return r.SymlinkIfPossible(oldname, newname) + } + + return &os.LinkError{Op: "symlink", Old: oldname, New: newname, Err: afero.ErrNoSymlink} +} diff --git a/pkg/tracejob/job.go b/pkg/tracejob/job.go index 3710c4c0..937061a9 100644 --- a/pkg/tracejob/job.go +++ b/pkg/tracejob/job.go @@ -17,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" yamlutil "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/kubernetes" batchv1typed "k8s.io/client-go/kubernetes/typed/batch/v1" corev1typed "k8s.io/client-go/kubernetes/typed/core/v1" "sigs.k8s.io/yaml" @@ -34,11 +35,12 @@ type TraceJob struct { ID types.UID Namespace string ServiceAccount string - Hostname string + Tracer string + Target TraceJobTarget Program string + ProgramArgs []string PodUID string ContainerName string - IsPod bool ImageNameTag string InitImageNameTag string FetchHeaders bool @@ -50,6 +52,13 @@ type TraceJob struct { PatchType string } +func NewTraceJobClient(clientset kubernetes.Interface, namespace string) *TraceJobClient { + return &TraceJobClient{ + JobClient: clientset.BatchV1().Jobs(namespace), + ConfigClient: clientset.CoreV1().ConfigMaps(namespace), + } +} + // WithOutStream setup a file stream to output trace job operation information func (t *TraceJobClient) WithOutStream(o io.Writer) { if o == nil { @@ -141,7 +150,9 @@ func (t *TraceJobClient) GetJob(nf TraceJobFilter) ([]TraceJob, error) { Name: name, ID: types.UID(id), Namespace: j.Namespace, - Hostname: hostname, + Target: TraceJobTarget{ + Node: hostname, + }, StartTime: j.Status.StartTime, Status: jobStatus(j), } @@ -193,42 +204,48 @@ func (t *TraceJobClient) DeleteJobs(nf TraceJobFilter) error { } func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) { + job, cm := nj.Job(), nj.ConfigMap() + + // Optionally patch the job before creating it + if nj.PatchType != "" && nj.Patch != "" { + newJob, err := patchJobFile(job, nj.PatchType, nj.Patch) + if err != nil { + return nil, err + } + job = newJob + } + + if _, err := t.ConfigClient.Create(context.Background(), cm, metav1.CreateOptions{}); err != nil { + return nil, err + } + return t.JobClient.Create(context.Background(), job, metav1.CreateOptions{}) +} - bpfTraceCmd := []string{ +func (nj *TraceJob) Job() *batchv1.Job { + traceCmd := []string{ "/bin/timeout", "--preserve-status", "--signal", "INT", strconv.FormatInt(nj.Deadline, 10), "/bin/trace-runner", - "--program=/programs/program.bt", + "--tracer=" + nj.Tracer, + "--pod-uid=" + nj.Target.PodUID, + "--container-id=" + nj.Target.ContainerID, } - if nj.IsPod { - bpfTraceCmd = append(bpfTraceCmd, "--inpod") - bpfTraceCmd = append(bpfTraceCmd, "--container="+nj.ContainerName) - bpfTraceCmd = append(bpfTraceCmd, "--poduid="+nj.PodUID) + if nj.Tracer == "bpftrace" { + traceCmd = append(traceCmd, "--program=/programs/program.bt") + } else { + traceCmd = append(traceCmd, "--program="+nj.Program) } - commonMeta := metav1.ObjectMeta{ - Name: nj.Name, - Namespace: nj.Namespace, - Labels: map[string]string{ - meta.TraceLabelKey: nj.Name, - meta.TraceIDLabelKey: string(nj.ID), - }, - Annotations: map[string]string{ - meta.TraceLabelKey: nj.Name, - meta.TraceIDLabelKey: string(nj.ID), - }, + for _, arg := range nj.ProgramArgs { + traceCmd = append(traceCmd, "--args="+arg) } - cm := &apiv1.ConfigMap{ - ObjectMeta: commonMeta, - Data: map[string]string{ - "program.bt": nj.Program, - }, - } + commonMeta := *nj.Meta() + cm := nj.ConfigMap() job := &batchv1.Job{ ObjectMeta: commonMeta, @@ -283,7 +300,7 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) { apiv1.Container{ Name: nj.Name, Image: nj.ImageNameTag, - Command: bpfTraceCmd, + Command: traceCmd, TTY: true, Stdin: true, Resources: apiv1.ResourceRequirements{ @@ -337,7 +354,7 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) { apiv1.NodeSelectorRequirement{ Key: "kubernetes.io/hostname", Operator: apiv1.NodeSelectorOpIn, - Values: []string{nj.Hostname}, + Values: []string{nj.Target.Node}, }, }, }, @@ -477,20 +494,32 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) { ReadOnly: true, }) } - if _, err := t.ConfigClient.Create(context.Background(), cm, metav1.CreateOptions{}); err != nil { - return nil, err - } - // Optionally patch the job before creating it - if nj.PatchType != "" && nj.Patch != "" { - newJob, err := patchJobFile(job, nj.PatchType, nj.Patch) - if err != nil { - return nil, err - } - job = newJob + return job +} + +func (nj *TraceJob) ConfigMap() *apiv1.ConfigMap { + return &apiv1.ConfigMap{ + ObjectMeta: *nj.Meta(), + Data: map[string]string{ + "program.bt": nj.Program, + }, } +} - return t.JobClient.Create(context.Background(), job, metav1.CreateOptions{}) +func (nj *TraceJob) Meta() *metav1.ObjectMeta { + return &metav1.ObjectMeta{ + Name: nj.Name, + Namespace: nj.Namespace, + Labels: map[string]string{ + meta.TraceLabelKey: nj.Name, + meta.TraceIDLabelKey: string(nj.ID), + }, + Annotations: map[string]string{ + meta.TraceLabelKey: nj.Name, + meta.TraceIDLabelKey: string(nj.ID), + }, + } } func int32Ptr(i int32) *int32 { return &i } diff --git a/pkg/tracejob/selected_target.go b/pkg/tracejob/selected_target.go new file mode 100644 index 00000000..abb86de4 --- /dev/null +++ b/pkg/tracejob/selected_target.go @@ -0,0 +1,253 @@ +/* + +Currently, we are doing a bunch of crazy stuff in order to build our trace job. + +We have two ways of specifying the thing we actually want to trace: + +- From a literal target resource ( run.go 's o.resourceArg, taken from arg[0]) + - resourceArg is overloaded, it can be either the node, or the pod which we would like to trace + - In the future, it may be other types, eg, deployment/NAME, or statefulset/NAME, etc. + - additionally, o.container is specified further with arg[1], and may be required for cases where resourceArg is not a Node. + +Complicating this even further, we also permit for the target to be given by a Selector string + +Because of this, the selector string is parsed into a struct with a mutable hash, and we end up using +the selector struct as a way to store our actual target. + +To simplify this, we should separate the **desired target** from the **selected target**. + +If the pod / container are specified with positional arguments, then the selector **must not** +include a selector for the pod or container, but is permitted to have a process selector. + +It is also valid to have no positional arguments at all, and rely entirely on the +selector to specify the pod / container to target. + +1. Resolve the Target + - Look at any positional arguments (from CLI) + - Look at the Selector + - Perform validations + - Resolve to: + - At minimum, always a node to run the tracejob on (inferred through pod, and perhaps through deployment → pod) + - If a pod is being targeted, it may also have a Pod target and a Container +2. Create the TraceJob + - The fully specify target (the output from 1, above) is provided as a parameter + - The expectation is that a TraceJob is an entity that is scheduled on a given node: + - Other logic (1) is responsible for determining the correct node to run on + - Only necessary selector arguments are passed to the trace-runner, as an argument within the trace job + +*/ + +package tracejob + +import ( + "context" + "fmt" + "strings" + + v1 "k8s.io/api/core/v1" + + "github.com/iovisor/kubectl-trace/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +/* +This struct will be used when creating the TraceJob +*/ + +type TraceJobTarget struct { + Node string // Used for tracejob NodeSelector + PodUID string // passed as argument to trace-runner + ContainerID string // passed as argument to trace-runner +} + +/* +ResolveTraceJobTarget will: + +- take a kubernetes resource, possibly namespaced, and resolve it to a concrete node that the tracejob can run on + - if the resource is a node, the scope of the trace will be the entire node + - if the resource is a pod, or something that can be resolved to a pod: + - resource is namespaced, the targetNamespace is required + - if the pod has multiple containers, a container name is required, otherwise the only container will be used + +clientset - a kubernetes clientset, used for resolving the target resource +resource - a string, indicating the kubernetes resource to be traced. Currently supported: node, pod +container - (optional) the container to trace, required only if needed to disambiguate +targetNamespace - (optional) the target namespace of a given resource, required if the resource is namespaced + +*/ + +func ResolveTraceJobTarget(clientset kubernetes.Interface, resource, container, targetNamespace string) (*TraceJobTarget, error) { + + target := TraceJobTarget{} + + var resourceType string + var resourceID string + + resourceParts := strings.Split(resource, "/") + + if len(resourceParts) < 1 || len(resourceParts) > 2 { + return nil, fmt.Errorf("Invalid resource string %s", resource) + } else if len(resourceParts) == 1 { + resourceType = "node" + resourceID = resourceParts[0] + } else if len(resourceParts) == 2 { + resourceType = resourceParts[0] + resourceID = resourceParts[1] + } + + switch resourceType { + case "node": + nodeClient := clientset.CoreV1().Nodes() + allocatable, err := NodeIsAllocatable(clientset, resourceID) + if err != nil { + return nil, err + } + if !allocatable { + return nil, errors.NewErrorUnallocatable(fmt.Sprintf("Node %s is not allocatable", resourceID)) + } + node, err := nodeClient.Get(context.TODO(), resourceID, metav1.GetOptions{}) + if err != nil { + return nil, errors.NewErrorInvalid(fmt.Sprintf("Failed to locate a node for %s %v", resourceID, err)) + } + + labels := node.GetLabels() + val, ok := labels["kubernetes.io/hostname"] + if !ok { + return nil, errors.NewErrorInvalid(fmt.Sprintf("label kubernetes.io/hostname not found in node")) + } + target.Node = val + + return &target, nil + case "pod": + podClient := clientset.CoreV1().Pods(targetNamespace) + pod, err := podClient.Get(context.TODO(), resourceID, metav1.GetOptions{}) + if err != nil { + return nil, errors.NewErrorInvalid(fmt.Sprintf("Could not locate pod %s", resourceID)) + } + + allocatable, err := NodeIsAllocatable(clientset, pod.Spec.NodeName) + if err != nil || !allocatable { + return nil, errors.NewErrorUnallocatable(fmt.Sprintf("Pod %s is not scheduled on an allocatable node", resourceID)) + } + + err = resolvePodToTarget(podClient, resourceID, container, targetNamespace, &target) + if err != nil { + return nil, err + } + return &target, nil + default: + return nil, errors.NewErrorInvalid(fmt.Sprintf("Unsupported resource type %s for %s\n", resourceType, resourceID)) + } + + return &target, nil +} + +func NodeIsAllocatable(clientset kubernetes.Interface, hostname string) (bool, error) { + // if the job is neither Successful nor Failed, here we need to check to make sure that it is at least + // schedulable on the node. To do that + // + // retrieve a list of all pods currently scheduled on the node + // check the number of pods available on the node + // make sure that there are "enough" pods available + // - if not, mark as "Unschedulable" + // account for Unschedulable elsewhere in the operator logic? + + allPods, err := allPodsForNode(clientset, hostname) + if err != nil { + fmt.Errorf("could not retrieve pods %v", err) + return false, err + } + + var nonTerminalPods []*v1.Pod + for _, pod := range allPods.Items { + if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { + continue + } + + nonTerminalPods = append(nonTerminalPods, &pod) + } + + fmt.Printf("Got %d pods, %d non-terminal pods", len(allPods.Items), len(nonTerminalPods)) + + nodeClient := clientset.CoreV1().Nodes() + node, err := nodeClient.Get(context.TODO(), hostname, metav1.GetOptions{}) + + if err != nil { + fmt.Errorf("could not get node object %v", err) + return false, err + } + + maxPods, ok := node.Status.Allocatable.Pods().AsInt64() + if !ok { + err = fmt.Errorf("quantity was not an integer: %s", node.Status.Allocatable.Pods().String()) + fmt.Errorf("could not parse the number of allocatable pods %v", err) + return false, err + } + + if len(nonTerminalPods) >= int(maxPods) { + return false, nil + } + + // FIXME check resources, check for cordoned before giving the all-clear + + return true, nil +} + +func allPodsForNode(clientset kubernetes.Interface, nodeName string) (*v1.PodList, error) { + // https://github.com/kubernetes/kubectl/blob/1199011a44e83ded0d4f1d582e731bc4212aa9f5/pkg/describe/describe.go#L3473 + fieldSelector, err := fields.ParseSelector("spec.nodeName=" + nodeName + ",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) + if err != nil { + return nil, err + } + + listOptions := metav1.ListOptions{ + FieldSelector: fieldSelector.String(), + // TODO should we chunk this? RE https://github.com/kubernetes/kubectl/blob/1199011a44e83ded0d4f1d582e731bc4212aa9f5/pkg/describe/describe.go#L3484 + } + + podList, err := clientset.CoreV1().Pods("").List(context.TODO(), listOptions) + if err != nil { + return nil, err + } + + return podList, nil +} + +func resolvePodToTarget(podClient corev1.PodInterface, resourceID, container, targetNamespace string, target *TraceJobTarget) error { + pod, err := podClient.Get(context.TODO(), resourceID, metav1.GetOptions{}) + + if err != nil { + return fmt.Errorf("failed to locate a pod for %s; error was %s", resourceID, err.Error()) + } + if pod.Spec.NodeName == "" { + return fmt.Errorf("cannot attach a trace program to a pod that is not currently scheduled on a node") + } + + var targetContainer string + target.Node = pod.Spec.NodeName + target.PodUID = string(pod.UID) + + if len(pod.Spec.Containers) == 1 { + targetContainer = pod.Spec.Containers[0].Name + } else { + // FIXME verify container is not empty + targetContainer = container + } + + for _, s := range pod.Status.ContainerStatuses { + if s.Name == targetContainer { + containerID := strings.TrimPrefix(s.ContainerID, "docker://") + containerID = strings.TrimPrefix(containerID, "containerd://") + target.ContainerID = containerID + break + } + } + + if target.ContainerID == "" { + return fmt.Errorf("no containers found for the provided pod %s and container %s combination", pod.Name, targetContainer) + } + return nil +}