Skip to content

Commit

Permalink
sync: implement WaitGroup using a futex
Browse files Browse the repository at this point in the history
This prepares sync.WaitGroup for multithreading.
Code size for the cooperative scheduler is nearly unchanged.
  • Loading branch information
aykevl committed Dec 4, 2024
1 parent 2588bf7 commit 7be59a2
Showing 1 changed file with 59 additions and 28 deletions.
87 changes: 59 additions & 28 deletions src/sync/waitgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,65 @@ package sync
import "internal/task"

type WaitGroup struct {
counter uint
waiters task.Stack
futex task.Futex
}

func (wg *WaitGroup) Add(delta int) {
if delta > 0 {
// Check for overflow.
if uint(delta) > (^uint(0))-wg.counter {
panic("sync: WaitGroup counter overflowed")
// Delta is positive.
for {
// Check for overflow.
counter := wg.futex.Load()
if uint32(delta) > (^uint32(0))-counter {
panic("sync: WaitGroup counter overflowed")
}

// Add to the counter.
if wg.futex.CompareAndSwap(counter, counter+uint32(delta)) {
// Successfully added.
return
}
}

// Add to the counter.
wg.counter += uint(delta)
} else {
// Check for underflow.
if uint(-delta) > wg.counter {
panic("sync: negative WaitGroup counter")
}
// Delta is negative (or zero).
for {
counter := wg.futex.Load()

// Subtract from the counter.
wg.counter -= uint(-delta)
// Check for underflow.
if uint32(-delta) > counter {
panic("sync: negative WaitGroup counter")
}

// Subtract from the counter.
if !wg.futex.CompareAndSwap(counter, counter-uint32(-delta)) {
// Could not swap, trying again.
continue
}

// If the counter is zero, everything is done and the waiters should be resumed.
// This code assumes that the waiters cannot wake up until after this function returns.
// In the current implementation, this is always correct.
if wg.counter == 0 {
for t := wg.waiters.Pop(); t != nil; t = wg.waiters.Pop() {
scheduleTask(t)
// If the counter is zero, everything is done and the waiters should
// be resumed.
// When there are multiple thread, there is a chance for the counter
// to go to zero, WakeAll to be called, and then the counter to be
// incremented again before a waiting goroutine has a chance to
// check the new (zero) value. However the last increment is
// explicitly given in the docs as something that should not be
// done:
//
// > Note that calls with a positive delta that occur when the
// > counter is zero must happen before a Wait.
//
// So we're fine here.
if counter-uint32(-delta) == 0 {
// TODO: this is not the most efficient implementation possible
// because we wake up all waiters unconditionally, even if there
// might be none. Though since the common usage is for this to
// be called with at least one waiter, it's probably fine.
wg.futex.WakeAll()
}

// Successfully swapped (and woken all waiting tasks if needed).
return
}
}
}
Expand All @@ -41,14 +71,15 @@ func (wg *WaitGroup) Done() {
}

func (wg *WaitGroup) Wait() {
if wg.counter == 0 {
// Everything already finished.
return
}

// Push the current goroutine onto the waiter stack.
wg.waiters.Push(task.Current())
for {
counter := wg.futex.Load()
if counter == 0 {
return // everything already finished
}

// Pause until the waiters are awoken by Add/Done.
task.Pause()
if wg.futex.Wait(counter) {
// Successfully woken by WakeAll (in wg.Add).
break
}
}
}

0 comments on commit 7be59a2

Please sign in to comment.