Skip to content

Commit

Permalink
purge status events
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Dec 20, 2024
1 parent 832932f commit 6a86298
Show file tree
Hide file tree
Showing 18 changed files with 521 additions and 107 deletions.
3 changes: 3 additions & 0 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"

"github.com/openshift-online/maestro/pkg/logger"
Expand All @@ -18,6 +19,8 @@ func NewControllersServer(eventServer EventServer) *ControllersServer {
),
StatusController: controllers.NewStatusController(
env().Services.StatusEvents(),
dao.NewInstanceDao(&env().Database.SessionFactory),
dao.NewEventInstanceDao(&env().Database.SessionFactory),
),
}

Expand Down
90 changes: 54 additions & 36 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,42 +133,16 @@ func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID strin
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := s.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Source: statusEvent.ResourceSource,
Type: statusEvent.ResourceType,
Payload: statusEvent.Payload,
Status: statusEvent.Status,
}
} else {
resource, sErr = s.resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
s.eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := s.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: s.instanceID,
})

return err
return broadcastStatusEvent(
ctx,
s.statusEventService,
s.resourceService,
s.eventInstanceDao,
s.eventBroadcaster,
s.instanceID,
eventID,
resourceID,
)
}

// handleStatusUpdate processes the resource status update from the agent.
Expand Down Expand Up @@ -263,3 +237,47 @@ func handleStatusUpdate(ctx context.Context, resource *api.Resource, resourceSer

return nil
}

func broadcastStatusEvent(ctx context.Context,
statusEventService services.StatusEventService,
resourceService services.ResourceService,
eventInstanceDao dao.EventInstanceDao,
eventBroadcaster *event.EventBroadcaster,
instanceID, eventID, resourceID string) error {
statusEvent, sErr := statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Source: statusEvent.ResourceSource,
Type: statusEvent.ResourceType,
Payload: statusEvent.Payload,
Status: statusEvent.Status,
}
} else {
resource, sErr = resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: instanceID,
})

return err
}
47 changes: 10 additions & 37 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,44 +429,17 @@ func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
// TODO consider using a same way (MessageQueueEventServer.OnStatusUpdate) to handle this
func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := bkr.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Source: statusEvent.ResourceSource,
Type: statusEvent.ResourceType,
Payload: statusEvent.Payload,
Status: statusEvent.Status,
}
} else {
resource, sErr = bkr.resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
bkr.eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
})

return err
return broadcastStatusEvent(
ctx,
bkr.statusEventService,
bkr.resourceService,
bkr.eventInstanceDao,
bkr.eventBroadcaster,
bkr.instanceID,
eventID,
resourceID,
)
}

// IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec.
Expand Down
1 change: 1 addition & 0 deletions cmd/maestro/server/healthcheck_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (s *HealthCheckServer) pulse(ctx context.Context) {
return
}
klog.Errorf("Unable to get maestro instance: %s", err.Error())
return
}
found.LastHeartbeat = time.Now()
_, err = s.instanceDao.Replace(ctx, found)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (km *KindControllerManager) processNextEvent() bool {
}

func (km *KindControllerManager) syncEvents() {
logger.Infof("purge all reconciled events")
// delete the reconciled events from the database firstly
if err := km.events.DeleteAllReconciledEvents(context.Background()); err != nil {
// this process is called periodically, so if the error happened, we will wait for the next cycle to handle
Expand All @@ -210,6 +211,7 @@ func (km *KindControllerManager) syncEvents() {
return
}

logger.Infof("sync all unreconciled events")
unreconciledEvents, err := km.events.FindAllUnreconciledEvents(context.Background())
if err != nil {
logger.Error(fmt.Sprintf("Failed to list unreconciled events from db, %v", err))
Expand Down
79 changes: 62 additions & 17 deletions pkg/controllers/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/services"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand All @@ -16,16 +17,22 @@ const StatusEventID ControllerHandlerContextKey = "status_event"
type StatusHandlerFunc func(ctx context.Context, eventID, sourceID string) error

type StatusController struct {
controllers map[api.StatusEventType][]StatusHandlerFunc
statusEvents services.StatusEventService
eventsQueue workqueue.RateLimitingInterface
controllers map[api.StatusEventType][]StatusHandlerFunc
statusEvents services.StatusEventService
instanceDao dao.InstanceDao
eventInstanceDao dao.EventInstanceDao
eventsQueue workqueue.RateLimitingInterface
}

func NewStatusController(statusEvents services.StatusEventService) *StatusController {
func NewStatusController(statusEvents services.StatusEventService,
instanceDao dao.InstanceDao,
eventInstanceDao dao.EventInstanceDao) *StatusController {
return &StatusController{
controllers: map[api.StatusEventType][]StatusHandlerFunc{},
statusEvents: statusEvents,
eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"),
controllers: map[api.StatusEventType][]StatusHandlerFunc{},
statusEvents: statusEvents,
instanceDao: instanceDao,
eventInstanceDao: eventInstanceDao,
eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"),
}
}

Expand All @@ -38,9 +45,8 @@ func (sc *StatusController) Run(stopCh <-chan struct{}) {
logger.Infof("Starting status event controller")
defer sc.eventsQueue.ShutDown()

// TODO: start a goroutine to sync all status events periodically
// use a jitter to avoid multiple instances syncing the events at the same time
// go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh)
go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh)

// start a goroutine to handle the status event from the event queue
// the .Until will re-kick the runWorker one second after the runWorker completes
Expand All @@ -51,35 +57,35 @@ func (sc *StatusController) Run(stopCh <-chan struct{}) {
logger.Infof("Shutting down status event controller")
}

func (sm *StatusController) runWorker() {
func (sc *StatusController) runWorker() {
// hot loop until we're told to stop. processNextEvent will automatically wait until there's work available, so
// we don't worry about secondary waits
for sm.processNextEvent() {
for sc.processNextEvent() {
}
}

// processNextEvent deals with one key off the queue.
func (sm *StatusController) processNextEvent() bool {
func (sc *StatusController) processNextEvent() bool {
// pull the next status event item from queue.
// events queue blocks until it can return an item to be processed
key, quit := sm.eventsQueue.Get()
key, quit := sc.eventsQueue.Get()
if quit {
// the current queue is shutdown and becomes empty, quit this process
return false
}
defer sm.eventsQueue.Done(key)
defer sc.eventsQueue.Done(key)

if err := sm.handleStatusEvent(key.(string)); err != nil {
if err := sc.handleStatusEvent(key.(string)); err != nil {
logger.Error(fmt.Sprintf("Failed to handle the event %v, %v ", key, err))

// we failed to handle the status event, we should requeue the item to work on later
// this method will add a backoff to avoid hotlooping on particular items
sm.eventsQueue.AddRateLimited(key)
sc.eventsQueue.AddRateLimited(key)
return true
}

// we handle the status event successfully, tell the queue to stop tracking history for this status event
sm.eventsQueue.Forget(key)
sc.eventsQueue.Forget(key)
return true
}

Expand Down Expand Up @@ -131,3 +137,42 @@ func (sc *StatusController) add(ev api.StatusEventType, fns []StatusHandlerFunc)

sc.controllers[ev] = append(sc.controllers[ev], fns...)
}

func (sc *StatusController) syncStatusEvents() {
ctx := context.Background()

readyInstanceIDs, err := sc.instanceDao.FindReadyIDs(ctx)
if err != nil {
logger.Error(fmt.Sprintf("Failed to find ready instances from db, %v", err))
return
}
logger.Infof("purge status events on the ready instances: %s", readyInstanceIDs)

// find the status events that already were dispatched to all ready instances
statusEventIDs, err := sc.eventInstanceDao.GetEventsAssociatedWithInstances(ctx, readyInstanceIDs)
if err != nil {
logger.Error(fmt.Sprintf("Failed to find handled status events from db, %v", err))
return
}

// batch delete the handled status events
batches := batchStatusEventIDs(statusEventIDs, 500)
for _, batch := range batches {
if err := sc.statusEvents.DeleteAllEvents(ctx, batch); err != nil {
logger.Error(fmt.Sprintf("Failed to delete handled status events from db, %v", err))
return
}
}
}

func batchStatusEventIDs(statusEventIDs []string, batchSize int) [][]string {
batches := [][]string{}
for i := 0; i < len(statusEventIDs); i += batchSize {
end := i + batchSize
if end > len(statusEventIDs) {
end = len(statusEventIDs)
}
batches = append(batches, statusEventIDs[i:end])
}
return batches
}
64 changes: 64 additions & 0 deletions pkg/controllers/status_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package controllers

import (
"testing"
)

func TestBatchStatusEventIDs(t *testing.T) {
const batchSize = 500

cases := []struct {
name string
statusEventIDs []string
expected [][]string
}{
{
name: "empty input",
statusEventIDs: []string{},
expected: [][]string{},
},
{
name: "single batch less than batch size",
statusEventIDs: make([]string, 499),
expected: [][]string{make([]string, 499)},
},
{
name: "single batch equal to batch size",
statusEventIDs: make([]string, batchSize),
expected: [][]string{make([]string, batchSize)},
},
{
name: "multiple batches full",
statusEventIDs: make([]string, batchSize*2),
expected: [][]string{make([]string, batchSize), make([]string, batchSize)},
},
{
name: "multiple batches partial last",
statusEventIDs: make([]string, batchSize+100),
expected: [][]string{make([]string, batchSize), make([]string, 100)},
},
{
name: "multiple batches full partial last",
statusEventIDs: make([]string, batchSize*2+300),
expected: [][]string{make([]string, batchSize), make([]string, batchSize), make([]string, 300)},
},
}

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
result := batchStatusEventIDs(tt.statusEventIDs, batchSize)

// Ensure the number of batches is correct
if len(result) != len(tt.expected) {
t.Errorf("number of batches mismatch, got %d, want %d", len(result), len(tt.expected))
}

// Check the length of each batch
for i := range result {
if len(result[i]) != len(tt.expected[i]) {
t.Errorf("length of batch %d mismatch, got %d, want %d", i+1, len(result[i]), len(tt.expected[i]))
}
}
})
}
}
Loading

0 comments on commit 6a86298

Please sign in to comment.