Skip to content

Commit

Permalink
Capture pod logs in parallel and ignore errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Marin Dzhigarov committed Dec 23, 2024
1 parent c3feff7 commit 126156f
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 17 deletions.
2 changes: 1 addition & 1 deletion k8s/container_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c ContainerLogsImpl) Fetch(ctx context.Context, restApi rest.Interface) (i
req := restApi.Get().Namespace(c.namespace).Name(c.podName).Resource("pods").SubResource("log").VersionedParams(opts, scheme.ParameterCodec)
stream, err := req.Stream(ctx)
if err != nil {
err = errors.Wrap(err, "failed to create container log stream")
err = errors.Wrap(err, "failed to create container log stream for container with name "+c.container.Name)
}
return stream, err
}
Expand Down
34 changes: 25 additions & 9 deletions k8s/result_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"os"
"path/filepath"
"sync"

"github.com/sirupsen/logrus"
"k8s.io/cli-runtime/pkg/printers"
"k8s.io/client-go/rest"
)
Expand Down Expand Up @@ -50,6 +52,10 @@ func (w *ResultWriter) Write(ctx context.Context, searchResults []SearchResult)

// each result represents a list of searched item
// write each list in a namespaced location in working dir
var wg sync.WaitGroup
concurrencyLimit := 10
semaphore := make(chan int, concurrencyLimit)

for _, result := range searchResults {
objWriter := ObjectWriter{
writeDir: w.workdir,
Expand All @@ -72,21 +78,31 @@ func (w *ResultWriter) Write(ctx context.Context, searchResults []SearchResult)

containers, err := GetContainers(podItem)
if err != nil {
return err
logrus.Errorf("Failed to get containers for pod %s: %s", podItem.GetName(), err)
continue
}
for _, containerLogger := range containers {
reader, err := containerLogger.Fetch(ctx, w.restApi)
if err != nil {
return err
}
err = containerLogger.Write(reader, logDir)
if err != nil {
return err
}
semaphore <- 1 // Acquire a slot
wg.Add(1)
go func(logger Container) {
defer wg.Done()
defer func() { <-semaphore }() // Release the slot
reader, e := logger.Fetch(ctx, w.restApi)
if e != nil {
logrus.Errorf("Failed to fetch container logs for pod %s: %s", podItem.GetName(), e)

Check failure on line 92 in k8s/result_writer.go

View workflow job for this annotation

GitHub Actions / Build-Test-Binary

loopclosure: loop variable podItem captured by func literal (govet)
return
}
e = logger.Write(reader, logDir)
if e != nil {
logrus.Errorf("Failed to write container logs for pod %s: %s", podItem.GetName(), e)

Check failure on line 97 in k8s/result_writer.go

View workflow job for this annotation

GitHub Actions / Build-Test-Binary

loopclosure: loop variable podItem captured by func literal (govet)
return
}
}(containerLogger)
}
}
}
}
wg.Wait()

return nil
}
43 changes: 43 additions & 0 deletions starlark/kube_capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,49 @@ kube_data = kube_capture(what="logs", namespaces=["kube-system"], containers=["e
}
},
},
{
name: "capture logs does not fail even when we have a terminating pod",
script: fmt.Sprintf(`
crashd_config(workdir="%s")
set_defaults(kube_config(path="%s"))
kube_data = kube_capture(what="logs", namespaces=["default"])`, workdir, k8sconfig),
eval: func(t *testing.T, script string) {
err := testSupport.SimulateTerminatingPod()
if err != nil {
t.Error("Unexpected error while simulating terminating pod", err)
return
}
data := execute(t, script)

fileVal, err := data.Attr("file")
if err != nil {
t.Error(err)
}

fileValStr, ok := fileVal.(starlark.String)
if !ok {
t.Fatalf("unexpected type for starlark value")
}
captureDir := fileValStr.GoString()
if _, err := os.Stat(captureDir); err != nil {
t.Fatalf("stat(%s) failed: %s", captureDir, err)
}
defer os.RemoveAll(captureDir)

path := filepath.Join(captureDir, "core_v1", "default")
if _, err := os.Stat(path); err != nil {
t.Fatalf("expecting %s to be a directory", path)
}

files, err := os.ReadDir(path)
if err != nil {
t.Fatalf("ReadeDir(%s) failed: %s", path, err)
}
if len(files) == 0 {
t.Error("unexpected number of log files for namespace default:", len(files))
}
},
},
}

for _, test := range tests {
Expand Down
98 changes: 93 additions & 5 deletions testing/kindcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"os"
"strings"
"time"

"github.com/sirupsen/logrus"
"github.com/vladimirvivien/gexe"
Expand All @@ -18,13 +19,14 @@ var (
)

type KindCluster struct {
name string
config string
e *gexe.Echo
name string
config string
tmpRootDir string
e *gexe.Echo
}

func NewKindCluster(config, name string) *KindCluster {
return &KindCluster{name: name, config: config, e: gexe.New()}
func NewKindCluster(config, name, tmpRootDir string) *KindCluster {
return &KindCluster{name: name, config: config, tmpRootDir: tmpRootDir, e: gexe.New()}
}

func (k *KindCluster) Create() error {
Expand Down Expand Up @@ -74,6 +76,92 @@ func (k *KindCluster) MakeKubeConfigFile(path string) error {
return nil
}

func (k *KindCluster) SimulateTerminatingPod() error {
logrus.Infof("Simulating terminating pod in kind cluster %s", k.name)
podConfig := `
apiVersion: v1
kind: Pod
metadata:
name: stuck-pod
namespace: default
labels:
app: test
finalizers:
- example.com/finalizer
spec:
containers:
- name: busybox
image: busybox
command:
- sh
- -c
- while true; do echo "Simulating a stuck pod"; sleep 5; done
---
apiVersion: v1
kind: Pod
metadata:
name: non-stuck-pod
namespace: default
labels:
app: test
spec:
containers:
- name: busybox
image: busybox
command:
- sh
- -c
- while true; do echo "Simulating a non-stuck pod"; sleep 5; done
`
// Write pod configuration to a temporary file in the directory k.tmpRootDir
filePath := fmt.Sprintf("%s/stuck-pod.yaml", k.tmpRootDir)
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to create file for pod configuration: %s", err)
}
defer file.Close()

if _, err := file.WriteString(podConfig); err != nil {
return fmt.Errorf("failed to write pod configuration to file: %s", err)
}
p := k.e.RunProc(fmt.Sprintf(`kubectl --context kind-%s apply -f %s`, k.name, filePath))
if p.Err() != nil {
return fmt.Errorf("failed to apply pod configuration: %s: %s", p.Err(), p.Result())
}

p = k.e.RunProc(fmt.Sprintf("kubectl --context kind-%s wait --for=condition=Ready pod -l app=test --timeout=60s", k.name))
if p.Err() != nil {
return fmt.Errorf("failed to simulate terminating pod: %s: %s", p.Err(), p.Result())
}

p = k.e.RunProc(fmt.Sprintf(`kubectl --context kind-%s delete pod stuck-pod --wait=false --grace-period=0 --force`, k.name))
if p.Err() != nil {
return fmt.Errorf("failed to simulate terminating pod: %s: %s", p.Err(), p.Result())
}

// Wait until pod is in Error state or max 10 seconds
timeout := time.After(10 * time.Second)
tick := time.Tick(500 * time.Millisecond)

Check failure on line 144 in testing/kindcluster.go

View workflow job for this annotation

GitHub Actions / Build-Test-Binary

SA1015: using time.Tick leaks the underlying ticker, consider using it only in endless functions, tests and the main package, and use time.NewTicker here (staticcheck)

for {
select {
case <-timeout:
return fmt.Errorf("timed out waiting for pod to be in Terminating state")
case <-tick:
p = k.e.RunProc(fmt.Sprintf(`kubectl --context kind-%s get pod stuck-pod -o jsonpath='{.status.phase}'`, k.name))
if p.Err() != nil {
return fmt.Errorf("failed to check pod status: %s: %s", p.Err(), p.Result())
}
if strings.Contains(p.Result(), "Failed") {
logrus.Infof("Pod is in Error state: %s", p.Result())
return nil
}
}
}

return nil

Check failure on line 162 in testing/kindcluster.go

View workflow job for this annotation

GitHub Actions / Build-Test-Binary

unreachable: unreachable code (govet)
}

func (k *KindCluster) GetKubeCtlContext() string {
return fmt.Sprintf("kind-%s", k.name)
}
Expand Down
2 changes: 1 addition & 1 deletion testing/kindcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestCreateKindCluster(t *testing.T) {
k := NewKindCluster("./kind-cluster-docker.yaml", "testing-test-cluster")
k := NewKindCluster("./kind-cluster-docker.yaml", "testing-test-cluster", "")
if err := k.Create(); err != nil {
t.Error(err)
}
Expand Down
6 changes: 5 additions & 1 deletion testing/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (t *TestSupport) SetupKindCluster() error {
return err
}

kind := NewKindCluster(yamlPath, t.resourceName)
kind := NewKindCluster(yamlPath, t.resourceName, t.tmpDirRoot)
if err := kind.Create(); err != nil {
return err
}
Expand Down Expand Up @@ -197,6 +197,10 @@ func (t *TestSupport) KindClusterContextName() string {
return t.kindCluster.GetKubeCtlContext()
}

func (t *TestSupport) SimulateTerminatingPod() error {
return t.kindCluster.SimulateTerminatingPod()
}

func (t *TestSupport) TearDown() error {
var errs []error

Expand Down

0 comments on commit 126156f

Please sign in to comment.