Skip to content

Commit

Permalink
x/ref/runtime/internal/flow/conn: fragment release message (#289)
Browse files Browse the repository at this point in the history
PR #287 deleted the toRelease/borrowed entries for a flow when it is closed. This greatly reduced the size of the Release messages for the common case. However, those counters are required when borrowed is true past the flow being closed so that the dialer can recover the borrowed tokens to its shared pool of tokens. In cases where a great number of short lived connections are created the resulting Release message can be larger than the default buffer size allowed and hence will require fragmentation. When routed through a proxy these large messages can lead to deadlock since the proxy's encapsulate flow will not forward a message that has to be fragmented if it does not have enough flow control counters for the entire message. This PR explicitly fragments the Release message to get around this limitation.

A better fix is to collect all of the borrowed tokens into a single 'special flow' in the Release message and for the dialer to add these tokens to the shared pool directly rather than iterating over all of the per-flow borrowed counters.
  • Loading branch information
cosnicolaou authored Aug 26, 2022
1 parent f2f0f5e commit 5485106
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 213 deletions.
148 changes: 98 additions & 50 deletions x/ref/runtime/internal/flow/conn/close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package conn

import (
"bytes"
"crypto/rand"
"fmt"
"io"
"runtime"
Expand Down Expand Up @@ -256,55 +257,85 @@ func TestFlowCancelOnRead(t *testing.T) {
<-af.Closed()
}

func testCounters(t *testing.T, ctx *context.T, count int, dialClose, acceptClose bool) (
func acceptor(errCh chan error, acceptCh chan flow.Flow, size int, close bool) {
for flw := range acceptCh {
buf := make([]byte, size)
// It's essential to use ReadFull rather than ReadMsg since WriteMsg
// will fragment a message larger than a default size into multiple
// messages.
n, err := io.ReadFull(flw, buf)
if err != nil {
errCh <- err
return
}
if n != size {
errCh <- fmt.Errorf("short read: %v != %v", n, size)
}
if got, want := n, size; got != want {
errCh <- fmt.Errorf("got %v, want %v", got, want)
}
if _, err := flw.WriteMsg(buf); err != nil {
errCh <- err
return
}

if close {
if err := flw.Close(); err != nil {
errCh <- err
return
}
}
}
errCh <- nil
}

func testCounters(t *testing.T, ctx *context.T, count int, dialClose, acceptClose bool, size int) (
dialRelease, dialBorrowed, acceptRelease, acceptBorrowed int) {
accept := make(chan flow.Flow, 1)
dc, ac, derr, aerr := setupConns(t, "local", "", ctx, ctx, nil, accept, nil, nil)
acceptCh := make(chan flow.Flow, 1)
dc, ac, derr, aerr := setupConns(t, "local", "", ctx, ctx, nil, acceptCh, nil, nil)
if derr != nil || aerr != nil {
t.Fatalf("setup: dial err: %v, accept err: %v", derr, aerr)
}

errCh := make(chan error, 1)
go func() {
for flw := range accept {
m, err := flw.ReadMsg()
if err != nil {
errCh <- err
return
}
if _, err := flw.WriteMsg(m); err != nil {
errCh <- err
return
}
if acceptClose {
if err := flw.Close(); err != nil {
errCh <- err
return
}
}
}
errCh <- nil
}()
go acceptor(errCh, acceptCh, size, acceptClose)

writeBuf := make([]byte, size)
if n, err := io.ReadFull(rand.Reader, writeBuf); n != size || err != nil {
t.Fatalf("failed to write random bytes: %v %v", n, err)
}

for i := 0; i < count; i++ {
df, err := dc.Dial(ctx, dc.LocalBlessings(), nil, naming.Endpoint{}, 0, false)
if err != nil {
t.Fatal(err)
}
if _, err := df.WriteMsg([]byte("hello")); err != nil {
// WriteMsg wil fragment messages larger than its default buffer size.
if _, err := df.WriteMsg(writeBuf); err != nil {
t.Fatalf("could not write flow: %v", err)
}
if _, err := df.ReadMsg(); err != nil {
readBuf := make([]byte, size)
// It's essential to use ReadFull rather than ReadMsg since WriteMsg
// will fragment a message larger than a default size into multiple
// messages.
n, err := io.ReadFull(df, readBuf)
if err != nil {
t.Fatalf("unexpected error reading from flow: %v", err)
}
if got, want := n, size; got != want {
t.Fatalf("got %v, want %v", got, want)
}
if !bytes.Equal(writeBuf, readBuf) {
t.Fatalf("data corruption: %v %v", writeBuf[:10], readBuf[:10])
}
if dialClose {
if err := df.Close(); err != nil {
t.Fatalf("unexpected error closing flow: %v", err)
}
}
}

close(accept)
close(acceptCh)
if err := <-errCh; err != nil {
t.Fatal(err)
}
Expand All @@ -326,30 +357,47 @@ func TestCounters(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()

check := func(got, want int) {
if got > want {
_, _, line, _ := runtime.Caller(1)
t.Errorf("line: %v, got %v, want %v", line, got, want)
var dialRelease, dialBorrowed, acceptRelease, acceptBorrowed int

assert := func(dialApprox, acceptApprox int) {
compare := func(got, want int) {
if got > want {
_, _, l1, _ := runtime.Caller(3)
_, _, l2, _ := runtime.Caller(2)
t.Errorf("line: %v:%v, got %v, want %v", l1, l2, got, want)
}
}
compare(dialRelease, dialApprox)
compare(dialBorrowed, dialApprox)
compare(acceptRelease, acceptApprox)
compare(acceptBorrowed, acceptApprox)
}

runAndTest := func(count, size, dialApprox, acceptApprox int) {
dialRelease, dialBorrowed, acceptRelease, acceptBorrowed = testCounters(t, ctx, count, true, false, size)
assert(dialApprox, acceptApprox)
dialRelease, dialBorrowed, acceptRelease, acceptBorrowed = testCounters(t, ctx, count, false, true, size)
assert(dialApprox, acceptApprox)
dialRelease, dialBorrowed, acceptRelease, acceptBorrowed = testCounters(t, ctx, count, true, true, size)
assert(dialApprox, acceptApprox)
}
count := 1000
// The actual values should be 1 for the dial side and 2 for the accept side, but
// we allow a few more than that to avoid racing for the network comms to complete after
// the flows are closed.
approx := 3
dialRelease, dialBorrowed, acceptRelease, acceptBorrowed := testCounters(t, ctx, count, true, true)
check(dialRelease, approx)
check(dialBorrowed, approx)
check(acceptRelease, approx)
check(acceptBorrowed, approx)
dialRelease, dialBorrowed, acceptRelease, acceptBorrowed = testCounters(t, ctx, count, true, false)
check(dialRelease, approx)
check(dialBorrowed, approx)
check(acceptRelease, approx)
check(acceptBorrowed, approx)
dialRelease, dialBorrowed, acceptRelease, acceptBorrowed = testCounters(t, ctx, count, false, true)
check(dialRelease, approx)
check(dialBorrowed, approx)
check(acceptRelease, approx)
check(acceptBorrowed, approx)

// The actual values should be 1 for the dial side but we allow a few more
// than that to avoid racing for the network comms to complete after
// the flows are closed. On the accept side, the number of currently in
// use toRelease entries depends on the size of the data buffers used.
// The number is determined by the number of flows that have outstanding
// Release messages to send to the dialer once count iterations are complete.
// Increasing size decreases the number of flows with outstanding Release
// messages since the shared counter is burned through faster when using
// a larger size.

// For small packets, all connections end up being 'borrowed' and hence
// their counters are kept around.
runAndTest(5000, 10, 3, 5005)
// 60K connection setups/teardowns will ensure that the release message
// is fragmented.
runAndTest(60000, 10, 3, 10000)
runAndTest(5000, 1024*16, 3, 20)
runAndTest(1000, 1024*100, 3, 5)
}
41 changes: 38 additions & 3 deletions x/ref/runtime/internal/flow/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,43 @@ func (c *Conn) internalCloseLocked(ctx *context.T, closedRemotely, closedWhileAc
}(c)
}

func (c *Conn) fragmentReleaseMessage(ctx *context.T, toRelease map[uint64]uint64, limit int) error {
if len(toRelease) < limit {
return c.sendMessageLocked(ctx, false, expressPriority, &message.Release{
Counters: toRelease,
})
}
for {
var send, remaining map[uint64]uint64
rem := len(toRelease) - limit
if rem <= 0 {
send = toRelease
} else {
send = make(map[uint64]uint64, limit)
remaining = make(map[uint64]uint64, rem)
i := 0
for k, v := range toRelease {
if i < limit {
send[k] = v
} else {
remaining[k] = v
}
i++
}
}
if err := c.sendMessageLocked(ctx, false, expressPriority, &message.Release{
Counters: send,
}); err != nil {
return err
}
if remaining == nil {
break
}
toRelease = remaining
}
return nil
}

func (c *Conn) release(ctx *context.T, fid, count uint64) {
var toRelease map[uint64]uint64
var release bool
Expand All @@ -820,9 +857,7 @@ func (c *Conn) release(ctx *context.T, fid, count uint64) {
var err error
if toRelease != nil {
delete(toRelease, invalidFlowID)
err = c.sendMessageLocked(ctx, false, expressPriority, &message.Release{
Counters: toRelease,
})
err = c.fragmentReleaseMessage(ctx, toRelease, 8000)
}
c.mu.Unlock()
if err != nil {
Expand Down
14 changes: 12 additions & 2 deletions x/ref/runtime/internal/flow/conn/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,16 @@ func (c *Conn) newFlowCountersLocked(id uint64) {
}

func (c *Conn) clearFlowCountersLocked(id uint64) {
delete(c.toRelease, id)
delete(c.borrowing, id)
if !c.borrowing[id] {
delete(c.toRelease, id)
delete(c.borrowing, id)
}
// Need to keep borrowed counters around so that they can be sent
// to the dialer to allow for the shared counter to be incremented
// for all the past flows that borrowed counters (ie. pretty much
// any/all short lived connections). A much better approach would be
// to use a 'special' flow ID (e.g use the invalidFlowID) to use
// for referring to all borrowed tokens for closed flows.
}

// Implement the writer interface.
Expand Down Expand Up @@ -160,6 +168,7 @@ func (f *flw) Write(p []byte) (n int, err error) {
// the number of shared counters for the conn if we are sending on a just
// dialed flow.
func (f *flw) tokensLocked() (int, func(int)) {

max := f.conn.mtu
// When our flow is proxied (i.e. encapsulated), the proxy has added overhead
// when forwarding the message. This means we must reduce our mtu to ensure
Expand Down Expand Up @@ -315,6 +324,7 @@ func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (sent int, err error) {
if f.noEncrypt {
d.Flags |= message.DisableEncryptionFlag
}

if opened {
err = f.conn.mp.writeMsg(ctx, d)
} else {
Expand Down
Loading

0 comments on commit 5485106

Please sign in to comment.