Skip to content

Commit

Permalink
Merge pull request #442 from buildkite/add-job-watcher
Browse files Browse the repository at this point in the history
Add job watcher
  • Loading branch information
DrJosh9000 authored Dec 5, 2024
2 parents 94ae79d + 1ea1b55 commit c85d911
Show file tree
Hide file tree
Showing 17 changed files with 674 additions and 93 deletions.
6 changes: 6 additions & 0 deletions .buildkite/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ rules:
- pods/eviction
verbs:
- create
- apiGroups:
- ""
resources:
- events
verbs:
- list
---
apiVersion: v1
kind: ServiceAccount
Expand Down
6 changes: 6 additions & 0 deletions charts/agent-stack-k8s/templates/rbac.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ rules:
- pods/eviction
verbs:
- create
- apiGroups:
- ""
resources:
- events
verbs:
- list
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down
15 changes: 15 additions & 0 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ func AddConfigFlags(cmd *cobra.Command) {
)
cmd.Flags().String("graphql-endpoint", "", "Buildkite GraphQL endpoint URL")

cmd.Flags().Duration(
"stale-job-data-timeout",
config.DefaultStaleJobDataTimeout,
"Duration after querying jobs in Buildkite that the data is considered valid",
)
cmd.Flags().Int(
"job-creation-concurrency",
config.DefaultJobCreationConcurrency,
"Number of concurrent goroutines to run for converting Buildkite jobs into Kubernetes jobs",
)
cmd.Flags().Duration(
"image-pull-backoff-grace-period",
config.DefaultImagePullBackOffGracePeriod,
Expand All @@ -100,6 +110,11 @@ func AddConfigFlags(cmd *cobra.Command) {
config.DefaultJobCancelCheckerPollInterval,
"Controls the interval between job state queries while a pod is still Pending",
)
cmd.Flags().Duration(
"empty-job-grace-period",
config.DefaultEmptyJobGracePeriod,
"Duration after starting a Kubernetes job that the controller will wait before considering failing the job due to a missing pod (e.g. when the podSpec specifies a missing service account)",
)
cmd.Flags().Bool(
"prohibit-kubernetes-plugin",
false,
Expand Down
1 change: 1 addition & 0 deletions cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestReadAndParseConfig(t *testing.T) {
JobTTL: 300 * time.Second,
ImagePullBackOffGracePeriod: 60 * time.Second,
JobCancelCheckerPollInterval: 10 * time.Second,
EmptyJobGracePeriod: 50 * time.Second,
PollInterval: 5 * time.Second,
StaleJobDataTimeout: 10 * time.Second,
JobCreationConcurrency: 5,
Expand Down
1 change: 1 addition & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ image: my.registry.dev/buildkite-agent:latest
job-ttl: 5m
image-pull-backoff-grace-period: 60s
job-cancel-checker-poll-interval: 10s
empty-job-grace-period: 50s
poll-interval: 5s
stale-job-data-timeout: 10s
job-creation-concurrency: 5
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ const (
BuildURLAnnotation = "buildkite.com/build-url"
JobURLAnnotation = "buildkite.com/job-url"
DefaultNamespace = "default"
DefaultStaleJobDataTimeout = 10 * time.Second
DefaultImagePullBackOffGracePeriod = 30 * time.Second
DefaultJobCancelCheckerPollInterval = 5 * time.Second
DefaultEmptyJobGracePeriod = 30 * time.Second
DefaultJobCreationConcurrency = 5
)

var DefaultAgentImage = "ghcr.io/buildkite/agent:" + version.Version()
Expand Down Expand Up @@ -49,6 +52,7 @@ type Config struct {
PodSpecPatch *corev1.PodSpec `json:"pod-spec-patch" validate:"omitempty"`
ImagePullBackOffGracePeriod time.Duration `json:"image-pull-backoff-grace-period" validate:"omitempty"`
JobCancelCheckerPollInterval time.Duration `json:"job-cancel-checker-poll-interval" validate:"omitempty"`
EmptyJobGracePeriod time.Duration `json:"empty-job-grace-period" validate:"omitempty"`

// WorkspaceVolume allows supplying a volume for /workspace. By default
// an EmptyDir volume is created for it.
Expand Down
13 changes: 13 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,20 @@ func Run(
logger.Fatal("failed to register completions informer", zap.Error(err))
}

// JobWatcher watches for jobs in bad conditions to clean up:
// * Jobs that fail without ever creating a pod
// * Jobs that stall forever without ever creating a pod
jobWatcher := scheduler.NewJobWatcher(
logger.Named("jobWatcher"),
k8sClient,
cfg,
)
if err := jobWatcher.RegisterInformer(ctx, informerFactory); err != nil {
logger.Fatal("failed to register jobWatcher informer", zap.Error(err))
}

// PodWatcher watches for other conditions to clean up pods:
// * Pods where an init container failed for any reason
// * Pods where a container is in ImagePullBackOff for too long
// * Pods that are still pending, but the Buildkite job has been cancelled
podWatcher := scheduler.NewPodWatcher(
Expand Down
5 changes: 3 additions & 2 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Khan/genqlient/graphql"
"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/model"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -46,12 +47,12 @@ func New(logger *zap.Logger, k8s kubernetes.Interface, cfg Config) (*Monitor, er

// Default StaleJobDataTimeout to 10s.
if cfg.StaleJobDataTimeout <= 0 {
cfg.StaleJobDataTimeout = 10 * time.Second
cfg.StaleJobDataTimeout = config.DefaultStaleJobDataTimeout
}

// Default CreationConcurrency to 5.
if cfg.JobCreationConcurrency <= 0 {
cfg.JobCreationConcurrency = 5
cfg.JobCreationConcurrency = config.DefaultJobCreationConcurrency
}

return &Monitor{
Expand Down
23 changes: 15 additions & 8 deletions internal/controller/scheduler/completions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"

"go.uber.org/zap"

v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,9 +17,18 @@ import (
"k8s.io/utils/ptr"
)

const defaultTermGracePeriodSeconds = 60

type completionsWatcher struct {
logger *zap.Logger
k8s kubernetes.Interface

// This is the context passed to RegisterInformer.
// It's being stored here (grrrr!) because the k8s ResourceEventHandler
// interface doesn't have context args. (Working around an interface in a
// library outside of our control is a carve-out from the usual rule.)
// The context is needed to ensure goroutines are cleaned up.
resourceEventHandlerCtx context.Context
}

func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *completionsWatcher {
Expand All @@ -30,14 +40,12 @@ func NewPodCompletionWatcher(logger *zap.Logger, k8s kubernetes.Interface) *comp
}

// Creates a Pods informer and registers the handler on it
func (w *completionsWatcher) RegisterInformer(
ctx context.Context,
factory informers.SharedInformerFactory,
) error {
func (w *completionsWatcher) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
informer := factory.Core().V1().Pods().Informer()
if _, err := informer.AddEventHandler(w); err != nil {
return err
}
w.resourceEventHandlerCtx = ctx // see note on field
go factory.Start(ctx.Done())
return nil
}
Expand All @@ -49,7 +57,7 @@ func (w *completionsWatcher) OnDelete(obj any) {}
func (w *completionsWatcher) OnAdd(obj any, isInInitialList bool) {
completionWatcherOnAddEventCounter.Inc()
pod := obj.(*v1.Pod)
w.cleanupSidecars(pod)
w.cleanupSidecars(w.resourceEventHandlerCtx, pod)
}

func (w *completionsWatcher) OnUpdate(old any, new any) {
Expand All @@ -62,15 +70,15 @@ func (w *completionsWatcher) OnUpdate(old any, new any) {
}

newPod := new.(*v1.Pod)
w.cleanupSidecars(newPod)
w.cleanupSidecars(w.resourceEventHandlerCtx, newPod)
}

// cleanupSidecars first checks if the container status of the agent container
// in the pod is Terminated. If so, it ensures the job is cleaned up by updating
// it with an ActiveDeadlineSeconds value (defaultTermGracePeriodSeconds).
// (So this is not actually sidecar-specific, but is needed because sidecars
// would otherwise cause the pod to continue running.)
func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) {
func (w *completionsWatcher) cleanupSidecars(ctx context.Context, pod *v1.Pod) {
terminated := getTermination(pod)
if terminated == nil {
return
Expand All @@ -82,7 +90,6 @@ func (w *completionsWatcher) cleanupSidecars(pod *v1.Pod) {
)

if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
ctx := context.TODO()
job, err := w.k8s.BatchV1().Jobs(pod.Namespace).Get(ctx, pod.Labels["job-name"], metav1.GetOptions{})
if err != nil {
return err
Expand Down
39 changes: 37 additions & 2 deletions internal/controller/scheduler/fail_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"os"

"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/version"

agentcore "github.com/buildkite/agent/v3/core"
Expand All @@ -16,9 +18,42 @@ import (
"k8s.io/client-go/kubernetes"
)

// failJob fails the job in Buildkite. agentToken needs to be the token value.
// acquireAndFailForObject figures out how to fail the BK job corresponding to
// the k8s object (a pod or job) by inspecting the object's labels.
func acquireAndFailForObject(
ctx context.Context,
logger *zap.Logger,
k8sClient kubernetes.Interface,
cfg *config.Config,
obj metav1.Object,
message string,
) error {
agentToken, err := fetchAgentToken(ctx, logger, k8sClient, obj.GetNamespace(), cfg.AgentTokenSecret)
if err != nil {
logger.Error("fetching agent token from secret", zap.Error(err))
return err
}

// Matching tags are required order to connect the temporary agent.
labels := obj.GetLabels()
jobUUID := labels[config.UUIDLabel]
if jobUUID == "" {
logger.Error("object missing UUID label", zap.String("label", config.UUIDLabel))
return errors.New("missing UUID label")
}
tags := agenttags.TagsFromLabels(labels)
opts := cfg.AgentConfig.ControllerOptions()

if err := acquireAndFail(ctx, logger, agentToken, jobUUID, tags, message, opts...); err != nil {
logger.Error("failed to acquire and fail the job on Buildkite", zap.Error(err))
return err
}
return nil
}

// acquireAndFail fails the job in Buildkite. agentToken needs to be the token value.
// Use fetchAgentToken to fetch it from the k8s secret.
func failJob(
func acquireAndFail(
ctx context.Context,
zapLogger *zap.Logger,
agentToken string,
Expand Down
Loading

0 comments on commit c85d911

Please sign in to comment.