Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use shared caches in the deploymentconfig controller #9002

Merged
merged 3 commits into from
Jun 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions pkg/client/cache/deploymentconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package cache

import (
"fmt"

"github.com/golang/glog"

kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"

deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)

// StoreToDeploymentConfigLister gives a store List and Exists methods. The store must contain only deploymentconfigs.
type StoreToDeploymentConfigLister struct {
cache.Indexer
}

// Exists checks if the given deploymentconfig exists in the store.
func (s *StoreToDeploymentConfigLister) Exists(dc *deployapi.DeploymentConfig) (bool, error) {
_, exists, err := s.Indexer.Get(dc)
return exists, err
}

// List all deploymentconfigs in the store.
func (s *StoreToDeploymentConfigLister) List() ([]*deployapi.DeploymentConfig, error) {
configs := []*deployapi.DeploymentConfig{}
for _, c := range s.Indexer.List() {
configs = append(configs, c.(*deployapi.DeploymentConfig))
}
return configs, nil
}

// GetConfigForController returns the managing deployment config for the provided replication controller.
func (s *StoreToDeploymentConfigLister) GetConfigForController(rc *kapi.ReplicationController) (*deployapi.DeploymentConfig, error) {
dcName := deployutil.DeploymentConfigNameFor(rc)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smarterclayton I am using deployutil here. Should I not pull it here and just replicate what it does?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok to use deployutil from client code I guess - I was initially hesitant. But let's make sure it's just deployutil. And deployutil is a terrible name.

obj, exists, err := s.Indexer.GetByKey(rc.Namespace + "/" + dcName)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("deployment config %q not found", dcName)
}
return obj.(*deployapi.DeploymentConfig), nil
}

func (s *StoreToDeploymentConfigLister) DeploymentConfigs(namespace string) storeDeploymentConfigsNamespacer {
return storeDeploymentConfigsNamespacer{s.Indexer, namespace}
}

type storeDeploymentConfigsNamespacer struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lack of a Get is weird. Also, since this is internal, is there any way I can actually use this?

I'm assuming its not critical for this PR, but I think exposing a basic List/Get from here is pretty reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added Get

indexer cache.Indexer
namespace string
}

// Get the deployment config matching the name from the cache.
func (s storeDeploymentConfigsNamespacer) Get(name string) (*deployapi.DeploymentConfig, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("deployment config %q not found", name)
}
return obj.(*deployapi.DeploymentConfig), nil
}

// List all the deploymentconfigs that match the provided selector using a namespace index.
// If the indexed list fails then we will fallback to listing from all namespaces and filter
// by the namespace we want.
func (s storeDeploymentConfigsNamespacer) List(selector labels.Selector) ([]*deployapi.DeploymentConfig, error) {
configs := []*deployapi.DeploymentConfig{}

if s.namespace == kapi.NamespaceAll {
for _, obj := range s.indexer.List() {
dc := obj.(*deployapi.DeploymentConfig)
if selector.Matches(labels.Set(dc.Labels)) {
configs = append(configs, dc)
}
}
return configs, nil
}

key := &deployapi.DeploymentConfig{ObjectMeta: kapi.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(cache.NamespaceIndex, key)
if err != nil {
// Ignore error; do slow search without index.
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, obj := range s.indexer.List() {
dc := obj.(*deployapi.DeploymentConfig)
if s.namespace == dc.Namespace && selector.Matches(labels.Set(dc.Labels)) {
configs = append(configs, dc)
}
}
return configs, nil
}
for _, obj := range items {
dc := obj.(*deployapi.DeploymentConfig)
if selector.Matches(labels.Set(dc.Labels)) {
configs = append(configs, dc)
}
}
return configs, nil
}
6 changes: 3 additions & 3 deletions pkg/cmd/server/origin/master_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,9 +576,9 @@ func (c *MasterConfig) DeploymentControllerClients() (*osclient.Client, *kclient
return osClient, kClient
}

// DeployerPodControllerClients returns the deployer pod controller client objects
func (c *MasterConfig) DeployerPodControllerClients() (*osclient.Client, *kclient.Client) {
return c.PrivilegedLoopbackOpenShiftClient, c.PrivilegedLoopbackKubernetesClient
// DeployerPodControllerClients returns the deployer pod controller client object
func (c *MasterConfig) DeployerPodControllerClient() *kclient.Client {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're touching it, how about either converting to an SA or opening an issue where you promise to convert it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we add a pod cache in the deployment controller, we can stop using the deployer pod controller. There is #9296

return c.PrivilegedLoopbackKubernetesClient
}

// DeploymentConfigClients returns deploymentConfig and deployment client objects
Expand Down
18 changes: 9 additions & 9 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,8 @@ func (c *MasterConfig) RunDeploymentController() {

// RunDeployerPodController starts the deployer pod controller process.
func (c *MasterConfig) RunDeployerPodController() {
osclient, kclient := c.DeployerPodControllerClients()
kclient := c.DeployerPodControllerClient()
factory := deployerpodcontroller.DeployerPodControllerFactory{
Client: osclient,
KubeClient: kclient,
Codec: c.EtcdHelper.Codec(),
}
Expand All @@ -343,14 +342,15 @@ func (c *MasterConfig) RunDeployerPodController() {

// RunDeploymentConfigController starts the deployment config controller process.
func (c *MasterConfig) RunDeploymentConfigController() {
dcInfomer := c.Informers.DeploymentConfigs().Informer()
rcInformer := c.Informers.ReplicationControllers().Informer()
osclient, kclient := c.DeploymentConfigControllerClients()
factory := deployconfigcontroller.DeploymentConfigControllerFactory{
Client: osclient,
KubeClient: kclient,
Codec: c.EtcdHelper.Codec(),
}
controller := factory.Create()
controller.Run()

controller := deployconfigcontroller.NewDeploymentConfigController(dcInfomer, rcInformer, osclient, kclient, c.EtcdHelper.Codec())
// TODO: Make the stop channel actually work.
stopCh := make(chan struct{})
// TODO: Make the number of workers configurable.
go controller.Run(5, stopCh)
}

// RunDeploymentTriggerController starts the deployment trigger controller process.
Expand Down
110 changes: 110 additions & 0 deletions pkg/controller/shared_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ import (
"k8s.io/kubernetes/pkg/watch"

oclient "github.com/openshift/origin/pkg/client"
oscache "github.com/openshift/origin/pkg/client/cache"
deployapi "github.com/openshift/origin/pkg/deploy/api"
)

type InformerFactory interface {
Start(stopCh <-chan struct{})

Pods() PodInformer
ReplicationControllers() ReplicationControllerInformer
DeploymentConfigs() DeploymentConfigInformer
}

type PodInformer interface {
Expand All @@ -27,6 +31,18 @@ type PodInformer interface {
Lister() *cache.StoreToPodLister
}

type ReplicationControllerInformer interface {
Informer() framework.SharedIndexInformer
Indexer() cache.Indexer
Lister() *cache.StoreToReplicationControllerLister
}

type DeploymentConfigInformer interface {
Informer() framework.SharedIndexInformer
Indexer() cache.Indexer
Lister() *oscache.StoreToDeploymentConfigLister
}

func NewInformerFactory(kubeClient kclient.Interface, originClient oclient.Interface, defaultResync time.Duration) InformerFactory {
return &sharedInformerFactory{
kubeClient: kubeClient,
Expand Down Expand Up @@ -57,6 +73,14 @@ func (f *sharedInformerFactory) Pods() PodInformer {
return &podInformer{sharedInformerFactory: f}
}

func (f *sharedInformerFactory) ReplicationControllers() ReplicationControllerInformer {
return &replicationControllerInformer{sharedInformerFactory: f}
}

func (f *sharedInformerFactory) DeploymentConfigs() DeploymentConfigInformer {
return &deploymentConfigInformer{sharedInformerFactory: f}
}

type podInformer struct {
*sharedInformerFactory
}
Expand Down Expand Up @@ -99,3 +123,89 @@ func (f *podInformer) Lister() *cache.StoreToPodLister {
informer := f.Informer()
return &cache.StoreToPodLister{Indexer: informer.GetIndexer()}
}

type replicationControllerInformer struct {
*sharedInformerFactory
}

func (f *replicationControllerInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerObj := &kapi.ReplicationController{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
return f.kubeClient.ReplicationControllers(kapi.NamespaceAll).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
return f.kubeClient.ReplicationControllers(kapi.NamespaceAll).Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer

return informer
}

func (f *replicationControllerInformer) Indexer() cache.Indexer {
informer := f.Informer()
return informer.GetIndexer()
}

func (f *replicationControllerInformer) Lister() *cache.StoreToReplicationControllerLister {
informer := f.Informer()
return &cache.StoreToReplicationControllerLister{Indexer: informer.GetIndexer()}
}

type deploymentConfigInformer struct {
*sharedInformerFactory
}

func (f *deploymentConfigInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerObj := &deployapi.DeploymentConfig{}
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
return f.originClient.DeploymentConfigs(kapi.NamespaceAll).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
return f.originClient.DeploymentConfigs(kapi.NamespaceAll).Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer

return informer
}

func (f *deploymentConfigInformer) Indexer() cache.Indexer {
informer := f.Informer()
return informer.GetIndexer()
}

func (f *deploymentConfigInformer) Lister() *oscache.StoreToDeploymentConfigLister {
informer := f.Informer()
return &oscache.StoreToDeploymentConfigLister{Indexer: informer.GetIndexer()}
}
24 changes: 0 additions & 24 deletions pkg/deploy/controller/deployerpod/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package deployerpod

import (
"fmt"
"strconv"

"github.com/golang/glog"

Expand All @@ -12,7 +11,6 @@ import (
"k8s.io/kubernetes/pkg/client/record"
kclient "k8s.io/kubernetes/pkg/client/unversioned"

osclient "github.com/openshift/origin/pkg/client"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)
Expand All @@ -23,7 +21,6 @@ import (
// Use the DeployerPodControllerFactory to create this controller.
type DeployerPodController struct {
store cache.Store
client osclient.Interface
kClient kclient.Interface

// decodeConfig knows how to decode the deploymentConfig from a deployment's annotations.
Expand Down Expand Up @@ -145,27 +142,6 @@ func (c *DeployerPodController) Handle(pod *kapi.Pod) error {
return fmt.Errorf("couldn't update Deployment %s to status %s: %v", deployutil.LabelForDeployment(deployment), nextStatus, err)
}
glog.V(4).Infof("Updated deployment %s status from %s to %s (scale: %d)", deployutil.LabelForDeployment(deployment), currentStatus, nextStatus, deployment.Spec.Replicas)

// If the deployment was canceled, trigger a reconciliation of its deployment config
// so that the latest complete deployment can immediately rollback in place of the
// canceled deployment.
if nextStatus == deployapi.DeploymentStatusFailed && deployutil.IsDeploymentCancelled(deployment) {
// If we are unable to get the deployment config, then the deploymentconfig controller will
// perform its duties once the resync interval forces the deploymentconfig to be reconciled.
name := deployutil.DeploymentConfigNameFor(deployment)
kclient.RetryOnConflict(kclient.DefaultRetry, func() error {
config, err := c.client.DeploymentConfigs(deployment.Namespace).Get(name)
if err != nil {
return err
}
if config.Annotations == nil {
config.Annotations = make(map[string]string)
}
config.Annotations[deployapi.DeploymentCancelledAnnotation] = strconv.FormatInt(config.Status.LatestVersion, 10)
_, err = c.client.DeploymentConfigs(config.Namespace).Update(config)
return err
})
}
}

return nil
Expand Down
Loading