From 3e1134c0580b78cbfd902d28380f29bbbd155ec7 Mon Sep 17 00:00:00 2001 From: rbotarleanu Date: Fri, 9 Dec 2022 11:11:56 +0000 Subject: [PATCH] Make unschedulable pod timeout duration configurable (#221) * Make unschedulable pod timeout duration configurable * Autorelease 0.41.0-rc1 [skip ci] * Log timeout duration * Autorelease 0.41.0-rc2 [skip ci] * Additional log for debugging * Better logging to see if configuration works properly * Autorelease 0.41.0-rc3 [skip ci] * Fix config * Remove hardcoded value * Improve log * Check for negative values as well * Autorelease 0.41.0-rc4 [skip ci] * Fix time until timeout log Co-authored-by: Robert-Mihai Botarleanu --- cmd/server.go | 1 + config/config.go | 21 ++++++++++--------- docker/var/conf/install.yml | 1 + hack/dev/generate-certs.sh | 2 +- .../extendertest/extender_test_utils.go | 3 ++- internal/extender/unschedulablepods.go | 20 ++++++++++++------ 6 files changed, 30 insertions(+), 18 deletions(-) diff --git a/cmd/server.go b/cmd/server.go index 0aa25d708..69567af9c 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -232,6 +232,7 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) { kubeClient.CoreV1(), overheadComputer, binpacker, + install.UnschedulablePodTimeoutDuration, ) resourceReservationCache.Run(ctx) diff --git a/config/config.go b/config/config.go index e1c7d433e..a2bdcb3cb 100644 --- a/config/config.go +++ b/config/config.go @@ -22,16 +22,17 @@ import ( // Install contains the install time configuration of the server and kubernetes dependency type Install struct { - config.Install `yaml:",inline"` - config.Runtime `yaml:",inline"` - Kubeconfig string `yaml:"kube-config,omitempty"` - FIFO bool `yaml:"fifo,omitempty"` - FifoConfig FifoConfig `yaml:"fifo-config,omitempty"` - QPS float32 `yaml:"qps,omitempty"` - Burst int `yaml:"burst,omitempty"` - BinpackAlgo string `yaml:"binpack,omitempty"` - InstanceGroupLabel string `yaml:"instance-group-label,omitempty"` - AsyncClientConfig AsyncClientConfig `yaml:"async-client-config,omitempty"` + config.Install `yaml:",inline"` + config.Runtime `yaml:",inline"` + Kubeconfig string `yaml:"kube-config,omitempty"` + FIFO bool `yaml:"fifo,omitempty"` + FifoConfig FifoConfig `yaml:"fifo-config,omitempty"` + QPS float32 `yaml:"qps,omitempty"` + Burst int `yaml:"burst,omitempty"` + BinpackAlgo string `yaml:"binpack,omitempty"` + InstanceGroupLabel string `yaml:"instance-group-label,omitempty"` + AsyncClientConfig AsyncClientConfig `yaml:"async-client-config,omitempty"` + UnschedulablePodTimeoutDuration time.Duration `yaml:"unschedulable-pod-timeout-duration,omitempty"` // Deprecated: assumed true, value not used UseExperimentalHostPriorities bool `yaml:"use-experimental-host-priorities,omitempty"` diff --git a/docker/var/conf/install.yml b/docker/var/conf/install.yml index 51ee93595..81a464e07 100644 --- a/docker/var/conf/install.yml +++ b/docker/var/conf/install.yml @@ -3,6 +3,7 @@ server: management-port: 8484 context-path: /spark-scheduler fifo: true +unschedulable-pod-timeout-duration: 10m logging: level: info output: STDOUT diff --git a/hack/dev/generate-certs.sh b/hack/dev/generate-certs.sh index 839702953..9d6d6a880 100755 --- a/hack/dev/generate-certs.sh +++ b/hack/dev/generate-certs.sh @@ -10,7 +10,7 @@ mkdir -p $TMP_DIR openssl genrsa -out "${TMP_DIR}/rootCA.key" 2048 # Generate the root ca cert from the key -openssl req -batch -x509 -sha256 -new -nodes -key ${TMP_DIR}/rootCA.key -days 3650 -out ${TMP_DIR}/rootCA.crt +openssl req -batch -x509 -sha256 -new -nodes -key ${TMP_DIR}/rootCA.key -days 3650 -out ${TMP_DIR}/rootCA.crt -config ${SCRIPT_DIR}/cert.conf # Generate the signing request for the witchcraft ssl cert as well as the private key openssl req -new -nodes -newkey rsa:2048 -keyout ${TMP_DIR}/spark-scheduler.key -out ${TMP_DIR}/spark-scheduler.csr -config ${SCRIPT_DIR}/cert.conf -extensions 'v3_req' diff --git a/internal/extender/extendertest/extender_test_utils.go b/internal/extender/extendertest/extender_test_utils.go index 36236643e..dc92895b5 100644 --- a/internal/extender/extendertest/extender_test_utils.go +++ b/internal/extender/extendertest/extender_test_utils.go @@ -156,7 +156,8 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) { podLister, fakeKubeClient.CoreV1(), overheadComputer, - binpacker) + binpacker, + installConfig.UnschedulablePodTimeoutDuration) return &Harness{ Extender: sparkSchedulerExtender, diff --git a/internal/extender/unschedulablepods.go b/internal/extender/unschedulablepods.go index 0fa886f5c..3b3d28f8a 100644 --- a/internal/extender/unschedulablepods.go +++ b/internal/extender/unschedulablepods.go @@ -33,9 +33,8 @@ import ( ) const ( - podExceedsClusterCapacity v1.PodConditionType = "PodExceedsClusterCapacity" - unschedulablePollingInterval time.Duration = time.Minute - unschedulableInClusterThreshold time.Duration = 10 * time.Minute + podExceedsClusterCapacity v1.PodConditionType = "PodExceedsClusterCapacity" + unschedulablePollingInterval time.Duration = time.Minute ) // UnschedulablePodMarker checks for spark scheduler managed pending driver pods @@ -47,6 +46,7 @@ type UnschedulablePodMarker struct { coreClient corev1.CoreV1Interface overheadComputer *OverheadComputer binpacker *Binpacker + timeoutDuration time.Duration } // NewUnschedulablePodMarker creates a new UnschedulablePodMarker @@ -55,13 +55,20 @@ func NewUnschedulablePodMarker( podLister corelisters.PodLister, coreClient corev1.CoreV1Interface, overheadComputer *OverheadComputer, - binpacker *Binpacker) *UnschedulablePodMarker { + binpacker *Binpacker, + timeoutDuration time.Duration) *UnschedulablePodMarker { + + if timeoutDuration <= 0 { + timeoutDuration = 10 * time.Minute + } + return &UnschedulablePodMarker{ nodeLister: nodeLister, podLister: podLister, coreClient: coreClient, overheadComputer: overheadComputer, binpacker: binpacker, + timeoutDuration: timeoutDuration, } } @@ -94,12 +101,13 @@ func (u *UnschedulablePodMarker) scanForUnschedulablePods(ctx context.Context) { len(pod.Spec.NodeName) == 0 && pod.DeletionTimestamp == nil && pod.Labels[common.SparkRoleLabel] == common.Driver && - pod.CreationTimestamp.Time.Add(unschedulableInClusterThreshold).Before(now) { + pod.CreationTimestamp.Time.Add(u.timeoutDuration).Before(now) { ctx = svc1log.WithLoggerParams( ctx, svc1log.SafeParam("podName", pod.Name), - svc1log.SafeParam("podNamespace", pod.Namespace)) + svc1log.SafeParam("podNamespace", pod.Namespace), + svc1log.SafeParam("timeUntilTimeout", pod.CreationTimestamp.Time.Add(u.timeoutDuration).Sub(now))) exceedsCapacity, err := u.DoesPodExceedClusterCapacity(ctx, pod) if err != nil {