Skip to content

Commit

Permalink
add unassigned and actions to slots to unassigned qi
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 committed Dec 20, 2024
1 parent 4dfdf19 commit 2d079ed
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
4 changes: 4 additions & 0 deletions pkg/scheduling/v2/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ type PostScheduleInput struct {
Workers map[string]*WorkerCp

Slots []*SlotCp

Unassigned []*dbsqlc.QueueItem

ActionsToSlots map[string][]*SlotCp
}

type WorkerCp struct {
Expand Down
31 changes: 26 additions & 5 deletions pkg/scheduling/v2/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,9 @@ func (s *Scheduler) tryAssign(
wg := sync.WaitGroup{}
startTotal := time.Now()

extensionResultsCh := make(chan *assignResults, len(actionIdToQueueItems))
defer close(extensionResultsCh)

// process each action id in parallel
for actionId, qis := range actionIdToQueueItems {
wg.Add(1)
Expand Down Expand Up @@ -735,12 +738,16 @@ func (s *Scheduler) tryAssign(
s.l.Warn().Dur("duration", sinceStart).Msgf("processing batch of %d queue items took longer than 100ms", len(batchQis))
}

resultsCh <- &assignResults{
r := &assignResults{
assigned: batchAssigned,
rateLimited: batchRateLimited,
unassigned: batchUnassigned,
}

extensionResultsCh <- r

resultsCh <- r

return nil
})

Expand All @@ -754,7 +761,7 @@ func (s *Scheduler) tryAssign(
span.End()
close(resultsCh)

extInput := s.getExtensionInput()
extInput := s.getExtensionInput(extensionResultsCh)

s.exts.PostSchedule(sqlchelpers.UUIDToStr(s.tenantId), extInput)

Expand All @@ -766,12 +773,19 @@ func (s *Scheduler) tryAssign(
return resultsCh
}

func (s *Scheduler) getExtensionInput() *PostScheduleInput {
func (s *Scheduler) getExtensionInput(ch <-chan *assignResults) *PostScheduleInput {
unassigned := make([]*dbsqlc.QueueItem, 0)

for res := range ch {
unassigned = append(unassigned, res.unassigned...)
}

workers := s.getWorkers()

res := &PostScheduleInput{
Workers: make(map[string]*WorkerCp),
Slots: make([]*SlotCp, 0),
Workers: make(map[string]*WorkerCp),
Slots: make([]*SlotCp, 0),
Unassigned: unassigned,
}

for workerId, worker := range workers {
Expand All @@ -786,8 +800,11 @@ func (s *Scheduler) getExtensionInput() *PostScheduleInput {
defer s.actionsMu.RUnlock()

uniqueSlots := make(map[*slot]*SlotCp)
actionsToSlots := make(map[string][]*SlotCp)

for _, action := range s.actions {
actionsToSlots[action.actionId] = make([]*SlotCp, 0, len(action.slots))

for _, slot := range action.slots {
if _, ok := uniqueSlots[slot]; ok {
continue
Expand All @@ -797,13 +814,17 @@ func (s *Scheduler) getExtensionInput() *PostScheduleInput {
WorkerId: slot.getWorkerId(),
Used: slot.used,
}

actionsToSlots[action.actionId] = append(actionsToSlots[action.actionId], uniqueSlots[slot])
}
}

for _, slot := range uniqueSlots {
res.Slots = append(res.Slots, slot)
}

res.ActionsToSlots = actionsToSlots

return res
}

Expand Down

0 comments on commit 2d079ed

Please sign in to comment.