diff --git a/build/Dockerfile.tracerunner b/build/Dockerfile.tracerunner index 69b53227..8d15596d 100644 --- a/build/Dockerfile.tracerunner +++ b/build/Dockerfile.tracerunner @@ -1,6 +1,8 @@ # syntax = docker/dockerfile:1.2 ARG bpftraceversion=v0.13.0 +ARG bccversion=v0.21.0-focal-release FROM quay.io/iovisor/bpftrace:$bpftraceversion as bpftrace +FROM quay.io/iovisor/bcc:$bccversion as bcc FROM golang:1.15-buster as gobuilder ARG GIT_ORG=iovisor @@ -26,12 +28,21 @@ RUN --mount=type=cache,target=/root/.cache/go-build make _output/bin/trace-runne FROM ubuntu:20.04 +# Install bcc by copying apt packages from docker image +COPY --from=bcc /root/bcc /tmp/bcc +RUN apt-get update && \ + DEBIAN_FRONTEND=noninteractive apt-get install -y python python3 binutils libelf1 kmod && apt-get clean && \ + dpkg -i /tmp/bcc/*.deb && rm -rf /tmp/bcc + # Install CA certificates RUN apt-get update && apt-get install -y ca-certificates && update-ca-certificates && apt-get clean COPY --from=bpftrace /usr/bin/bpftrace /usr/bin/bpftrace COPY --from=gobuilder /go/src/github.com/iovisor/kubectl-trace/_output/bin/trace-runner /bin/trace-runner +# Inject some fake tracer 'programs' for integration testing. +COPY /build/test/fake/success /usr/share/fake/success + COPY /build/hooks/prestop /bin/hooks/prestop ENTRYPOINT ["/bin/trace-runner"] diff --git a/build/test/fake/success b/build/test/fake/success new file mode 100755 index 00000000..8c3cbfc3 --- /dev/null +++ b/build/test/fake/success @@ -0,0 +1,3 @@ +#!/bin/bash + +exit 0 diff --git a/go.mod b/go.mod index 15bc900c..203910e9 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/evanphx/json-patch v4.9.0+incompatible github.com/fntlnz/mountinfo v0.0.0-20171106231217-40cb42681fad github.com/kr/pretty v0.2.1 // indirect + github.com/spf13/afero v1.3.1 github.com/spf13/cobra v1.1.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 4813ab3d..e5e0d2ea 100644 --- a/go.sum +++ b/go.sum @@ -240,6 +240,7 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -306,6 +307,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= @@ -343,7 +345,10 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9 github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= +github.com/spf13/afero v1.3.1 h1:GPTpEAuNr98px18yNQ66JllNil98wfRZ/5Ukny8FeQA= +github.com/spf13/afero v1.3.1/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= github.com/spf13/cobra v1.1.1 h1:KfztREH0tPxJJ+geloSLaAkaPkr4ki2Er5quFV1TDo4= @@ -384,6 +389,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= diff --git a/integration/cmd_run_test.go b/integration/cmd_run_test.go index ee4bf70d..3ad3504e 100644 --- a/integration/cmd_run_test.go +++ b/integration/cmd_run_test.go @@ -44,3 +44,33 @@ func (k *KubectlTraceSuite) TestReturnErrOnErr() { assert.Equal(k.T(), int32(0), job.Status.Succeeded, "No jobs in the batch should have succeeded") assert.Greater(k.T(), job.Status.Failed, int32(1), "There should be at least one failed job") } + +func (k *KubectlTraceSuite) TestGenericTracer() { + nodeName := k.GetTestNode() + + out := k.KubectlTraceCmd( + "run", + "node/"+nodeName, + "--tracer=fake", + "--program=success", + "--deadline=5", + "--imagename="+k.RunnerImage()) + assert.Regexp(k.T(), regexp.MustCompile("trace [a-f0-9-]{36} created"), out) + + var job batchv1.Job + + for { + jobs := k.GetJobs().Items + assert.Equal(k.T(), 1, len(jobs)) + + job = jobs[0] + if len(job.Status.Conditions) > 0 { + break // on the first condition + } + + time.Sleep(1 * time.Second) + } + + assert.Equal(k.T(), 1, len(job.Status.Conditions)) + assert.Equal(k.T(), "Complete", string(job.Status.Conditions[0].Type)) +} diff --git a/pkg/cmd/get.go b/pkg/cmd/get.go index 55200658..1c436c47 100644 --- a/pkg/cmd/get.go +++ b/pkg/cmd/get.go @@ -193,7 +193,7 @@ func jobsTablePrint(o io.Writer, jobs []tracejob.TraceJob) { if status == "" { status = tracejob.TraceJobUnknown } - fmt.Fprintf(w, "\n"+format, j.Namespace, j.Hostname, j.Name, status, translateTimestampSince(j.StartTime)) + fmt.Fprintf(w, "\n"+format, j.Namespace, j.Target.Node, j.Name, status, translateTimestampSince(j.StartTime)) } fmt.Fprintf(w, "\n") } diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 8dd4ad93..0d632893 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -10,19 +10,16 @@ import ( "github.com/iovisor/kubectl-trace/pkg/signals" "github.com/iovisor/kubectl-trace/pkg/tracejob" "github.com/spf13/cobra" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/cli-runtime/pkg/genericclioptions" - "k8s.io/client-go/kubernetes/scheme" - batchv1client "k8s.io/client-go/kubernetes/typed/batch/v1" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" cmdutil "k8s.io/kubectl/pkg/cmd/util" ) var ( // ImageName represents the default tracerunner image - ImageName = "quay.io/iovisor/kubectl-trace-bpftrace" + ImageName = "quay.io/iovisor/kubectl-trace-runner" // ImageTag represents the tag to fetch for ImageName ImageTag = "latest" // InitImageName represents the default init container image @@ -67,6 +64,7 @@ var ( bpftraceEmptyErrString = "the bpftrace programm cannot be empty" bpftracePatchWithoutTypeErrString = "to use --patch you must also specify the --patch-type argument" bpftracePatchTypeWithoutPatchErrString = "to use --patch-type you must specify the --patch argument" + tracerNotFound = "unknown tracer %s" ) // RunOptions ... @@ -77,9 +75,18 @@ type RunOptions struct { explicitNamespace bool // Flags local to this command - container string - eval string - program string + eval string + filename string + + // Flags for generic interface + // See TraceRunnerOptions for definitions. + // TODO: clean this up + tracer string + targetNamespace string + program string + programArgs []string + tracerDefined bool + serviceAccount string imageName string initImageName string @@ -88,13 +95,11 @@ type RunOptions struct { deadlineGracePeriod int64 resourceArg string - attach bool - isPod bool - podUID string - nodeName string + container string patch string patchType string + attach bool clientConfig *rest.Config } @@ -137,10 +142,19 @@ func NewRunCommand(factory cmdutil.Factory, streams genericclioptions.IOStreams) }, } + // flags for existing usage cmd.Flags().StringVarP(&o.container, "container", "c", o.container, "Specify the container") - cmd.Flags().BoolVarP(&o.attach, "attach", "a", o.attach, "Whether or not to attach to the trace program once it is created") cmd.Flags().StringVarP(&o.eval, "eval", "e", o.eval, "Literal string to be evaluated as a bpftrace program") - cmd.Flags().StringVarP(&o.program, "filename", "f", o.program, "File containing a bpftrace program") + cmd.Flags().StringVarP(&o.filename, "filename", "f", o.filename, "File containing a bpftrace program") + + // flags for new generic interface + cmd.Flags().StringVar(&o.tracer, "tracer", "bpftrace", "Tracing system to use") + cmd.Flags().StringVar(&o.targetNamespace, "target-namespace", "", "Namespace in which the target pod exists (if applicable). Defaults to the namespace argument passed to kubectl.") + cmd.Flags().StringVar(&o.program, "program", o.program, "Program to execute") + cmd.Flags().StringArrayVar(&o.programArgs, "args", o.programArgs, "Additional arguments to pass on to program, repeat flag for multiple arguments") + + // global flags + cmd.Flags().BoolVarP(&o.attach, "attach", "a", o.attach, "Whether or not to attach to the trace program once it is created") cmd.Flags().StringVar(&o.serviceAccount, "serviceaccount", o.serviceAccount, "Service account to use to set in the pod spec of the kubectl-trace job") cmd.Flags().StringVar(&o.imageName, "imagename", o.imageName, "Custom image for the tracerunner") cmd.Flags().StringVar(&o.initImageName, "init-imagename", o.initImageName, "Custom image for the init container responsible to fetch and prepare linux headers") @@ -155,7 +169,17 @@ func NewRunCommand(factory cmdutil.Factory, streams genericclioptions.IOStreams) // Validate validates the arguments and flags populating RunOptions accordingly. func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error { + // Selector can only be used in conjunction with tracer. + o.tracerDefined = cmd.Flag("tracer").Changed + + switch o.tracer { + case bpftrace, bcc, fake: + default: + return fmt.Errorf(tracerNotFound, o.tracer) + } + containerFlagDefined := cmd.Flag("container").Changed + switch len(args) { case 1: o.resourceArg = args[0] @@ -163,7 +187,7 @@ func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error { // 2nd argument interpreted as container when provided case 2: o.resourceArg = args[0] - o.container = args[1] + o.container = args[1] // NOTE: this should actually be -c, to be consistent with the rest of kubectl if containerFlagDefined { return fmt.Errorf(containerAsArgOrFlagErrString) } @@ -172,16 +196,6 @@ func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error { return fmt.Errorf(requiredArgErrString) } - if !cmd.Flag("eval").Changed && !cmd.Flag("filename").Changed { - return fmt.Errorf(bpftraceMissingErrString) - } - if cmd.Flag("eval").Changed == cmd.Flag("filename").Changed { - return fmt.Errorf(bpftraceDoubleErrString) - } - if (cmd.Flag("eval").Changed && len(o.eval) == 0) || (cmd.Flag("filename").Changed && len(o.program) == 0) { - return fmt.Errorf(bpftraceEmptyErrString) - } - havePatch := cmd.Flag("patch").Changed havePatchType := cmd.Flag("patch-type").Changed @@ -193,106 +207,51 @@ func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error { return fmt.Errorf(bpftracePatchTypeWithoutPatchErrString) } + switch o.tracer { + case bpftrace, bcc: + evalDefined, filenameDefined, programDefined := cmd.Flag("eval").Changed, cmd.Flag("filename").Changed, cmd.Flag("program").Changed + if !evalDefined && !filenameDefined && !programDefined { + return fmt.Errorf(bpftraceMissingErrString) + } + if (evalDefined && filenameDefined) || (evalDefined && programDefined) || (filenameDefined && programDefined) { + return fmt.Errorf(bpftraceDoubleErrString) + } + if (evalDefined && len(o.eval) == 0) || (filenameDefined && len(o.program) == 0) || (programDefined && len(o.program) == 0) { + return fmt.Errorf(bpftraceEmptyErrString) + } + default: + } + return nil } // Complete completes the setup of the command. func (o *RunOptions) Complete(factory cmdutil.Factory, cmd *cobra.Command, args []string) error { // Prepare program - if len(o.program) > 0 { - b, err := ioutil.ReadFile(o.program) - if err != nil { - return fmt.Errorf("error opening program file") + if len(o.program) == 0 { + if len(o.filename) > 0 { + b, err := ioutil.ReadFile(o.filename) + if err != nil { + return fmt.Errorf("error opening program file") + } + o.program = string(b) + } else { + o.program = o.eval } - o.program = string(b) - } else { - o.program = o.eval } - // Prepare namespace + // Prepare namespaces var err error o.namespace, o.explicitNamespace, err = factory.ToRawKubeConfigLoader().Namespace() if err != nil { return err } - // Look for the target object - x := factory. - NewBuilder(). - WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). - NamespaceParam(o.namespace). - SingleResourceType(). - ResourceNames("nodes", o.resourceArg). // Search nodes by default - Do() - - obj, err := x.Object() - if err != nil { - return err - } - - // Check we got a pod or a node - o.isPod = false - - var node *v1.Node - - switch v := obj.(type) { - case *v1.Pod: - if len(v.Spec.NodeName) == 0 { - return fmt.Errorf("cannot attach a trace program to a pod that is not currently scheduled on a node") - } - o.isPod = true - found := false - o.podUID = string(v.UID) - for _, c := range v.Spec.Containers { - // default if no container provided - if len(o.container) == 0 { - o.container = c.Name - found = true - break - } - // check if the provided one exists - if c.Name == o.container { - found = true - break - } - } - - if !found { - return fmt.Errorf("no containers found for the provided pod/container combination") - } - - obj, err = factory. - NewBuilder(). - WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). - ResourceNames("nodes", v.Spec.NodeName). - Do().Object() - - if err != nil { - return err - } - - if n, ok := obj.(*v1.Node); ok { - node = n - } - - break - case *v1.Node: - node = v - break - default: - return fmt.Errorf("first argument must be %s", usageString) - } - - if node == nil { - return fmt.Errorf("could not determine on which node to run the trace program") - } - - labels := node.GetLabels() - val, ok := labels["kubernetes.io/hostname"] - if !ok { - return fmt.Errorf("label kubernetes.io/hostname not found in node") + // If a target namespace is not specified explicitly, we want to reuse the + // namespace as understood by kubectl. + if o.targetNamespace == "" { + o.targetNamespace = o.namespace } - o.nodeName = val // Prepare client o.clientConfig, err = factory.ToRESTConfig() @@ -306,31 +265,29 @@ func (o *RunOptions) Complete(factory cmdutil.Factory, cmd *cobra.Command, args // Run executes the run command. func (o *RunOptions) Run() error { juid := uuid.NewUUID() - jobsClient, err := batchv1client.NewForConfig(o.clientConfig) + + clientset, err := kubernetes.NewForConfig(o.clientConfig) if err != nil { return err } - coreClient, err := corev1client.NewForConfig(o.clientConfig) + target, err := tracejob.ResolveTraceJobTarget(clientset, o.resourceArg, o.container, o.targetNamespace) + if err != nil { return err } - tc := &tracejob.TraceJobClient{ - JobClient: jobsClient.Jobs(o.namespace), - ConfigClient: coreClient.ConfigMaps(o.namespace), - } + tc := tracejob.NewTraceJobClient(clientset, o.namespace) tj := tracejob.TraceJob{ Name: fmt.Sprintf("%s%s", meta.ObjectNamePrefix, string(juid)), Namespace: o.namespace, ServiceAccount: o.serviceAccount, ID: juid, - Hostname: o.nodeName, + Target: *target, + Tracer: o.tracer, Program: o.program, - PodUID: o.podUID, - ContainerName: o.container, - IsPod: o.isPod, + ProgramArgs: o.programArgs, ImageNameTag: o.imageName, InitImageNameTag: o.initImageName, FetchHeaders: o.fetchHeaders, @@ -350,7 +307,7 @@ func (o *RunOptions) Run() error { if o.attach { ctx := context.Background() ctx = signals.WithStandardSignals(ctx) - a := attacher.NewAttacher(coreClient, o.clientConfig, o.IOStreams) + a := attacher.NewAttacher(clientset.CoreV1(), o.clientConfig, o.IOStreams) a.WithContext(ctx) a.AttachJob(tj.ID, job.Namespace) } diff --git a/pkg/cmd/tracerunner.go b/pkg/cmd/tracerunner.go index d25f4f07..b4d49052 100644 --- a/pkg/cmd/tracerunner.go +++ b/pkg/cmd/tracerunner.go @@ -3,7 +3,6 @@ package cmd import ( "context" "fmt" - "io" "io/ioutil" "os" "os/exec" @@ -12,20 +11,51 @@ import ( "strings" "syscall" - "github.com/fntlnz/mountinfo" + "github.com/iovisor/kubectl-trace/pkg/procfs" "github.com/spf13/cobra" ) +const ( + // MetadataDir is where trace-runner will output traces and metadata + MetadataDir = "/tmp/kubectl-trace" + + bpftrace = "bpftrace" + bcc = "bcc" + fake = "fake" +) + +var ( + bpfTraceBinaryPath = "/usr/bin/bpftrace" + bccToolsDir = "/usr/share/bcc/tools/" + fakeToolsDir = "/usr/share/fake/" +) + type TraceRunnerOptions struct { - podUID string - containerName string - inPod bool - programPath string - bpftraceBinaryPath string + // The tracing system to use. + // tracer = bpftrace | bcc | fake + tracer string + + podUID string + + containerID string + + // In the case of bcc the name of the bcc program to execute. + // In the case of bpftrace the path to contents of the user provided expression or program. + program string + + // In the case of bcc the user provided arguments to pass on to program. + // Not used for bpftrace. + programArgs []string + + // Used for traces which have their own ability to stop tracing after a specified time + // A value of 0 is ignored + duration int64 } func NewTraceRunnerOptions() *TraceRunnerOptions { - return &TraceRunnerOptions{} + return &TraceRunnerOptions{ + duration: int64(0), + } } func NewTraceRunnerCommand() *cobra.Command { @@ -46,19 +76,22 @@ func NewTraceRunnerCommand() *cobra.Command { }, } - cmd.Flags().StringVarP(&o.containerName, "container", "c", o.containerName, "Specify the container") - cmd.Flags().StringVarP(&o.podUID, "poduid", "p", o.podUID, "Specify the pod UID") - cmd.Flags().StringVarP(&o.programPath, "program", "f", "program.bt", "Specify the bpftrace program path") - cmd.Flags().StringVarP(&o.bpftraceBinaryPath, "bpftracebinary", "b", "/usr/bin/bpftrace", "Specify the bpftrace binary path") - cmd.Flags().BoolVar(&o.inPod, "inpod", false, "Whether or not run this bpftrace in a pod's container process namespace") + cmd.Flags().StringVar(&o.tracer, "tracer", "bpftrace", "Tracing system to use") + cmd.Flags().StringVar(&o.podUID, "pod-uid", "", "UID of target pod") + cmd.Flags().StringVar(&o.containerID, "container-id", "", "ID of target container") + cmd.Flags().StringVar(&o.program, "program", "/programs/program.bt", "Tracer input script or executable") + cmd.Flags().StringArrayVar(&o.programArgs, "args", o.programArgs, "Arguments to pass through to executable in --program") + cmd.Flags().Int64Var(&o.duration, "duration", o.duration, "Duration to pass to trace program, if it supports it") return cmd } func (o *TraceRunnerOptions) Validate(cmd *cobra.Command, args []string) error { - // TODO(fntlnz): do some more meaningful validation here, for now just checking if they are there - if o.inPod == true && (len(o.containerName) == 0 || len(o.podUID) == 0) { - return fmt.Errorf("poduid and container must be specified when inpod=true") + switch o.tracer { + case bpftrace, bcc, fake: + default: + return fmt.Errorf("unknown tracer %s", o.tracer) } + return nil } @@ -68,29 +101,24 @@ func (o *TraceRunnerOptions) Complete(cmd *cobra.Command, args []string) error { } func (o *TraceRunnerOptions) Run() error { - programPath := o.programPath - if o.inPod == true { - pid, err := findPidByPodContainer(o.podUID, o.containerName) - if err != nil { - return err - } - if pid == nil { - return fmt.Errorf("pid not found") - } - if len(*pid) == 0 { - return fmt.Errorf("invalid pid found") - } - f, err := ioutil.ReadFile(programPath) - if err != nil { - return err - } - programPath = path.Join(os.TempDir(), "program-container.bt") - r := strings.Replace(string(f), "$container_pid", *pid, -1) - if err := ioutil.WriteFile(programPath, []byte(r), 0755); err != nil { - return err - } + var err error + var binary *string + var args []string + + switch o.tracer { + case bpftrace: + binary, args, err = o.prepBpfTraceCommand() + case bcc: + binary, args, err = o.prepBccCommand() + case fake: + binary, args, err = o.prepFakeCommand() + } + + if err != nil { + return err } + // Assume output is stdout until other backends are implemented. fmt.Println("if your program has maps to print, send a SIGINT using Ctrl-C, if you want to interrupt the execution send SIGINT two times") ctx, cancel := context.WithCancel(context.Background()) sigCh := make(chan os.Signal, 1) @@ -116,53 +144,85 @@ func (o *TraceRunnerOptions) Run() error { } }() - c := exec.CommandContext(ctx, o.bpftraceBinaryPath, programPath) - c.Stdout = os.Stdout + var c *exec.Cmd + if len(args) == 0 { + c = exec.CommandContext(ctx, *binary) + } else { + c = exec.CommandContext(ctx, *binary, args...) + } + + return runTraceCommand(c) +} + +func runTraceCommand(c *exec.Cmd) error { c.Stdin = os.Stdin + c.Stdout = os.Stdout c.Stderr = os.Stderr return c.Run() } -func findPidByPodContainer(podUID, containerName string) (*string, error) { - d, err := os.Open("/proc") +func (o *TraceRunnerOptions) prepBpfTraceCommand() (*string, []string, error) { + programPath := o.program - if err != nil { - return nil, err + // Render $container_pid to actual process pid if scoped to container. + if o.podUID != "" && o.containerID != "" { + pid, err := procfs.FindPidByPodContainer(o.podUID, o.containerID) + if err != nil { + return nil, nil, err + } + f, err := ioutil.ReadFile(programPath) + if err != nil { + return nil, nil, err + } + programPath = path.Join(os.TempDir(), "program-container.bt") + r := strings.Replace(string(f), "$container_pid", pid, -1) + if err := ioutil.WriteFile(programPath, []byte(r), 0755); err != nil { + return nil, nil, err + } } - defer d.Close() + return &bpfTraceBinaryPath, []string{programPath}, nil +} - for { - dirs, err := d.Readdir(10) - if err == io.EOF { - break - } +func (o *TraceRunnerOptions) prepBccCommand() (*string, []string, error) { + // Sanitize o.program by removing common prefix/suffixes. + name := o.program + name = strings.TrimPrefix(name, "/usr/bin/") + name = strings.TrimPrefix(name, "/usr/sbin/") + name = strings.TrimSuffix(name, "-bpfcc") + + program := bccToolsDir + name + args := append([]string{}, o.programArgs...) + + if o.podUID != "" && o.containerID != "" { + pid, err := procfs.FindPidByPodContainer(o.podUID, o.containerID) if err != nil { - return nil, err + return nil, nil, err } - for _, di := range dirs { - if !di.IsDir() { - continue - } - dname := di.Name() - if dname[0] < '0' || dname[0] > '9' { - continue - } + for i, arg := range args { + args[i] = strings.Replace(arg, "$container_pid", pid, -1) + } + } - mi, err := mountinfo.GetMountInfo(path.Join("/proc", dname, "mountinfo")) - if err != nil { - continue - } + return &program, args, nil +} - for _, m := range mi { - root := m.Root - if strings.Contains(root, podUID) && strings.Contains(root, containerName) { - return &dname, nil - } - } +func (o *TraceRunnerOptions) prepFakeCommand() (*string, []string, error) { + name := path.Base(o.program) + program := fakeToolsDir + name + args := append([]string{}, o.programArgs...) + + if o.podUID != "" && o.containerID != "" { + pid, err := procfs.FindPidByPodContainer(o.podUID, o.containerID) + if err != nil { + return nil, nil, err + } + + for i, arg := range args { + args[i] = strings.Replace(arg, "$container_pid", pid, -1) } } - return nil, fmt.Errorf("no process found for specified pod and container") + return &program, args, nil } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go new file mode 100644 index 00000000..c5b7eb55 --- /dev/null +++ b/pkg/errors/errors.go @@ -0,0 +1,45 @@ +package errors + +const ( + ErrorInvalid string = "Invalid" + ErrorUnallocatable string = "Unallocatable" +) + +type TargetSelectionError struct { + ErrorMessage string + ErrorType string +} + +func (e TargetSelectionError) Error() string { + return e.ErrorMessage +} + +func NewErrorInvalid(message string) error { + return TargetSelectionError{ + ErrorMessage: message, + ErrorType: ErrorInvalid, + } +} + +func NewErrorUnallocatable(message string) error { + return TargetSelectionError{ + ErrorMessage: message, + ErrorType: ErrorUnallocatable, + } +} + +func IsInvalidTargetError(err error) bool { + return reasonForError(err) == ErrorInvalid +} + +func IsUnallocatableTargetError(err error) bool { + return reasonForError(err) == ErrorUnallocatable +} + +func reasonForError(err error) string { + myErr, ok := err.(TargetSelectionError) + if ok { + return myErr.ErrorType + } + return "" +} diff --git a/pkg/procfs/procfs.go b/pkg/procfs/procfs.go new file mode 100644 index 00000000..d297a11d --- /dev/null +++ b/pkg/procfs/procfs.go @@ -0,0 +1,184 @@ +package procfs + +import ( + "bufio" + "fmt" + "io" + "os" + "path" + "strings" + + "github.com/fntlnz/mountinfo" + "github.com/spf13/afero" +) + +var ProcFs = afero.NewOsFs() + +func FindPidByPodContainer(podUID, containerID string) (string, error) { + d, err := ProcFs.Open("/proc") + + if err != nil { + return "", err + } + + defer d.Close() + + for { + dirs, err := d.Readdir(10) + if err == io.EOF { + break + } + if err != nil { + return "", err + } + + for _, di := range dirs { + if !di.IsDir() { + continue + } + dname := di.Name() + if dname[0] < '0' || dname[0] > '9' { + continue + } + + mi, err := getMountInfo(path.Join("/proc", dname, "mountinfo")) + if err != nil { + continue + } + + for _, m := range mi { + root := m.Root + // See https://github.com/kubernetes/kubernetes/blob/2f3a4ec9cb96e8e2414834991d63c59988c3c866/pkg/kubelet/cm/cgroup_manager_linux.go#L81-L85 + // Note that these identifiers are currently specific to systemd, however, this mounting approach is what allows us to find the containerized + // process. + // + // EG: /kubepods/burstable/pod31dd0274-bb43-4975-bdbc-7e10047a23f8/851c75dad6ad8ce6a5d9b9129a4eb1645f7c6e5ba8406b12d50377b665737072 + // /kubepods/burstable/pod{POD_ID}/{CONTAINER_ID} + // + // This "needle" that we look for in the mountinfo haystack should match one and only one container. + needle := path.Join(podUID, containerID) + if strings.Contains(root, needle) { + return dname, nil + } + } + } + } + + return "", fmt.Errorf("no process found for specified pod and container") +} + +func FindPidsForContainer(pid string) ([]string, error) { + d, err := ProcFs.Open("/proc") + + if err != nil { + return nil, err + } + + defer d.Close() + + pids := []string{} + + ns, err := readlink(path.Join("/proc", pid, "ns", "pid")) + if err != nil { + return nil, err + } + + for { + dirs, err := d.Readdir(10) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + for _, di := range dirs { + if !di.IsDir() { + continue + } + dname := di.Name() + if dname[0] < '0' || dname[0] > '9' { + continue + } + + cns, err := readlink(path.Join("/proc", dname, "ns", "pid")) + if err != nil { + return nil, err + } + + if cns == ns { + pids = append(pids, dname) + } + } + } + + return pids, nil +} + +func GetFinalNamespacePid(pid string) (string, error) { + status, err := ProcFs.Open(path.Join("/proc", pid, "status")) + if err != nil { + return "", err + } + + scanner := bufio.NewScanner(status) + var line string + for scanner.Scan() { + line = scanner.Text() + if strings.HasPrefix(line, "NSpid") { + break + } + } + + if err := scanner.Err(); err != nil { + return "", err + } + + fields := strings.Fields(line) + return fields[len(fields)-1], nil +} + +func GetProcExe(pid string) (string, error) { + exe, err := readlink(path.Join("/proc", pid, "exe")) + if err != nil { + return "", err + } + + return exe, nil +} + +func GetProcComm(pid string) (string, error) { + comm, err := afero.ReadFile(ProcFs, path.Join("/proc", pid, "comm")) + if err != nil { + return "", err + } + + return string(comm), nil +} + +func GetProcCmdline(pid string) (string, error) { + cmdline, err := afero.ReadFile(ProcFs, path.Join("/proc", pid, "cmdline")) + if err != nil { + return "", err + } + + return string(cmdline), nil +} + +func getMountInfo(fd string) ([]mountinfo.Mountinfo, error) { + file, err := ProcFs.Open(fd) + if err != nil { + return nil, err + } + defer file.Close() + + return mountinfo.ParseMountInfo(file) +} + +func readlink(name string) (string, error) { + if r, ok := ProcFs.(afero.LinkReader); ok { + return r.ReadlinkIfPossible(name) + } + + return "", &os.PathError{Op: "readlink", Path: name, Err: afero.ErrNoReadlink} +} diff --git a/pkg/procfs/procfs_test.go b/pkg/procfs/procfs_test.go new file mode 100644 index 00000000..193c39a9 --- /dev/null +++ b/pkg/procfs/procfs_test.go @@ -0,0 +1,121 @@ +package procfs + +import ( + "io/ioutil" + "log" + "os" + "path" + "sort" + "testing" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" +) + +func TestFindPidsForContainerFindsTheContainer(t *testing.T) { + _ = setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc", 0755)) + assert.Nil(t, ProcFs.MkdirAll("/proc/1/ns", 0755)) + assert.Nil(t, symlink(ProcFs, "/pid:[0000000001]", "/proc/1/ns/pid")) + + pids, err := FindPidsForContainer("1") + + assert.Nil(t, err) + assert.Equal(t, []string{"1"}, pids) +} + +func TestFindPidsForContainerFindsTheCorrectPids(t *testing.T) { + _ = setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc/1/ns", 0755)) + assert.Nil(t, symlink(ProcFs, "/pid:[0000000001]", "/proc/1/ns/pid")) + + assert.Nil(t, ProcFs.MkdirAll("/proc/2/ns", 0755)) + assert.Nil(t, symlink(ProcFs, "/pid:[0000000001]", "/proc/2/ns/pid")) + + assert.Nil(t, ProcFs.MkdirAll("/proc/3/ns", 0755)) + assert.Nil(t, symlink(ProcFs, "/pid:[0000000001]", "/proc/3/ns/pid")) + + assert.Nil(t, ProcFs.MkdirAll("/proc/10/ns", 0755)) + assert.Nil(t, symlink(ProcFs, "/pid:[1010101010]", "/proc/10/ns/pid")) + + pids, err := FindPidsForContainer("1") + assert.Nil(t, err) + + sort.Strings(pids) + assert.Equal(t, []string{"1", "2", "3"}, pids) +} + +func TestGetFinalNamespacePid(t *testing.T) { + _ = setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc/47/ns", 0755)) + data := []byte("Name: testprogram\n NStgid: 47 1\n NSpid: 47 1\n NSpgid: 47 1\n NSsid: 47 1\n") + assert.Nil(t, afero.WriteFile(ProcFs, "/proc/47/status", data, 0444)) + + pid, err := GetFinalNamespacePid("47") + assert.Nil(t, err) + + assert.Equal(t, "1", pid) +} + +func TestGetProcComm(t *testing.T) { + _ = setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc/42", 0755)) + data := []byte("ruby") + assert.Nil(t, afero.WriteFile(ProcFs, "/proc/42/comm", data, 0444)) + + comm, err := GetProcComm("42") + assert.Nil(t, err) + + assert.Equal(t, "ruby", comm) +} + +func TestGetProcCmdline(t *testing.T) { + _ = setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc/42", 0755)) + data := []byte("Rails uri_path=/foo/bar request_id=1234") + assert.Nil(t, afero.WriteFile(ProcFs, "/proc/42/cmdline", data, 0444)) + + cmdline, err := GetProcCmdline("42") + assert.Nil(t, err) + + assert.Equal(t, "Rails uri_path=/foo/bar request_id=1234", cmdline) +} + +func TestGetProcExe(t *testing.T) { + basePath := setupBasePath(t) + + assert.Nil(t, ProcFs.MkdirAll("/proc/42", 0755)) + assert.Nil(t, symlink(ProcFs, "/usr/local/bin/ruby", "/proc/42/exe")) + + exe, err := GetProcExe("42") + assert.Nil(t, err) + + expected := path.Join(basePath, "/usr/local/bin/ruby") + assert.Equal(t, expected, exe) +} + +func setupBasePath(t *testing.T) string { + tempDir, err := ioutil.TempDir("", "example") + if err != nil { + log.Fatal(err) + } + defer os.RemoveAll(tempDir) // clean up + + ProcFs = afero.NewBasePathFs(afero.NewOsFs(), tempDir) + assert.Nil(t, ProcFs.MkdirAll("/proc", 0755)) + + return tempDir +} + +func symlink(fs afero.Fs, oldname string, newname string) error { + if r, ok := fs.(afero.Linker); ok { + return r.SymlinkIfPossible(oldname, newname) + } + + return &os.LinkError{Op: "symlink", Old: oldname, New: newname, Err: afero.ErrNoSymlink} +} diff --git a/pkg/tracejob/job.go b/pkg/tracejob/job.go index 3710c4c0..937061a9 100644 --- a/pkg/tracejob/job.go +++ b/pkg/tracejob/job.go @@ -17,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" yamlutil "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/kubernetes" batchv1typed "k8s.io/client-go/kubernetes/typed/batch/v1" corev1typed "k8s.io/client-go/kubernetes/typed/core/v1" "sigs.k8s.io/yaml" @@ -34,11 +35,12 @@ type TraceJob struct { ID types.UID Namespace string ServiceAccount string - Hostname string + Tracer string + Target TraceJobTarget Program string + ProgramArgs []string PodUID string ContainerName string - IsPod bool ImageNameTag string InitImageNameTag string FetchHeaders bool @@ -50,6 +52,13 @@ type TraceJob struct { PatchType string } +func NewTraceJobClient(clientset kubernetes.Interface, namespace string) *TraceJobClient { + return &TraceJobClient{ + JobClient: clientset.BatchV1().Jobs(namespace), + ConfigClient: clientset.CoreV1().ConfigMaps(namespace), + } +} + // WithOutStream setup a file stream to output trace job operation information func (t *TraceJobClient) WithOutStream(o io.Writer) { if o == nil { @@ -141,7 +150,9 @@ func (t *TraceJobClient) GetJob(nf TraceJobFilter) ([]TraceJob, error) { Name: name, ID: types.UID(id), Namespace: j.Namespace, - Hostname: hostname, + Target: TraceJobTarget{ + Node: hostname, + }, StartTime: j.Status.StartTime, Status: jobStatus(j), } @@ -193,42 +204,48 @@ func (t *TraceJobClient) DeleteJobs(nf TraceJobFilter) error { } func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) { + job, cm := nj.Job(), nj.ConfigMap() + + // Optionally patch the job before creating it + if nj.PatchType != "" && nj.Patch != "" { + newJob, err := patchJobFile(job, nj.PatchType, nj.Patch) + if err != nil { + return nil, err + } + job = newJob + } + + if _, err := t.ConfigClient.Create(context.Background(), cm, metav1.CreateOptions{}); err != nil { + return nil, err + } + return t.JobClient.Create(context.Background(), job, metav1.CreateOptions{}) +} - bpfTraceCmd := []string{ +func (nj *TraceJob) Job() *batchv1.Job { + traceCmd := []string{ "/bin/timeout", "--preserve-status", "--signal", "INT", strconv.FormatInt(nj.Deadline, 10), "/bin/trace-runner", - "--program=/programs/program.bt", + "--tracer=" + nj.Tracer, + "--pod-uid=" + nj.Target.PodUID, + "--container-id=" + nj.Target.ContainerID, } - if nj.IsPod { - bpfTraceCmd = append(bpfTraceCmd, "--inpod") - bpfTraceCmd = append(bpfTraceCmd, "--container="+nj.ContainerName) - bpfTraceCmd = append(bpfTraceCmd, "--poduid="+nj.PodUID) + if nj.Tracer == "bpftrace" { + traceCmd = append(traceCmd, "--program=/programs/program.bt") + } else { + traceCmd = append(traceCmd, "--program="+nj.Program) } - commonMeta := metav1.ObjectMeta{ - Name: nj.Name, - Namespace: nj.Namespace, - Labels: map[string]string{ - meta.TraceLabelKey: nj.Name, - meta.TraceIDLabelKey: string(nj.ID), - }, - Annotations: map[string]string{ - meta.TraceLabelKey: nj.Name, - meta.TraceIDLabelKey: string(nj.ID), - }, + for _, arg := range nj.ProgramArgs { + traceCmd = append(traceCmd, "--args="+arg) } - cm := &apiv1.ConfigMap{ - ObjectMeta: commonMeta, - Data: map[string]string{ - "program.bt": nj.Program, - }, - } + commonMeta := *nj.Meta() + cm := nj.ConfigMap() job := &batchv1.Job{ ObjectMeta: commonMeta, @@ -283,7 +300,7 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) { apiv1.Container{ Name: nj.Name, Image: nj.ImageNameTag, - Command: bpfTraceCmd, + Command: traceCmd, TTY: true, Stdin: true, Resources: apiv1.ResourceRequirements{ @@ -337,7 +354,7 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) { apiv1.NodeSelectorRequirement{ Key: "kubernetes.io/hostname", Operator: apiv1.NodeSelectorOpIn, - Values: []string{nj.Hostname}, + Values: []string{nj.Target.Node}, }, }, }, @@ -477,20 +494,32 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) { ReadOnly: true, }) } - if _, err := t.ConfigClient.Create(context.Background(), cm, metav1.CreateOptions{}); err != nil { - return nil, err - } - // Optionally patch the job before creating it - if nj.PatchType != "" && nj.Patch != "" { - newJob, err := patchJobFile(job, nj.PatchType, nj.Patch) - if err != nil { - return nil, err - } - job = newJob + return job +} + +func (nj *TraceJob) ConfigMap() *apiv1.ConfigMap { + return &apiv1.ConfigMap{ + ObjectMeta: *nj.Meta(), + Data: map[string]string{ + "program.bt": nj.Program, + }, } +} - return t.JobClient.Create(context.Background(), job, metav1.CreateOptions{}) +func (nj *TraceJob) Meta() *metav1.ObjectMeta { + return &metav1.ObjectMeta{ + Name: nj.Name, + Namespace: nj.Namespace, + Labels: map[string]string{ + meta.TraceLabelKey: nj.Name, + meta.TraceIDLabelKey: string(nj.ID), + }, + Annotations: map[string]string{ + meta.TraceLabelKey: nj.Name, + meta.TraceIDLabelKey: string(nj.ID), + }, + } } func int32Ptr(i int32) *int32 { return &i } diff --git a/pkg/tracejob/selected_target.go b/pkg/tracejob/selected_target.go new file mode 100644 index 00000000..fa136bbb --- /dev/null +++ b/pkg/tracejob/selected_target.go @@ -0,0 +1,293 @@ +/* + +Currently, we are doing a bunch of crazy stuff in order to build our trace job. + +We have two ways of specifying the thing we actually want to trace: + +- From a literal target resource ( run.go 's o.resourceArg, taken from arg[0]) + - resourceArg is overloaded, it can be either the node, or the pod which we would like to trace + - In the future, it may be other types, eg, deployment/NAME, or statefulset/NAME, etc. + - additionally, o.container is specified further with arg[1], and may be required for cases where resourceArg is not a Node. + +Complicating this even further, we also permit for the target to be given by a Selector string + +Because of this, the selector string is parsed into a struct with a mutable hash, and we end up using +the selector struct as a way to store our actual target. + +To simplify this, we should separate the **desired target** from the **selected target**. + +If the pod / container are specified with positional arguments, then the selector **must not** +include a selector for the pod or container, but is permitted to have a process selector. + +It is also valid to have no positional arguments at all, and rely entirely on the +selector to specify the pod / container to target. + +1. Resolve the Target + - Look at any positional arguments (from CLI) + - Look at the Selector + - Perform validations + - Resolve to: + - At minimum, always a node to run the tracejob on (inferred through pod, and perhaps through deployment → pod) + - If a pod is being targeted, it may also have a Pod target and a Container +2. Create the TraceJob + - The fully specify target (the output from 1, above) is provided as a parameter + - The expectation is that a TraceJob is an entity that is scheduled on a given node: + - Other logic (1) is responsible for determining the correct node to run on + - Only necessary selector arguments are passed to the trace-runner, as an argument within the trace job + +*/ + +package tracejob + +import ( + "context" + "fmt" + "strings" + + v1 "k8s.io/api/core/v1" + + "github.com/iovisor/kubectl-trace/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +/* +This struct will be used when creating the TraceJob +*/ + +type TraceJobTarget struct { + Node string // Used for tracejob NodeSelector + PodUID string // passed as argument to trace-runner + ContainerID string // passed as argument to trace-runner +} + +/* +ResolveTraceJobTarget will: + +- take a kubernetes resource, possibly namespaced, and resolve it to a concrete node that the tracejob can run on + - if the resource is a node, the scope of the trace will be the entire node + - if the resource is a pod, or something that can be resolved to a pod: + - resource is namespaced, the targetNamespace is required + - if the pod has multiple containers, a container name is required, otherwise the only container will be used + +clientset - a kubernetes clientset, used for resolving the target resource +resource - a string, indicating the kubernetes resource to be traced. Currently supported: node, pod +container - (optional) the container to trace, required only if needed to disambiguate +targetNamespace - (optional) the target namespace of a given resource, required if the resource is namespaced + +*/ + +func ResolveTraceJobTarget(clientset kubernetes.Interface, resource, container, targetNamespace string) (*TraceJobTarget, error) { + + target := TraceJobTarget{} + + var resourceType string + var resourceID string + + resourceParts := strings.Split(resource, "/") + + if len(resourceParts) < 1 || len(resourceParts) > 2 { + return nil, fmt.Errorf("Invalid resource string %s", resource) + } else if len(resourceParts) == 1 { + resourceType = "node" + resourceID = resourceParts[0] + } else if len(resourceParts) == 2 { + resourceType = resourceParts[0] + resourceID = resourceParts[1] + } + + switch resourceType { + case "node": + nodeClient := clientset.CoreV1().Nodes() + allocatable, err := NodeIsAllocatable(clientset, resourceID) + if err != nil { + return nil, err + } + if !allocatable { + return nil, errors.NewErrorUnallocatable(fmt.Sprintf("Node %s is not allocatable", resourceID)) + } + node, err := nodeClient.Get(context.TODO(), resourceID, metav1.GetOptions{}) + if err != nil { + return nil, errors.NewErrorInvalid(fmt.Sprintf("Failed to locate a node for %s %v", resourceID, err)) + } + + labels := node.GetLabels() + val, ok := labels["kubernetes.io/hostname"] + if !ok { + return nil, errors.NewErrorInvalid(fmt.Sprintf("label kubernetes.io/hostname not found in node")) + } + target.Node = val + + return &target, nil + case "pod": + podClient := clientset.CoreV1().Pods(targetNamespace) + pod, err := podClient.Get(context.TODO(), resourceID, metav1.GetOptions{}) + if err != nil { + return nil, errors.NewErrorInvalid(fmt.Sprintf("Could not locate pod %s", resourceID)) + } + + allocatable, err := NodeIsAllocatable(clientset, pod.Spec.NodeName) + if err != nil || !allocatable { + return nil, errors.NewErrorUnallocatable(fmt.Sprintf("Pod %s is not scheduled on an allocatable node", resourceID)) + } + + err = resolvePodToTarget(podClient, resourceID, container, targetNamespace, &target) + if err != nil { + return nil, err + } + return &target, nil + case "deploy", "deployment": + deployClient := clientset.AppsV1().Deployments(targetNamespace) + deployment, err := deployClient.Get(context.TODO(), resourceID, metav1.GetOptions{}) + if err != nil { + return nil, err + } + labelMap, err := metav1.LabelSelectorAsMap(deployment.Spec.Selector) + if err != nil { + return nil, err + } + podClient := clientset.CoreV1().Pods(targetNamespace) + pods, err := podClient.List(context.TODO(), metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labelMap).String()}) + if err != nil { + return nil, err + } + + var selectedPod *v1.Pod + for _, pod := range pods.Items { + allocatable, err := NodeIsAllocatable(clientset, pod.Spec.NodeName) + if err != nil { + continue + } + + if allocatable { + selectedPod = &pod + break + } + } + + if selectedPod == nil { + return nil, errors.NewErrorUnallocatable(fmt.Sprintf("No pods for deployment %s were on allocatable nodes", resourceID)) + } + + err = resolvePodToTarget(podClient, selectedPod.Name, container, targetNamespace, &target) + if err != nil { + return nil, err + } + + return &target, nil + default: + return nil, errors.NewErrorInvalid(fmt.Sprintf("Unsupported resource type %s for %s\n", resourceType, resourceID)) + } + + return &target, nil +} + +func NodeIsAllocatable(clientset kubernetes.Interface, hostname string) (bool, error) { + // if the job is neither Successful nor Failed, here we need to check to make sure that it is at least + // schedulable on the node. To do that + // + // retrieve a list of all pods currently scheduled on the node + // check the number of pods available on the node + // make sure that there are "enough" pods available + // - if not, mark as "Unschedulable" + // account for Unschedulable elsewhere in the operator logic? + + allPods, err := allPodsForNode(clientset, hostname) + if err != nil { + fmt.Errorf("could not retrieve pods %v", err) + return false, err + } + + var nonTerminalPods []*v1.Pod + for _, pod := range allPods.Items { + if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { + continue + } + + nonTerminalPods = append(nonTerminalPods, &pod) + } + + fmt.Printf("Got %d pods, %d non-terminal pods", len(allPods.Items), len(nonTerminalPods)) + + nodeClient := clientset.CoreV1().Nodes() + node, err := nodeClient.Get(context.TODO(), hostname, metav1.GetOptions{}) + + if err != nil { + fmt.Errorf("could not get node object %v", err) + return false, err + } + + maxPods, ok := node.Status.Allocatable.Pods().AsInt64() + if !ok { + err = fmt.Errorf("quantity was not an integer: %s", node.Status.Allocatable.Pods().String()) + fmt.Errorf("could not parse the number of allocatable pods %v", err) + return false, err + } + + if len(nonTerminalPods) >= int(maxPods) { + return false, nil + } + + // FIXME check resources, check for cordoned before giving the all-clear + + return true, nil +} + +func allPodsForNode(clientset kubernetes.Interface, nodeName string) (*v1.PodList, error) { + // https://github.com/kubernetes/kubectl/blob/1199011a44e83ded0d4f1d582e731bc4212aa9f5/pkg/describe/describe.go#L3473 + fieldSelector, err := fields.ParseSelector("spec.nodeName=" + nodeName + ",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) + if err != nil { + return nil, err + } + + listOptions := metav1.ListOptions{ + FieldSelector: fieldSelector.String(), + // TODO should we chunk this? RE https://github.com/kubernetes/kubectl/blob/1199011a44e83ded0d4f1d582e731bc4212aa9f5/pkg/describe/describe.go#L3484 + } + + podList, err := clientset.CoreV1().Pods("").List(context.TODO(), listOptions) + if err != nil { + return nil, err + } + + return podList, nil +} + +func resolvePodToTarget(podClient corev1.PodInterface, resourceID, container, targetNamespace string, target *TraceJobTarget) error { + pod, err := podClient.Get(context.TODO(), resourceID, metav1.GetOptions{}) + + if err != nil { + return fmt.Errorf("failed to locate a pod for %s; error was %s", resourceID, err.Error()) + } + if pod.Spec.NodeName == "" { + return fmt.Errorf("cannot attach a trace program to a pod that is not currently scheduled on a node") + } + + var targetContainer string + target.Node = pod.Spec.NodeName + target.PodUID = string(pod.UID) + + if len(pod.Spec.Containers) == 1 { + targetContainer = pod.Spec.Containers[0].Name + } else { + // FIXME verify container is not empty + targetContainer = container + } + + for _, s := range pod.Status.ContainerStatuses { + if s.Name == targetContainer { + containerID := strings.TrimPrefix(s.ContainerID, "docker://") + containerID = strings.TrimPrefix(containerID, "containerd://") + target.ContainerID = containerID + break + } + } + + if target.ContainerID == "" { + return fmt.Errorf("no containers found for the provided pod %s and container %s combination", pod.Name, targetContainer) + } + return nil +}