Skip to content

Commit

Permalink
feat(binding): create or update work parallel
Browse files Browse the repository at this point in the history
Signed-off-by: chang.qiangqiang <[email protected]>
  • Loading branch information
CharlesQQ committed Nov 13, 2024
1 parent 2f80476 commit d88ce8e
Showing 1 changed file with 38 additions and 10 deletions.
48 changes: 38 additions & 10 deletions pkg/controllers/binding/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func ensureWork(
}
}

var createOrUpdateWorkArgs []*CreateOrUpdateWorkArg
for i := range targetClusters {
targetCluster := targetClusters[i]
clonedWorkload := workload.DeepCopy()
Expand Down Expand Up @@ -135,16 +136,24 @@ func ensureWork(
Labels: workLabel,
Annotations: annotations,
}

if err = helper.CreateOrUpdateWork(
ctx,
c,
workMeta,
clonedWorkload,
helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)),
helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false)),
); err != nil {
return err
createOrUpdateWorkArgs = append(createOrUpdateWorkArgs, &CreateOrUpdateWorkArg{
WorkMeta: workMeta,
ClonedWorkload: clonedWorkload,
Options: []helper.WorkOption{helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)), helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false))},
})
}
resChan := make(chan error)
for _, item := range createOrUpdateWorkArgs {
go CreateOrUpdateWorkParallel(ctx, resChan, c, item)
}
for i := 0; i < len(targetClusters); i++ {
select {
case res := <-resChan:
if res != nil {
return err
}
case <-ctx.Done():
return nil
}
}
return nil
Expand Down Expand Up @@ -293,3 +302,22 @@ func shouldSuspendDispatching(suspension *policyv1alpha1.Suspension, targetClust
}
return suspendDispatching
}

// CreateOrUpdateWorkArg create or update work args struct.
type CreateOrUpdateWorkArg struct {
WorkMeta metav1.ObjectMeta
ClonedWorkload *unstructured.Unstructured
Options []helper.WorkOption
}

// CreateOrUpdateWorkParallel creates or update work object parallel.
func CreateOrUpdateWorkParallel(ctx context.Context, resChan chan error, c client.Client, createOrUpdateWorkArg *CreateOrUpdateWorkArg) {
err := helper.CreateOrUpdateWork(
ctx,
c,
createOrUpdateWorkArg.WorkMeta,
createOrUpdateWorkArg.ClonedWorkload,
createOrUpdateWorkArg.Options...,
)
resChan <- err
}

0 comments on commit d88ce8e

Please sign in to comment.