Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
improvement: Move the demand processing behind a single interface tha…
Browse files Browse the repository at this point in the history
…t can be shared (#255)
  • Loading branch information
k-simons authored Apr 12, 2023
1 parent 4c4b95b commit 7340b7e
Show file tree
Hide file tree
Showing 16 changed files with 280 additions and 167 deletions.
16 changes: 10 additions & 6 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"github.com/palantir/k8s-spark-scheduler-lib/pkg/apis/sparkscheduler/v1beta2"
ssinformers "github.com/palantir/k8s-spark-scheduler-lib/pkg/client/informers/externalversions"
"github.com/palantir/k8s-spark-scheduler/config"
"github.com/palantir/k8s-spark-scheduler/internal/binpacker"
"github.com/palantir/k8s-spark-scheduler/internal/cache"
"github.com/palantir/k8s-spark-scheduler/internal/conversionwebhook"
"github.com/palantir/k8s-spark-scheduler/internal/crd"
"github.com/palantir/k8s-spark-scheduler/internal/demands"
"github.com/palantir/k8s-spark-scheduler/internal/extender"
"github.com/palantir/k8s-spark-scheduler/internal/metrics"
"github.com/palantir/k8s-spark-scheduler/internal/sort"
Expand Down Expand Up @@ -140,14 +142,18 @@ func InitServerWithClients(ctx context.Context, info witchcraft.InitInfo, allCli
sparkSchedulerInformerFactory,
apiExtensionsClient,
)

binpacker := binpacker.SelectBinpacker(install.BinpackAlgo)
demandCache := cache.NewSafeDemandCache(
lazyDemandInformer,
sparkSchedulerClient.ScalerV1alpha2(),
install.AsyncClientConfig,
)

extender.StartDemandGC(ctx, podInformerInterface, demandCache)
demandManager := demands.NewDefaultManager(
kubeClient.CoreV1(),
demandCache,
binpacker,
instanceGroupLabel)
extender.StartDemandGC(ctx, podInformerInterface, demandManager)

softReservationStore := cache.NewSoftReservationStore(ctx, podInformerInterface)

Expand All @@ -161,8 +167,6 @@ func InitServerWithClients(ctx context.Context, info witchcraft.InitInfo, allCli
nodeLister,
)

binpacker := extender.SelectBinpacker(install.BinpackAlgo)

wasteMetricsReporter := metrics.NewWasteMetricsReporter(ctx, instanceGroupLabel)

sparkSchedulerExtender := extender.NewExtender(
Expand All @@ -172,7 +176,7 @@ func InitServerWithClients(ctx context.Context, info witchcraft.InitInfo, allCli
softReservationStore,
resourceReservationManager,
kubeClient.CoreV1(),
demandCache,
demandManager,
apiExtensionsClient,
install.FIFO,
install.FifoConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package extender
package binpacker

import (
"github.com/palantir/k8s-spark-scheduler-lib/pkg/binpack"
Expand Down
108 changes: 70 additions & 38 deletions internal/extender/demand.go → internal/demands/demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package extender
package demands

import (
"context"
Expand All @@ -21,14 +21,17 @@ import (
demandapi "github.com/palantir/k8s-spark-scheduler-lib/pkg/apis/scaler/v1alpha2"
"github.com/palantir/k8s-spark-scheduler-lib/pkg/resources"
"github.com/palantir/k8s-spark-scheduler/internal"
"github.com/palantir/k8s-spark-scheduler/internal/binpacker"
"github.com/palantir/k8s-spark-scheduler/internal/cache"
"github.com/palantir/k8s-spark-scheduler/internal/common"
"github.com/palantir/k8s-spark-scheduler/internal/common/utils"
"github.com/palantir/k8s-spark-scheduler/internal/events"
"github.com/palantir/k8s-spark-scheduler/internal/types"
werror "github.com/palantir/witchcraft-go-error"
"github.com/palantir/witchcraft-go-logging/wlog/svclog/svc1log"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)

Expand All @@ -43,24 +46,53 @@ var (
}
)

// Manager holds the types of demand operations that are available
type Manager interface {
DeleteDemandIfExists(ctx context.Context, pod *v1.Pod, source string)
CreateDemandForApplicationInAnyZone(ctx context.Context, driverPod *v1.Pod, applicationResources *types.SparkApplicationResources)
CreateDemandForExecutorInAnyZone(ctx context.Context, executorPod *v1.Pod, executorResources *resources.Resources)
CreateDemandForExecutorInSpecificZone(ctx context.Context, executorPod *v1.Pod, executorResources *resources.Resources, zone *demandapi.Zone)
}

type defaultManager struct {
coreClient corev1.CoreV1Interface
demands *cache.SafeDemandCache
binpacker *binpacker.Binpacker
instanceGroupLabel string
}

// NewDefaultManager creates the default implementation of the Manager
func NewDefaultManager(
coreClient corev1.CoreV1Interface,
demands *cache.SafeDemandCache,
binpacker *binpacker.Binpacker,
instanceGroupLabel string) Manager {
return &defaultManager{
coreClient: coreClient,
demands: demands,
binpacker: binpacker,
instanceGroupLabel: instanceGroupLabel,
}
}

// TODO: should patch instead of put to avoid conflicts
func (s *SparkSchedulerExtender) updatePodStatus(ctx context.Context, pod *v1.Pod, _ *v1.PodCondition) {
func (d *defaultManager) updatePodStatus(ctx context.Context, pod *v1.Pod, _ *v1.PodCondition) {
if !podutil.UpdatePodCondition(&pod.Status, demandCreatedCondition) {
svc1log.FromContext(ctx).Info("pod condition for demand creation already exist")
return
}
_, err := s.coreClient.Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
_, err := d.coreClient.Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
if err != nil {
svc1log.FromContext(ctx).Warn("pod condition update failed", svc1log.SafeParam("reason", err.Error()))
}
}

func (s *SparkSchedulerExtender) createDemandForExecutorInAnyZone(ctx context.Context, executorPod *v1.Pod, executorResources *resources.Resources) {
s.createDemandForExecutorInSpecificZone(ctx, executorPod, executorResources, nil)
func (d *defaultManager) CreateDemandForExecutorInAnyZone(ctx context.Context, executorPod *v1.Pod, executorResources *resources.Resources) {
d.CreateDemandForExecutorInSpecificZone(ctx, executorPod, executorResources, nil)
}

func (s *SparkSchedulerExtender) createDemandForExecutorInSpecificZone(ctx context.Context, executorPod *v1.Pod, executorResources *resources.Resources, zone *demandapi.Zone) {
if !s.demands.CRDExists() {
func (d *defaultManager) CreateDemandForExecutorInSpecificZone(ctx context.Context, executorPod *v1.Pod, executorResources *resources.Resources, zone *demandapi.Zone) {
if !d.demands.CRDExists() {
return
}
units := []demandapi.DemandUnit{
Expand All @@ -76,46 +108,46 @@ func (s *SparkSchedulerExtender) createDemandForExecutorInSpecificZone(ctx conte
},
},
}
s.createDemand(ctx, executorPod, units, zone)
d.createDemand(ctx, executorPod, units, zone)
}

func (s *SparkSchedulerExtender) createDemandForApplicationInAnyZone(ctx context.Context, driverPod *v1.Pod, applicationResources *sparkApplicationResources) {
if !s.demands.CRDExists() {
func (d *defaultManager) CreateDemandForApplicationInAnyZone(ctx context.Context, driverPod *v1.Pod, applicationResources *types.SparkApplicationResources) {
if !d.demands.CRDExists() {
return
}
s.createDemand(ctx, driverPod, demandResourcesForApplication(driverPod, applicationResources), nil)
d.createDemand(ctx, driverPod, demandResourcesForApplication(driverPod, applicationResources), nil)
}

func (s *SparkSchedulerExtender) createDemand(ctx context.Context, pod *v1.Pod, demandUnits []demandapi.DemandUnit, zone *demandapi.Zone) {
instanceGroup, ok := internal.FindInstanceGroupFromPodSpec(pod.Spec, s.instanceGroupLabel)
func (d *defaultManager) createDemand(ctx context.Context, pod *v1.Pod, demandUnits []demandapi.DemandUnit, zone *demandapi.Zone) {
instanceGroup, ok := internal.FindInstanceGroupFromPodSpec(pod.Spec, d.instanceGroupLabel)
if !ok {
svc1log.FromContext(ctx).Error("No instanceGroup label exists. Cannot map to InstanceGroup. Skipping demand object",
svc1log.SafeParam("expectedLabel", s.instanceGroupLabel))
svc1log.SafeParam("expectedLabel", d.instanceGroupLabel))
return
}

newDemand, err := s.newDemand(pod, instanceGroup, demandUnits, zone)
newDemand, err := d.newDemand(pod, instanceGroup, demandUnits, zone)
if err != nil {
svc1log.FromContext(ctx).Error("failed to construct demand object", svc1log.Stacktrace(err))
return
}
err = s.doCreateDemand(ctx, newDemand)
err = d.doCreateDemand(ctx, newDemand)
if err != nil {
svc1log.FromContext(ctx).Error("failed to create demand", svc1log.Stacktrace(err))
return
}
go s.updatePodStatus(ctx, pod, demandCreatedCondition)
go d.updatePodStatus(ctx, pod, demandCreatedCondition)
}

func (s *SparkSchedulerExtender) doCreateDemand(ctx context.Context, newDemand *demandapi.Demand) error {
func (d *defaultManager) doCreateDemand(ctx context.Context, newDemand *demandapi.Demand) error {
demandObjectBytes, err := json.Marshal(newDemand)
if err != nil {
return werror.Wrap(err, "failed to marshal demand object")
}
svc1log.FromContext(ctx).Info("Creating demand object", svc1log.SafeParams(internal.DemandSafeParamsFromObj(newDemand)), svc1log.SafeParam("demandObjectBytes", string(demandObjectBytes)))
err = s.demands.Create(newDemand)
err = d.demands.Create(newDemand)
if err != nil {
_, ok := s.demands.Get(newDemand.Namespace, newDemand.Name)
_, ok := d.demands.Get(newDemand.Namespace, newDemand.Name)
if ok {
svc1log.FromContext(ctx).Info("demand object already exists for pod so no action will be taken")
return nil
Expand All @@ -125,25 +157,25 @@ func (s *SparkSchedulerExtender) doCreateDemand(ctx context.Context, newDemand *
return err
}

func (s *SparkSchedulerExtender) removeDemandIfExists(ctx context.Context, pod *v1.Pod) {
DeleteDemandIfExists(ctx, s.demands, pod, "SparkSchedulerExtender")
func (d *defaultManager) removeDemandIfExists(ctx context.Context, pod *v1.Pod) {
d.DeleteDemandIfExists(ctx, pod, "SparkSchedulerExtender")
}

// DeleteDemandIfExists removes a demand object if it exists, and emits an event tagged by the source of the deletion
func DeleteDemandIfExists(ctx context.Context, cache *cache.SafeDemandCache, pod *v1.Pod, source string) {
if !cache.CRDExists() {
func (d *defaultManager) DeleteDemandIfExists(ctx context.Context, pod *v1.Pod, source string) {
if !d.demands.CRDExists() {
return
}
demandName := utils.DemandName(pod)
if demand, ok := cache.Get(pod.Namespace, demandName); ok {
if demand, ok := d.demands.Get(pod.Namespace, demandName); ok {
// there is no harm in the demand being deleted elsewhere in between the two calls.
cache.Delete(pod.Namespace, demandName)
d.demands.Delete(pod.Namespace, demandName)
svc1log.FromContext(ctx).Info("Removed demand object for pod", svc1log.SafeParams(internal.DemandSafeParams(demandName, pod.Namespace)))
events.EmitDemandDeleted(ctx, demand, source)
}
}

func (s *SparkSchedulerExtender) newDemand(pod *v1.Pod, instanceGroup string, units []demandapi.DemandUnit, zone *demandapi.Zone) (*demandapi.Demand, error) {
func (d *defaultManager) newDemand(pod *v1.Pod, instanceGroup string, units []demandapi.DemandUnit, zone *demandapi.Zone) (*demandapi.Demand, error) {
appID, ok := pod.Labels[common.SparkAppIDLabel]
if !ok {
return nil, werror.Error("pod did not contain expected label for AppID", werror.SafeParam("expectedLabel", common.SparkAppIDLabel))
Expand All @@ -157,40 +189,40 @@ func (s *SparkSchedulerExtender) newDemand(pod *v1.Pod, instanceGroup string, un
common.SparkAppIDLabel: appID,
},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(pod, podGroupVersionKind),
*metav1.NewControllerRef(pod, types.PodGroupVersionKind),
},
},
Spec: demandapi.DemandSpec{
InstanceGroup: instanceGroup,
Units: units,
EnforceSingleZoneScheduling: s.binpacker.IsSingleAz,
EnforceSingleZoneScheduling: d.binpacker.IsSingleAz,
Zone: zone,
},
}, nil
}

func demandResourcesForApplication(driverPod *v1.Pod, applicationResources *sparkApplicationResources) []demandapi.DemandUnit {
func demandResourcesForApplication(driverPod *v1.Pod, applicationResources *types.SparkApplicationResources) []demandapi.DemandUnit {
demandUnits := []demandapi.DemandUnit{
{
Count: 1,
Resources: demandapi.ResourceList{
demandapi.ResourceCPU: applicationResources.driverResources.CPU,
demandapi.ResourceMemory: applicationResources.driverResources.Memory,
demandapi.ResourceNvidiaGPU: applicationResources.driverResources.NvidiaGPU,
demandapi.ResourceCPU: applicationResources.DriverResources.CPU,
demandapi.ResourceMemory: applicationResources.DriverResources.Memory,
demandapi.ResourceNvidiaGPU: applicationResources.DriverResources.NvidiaGPU,
},
// By specifying the pod driver pod here, we don't duplicate the resources of the pod with the created demand
PodNamesByNamespace: map[string][]string{
driverPod.Namespace: {driverPod.Name},
},
},
}
if applicationResources.minExecutorCount > 0 {
if applicationResources.MinExecutorCount > 0 {
demandUnits = append(demandUnits, demandapi.DemandUnit{
Count: applicationResources.minExecutorCount,
Count: applicationResources.MinExecutorCount,
Resources: demandapi.ResourceList{
demandapi.ResourceCPU: applicationResources.executorResources.CPU,
demandapi.ResourceMemory: applicationResources.executorResources.Memory,
demandapi.ResourceNvidiaGPU: applicationResources.executorResources.NvidiaGPU,
demandapi.ResourceCPU: applicationResources.ExecutorResources.CPU,
demandapi.ResourceMemory: applicationResources.ExecutorResources.Memory,
demandapi.ResourceNvidiaGPU: applicationResources.ExecutorResources.NvidiaGPU,
},
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,31 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package extender
package demands

import (
"reflect"
"testing"

demandapi "github.com/palantir/k8s-spark-scheduler-lib/pkg/apis/scaler/v1alpha2"
"github.com/palantir/k8s-spark-scheduler-lib/pkg/resources"
"github.com/palantir/k8s-spark-scheduler/internal/types"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var testResource = createResources(1, 2432*1024*1024, 1)
var testResource = &resources.Resources{
CPU: *resource.NewQuantity(1, resource.DecimalSI),
Memory: *resource.NewQuantity(2432*1024*1024, resource.BinarySI),
NvidiaGPU: *resource.NewQuantity(1, resource.DecimalSI),
}

var testResources = &sparkApplicationResources{
driverResources: testResource,
executorResources: testResource,
minExecutorCount: 0,
maxExecutorCount: 0,
var testResources = &types.SparkApplicationResources{
DriverResources: testResource,
ExecutorResources: testResource,
MinExecutorCount: 0,
MaxExecutorCount: 0,
}
var testPod = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -41,7 +48,7 @@ var testPod = &v1.Pod{
func Test_demandResourcesForApplication(t *testing.T) {
type args struct {
driverPod *v1.Pod
applicationResources *sparkApplicationResources
applicationResources *types.SparkApplicationResources
}
var tests = []struct {
name string
Expand All @@ -57,9 +64,9 @@ func Test_demandResourcesForApplication(t *testing.T) {
want: []demandapi.DemandUnit{
{
Resources: demandapi.ResourceList{
demandapi.ResourceCPU: testResources.driverResources.CPU,
demandapi.ResourceMemory: testResources.driverResources.Memory,
demandapi.ResourceNvidiaGPU: testResources.driverResources.NvidiaGPU,
demandapi.ResourceCPU: testResources.DriverResources.CPU,
demandapi.ResourceMemory: testResources.DriverResources.Memory,
demandapi.ResourceNvidiaGPU: testResources.DriverResources.NvidiaGPU,
},
Count: 1,
PodNamesByNamespace: map[string][]string{"test-namespace": {"test-name"}},
Expand Down
18 changes: 11 additions & 7 deletions internal/extender/demand_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package extender
import (
"context"

"github.com/palantir/k8s-spark-scheduler/internal/cache"
"github.com/palantir/k8s-spark-scheduler/internal/common/utils"
"github.com/palantir/k8s-spark-scheduler/internal/demands"
v1 "k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientcache "k8s.io/client-go/tools/cache"
Expand All @@ -27,23 +27,27 @@ import (
// DemandGC is a background pod event handler which deletes any demand we have previously created for a pod when a pod gets scheduled.
// We also delete demands elsewhere in the extender when we schedule the pod, but those can miss some demands due to race conditions.
type DemandGC struct {
demandCache *cache.SafeDemandCache
ctx context.Context
manager demands.Manager
ctx context.Context
}

// StartDemandGC initializes the DemandGC which handles events in the background
func StartDemandGC(ctx context.Context, podInformer coreinformers.PodInformer, demandCache *cache.SafeDemandCache) {
func StartDemandGC(
ctx context.Context,
podInformer coreinformers.PodInformer,
manager demands.Manager,
) {
dgc := &DemandGC{
demandCache: demandCache,
ctx: ctx,
manager: manager,
ctx: ctx,
}

podInformer.Informer().AddEventHandler(
clientcache.FilteringResourceEventHandler{
FilterFunc: utils.IsSparkSchedulerPod,
Handler: clientcache.ResourceEventHandlerFuncs{
UpdateFunc: utils.OnPodScheduled(ctx, func(pod *v1.Pod) {
DeleteDemandIfExists(dgc.ctx, dgc.demandCache, pod, "DemandGC")
manager.DeleteDemandIfExists(dgc.ctx, pod, "DemandGC")
}),
},
},
Expand Down
Loading

0 comments on commit 7340b7e

Please sign in to comment.