Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
Make unschedulable pod timeout duration configurable (#221)
Browse files Browse the repository at this point in the history
* Make unschedulable pod timeout duration configurable

* Autorelease 0.41.0-rc1

[skip ci]

* Log timeout duration

* Autorelease 0.41.0-rc2

[skip ci]

* Additional log for debugging

* Better logging to see if configuration works properly

* Autorelease 0.41.0-rc3

[skip ci]

* Fix config

* Remove hardcoded value

* Improve log

* Check for negative values as well

* Autorelease 0.41.0-rc4

[skip ci]

* Fix time until timeout log

Co-authored-by: Robert-Mihai Botarleanu <[email protected]>
  • Loading branch information
rbotarleanu and Robert-Mihai Botarleanu authored Dec 9, 2022
1 parent d6bdc6a commit 3e1134c
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 18 deletions.
1 change: 1 addition & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {
kubeClient.CoreV1(),
overheadComputer,
binpacker,
install.UnschedulablePodTimeoutDuration,
)

resourceReservationCache.Run(ctx)
Expand Down
21 changes: 11 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (

// Install contains the install time configuration of the server and kubernetes dependency
type Install struct {
config.Install `yaml:",inline"`
config.Runtime `yaml:",inline"`
Kubeconfig string `yaml:"kube-config,omitempty"`
FIFO bool `yaml:"fifo,omitempty"`
FifoConfig FifoConfig `yaml:"fifo-config,omitempty"`
QPS float32 `yaml:"qps,omitempty"`
Burst int `yaml:"burst,omitempty"`
BinpackAlgo string `yaml:"binpack,omitempty"`
InstanceGroupLabel string `yaml:"instance-group-label,omitempty"`
AsyncClientConfig AsyncClientConfig `yaml:"async-client-config,omitempty"`
config.Install `yaml:",inline"`
config.Runtime `yaml:",inline"`
Kubeconfig string `yaml:"kube-config,omitempty"`
FIFO bool `yaml:"fifo,omitempty"`
FifoConfig FifoConfig `yaml:"fifo-config,omitempty"`
QPS float32 `yaml:"qps,omitempty"`
Burst int `yaml:"burst,omitempty"`
BinpackAlgo string `yaml:"binpack,omitempty"`
InstanceGroupLabel string `yaml:"instance-group-label,omitempty"`
AsyncClientConfig AsyncClientConfig `yaml:"async-client-config,omitempty"`
UnschedulablePodTimeoutDuration time.Duration `yaml:"unschedulable-pod-timeout-duration,omitempty"`

// Deprecated: assumed true, value not used
UseExperimentalHostPriorities bool `yaml:"use-experimental-host-priorities,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions docker/var/conf/install.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ server:
management-port: 8484
context-path: /spark-scheduler
fifo: true
unschedulable-pod-timeout-duration: 10m
logging:
level: info
output: STDOUT
Expand Down
2 changes: 1 addition & 1 deletion hack/dev/generate-certs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mkdir -p $TMP_DIR
openssl genrsa -out "${TMP_DIR}/rootCA.key" 2048

# Generate the root ca cert from the key
openssl req -batch -x509 -sha256 -new -nodes -key ${TMP_DIR}/rootCA.key -days 3650 -out ${TMP_DIR}/rootCA.crt
openssl req -batch -x509 -sha256 -new -nodes -key ${TMP_DIR}/rootCA.key -days 3650 -out ${TMP_DIR}/rootCA.crt -config ${SCRIPT_DIR}/cert.conf

# Generate the signing request for the witchcraft ssl cert as well as the private key
openssl req -new -nodes -newkey rsa:2048 -keyout ${TMP_DIR}/spark-scheduler.key -out ${TMP_DIR}/spark-scheduler.csr -config ${SCRIPT_DIR}/cert.conf -extensions 'v3_req'
Expand Down
3 changes: 2 additions & 1 deletion internal/extender/extendertest/extender_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func NewTestExtender(objects ...runtime.Object) (*Harness, error) {
podLister,
fakeKubeClient.CoreV1(),
overheadComputer,
binpacker)
binpacker,
installConfig.UnschedulablePodTimeoutDuration)

return &Harness{
Extender: sparkSchedulerExtender,
Expand Down
20 changes: 14 additions & 6 deletions internal/extender/unschedulablepods.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ import (
)

const (
podExceedsClusterCapacity v1.PodConditionType = "PodExceedsClusterCapacity"
unschedulablePollingInterval time.Duration = time.Minute
unschedulableInClusterThreshold time.Duration = 10 * time.Minute
podExceedsClusterCapacity v1.PodConditionType = "PodExceedsClusterCapacity"
unschedulablePollingInterval time.Duration = time.Minute
)

// UnschedulablePodMarker checks for spark scheduler managed pending driver pods
Expand All @@ -47,6 +46,7 @@ type UnschedulablePodMarker struct {
coreClient corev1.CoreV1Interface
overheadComputer *OverheadComputer
binpacker *Binpacker
timeoutDuration time.Duration
}

// NewUnschedulablePodMarker creates a new UnschedulablePodMarker
Expand All @@ -55,13 +55,20 @@ func NewUnschedulablePodMarker(
podLister corelisters.PodLister,
coreClient corev1.CoreV1Interface,
overheadComputer *OverheadComputer,
binpacker *Binpacker) *UnschedulablePodMarker {
binpacker *Binpacker,
timeoutDuration time.Duration) *UnschedulablePodMarker {

if timeoutDuration <= 0 {
timeoutDuration = 10 * time.Minute
}

return &UnschedulablePodMarker{
nodeLister: nodeLister,
podLister: podLister,
coreClient: coreClient,
overheadComputer: overheadComputer,
binpacker: binpacker,
timeoutDuration: timeoutDuration,
}
}

Expand Down Expand Up @@ -94,12 +101,13 @@ func (u *UnschedulablePodMarker) scanForUnschedulablePods(ctx context.Context) {
len(pod.Spec.NodeName) == 0 &&
pod.DeletionTimestamp == nil &&
pod.Labels[common.SparkRoleLabel] == common.Driver &&
pod.CreationTimestamp.Time.Add(unschedulableInClusterThreshold).Before(now) {
pod.CreationTimestamp.Time.Add(u.timeoutDuration).Before(now) {

ctx = svc1log.WithLoggerParams(
ctx,
svc1log.SafeParam("podName", pod.Name),
svc1log.SafeParam("podNamespace", pod.Namespace))
svc1log.SafeParam("podNamespace", pod.Namespace),
svc1log.SafeParam("timeUntilTimeout", pod.CreationTimestamp.Time.Add(u.timeoutDuration).Sub(now)))

exceedsCapacity, err := u.DoesPodExceedClusterCapacity(ctx, pod)
if err != nil {
Expand Down

0 comments on commit 3e1134c

Please sign in to comment.