-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Context cancellation closes the connection #37
Comments
Ah, yeah. Since gRPC internally maintains a connection pool as well as does rpc multiplexing, I imagine it can do something other than closing the connection on a context cancel. DRPC will close the connection if the stream state is not in a finished state when the context is canceled. Though, thinking out loud, it seems like it could wait to close the connection until the next rpc is started if the remote side has not acknowledged that the stream has been canceled. That grace period could help with connection reuse. |
Thanks for your reply! I'm trying to figure out how to deal with this in the best way in my code base. My code looks something like
So the first observation is that the idiomatic |
Hmm. If the rpc is finished, cancel should not cause the connection to close. There should be a bunch of test cases around that behavior. Am I misunderstanding what you mean by your first observation? Every conn has a Closed method that the pool can inspect to see if the connection is closed or not at that moment. |
OK! I might have been mistaken, it's possible that the rpc call wasn't finished in the case I'm troubleshooting. I can't reproduce it in a minimal example. Sorry for confusion.
OK! My pool is checking |
Cool. The rpcpool checks Closed on the way out because that's the latest possible time, so it has the smallest chance of being stale. I think this is something many people have to deal with, and so the library might want to provide a basic "good enough" answer. And with generics now, it may be easier to handle more use cases (the key type can be generic, for example). Would you mind listing some of your requirements for a pool? There are so many possible features here, and having a narrow but still useful set of requirements might help in coming up with an answer for everyone. |
Initial tests with I think just having a pool example in a prominent place would go a long way. As for requirements, what I care mostly about is high throughput. My system is sending lots of blobs of data (a few MB at most) over a 10Gbps network. Right now my main concern is bad connection reuse because of context cancellation (seeing a lot of them in this system). Possibly it's fine in environments where TLS isn't a requirement, but with TLS I imagine I need some mechanism that ensures I have a minimum number of idle connections available in the pool. |
Haha, yeah the API of rpcpool being weird is why it's not linked. The API is tailored to some pretty specific and weird requirements that Storj has. Thanks for the info! |
I pushed up a change to our gerrit code review instance that includes a connection pool based on the design in the storj rpcpool: https://review.dev.storj.io/c/storj/drpc/+/8599 I think it's easier to understand because it hides the storj weirdness by the fact that it uses generics for the cache key, and doesn't require any internal stuff, so you could copy/paste the files into your own repo if you want to try it out. I'm going to be working on figuring out how to delay closing connections in the presence of cancellations unless necessary. The following is a mostly unstructured brain dump of ideas:
Ok, that's all. |
A little update here. I gave func foo(ctx context.Context) {
conn := pool.Get(ctx, ...)
defer conn.Close()
stream, err := pb.NewFoo(conn).WriteStuff(ctx)
// a bunch of stream.Send()
return stream.CloseAndRecv()
} The other side is reading from the stream and calling After I get the impression that the stream is closed in an asynchronous manner and if the stream isn't closed already by the time Does my stream usage above seem correct? I suppose I can re-run the body of |
That usage does seem correct. I'm writing some tests to see if I can reproduce. |
So here's a test I've been running and have been unable to reproduce the error: func TestCancelRepeatedPooled(t *testing.T) {
tctx := drpctest.NewTracker(t)
defer tctx.Close()
server := impl{
Method2Fn: func(stream DRPCService_Method2Stream) error {
var total int64
for {
in, err := stream.Recv()
if err == io.EOF {
break
} else if err != nil {
return err
}
total += in.In
}
return stream.SendAndClose(out(total))
},
}
foo := func(ctx context.Context, p *drpcpool.Pool) {
conn := p.Get(ctx, "foo", func(ctx context.Context, key interface{}) (drpcpool.Conn, error) {
return createRawConnection(t, server, tctx), nil
})
defer func() { _ = conn.Close() }()
stream, err := NewDRPCServiceClient(conn).Method2(ctx)
assert.NoError(t, err)
assert.NoError(t, stream.Send(in(1)))
assert.NoError(t, stream.Send(in(2)))
assert.NoError(t, stream.Send(in(3)))
out, err := stream.CloseAndRecv()
assert.NoError(t, err)
assert.Equal(t, out.Out, 6)
}
p := drpcpool.New(drpcpool.Options{
Capacity: 1,
})
for i := 0; i < 10000; i++ {
ctx, cancel := context.WithCancel(tctx)
foo(ctx, p)
cancel()
}
} Does that look like it matches your usage? |
Thank you for looking into this again! The unit test appears to do what I described but I realize now I omitted one detail that might matter. I see the context problem in the
Probably I can get the context error here too but when I'm able to reproduce this there happens to be a lot more writes. One key difference between these two functions is that the context is frequently cancelled during reading, that is, before Is that unit test committed somewhere? I could try to play around with it. |
The test isn't committed, but you can put it in |
The test code above doesn't seem complete. I get $ go test
# storj.io/drpc/internal/integration [storj.io/drpc/internal/integration.test]
./common_test.go:245:31: undefined: out
./common_test.go:250:82: undefined: drpcpool.Conn
./common_test.go:251:11: undefined: createRawConnection
FAIL storj.io/drpc/internal/integration [build failed] with latest |
Whoops! I forgot that I included other changes to some supporting files. I have pushed a commit with the test up. Also, it now fails for me locally sometimes. I don't have any time to figure this out right now, but I'll look again first thing tomorrow morning. |
TestCancelRepeatedPooled
|
Hi,
I'm looking into replacing grpc with drpc in one of my projects and initial tests show some nice performance improvements!
Switching to drpc was fairly painless. The only snag was that context cancelling seemingly closes the connection. This behavior is different from grpc, see my tweaked drpc/grpc examples in tomyl@acb08bd.
Is this behavior intentional? I can't see that the documentation mentions it.
Thanks,
Tommy
The text was updated successfully, but these errors were encountered: