Skip to content

Commit

Permalink
Merge pull request #9002 from kargakis/refactor-dc-controller-to-use-…
Browse files Browse the repository at this point in the history
…caches

Merged by openshift-bot
  • Loading branch information
OpenShift Bot authored Jun 22, 2016
2 parents eee22ff + cdbabcb commit e5ff67e
Show file tree
Hide file tree
Showing 11 changed files with 563 additions and 244 deletions.
106 changes: 106 additions & 0 deletions pkg/client/cache/deploymentconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package cache

import (
"fmt"

"github.com/golang/glog"

kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"

deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)

// StoreToDeploymentConfigLister gives a store List and Exists methods. The store must contain only deploymentconfigs.
type StoreToDeploymentConfigLister struct {
cache.Indexer
}

// Exists checks if the given deploymentconfig exists in the store.
func (s *StoreToDeploymentConfigLister) Exists(dc *deployapi.DeploymentConfig) (bool, error) {
_, exists, err := s.Indexer.Get(dc)
return exists, err
}

// List all deploymentconfigs in the store.
func (s *StoreToDeploymentConfigLister) List() ([]*deployapi.DeploymentConfig, error) {
configs := []*deployapi.DeploymentConfig{}
for _, c := range s.Indexer.List() {
configs = append(configs, c.(*deployapi.DeploymentConfig))
}
return configs, nil
}

// GetConfigForController returns the managing deployment config for the provided replication controller.
func (s *StoreToDeploymentConfigLister) GetConfigForController(rc *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) {
dcName := deployutil.DeploymentConfigNameFor(rc)
obj, exists, err := s.Indexer.GetByKey(rc.Namespace + "/" + dcName)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("deployment config %q not found", dcName)
}
return obj.(*deployapi.DeploymentConfig), nil
}

func (s *StoreToDeploymentConfigLister) DeploymentConfigs(namespace string) storeDeploymentConfigsNamespacer {
return storeDeploymentConfigsNamespacer{s.Indexer, namespace}
}

type storeDeploymentConfigsNamespacer struct {
indexer cache.Indexer
namespace string
}

// Get the deployment config matching the name from the cache.
func (s storeDeploymentConfigsNamespacer) Get(name string) (*deployapi.DeploymentConfig, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("deployment config %q not found", name)
}
return obj.(*deployapi.DeploymentConfig), nil
}

// List all the deploymentconfigs that match the provided selector using a namespace index.
// If the indexed list fails then we will fallback to listing from all namespaces and filter
// by the namespace we want.
func (s storeDeploymentConfigsNamespacer) List(selector labels.Selector) ([]*deployapi.DeploymentConfig, error) {
configs := []*deployapi.DeploymentConfig{}

if s.namespace == kapi.NamespaceAll {
for _, obj := range s.indexer.List() {
dc := obj.(*deployapi.DeploymentConfig)
if selector.Matches(labels.Set(dc.Labels)) {
configs = append(configs, dc)
}
}
return configs, nil
}

key := &deployapi.DeploymentConfig{ObjectMeta: kapi.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(cache.NamespaceIndex, key)
if err != nil {
// Ignore error; do slow search without index.
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, obj := range s.indexer.List() {
dc := obj.(*deployapi.DeploymentConfig)
if s.namespace == dc.Namespace && selector.Matches(labels.Set(dc.Labels)) {
configs = append(configs, dc)
}
}
return configs, nil
}
for _, obj := range items {
dc := obj.(*deployapi.DeploymentConfig)
if selector.Matches(labels.Set(dc.Labels)) {
configs = append(configs, dc)
}
}
return configs, nil
}
6 changes: 3 additions & 3 deletions pkg/cmd/server/origin/master_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,9 @@ func (c *MasterConfig) DeploymentControllerClients() (*osclient.Client, *kclient
return osClient, kClient
}

// DeployerPodControllerClients returns the deployer pod controller client objects
func (c *MasterConfig) DeployerPodControllerClients() (*osclient.Client, *kclient.Client) {
return c.PrivilegedLoopbackOpenShiftClient, c.PrivilegedLoopbackKubernetesClient
// DeployerPodControllerClients returns the deployer pod controller client object
func (c *MasterConfig) DeployerPodControllerClient() *kclient.Client {
return c.PrivilegedLoopbackKubernetesClient
}

// DeploymentConfigClients returns deploymentConfig and deployment client objects
Expand Down
18 changes: 9 additions & 9 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,8 @@ func (c *MasterConfig) RunDeploymentController() {

// RunDeployerPodController starts the deployer pod controller process.
func (c *MasterConfig) RunDeployerPodController() {
osclient, kclient := c.DeployerPodControllerClients()
kclient := c.DeployerPodControllerClient()
factory := deployerpodcontroller.DeployerPodControllerFactory{
Client: osclient,
KubeClient: kclient,
Codec: c.EtcdHelper.Codec(),
}
Expand All @@ -343,14 +342,15 @@ func (c *MasterConfig) RunDeployerPodController() {

// RunDeploymentConfigController starts the deployment config controller process.
func (c *MasterConfig) RunDeploymentConfigController() {
dcInfomer := c.Informers.DeploymentConfigs().Informer()
rcInformer := c.Informers.ReplicationControllers().Informer()
osclient, kclient := c.DeploymentConfigControllerClients()
factory := deployconfigcontroller.DeploymentConfigControllerFactory{
Client: osclient,
KubeClient: kclient,
Codec: c.EtcdHelper.Codec(),
}
controller := factory.Create()
controller.Run()

controller := deployconfigcontroller.NewDeploymentConfigController(dcInfomer, rcInformer, osclient, kclient, c.EtcdHelper.Codec())
// TODO: Make the stop channel actually work.
stopCh := make(chan struct{})
// TODO: Make the number of workers configurable.
go controller.Run(5, stopCh)
}

// RunDeploymentTriggerController starts the deployment trigger controller process.
Expand Down
110 changes: 110 additions & 0 deletions pkg/controller/shared_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ import (
"k8s.io/kubernetes/pkg/watch"

oclient "github.com/openshift/origin/pkg/client"
oscache "github.com/openshift/origin/pkg/client/cache"
deployapi "github.com/openshift/origin/pkg/deploy/api"
)

type InformerFactory interface {
Start(stopCh <-chan struct{})

Pods() PodInformer
ReplicationControllers() ReplicationControllerInformer
DeploymentConfigs() DeploymentConfigInformer
}

type PodInformer interface {
Expand All @@ -27,6 +31,18 @@ type PodInformer interface {
Lister() *cache.StoreToPodLister
}

type ReplicationControllerInformer interface {
Informer() framework.SharedIndexInformer
Indexer() cache.Indexer
Lister() *cache.StoreToReplicationControllerLister
}

type DeploymentConfigInformer interface {
Informer() framework.SharedIndexInformer
Indexer() cache.Indexer
Lister() *oscache.StoreToDeploymentConfigLister
}

func NewInformerFactory(kubeClient kclient.Interface, originClient oclient.Interface, defaultResync time.Duration) InformerFactory {
return &sharedInformerFactory{
kubeClient: kubeClient,
Expand Down Expand Up @@ -57,6 +73,14 @@ func (f *sharedInformerFactory) Pods() PodInformer {
return &podInformer{sharedInformerFactory: f}
}

func (f *sharedInformerFactory) ReplicationControllers() ReplicationControllerInformer {
return &replicationControllerInformer{sharedInformerFactory: f}
}

func (f *sharedInformerFactory) DeploymentConfigs() DeploymentConfigInformer {
return &deploymentConfigInformer{sharedInformerFactory: f}
}

type podInformer struct {
*sharedInformerFactory
}
Expand Down Expand Up @@ -99,3 +123,89 @@ func (f *podInformer) Lister() *cache.StoreToPodLister {
informer := f.Informer()
return &cache.StoreToPodLister{Indexer: informer.GetIndexer()}
}

type replicationControllerInformer struct {
*sharedInformerFactory
}

func (f *replicationControllerInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerObj := &kapi.ReplicationController{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
return f.kubeClient.ReplicationControllers(kapi.NamespaceAll).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
return f.kubeClient.ReplicationControllers(kapi.NamespaceAll).Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer

return informer
}

func (f *replicationControllerInformer) Indexer() cache.Indexer {
informer := f.Informer()
return informer.GetIndexer()
}

func (f *replicationControllerInformer) Lister() *cache.StoreToReplicationControllerLister {
informer := f.Informer()
return &cache.StoreToReplicationControllerLister{Indexer: informer.GetIndexer()}
}

type deploymentConfigInformer struct {
*sharedInformerFactory
}

func (f *deploymentConfigInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerObj := &deployapi.DeploymentConfig{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
return f.originClient.DeploymentConfigs(kapi.NamespaceAll).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
return f.originClient.DeploymentConfigs(kapi.NamespaceAll).Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer

return informer
}

func (f *deploymentConfigInformer) Indexer() cache.Indexer {
informer := f.Informer()
return informer.GetIndexer()
}

func (f *deploymentConfigInformer) Lister() *oscache.StoreToDeploymentConfigLister {
informer := f.Informer()
return &oscache.StoreToDeploymentConfigLister{Indexer: informer.GetIndexer()}
}
24 changes: 0 additions & 24 deletions pkg/deploy/controller/deployerpod/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package deployerpod

import (
"fmt"
"strconv"

"github.com/golang/glog"

Expand All @@ -12,7 +11,6 @@ import (
"k8s.io/kubernetes/pkg/client/record"
kclient "k8s.io/kubernetes/pkg/client/unversioned"

osclient "github.com/openshift/origin/pkg/client"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)
Expand All @@ -23,7 +21,6 @@ import (
// Use the DeployerPodControllerFactory to create this controller.
type DeployerPodController struct {
store cache.Store
client osclient.Interface
kClient kclient.Interface

// decodeConfig knows how to decode the deploymentConfig from a deployment's annotations.
Expand Down Expand Up @@ -145,27 +142,6 @@ func (c *DeployerPodController) Handle(pod *kapi.Pod) error {
return fmt.Errorf("couldn't update Deployment %s to status %s: %v", deployutil.LabelForDeployment(deployment), nextStatus, err)
}
glog.V(4).Infof("Updated deployment %s status from %s to %s (scale: %d)", deployutil.LabelForDeployment(deployment), currentStatus, nextStatus, deployment.Spec.Replicas)

// If the deployment was canceled, trigger a reconciliation of its deployment config
// so that the latest complete deployment can immediately rollback in place of the
// canceled deployment.
if nextStatus == deployapi.DeploymentStatusFailed && deployutil.IsDeploymentCancelled(deployment) {
// If we are unable to get the deployment config, then the deploymentconfig controller will
// perform its duties once the resync interval forces the deploymentconfig to be reconciled.
name := deployutil.DeploymentConfigNameFor(deployment)
kclient.RetryOnConflict(kclient.DefaultRetry, func() error {
config, err := c.client.DeploymentConfigs(deployment.Namespace).Get(name)
if err != nil {
return err
}
if config.Annotations == nil {
config.Annotations = make(map[string]string)
}
config.Annotations[deployapi.DeploymentCancelledAnnotation] = strconv.FormatInt(config.Status.LatestVersion, 10)
_, err = c.client.DeploymentConfigs(config.Namespace).Update(config)
return err
})
}
}

return nil
Expand Down
Loading

0 comments on commit e5ff67e

Please sign in to comment.