Skip to content

Commit

Permalink
refactor: breakdown to methods/functions + integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: KevFan <[email protected]>
  • Loading branch information
KevFan committed Nov 4, 2024
1 parent 9e47e7d commit 5af7788
Show file tree
Hide file tree
Showing 4 changed files with 424 additions and 203 deletions.
12 changes: 11 additions & 1 deletion controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewayapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"

Expand Down Expand Up @@ -62,7 +63,7 @@ func IsPolicyAccepted(ctx context.Context, p machinery.Policy, s *sync.Map) bool
case *kuadrantv1beta3.RateLimitPolicy:
return isRateLimitPolicyAcceptedFunc(s)(p)
case *kuadrantv1alpha1.TLSPolicy:
isValid, _ := IsTLSPolicyValid(ctx, s, p.(*kuadrantv1alpha1.TLSPolicy))
isValid, _ := IsTLSPolicyValid(ctx, s, t)
return isValid
case *kuadrantv1alpha1.DNSPolicy:
isValid, _ := dnsPolicyAcceptedStatusFunc(s)(p)
Expand All @@ -71,3 +72,12 @@ func IsPolicyAccepted(ctx context.Context, p machinery.Policy, s *sync.Map) bool
return false
}
}

func policyGroupKinds() []*schema.GroupKind {
return []*schema.GroupKind{
&kuadrantv1beta3.AuthPolicyGroupKind,
&kuadrantv1beta3.RateLimitPolicyGroupKind,
&kuadrantv1alpha1.TLSPolicyGroupKind,
&kuadrantv1alpha1.DNSPolicyGroupKind,
}
}
179 changes: 80 additions & 99 deletions controllers/gateway_policy_discoverability_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,17 @@ func (r *GatewayPolicyDiscoverabilityReconciler) Subscription() *controller.Subs
}
}

// Reconcile function to manage gateway and listener status based on policy conditions
func (r *GatewayPolicyDiscoverabilityReconciler) reconcile(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, _ error, syncMap *sync.Map) error {
logger := controller.LoggerFromContext(ctx).WithName("GatewayPolicyDiscoverabilityReconciler").WithName("reconcile")

// Extract gateways to process
gateways := lo.FilterMap(topology.Targetables().Items(), func(item machinery.Targetable, index int) (*machinery.Gateway, bool) {
gw, ok := item.(*machinery.Gateway)
return gw, ok
})

// Policy kinds to evaluate
policyGroupKinds := []*schema.GroupKind{
&kuadrantv1beta3.AuthPolicyGroupKind,
&kuadrantv1beta3.RateLimitPolicyGroupKind,
&kuadrantv1alpha1.TLSPolicyGroupKind,
&kuadrantv1alpha1.DNSPolicyGroupKind,
}
gateways := r.extractGateways(topology)
policyKinds := policyGroupKinds()

for _, gw := range gateways {
updatedGwStatus := r.updateGatewayStatus(ctx, syncMap, gw, topology, logger, policyGroupKinds)
if !equality.Semantic.DeepEqual(updatedGwStatus, gw.Status) {
gw.Status = *updatedGwStatus
if err := r.updateGateway(ctx, gw); err != nil {
updatedStatus := r.buildGatewayStatus(ctx, syncMap, gw, topology, logger, policyKinds)
if !equality.Semantic.DeepEqual(updatedStatus, gw.Status) {
gw.Status = *updatedStatus
if err := r.updateGatewayStatus(ctx, gw); err != nil {
logger.Error(err, "failed to update gateway status", "gateway", gw.GetName())
}
}
Expand All @@ -72,123 +60,116 @@ func (r *GatewayPolicyDiscoverabilityReconciler) reconcile(ctx context.Context,
return nil
}

// Updates the gateway status for a given policy group kind
func (r *GatewayPolicyDiscoverabilityReconciler) updateGatewayStatus(ctx context.Context, syncMap *sync.Map, gw *machinery.Gateway, topology *machinery.Topology, logger logr.Logger, policyGroupKinds []*schema.GroupKind) *gatewayapiv1.GatewayStatus {
gwStatus := gw.Status.DeepCopy()
func (r *GatewayPolicyDiscoverabilityReconciler) extractGateways(topology *machinery.Topology) []*machinery.Gateway {
return lo.FilterMap(topology.Targetables().Items(), func(item machinery.Targetable, _ int) (*machinery.Gateway, bool) {
gw, ok := item.(*machinery.Gateway)
return gw, ok
})
}

func (r *GatewayPolicyDiscoverabilityReconciler) buildGatewayStatus(ctx context.Context, syncMap *sync.Map, gw *machinery.Gateway, topology *machinery.Topology, logger logr.Logger, policyKinds []*schema.GroupKind) *gatewayapiv1.GatewayStatus {
status := gw.Status.DeepCopy()

// Update Listener Status for each listener in the gateway
for _, listener := range r.extractListeners(topology, gw) {
listenerStatus, index, updated := r.buildExpectedListenerStatus(ctx, syncMap, gw, listener, logger, policyGroupKinds)
if updated {
gwStatus.Listeners[index] = listenerStatus
}
updatedListenerStatus := r.updateListenerStatus(ctx, syncMap, gw, listener, logger, policyKinds)
status.Listeners = r.updateListenerList(status.Listeners, updatedListenerStatus)
}

// Set conditions for each policy kind
for _, policyKind := range policyGroupKinds {
conditionType := PolicyAffectedConditionType(policyKind.Kind)
policies := r.extractPolicies(ctx, syncMap, policyKind, gw)

if len(policies) == 0 {
r.removeGatewayConditionIfExists(gwStatus, conditionType, logger, gw.GetName())
} else {
r.updateGatewayCondition(gwStatus, PolicyAffectedCondition(policyKind.Kind, policies), gw, logger)
}
for _, policyKind := range policyKinds {
r.updatePolicyConditions(ctx, syncMap, gw, policyKind, status, logger)
}

return gwStatus
return status
}

func (r *GatewayPolicyDiscoverabilityReconciler) buildExpectedListenerStatus(ctx context.Context, syncMap *sync.Map, gw *machinery.Gateway, listener *machinery.Listener, logger logr.Logger, policyGroupKinds []*schema.GroupKind) (gatewayapiv1.ListenerStatus, int, bool) {
listenerStatus, index, found := lo.FindIndexOf(gw.Status.Listeners, func(item gatewayapiv1.ListenerStatus) bool {
return item.Name == listener.Name
func (r *GatewayPolicyDiscoverabilityReconciler) extractListeners(topology *machinery.Topology, gw *machinery.Gateway) []*machinery.Listener {
return lo.Map(topology.Targetables().Children(gw), func(item machinery.Targetable, _ int) *machinery.Listener {
listener, _ := item.(*machinery.Listener)
return listener
})
if !found {
logger.V(1).Info("listener status not found", "listener", listener.GetName())
return gatewayapiv1.ListenerStatus{}, index, false
}

func (r *GatewayPolicyDiscoverabilityReconciler) updateListenerStatus(ctx context.Context, syncMap *sync.Map, gw *machinery.Gateway, listener *machinery.Listener, logger logr.Logger, policyKinds []*schema.GroupKind) gatewayapiv1.ListenerStatus {
status, _, exists := r.findListenerStatus(gw.Status.Listeners, listener.Name)
if !exists {
status = gatewayapiv1.ListenerStatus{Name: listener.Name, Conditions: []metav1.Condition{}}
}

updated := false
for _, groupKind := range policyGroupKinds {
conditionType := PolicyAffectedConditionType(groupKind.Kind)
policies := r.extractPolicies(ctx, syncMap, groupKind, gw, listener)
for _, kind := range policyKinds {
conditionType := PolicyAffectedConditionType(kind.Kind)
policies := r.extractAcceptedPolicies(ctx, syncMap, kind, gw, listener)

if len(policies) == 0 {
updated = r.removeListenerConditionIfExists(&listenerStatus, conditionType, logger, listener.GetName()) || updated
r.removeConditionIfExists(&status.Conditions, conditionType, logger, listener.GetName())
} else {
condition := PolicyAffectedCondition(groupKind.Kind, policies)
updated = r.updateListenerCondition(&listenerStatus, condition, gw, logger) || updated
r.addOrUpdateCondition(&status.Conditions, PolicyAffectedCondition(kind.Kind, policies), gw.GetGeneration(), logger)
}
}

return listenerStatus, index, updated
return status
}

func (r *GatewayPolicyDiscoverabilityReconciler) updateGateway(ctx context.Context, gw *machinery.Gateway) error {
obj, err := controller.Destruct(gw.Gateway)
if err != nil {
return err
}
_, err = r.Client.Resource(controller.GatewaysResource).Namespace(gw.GetNamespace()).UpdateStatus(ctx, obj, metav1.UpdateOptions{})
return err
}
func (r *GatewayPolicyDiscoverabilityReconciler) updatePolicyConditions(ctx context.Context, syncMap *sync.Map, gw *machinery.Gateway, policyKind *schema.GroupKind, status *gatewayapiv1.GatewayStatus, logger logr.Logger) {
conditionType := PolicyAffectedConditionType(policyKind.Kind)
policies := r.extractAcceptedPolicies(ctx, syncMap, policyKind, gw)

// Extract accepted policies of a certain group kind for the specified targets
func (r *GatewayPolicyDiscoverabilityReconciler) extractPolicies(ctx context.Context, syncMap *sync.Map, policyKind *schema.GroupKind, targets ...machinery.Targetable) []machinery.Policy {
return kuadrantv1.PoliciesInPath(targets, func(policy machinery.Policy) bool {
return policy.GroupVersionKind().GroupKind() == *policyKind && IsPolicyAccepted(ctx, policy, syncMap)
})
if len(policies) == 0 {
r.removeConditionIfExists(&status.Conditions, conditionType, logger, gw.GetName())
} else {
r.addOrUpdateCondition(&status.Conditions, PolicyAffectedCondition(policyKind.Kind, policies), gw.GetGeneration(), logger)
}
}

func (r *GatewayPolicyDiscoverabilityReconciler) updateGatewayCondition(status *gatewayapiv1.GatewayStatus, condition metav1.Condition, gw *machinery.Gateway, logger logr.Logger) bool {
existingCondition := meta.FindStatusCondition(status.Conditions, condition.Type)
if existingCondition != nil && equality.Semantic.DeepEqual(existingCondition, condition) {
logger.V(1).Info("condition unchanged", "condition", condition.Type, "gateway", gw.GetName())
return false
func (r *GatewayPolicyDiscoverabilityReconciler) addOrUpdateCondition(conditions *[]metav1.Condition, condition metav1.Condition, generation int64, logger logr.Logger) {
existingCondition := meta.FindStatusCondition(*conditions, condition.Type)
if existingCondition != nil && equality.Semantic.DeepEqual(*existingCondition, condition) {
logger.V(1).Info("condition unchanged", "condition", condition.Type)
return
}

condition.ObservedGeneration = gw.GetGeneration()
meta.SetStatusCondition(&status.Conditions, condition)
logger.V(1).Info("updated condition", "condition", condition.Type, "gateway", gw.GetName())
return true
condition.ObservedGeneration = generation
meta.SetStatusCondition(conditions, condition)
logger.V(1).Info("updated condition", "condition", condition.Type)
}

func (r *GatewayPolicyDiscoverabilityReconciler) removeGatewayConditionIfExists(status *gatewayapiv1.GatewayStatus, conditionType string, logger logr.Logger, name string) bool {
if existingCondition := meta.FindStatusCondition(status.Conditions, conditionType); existingCondition != nil {
meta.RemoveStatusCondition(&status.Conditions, conditionType)
func (r *GatewayPolicyDiscoverabilityReconciler) removeConditionIfExists(conditions *[]metav1.Condition, conditionType string, logger logr.Logger, name string) {
if meta.RemoveStatusCondition(conditions, conditionType) {
logger.V(1).Info("removed condition", "condition", conditionType, "name", name)
return true
} else {
logger.V(1).Info("condition absent, skipping removal", "condition", conditionType, "name", name)
}
logger.V(1).Info("condition absent, skipping removal", "condition", conditionType, "name", name)
return false
}

func (r *GatewayPolicyDiscoverabilityReconciler) updateListenerCondition(status *gatewayapiv1.ListenerStatus, condition metav1.Condition, gw *machinery.Gateway, logger logr.Logger) bool {
existingCondition := meta.FindStatusCondition(status.Conditions, condition.Type)
if existingCondition != nil && equality.Semantic.DeepEqual(existingCondition, condition) {
logger.V(1).Info("condition unchanged", "condition", condition.Type, "gateway", gw.GetName())
return false
func (r *GatewayPolicyDiscoverabilityReconciler) updateListenerList(listeners []gatewayapiv1.ListenerStatus, updatedStatus gatewayapiv1.ListenerStatus) []gatewayapiv1.ListenerStatus {
_, index, exists := r.findListenerStatus(listeners, updatedStatus.Name)
if exists {
listeners[index] = updatedStatus
} else {
listeners = append(listeners, updatedStatus)
}
return listeners
}

condition.ObservedGeneration = gw.GetGeneration()
meta.SetStatusCondition(&status.Conditions, condition)
logger.V(1).Info("updated condition", "condition", condition.Type, "gateway", gw.GetName())
return true
func (r *GatewayPolicyDiscoverabilityReconciler) findListenerStatus(listeners []gatewayapiv1.ListenerStatus, name gatewayapiv1.SectionName) (gatewayapiv1.ListenerStatus, int, bool) {
for i, status := range listeners {
if status.Name == name {
return status, i, true
}
}
return gatewayapiv1.ListenerStatus{}, -1, false
}

func (r *GatewayPolicyDiscoverabilityReconciler) removeListenerConditionIfExists(status *gatewayapiv1.ListenerStatus, conditionType string, logger logr.Logger, name string) bool {
if existingCondition := meta.FindStatusCondition(status.Conditions, conditionType); existingCondition != nil {
meta.RemoveStatusCondition(&status.Conditions, conditionType)
logger.V(1).Info("removed condition", "condition", conditionType, "name", name)
return true
func (r *GatewayPolicyDiscoverabilityReconciler) updateGatewayStatus(ctx context.Context, gw *machinery.Gateway) error {
obj, err := controller.Destruct(gw.Gateway)
if err != nil {
return err
}
logger.V(1).Info("condition absent, skipping removal", "condition", conditionType, "name", name)
return false
_, err = r.Client.Resource(controller.GatewaysResource).Namespace(gw.GetNamespace()).UpdateStatus(ctx, obj, metav1.UpdateOptions{})
return err
}

func (r *GatewayPolicyDiscoverabilityReconciler) extractListeners(topology *machinery.Topology, gw *machinery.Gateway) []*machinery.Listener {
return lo.Map(topology.Targetables().Children(gw), func(item machinery.Targetable, index int) *machinery.Listener {
listener, _ := item.(*machinery.Listener)
return listener
func (r *GatewayPolicyDiscoverabilityReconciler) extractAcceptedPolicies(ctx context.Context, syncMap *sync.Map, policyKind *schema.GroupKind, targets ...machinery.Targetable) []machinery.Policy {
return kuadrantv1.PoliciesInPath(targets, func(policy machinery.Policy) bool {
return policy.GroupVersionKind().GroupKind() == *policyKind && IsPolicyAccepted(ctx, policy, syncMap) // Use enforced policies instead?
})
}
Loading

0 comments on commit 5af7788

Please sign in to comment.