Skip to content

Commit

Permalink
Make streaming client closers non-blocking
Browse files Browse the repository at this point in the history
This updates the behavior of the client streaming methods
`BidiStreamForClient.CloseResponse` and `ServerStreamForClient.Close` to
be non-blocking, aligning it with the standard behavior of `net/http`'s
`Request.Body` closure.

Previously, the implementation used a graceful, blocking closure that
fully read from the stream before closing. This allows for reuse of the
underlying TCP connection. However, this behavior could lead to
unexpected client hangs, as users may not anticipate blocking on close.

To address this, the closers no long drain the stream. Documentation has
been updated to clarify the behavior and provide users a workaround to
keep the optimization by Receiving messages until the stream is drain.
This avoids unexpected blocking behavior in client applications.

Signed-off-by: Edward McFarlane <[email protected]>
  • Loading branch information
emcfarlane committed Oct 28, 2024
1 parent 145b279 commit 9c11e8b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
9 changes: 9 additions & 0 deletions client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (s *ServerStreamForClient[Res]) ResponseTrailer() http.Header {
}

// Close the receive side of the stream.
//
// Close is non-blocking. To gracefully close the stream and allow for
// connection resuse ensure all messages have been received before calling
// Close. All messages are received when Receive returns false.
func (s *ServerStreamForClient[Res]) Close() error {
if s.constructErr != nil {
return s.constructErr
Expand Down Expand Up @@ -251,6 +255,11 @@ func (b *BidiStreamForClient[Req, Res]) Receive() (*Res, error) {
}

// CloseResponse closes the receive side of the stream.
//
// CloseResponse is non-blocking. To gracefully close the stream and allow for
// connection resuse ensure all messages have been received before calling
// CloseResponse. All messages are received when Receive returns an error
// wrapping [io.EOF].
func (b *BidiStreamForClient[Req, Res]) CloseResponse() error {
if b.err != nil {
return b.err
Expand Down
8 changes: 1 addition & 7 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,7 @@ func (d *duplexHTTPCall) CloseRead() error {
if d.response == nil {
return nil
}
_, err := discard(d.response.Body)
closeErr := d.response.Body.Close()
if err == nil ||
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
err = closeErr
}
err := d.response.Body.Close()
err = wrapIfContextDone(d.ctx, err)
return wrapIfRSTError(err)
}
Expand Down

0 comments on commit 9c11e8b

Please sign in to comment.