Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limited support for bidi streams over HTTP/1 #796

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 57 additions & 81 deletions connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,16 @@ func TestServer(t *testing.T) {
assert.Nil(t, stream.Close())
})
}
testCumSum := func(t *testing.T, client pingv1connect.PingServiceClient, expectSuccess bool) { //nolint:thelper
testCumSum := func(t *testing.T, client pingv1connect.PingServiceClient, fullDuplex bool) { //nolint:thelper
t.Run("cumsum", func(t *testing.T) {
if !fullDuplex {
t.Skip("transport doesn't support full-duplex streaming")
}
send := []int64{3, 5, 1}
expect := []int64{3, 8, 9}
var got []int64
stream := client.CumSum(context.Background())
stream.RequestHeader().Set(clientHeader, headerValue)
if !expectSuccess { // server doesn't support HTTP/2
failNoHTTP2(t, stream)
return
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
Expand Down Expand Up @@ -249,11 +248,10 @@ func TestServer(t *testing.T) {
assert.Equal(t, stream.ResponseTrailer().Values(handlerTrailer), []string{trailerValue})
})
t.Run("cumsum_error", func(t *testing.T) {
stream := client.CumSum(context.Background())
if !expectSuccess { // server doesn't support HTTP/2
failNoHTTP2(t, stream)
return
if !fullDuplex {
t.Skip("transport doesn't support full-duplex streaming")
}
stream := client.CumSum(context.Background())
if err := stream.Send(&pingv1.CumSumRequest{Number: 42}); err != nil {
assert.ErrorIs(t, err, io.EOF)
assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown)
Expand All @@ -265,12 +263,11 @@ func TestServer(t *testing.T) {
assert.True(t, connect.IsWireError(err))
})
t.Run("cumsum_empty_stream", func(t *testing.T) {
if !fullDuplex {
t.Skip("transport doesn't support full-duplex streaming")
}
stream := client.CumSum(context.Background())
stream.RequestHeader().Set(clientHeader, headerValue)
if !expectSuccess { // server doesn't support HTTP/2
failNoHTTP2(t, stream)
return
}
// Deliberately closing with calling Send to test the behavior of Receive.
// This test case is based on the grpc interop tests.
assert.Nil(t, stream.CloseRequest())
Expand All @@ -281,14 +278,12 @@ func TestServer(t *testing.T) {
assert.Nil(t, stream.CloseResponse()) // clean-up the stream
})
t.Run("cumsum_cancel_after_first_response", func(t *testing.T) {
if !fullDuplex {
t.Skip("transport doesn't support full-duplex streaming")
}
ctx, cancel := context.WithCancel(context.Background())
stream := client.CumSum(ctx)
stream.RequestHeader().Set(clientHeader, headerValue)
if !expectSuccess { // server doesn't support HTTP/2
failNoHTTP2(t, stream)
cancel()
return
}
var got []int64
expect := []int64{42}
if err := stream.Send(&pingv1.CumSumRequest{Number: 42}); err != nil {
Expand All @@ -306,13 +301,11 @@ func TestServer(t *testing.T) {
assert.Nil(t, stream.CloseResponse())
})
t.Run("cumsum_cancel_before_send", func(t *testing.T) {
if !fullDuplex {
t.Skip("transport doesn't support full-duplex streaming")
}
ctx, cancel := context.WithCancel(context.Background())
stream := client.CumSum(ctx)
if !expectSuccess { // server doesn't support HTTP/2
failNoHTTP2(t, stream)
cancel()
return
}
stream.RequestHeader().Set(clientHeader, headerValue)
assert.Nil(t, stream.Send(&pingv1.CumSumRequest{Number: 8}))
cancel()
Expand All @@ -324,6 +317,29 @@ func TestServer(t *testing.T) {
assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
})
t.Run("cumsum_half_duplex", func(t *testing.T) {
// This test is for HTTP1, which doesn't support full-duplex streaming.
// We expect the stream to error after the first response.
if fullDuplex {
t.Skip("transport supports full-duplex streaming")
}
stream := client.CumSum(context.Background())
stream.RequestHeader().Set(clientHeader, headerValue)
err := stream.Send(&pingv1.CumSumRequest{Number: 1})
assert.Nil(t, err)

response, err := stream.Receive()
assert.Nil(t, err)
assert.NotNil(t, response)
assert.Equal(t, response.GetSum(), 1)

// Stream must now error as HTTP1 doesn't support full-duplex.
err = stream.Send(&pingv1.CumSumRequest{Number: 2})
assert.ErrorIs(t, err, io.EOF)

assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
})
}
testErrors := func(t *testing.T, client pingv1connect.PingServiceClient) { //nolint:thelper
assertIsHTTPMiddlewareError := func(tb testing.TB, err error) {
Expand Down Expand Up @@ -373,14 +389,14 @@ func TestServer(t *testing.T) {
assertIsHTTPMiddlewareError(t, stream.Err())
})
}
testMatrix := func(t *testing.T, client *http.Client, url string, bidi bool) { //nolint:thelper
testMatrix := func(t *testing.T, client *http.Client, url string, fullDuplex bool) { //nolint:thelper
run := func(t *testing.T, opts ...connect.ClientOption) {
t.Helper()
client := pingv1connect.NewPingServiceClient(client, url, opts...)
testPing(t, client)
testSum(t, client)
testCountUp(t, client)
testCumSum(t, client, bidi)
testCumSum(t, client, fullDuplex)
testErrors(t, client)
}
t.Run("connect", func(t *testing.T) {
Expand Down Expand Up @@ -486,13 +502,13 @@ func TestServer(t *testing.T) {
t.Parallel()
server := memhttptest.NewServer(t, mux)
client := &http.Client{Transport: server.TransportHTTP1()}
testMatrix(t, client, server.URL(), false /* bidi */)
testMatrix(t, client, server.URL(), false /* full-duplex */)
})
t.Run("http2", func(t *testing.T) {
t.Parallel()
server := memhttptest.NewServer(t, mux)
client := server.Client()
testMatrix(t, client, server.URL(), true /* bidi */)
testMatrix(t, client, server.URL(), true /* full-duplex */)
})
}

Expand Down Expand Up @@ -946,35 +962,6 @@ func TestUnavailableIfHostInvalid(t *testing.T) {
assert.Equal(t, connect.CodeOf(err), connect.CodeUnavailable)
}

func TestBidiRequiresHTTP2(t *testing.T) {
t.Parallel()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/connect+proto")
_, err := io.WriteString(w, "hello world")
assert.Nil(t, err)
})
server := memhttptest.NewServer(t, handler)
client := pingv1connect.NewPingServiceClient(
&http.Client{Transport: server.TransportHTTP1()},
server.URL(),
)
stream := client.CumSum(context.Background())
// Stream creates an async request, can error on Send or Receive.
if err := stream.Send(&pingv1.CumSumRequest{}); err != nil {
assert.ErrorIs(t, err, io.EOF)
}
assert.Nil(t, stream.CloseRequest())
_, err := stream.Receive()
assert.NotNil(t, err)
var connectErr *connect.Error
assert.True(t, errors.As(err, &connectErr))
assert.Equal(t, connectErr.Code(), connect.CodeUnimplemented)
assert.True(
t,
strings.HasSuffix(connectErr.Message(), ": bidi streams require at least HTTP/2"),
)
}

func TestCompressMinBytesClient(t *testing.T) {
t.Parallel()
assertContentType := func(tb testing.TB, text, expect string) {
Expand Down Expand Up @@ -2180,15 +2167,21 @@ func TestBidiOverHTTP1(t *testing.T) {
&http.Client{Transport: server.TransportHTTP1()},
server.URL(),
)
// Create a BIDI stream, after receiving a response the stream should be
// closed as HTTP1 doesn't support full-duplex.
stream := client.CumSum(context.Background())
// Stream creates an async request, can error on Send or Receive.
if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil {
assert.ErrorIs(t, err, io.EOF)
}
_, err := stream.Receive()
assert.NotNil(t, err)
assert.Equal(t, connect.CodeOf(err), connect.CodeUnknown)
assert.Equal(t, err.Error(), "unknown: HTTP status 505 HTTP Version Not Supported")
err := stream.Send(&pingv1.CumSumRequest{Number: 1})
assert.Nil(t, err)

response, err := stream.Receive()
assert.Nil(t, err)
assert.NotNil(t, response)
assert.Equal(t, response.GetSum(), 1)

// Stream must now error as HTTP1 doesn't support full-duplex.
err = stream.Send(&pingv1.CumSumRequest{Number: 2})
assert.ErrorIs(t, err, io.EOF)

assert.Nil(t, stream.CloseRequest())
assert.Nil(t, stream.CloseResponse())
}
Expand Down Expand Up @@ -2771,23 +2764,6 @@ func (p *pluggablePingServer) CumSum(
return p.cumSum(ctx, stream)
}

func failNoHTTP2(tb testing.TB, stream *connect.BidiStreamForClient[pingv1.CumSumRequest, pingv1.CumSumResponse]) {
tb.Helper()
if err := stream.Send(&pingv1.CumSumRequest{}); err != nil {
assert.ErrorIs(tb, err, io.EOF)
assert.Equal(tb, connect.CodeOf(err), connect.CodeUnknown)
}
assert.Nil(tb, stream.CloseRequest())
_, err := stream.Receive()
assert.NotNil(tb, err) // should be 505
assert.True(
tb,
strings.Contains(err.Error(), "HTTP status 505"),
assert.Sprintf("expected 505, got %v", err),
)
assert.Nil(tb, stream.CloseResponse())
}

func expectClientHeader(check bool, req connect.AnyRequest) error {
if !check {
return nil
Expand Down
13 changes: 3 additions & 10 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,16 +332,9 @@ func (d *duplexHTTPCall) makeRequest() {
_ = d.CloseWrite()
return
}
if (d.streamType&StreamTypeBidi) == StreamTypeBidi && response.ProtoMajor < 2 {
// If we somehow dialed an HTTP/1.x server, fail with an explicit message
// rather than returning a more cryptic error later on.
d.responseErr = errorf(
CodeUnimplemented,
"response from %v is HTTP/%d.%d: bidi streams require at least HTTP/2",
d.request.URL,
response.ProtoMajor,
response.ProtoMinor,
)
if response.ProtoMajor < 2 {
// HTTP/1.x doesn't support bidirectional streaming. We need to close the
// write side of the stream before we can read from the response body.
_ = d.CloseWrite()
}
}
Expand Down
9 changes: 0 additions & 9 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,6 @@ func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Re
// EOF: the stream we construct later on already does that, and we only
// return early when dealing with misbehaving clients. In those cases, it's
// okay if we can't re-use the connection.
isBidi := (h.spec.StreamType & StreamTypeBidi) == StreamTypeBidi
if isBidi && request.ProtoMajor < 2 {
// Clients coded to expect full-duplex connections may hang if they've
// mistakenly negotiated HTTP/1.1. To unblock them, we must close the
// underlying TCP connection.
responseWriter.Header().Set("Connection", "close")
responseWriter.WriteHeader(http.StatusHTTPVersionNotSupported)
return
}

protocolHandlers := h.protocolHandlers[request.Method]
if len(protocolHandlers) == 0 {
Expand Down