Skip to content

Commit

Permalink
wip: update operator status
Browse files Browse the repository at this point in the history
  • Loading branch information
katallaxie authored Dec 6, 2024
1 parent 70727a5 commit b149d67
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 77 deletions.
11 changes: 11 additions & 0 deletions api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ import (
corev1 "k8s.io/api/core/v1"
)

const (
ConditionTypeSynchronizing = "Sychronizing"
ConditionTypeSynchronized = "Synchronized"
ConditionTypeFailed = "Failed"
)

const (
ConditionReasonCreated = "Created"
ConditionReasonSynchronized = "Synchronized"
)

const (
FinalizerName = "natz.zeiss.com/finalizer"
)
Expand Down
23 changes: 18 additions & 5 deletions api/v1alpha1/nats_operator_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Phase is a type that represents the current phase of the operator.
//
// +enum
// +kubebuilder:validation:Enum={None,Pending,Creating,Synchronized,Failed}
type OperatorPhase string

const (
Expand All @@ -20,11 +24,20 @@ type NatsOperatorSpec struct {
}

type NatsOperatorStatus struct {
ControlPaused bool `json:"controlPaused,omitempty"`
JWT string `json:"jwt"`
Phase OperatorPhase `json:"phase"`
PublicKey string `json:"publicKey"`
SecretName string `json:"secretName"`
JWT string `json:"jwt"`
PublicKey string `json:"publicKey"`
SecretName string `json:"secretName"`

// Conditions is an array of conditions that the operator is currently in.
Conditions []metav1.Condition `json:"conditions,omitempty" optional:"true"`
// Phase is the current phase of the operator.
//
// +kubebuilder:validation:Enum={None,Pending,Creating,Synchronized,Failed}
Phase OperatorPhase `json:"phase"`
// ControlPaused is a flag that indicates if the operator is paused.
ControlPaused bool `json:"controlPaused,omitempty" optional:"true"`
// LastUpdate is the timestamp of the last update.
LastUpdate metav1.Time `json:"lastUpdate,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
11 changes: 10 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

157 changes: 92 additions & 65 deletions controllers/natsoperator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/nats-io/jwt/v2"
"github.com/nats-io/nkeys"
natsv1alpha1 "github.com/zeiss/natz-operator/api/v1alpha1"
"github.com/zeiss/pkg/cast"
"github.com/zeiss/natz-operator/pkg/status"
"github.com/zeiss/pkg/conv"
"github.com/zeiss/pkg/k8s/finalizers"
"github.com/zeiss/pkg/slices"
"github.com/zeiss/pkg/utilx"
corev1 "k8s.io/api/core/v1"
)
Expand All @@ -36,6 +36,7 @@ const (
EventReasonOperatorDeleteFailed EventReason = "OperatorDeleteFailed"
EventReasonOperatorSecretCreateSucceeded EventReason = "OperatorSecretCreateSucceeded"
EventReasonOperatorSecretCreateFailed EventReason = "OperatorSecretCreateFailed"
EventReasonOperatorSynchronized EventReason = "OperatorSynchronized"
)

// NatsOperatorReconciler ...
Expand Down Expand Up @@ -64,76 +65,53 @@ func NewNatsOperatorReconciler(mgr ctrl.Manager) *NatsOperatorReconciler {
func (r *NatsOperatorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)

log.Info("reconcile operator", "name", req.Name, "namespace", req.Namespace)

operator := &natsv1alpha1.NatsOperator{}
if err := r.Get(ctx, req.NamespacedName, operator); err != nil {
// Request object not found, could have been deleted after reconcile request.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !operator.ObjectMeta.DeletionTimestamp.IsZero() {
log.Info("processing deletion of operator")

if finalizers.HasFinalizer(operator, natsv1alpha1.FinalizerName) {
err := r.reconcileDelete(ctx, operator)
if err != nil {
return ctrl.Result{}, err
}
}

// Delete
return reconcile.Result{}, nil
}

// get latest version of the operator
if err := r.Get(ctx, req.NamespacedName, operator); err != nil {
log.Error(err, "operator not found", "operator", req.NamespacedName)

return reconcile.Result{}, err
}

err := r.reconcileResources(ctx, operator)
if err != nil {
r.Recorder.Event(operator, corev1.EventTypeWarning, cast.String(EventReasonOperatorCreateFailed), "operator resources reconciliation failed")
return reconcile.Result{}, err
return r.reconcileDelete(ctx, operator)
}

res, err := r.reconcileServerConfig(ctx, operator)
if err != nil {
log.Error(err, "failed to reconcile server config", "name", operator.Name, "namespace", operator.Namespace)
return res, err
}

return reconcile.Result{}, nil
return r.reconcileResources(ctx, operator)
}

func (r *NatsOperatorReconciler) reconcileResources(ctx context.Context, operator *natsv1alpha1.NatsOperator) error {
func (r *NatsOperatorReconciler) reconcileResources(ctx context.Context, operator *natsv1alpha1.NatsOperator) (ctrl.Result, error) {
log := log.FromContext(ctx)

err := r.reconcileStatus(ctx, operator)
if err != nil {
log.Error(err, "failed to reconcile status", "name", operator.Name, "namespace", operator.Namespace)
return err
return ctrl.Result{}, err
}

err = r.reconcileOperator(ctx, operator)
if err != nil {
log.Error(err, "failed to reconcile operator", "name", operator.Name, "namespace", operator.Namespace)
return err
if err = r.reconcileOperator(ctx, operator); err != nil {
return ctrl.Result{}, err
}

err = r.reconcileSecret(ctx, operator)
if err != nil {
log.Error(err, "failed to reconcile secret", "name", operator.Name, "namespace", operator.Namespace)
return err
return ctrl.Result{}, err
}

err = r.reconcileSystemAccount(ctx, operator)
if err != nil {
log.Error(err, "failed to reconcile system account", "name", operator.Name, "namespace", operator.Namespace)
return err
return ctrl.Result{}, err
}

return nil
res, err := r.reconcileServerConfig(ctx, operator)
if err != nil {
log.Error(err, "failed to reconcile server config", "name", operator.Name, "namespace", operator.Namespace)
return res, err
}

return r.ManageSuccess(ctx, operator)
}

func (r *NatsOperatorReconciler) reconcileSystemAccount(ctx context.Context, operator *natsv1alpha1.NatsOperator) error {
Expand Down Expand Up @@ -217,19 +195,16 @@ func (r *NatsOperatorReconciler) reconcileSystemAccount(ctx context.Context, ope
return nil
}

func (r *NatsOperatorReconciler) reconcileOperator(ctx context.Context, operator *natsv1alpha1.NatsOperator) error {
log.FromContext(ctx)

if controllerutil.AddFinalizer(operator, natsv1alpha1.FinalizerName) {
if err := r.Update(ctx, operator); err != nil && !errors.IsNotFound(err) {
return err
}
func (r *NatsOperatorReconciler) reconcileOperator(ctx context.Context, obj *natsv1alpha1.NatsOperator) error {
if !controllerutil.ContainsFinalizer(obj, natsv1alpha1.FinalizerName) {
controllerutil.AddFinalizer(obj, natsv1alpha1.FinalizerName)
return r.Update(ctx, obj)
}

return nil
}

func (r *NatsOperatorReconciler) reconcileServerConfig(ctx context.Context, operator *natsv1alpha1.NatsOperator) (reconcile.Result, error) {
func (r *NatsOperatorReconciler) reconcileServerConfig(ctx context.Context, operator *natsv1alpha1.NatsOperator) (ctrl.Result, error) {
log := log.FromContext(ctx)

log.Info("reconcile server config", "name", operator.Name, "namespace", operator.Namespace)
Expand All @@ -242,11 +217,11 @@ func (r *NatsOperatorReconciler) reconcileServerConfig(ctx context.Context, oper

err := r.Get(ctx, serverConfigName, serverConfig)
if err != nil && !errors.IsNotFound(err) {
return reconcile.Result{}, err
return ctrl.Result{}, err
}

if !errors.IsNotFound(err) {
return reconcile.Result{}, nil
return ctrl.Result{}, nil
}

systemAccount := &natsv1alpha1.NatsAccount{}
Expand All @@ -257,11 +232,11 @@ func (r *NatsOperatorReconciler) reconcileServerConfig(ctx context.Context, oper

err = r.Get(ctx, systemAccountName, systemAccount)
if err != nil && !errors.IsNotFound(err) {
return reconcile.Result{}, err
return ctrl.Result{}, err
}

if systemAccount.Status.Phase != natsv1alpha1.AccountPhaseSynchronized {
return reconcile.Result{
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 1,
}, nil
Expand All @@ -280,14 +255,14 @@ func (r *NatsOperatorReconciler) reconcileServerConfig(ctx context.Context, oper
return controllerutil.SetControllerReference(operator, serverConfig, r.Scheme)
})
if err != nil {
return reconcile.Result{}, err
return ctrl.Result{}, err
}

if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated {
log.Info("system account created or updated", "operation", op)
}

return reconcile.Result{}, nil
return ctrl.Result{}, nil
}

func (r *NatsOperatorReconciler) reconcileStatus(ctx context.Context, operator *natsv1alpha1.NatsOperator) error {
Expand All @@ -310,18 +285,19 @@ func (r *NatsOperatorReconciler) reconcileStatus(ctx context.Context, operator *
return nil
}

func (r *NatsOperatorReconciler) reconcileDelete(ctx context.Context, s *natsv1alpha1.NatsOperator) error {
log := log.FromContext(ctx)
func (r *NatsOperatorReconciler) reconcileDelete(ctx context.Context, operator *natsv1alpha1.NatsOperator) (ctrl.Result, error) {
// Remove our finalizer from the list.
controllerutil.RemoveFinalizer(operator, natsv1alpha1.FinalizerName)

log.Info("reconcile delete operator", "name", s.Name, "namespace", s.Namespace)
if !operator.DeletionTimestamp.IsZero() {
// Remove our finalizer from the list.
controllerutil.RemoveFinalizer(operator, natsv1alpha1.FinalizerName)

s.SetFinalizers(finalizers.RemoveFinalizer(s, natsv1alpha1.FinalizerName))
err := r.Update(ctx, s)
if err != nil && !errors.IsNotFound(err) {
return err
// Stop reconciliation as the object is being deleted.
return ctrl.Result{}, r.Update(ctx, operator)
}

return nil
return ctrl.Result{Requeue: true}, nil
}

// nolint:gocyclo
Expand Down Expand Up @@ -404,11 +380,62 @@ func (r *NatsOperatorReconciler) reconcileSecret(ctx context.Context, operator *
return nil
}

// IsCreating ...
func (r *NatsOperatorReconciler) IsCreating(obj *natsv1alpha1.NatsOperator) bool {
return utilx.Or(obj.Status.Conditions == nil, slices.Len(obj.Status.Conditions) == 0)
}

// IsSynchronized ...
func (r *NatsOperatorReconciler) IsSynchronized(obj *natsv1alpha1.NatsOperator) bool {
return obj.Status.Phase == natsv1alpha1.OperatorPhaseSynchronized
}

// ManageSuccess ...
func (r *NatsOperatorReconciler) ManageSuccess(ctx context.Context, obj *natsv1alpha1.NatsOperator) (ctrl.Result, error) {
if r.IsSynchronized(obj) {
return ctrl.Result{}, nil
}

status.SetNatzOperatorCondition(obj, status.NewOperatorSynchronizingCondition(obj))

if err := r.Client.Status().Update(ctx, obj); err != nil {
return ctrl.Result{}, err
}

if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
return ctrl.Result{Requeue: true}, nil
}

if r.IsCreating(obj) {
return ctrl.Result{Requeue: true}, nil
}

r.Recorder.Event(obj, corev1.EventTypeNormal, conv.String(EventReasonOperatorSynchronized), "operator synchronized")

return ctrl.Result{}, nil
}

// func (r *NatsOperatorReconciler) waitForSecretSync(obj *corev1.Secret) wait.ConditionWithContextFunc {
// return func(ctx context.Context) (bool, error) {
// newObj := &corev1.Secret{}
// if err := r.Get(ctx, client.ObjectKeyFromObject(obj), newObj); err != nil {
// if errors.IsNotFound(err) {
// return true, nil
// }

// return false, err
// }

// return equality.Semantic.DeepEqual(obj.Data, newObj.Data), nil
// }
// }

// SetupWithManager sets up the controller with the Manager.
func (r *NatsOperatorReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&natsv1alpha1.NatsOperator{}).
Owns(&natsv1alpha1.NatsAccount{}).
Owns(&corev1.Secret{}).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{})).
Complete(r)
}
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/samber/lo v1.47.0
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/zeiss/pkg v0.1.19
github.com/zeiss/pkg v0.1.20-0.20241206050945-8f53720b709a
golang.org/x/mod v0.22.0
helm.sh/helm v2.17.0+incompatible
k8s.io/api v0.31.3
Expand Down Expand Up @@ -68,14 +68,14 @@ require (
github.com/x448/float16 v0.8.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/crypto v0.30.0 // indirect
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
golang.org/x/sync v0.9.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/term v0.26.0 // indirect
golang.org/x/text v0.20.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0 // indirect
golang.org/x/tools v0.26.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
Expand Down
Loading

0 comments on commit b149d67

Please sign in to comment.