diff --git a/src/main/java/software/amazon/awssdk/crt/http/Http1StreamManager.java b/src/main/java/software/amazon/awssdk/crt/http/Http1StreamManager.java new file mode 100644 index 000000000..e2d371ba4 --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/Http1StreamManager.java @@ -0,0 +1,165 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.amazon.awssdk.crt.http; + +import software.amazon.awssdk.crt.CrtRuntimeException; + +import java.util.concurrent.CompletableFuture; + +/** + * Manages a Pool of HTTP/1.1 Streams. Creates and manages HTTP/1.1 connections + * under the hood. Will grab a connection from HttpClientConnectionManager to + * make request on it, and will return it back until the request finishes. + */ +public class Http1StreamManager implements AutoCloseable { + + private HttpClientConnectionManager connectionManager = null; + + /** + * Factory function for Http1StreamManager instances + * + * @param options the connection manager options configure to connection manager under the hood + * @return a new instance of an Http1StreamManager + */ + public static Http1StreamManager create(HttpClientConnectionManagerOptions options) { + return new Http1StreamManager(options); + } + + private Http1StreamManager(HttpClientConnectionManagerOptions options) { + this.connectionManager = HttpClientConnectionManager.create(options); + } + + public CompletableFuture getShutdownCompleteFuture() { + return this.connectionManager.getShutdownCompleteFuture(); + } + + /** + * Request an HTTP/1.1 HttpStream from StreamManager. + * + * @param request HttpRequest. The Request to make to the Server. + * @param streamHandler HttpStreamResponseHandler. The Stream Handler to be called from the Native EventLoop + * @return A future for a HttpStream that will be completed when the stream is + * acquired. + * @throws CrtRuntimeException Exception happens from acquiring stream. + */ + public CompletableFuture acquireStream(HttpRequest request, + HttpStreamResponseHandler streamHandler) { + CompletableFuture completionFuture = new CompletableFuture<>(); + HttpClientConnectionManager connManager = this.connectionManager; + this.connectionManager.acquireConnection().whenComplete((conn, throwable) -> { + if (throwable != null) { + completionFuture.completeExceptionally(throwable); + } else { + try { + HttpStream stream = conn.makeRequest(request, new HttpStreamResponseHandler() { + @Override + public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + streamHandler.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders); + } + + @Override + public void onResponseHeadersDone(HttpStream stream, int blockType) { + streamHandler.onResponseHeadersDone(stream, blockType); + } + + @Override + public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { + return streamHandler.onResponseBody(stream, bodyBytesIn); + } + + @Override + public void onResponseComplete(HttpStream stream, int errorCode) { + streamHandler.onResponseComplete(stream, errorCode); + /* Release the connection back */ + connManager.releaseConnection(conn); + } + }); + completionFuture.complete(stream); + /* Active the stream for user */ + try { + stream.activate(); + } catch (CrtRuntimeException e) { + /* If activate failed, complete callback will not be invoked */ + streamHandler.onResponseComplete(stream, e.errorCode); + /* Release the connection back */ + connManager.releaseConnection(conn); + } + } catch (Exception ex) { + connManager.releaseConnection(conn); + completionFuture.completeExceptionally(ex); + } + } + }); + return completionFuture; + } + + + /** + * Request an HTTP/1.1 HttpStream from StreamManager. + * + * @param request HttpRequestBase. The Request to make to the Server. + * @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop + * @return A future for a HttpStreamBase that will be completed when the stream is + * acquired. + * @throws CrtRuntimeException Exception happens from acquiring stream. + */ + public CompletableFuture acquireStream(HttpRequestBase request, + HttpStreamBaseResponseHandler streamHandler) { + CompletableFuture completionFuture = new CompletableFuture<>(); + HttpClientConnectionManager connManager = this.connectionManager; + this.connectionManager.acquireConnection().whenComplete((conn, throwable) -> { + if (throwable != null) { + completionFuture.completeExceptionally(throwable); + } else { + try { + HttpStreamBase stream = conn.makeRequest(request, new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + streamHandler.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders); + } + + @Override + public void onResponseHeadersDone(HttpStreamBase stream, int blockType) { + streamHandler.onResponseHeadersDone(stream, blockType); + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { + return streamHandler.onResponseBody(stream, bodyBytesIn); + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + streamHandler.onResponseComplete(stream, errorCode); + /* Release the connection back */ + connManager.releaseConnection(conn); + } + }); + completionFuture.complete(stream); + /* Active the stream for user */ + try { + stream.activate(); + } catch (CrtRuntimeException e) { + /* If activate failed, complete callback will not be invoked */ + streamHandler.onResponseComplete(stream, e.errorCode); + /* Release the connection back */ + connManager.releaseConnection(conn); + } + } catch (Exception ex) { + connManager.releaseConnection(conn); + completionFuture.completeExceptionally(ex); + } + } + }); + return completionFuture; + } + + @Override + public void close() { + this.connectionManager.close(); + } +} diff --git a/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java b/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java index ae7f3f268..da0e30bc8 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java +++ b/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java @@ -155,9 +155,8 @@ public CompletableFuture acquireStream(HttpRequest request, return this.acquireStream((HttpRequestBase) request, streamHandler); } - private CompletableFuture acquireStream(HttpRequestBase request, - HttpStreamBaseResponseHandler streamHandler) { - + public CompletableFuture acquireStream(HttpRequestBase request, + HttpStreamBaseResponseHandler streamHandler) { CompletableFuture completionFuture = new CompletableFuture<>(); AsyncCallback acquireStreamCompleted = AsyncCallback.wrapFuture(completionFuture, null); if (isNull()) { diff --git a/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManagerOptions.java b/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManagerOptions.java index 3b34906a9..d65b0c982 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManagerOptions.java +++ b/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManagerOptions.java @@ -2,6 +2,7 @@ import java.util.List; import java.util.ArrayList; +import java.net.URI; /** * Contains all the configuration options for a Http2StreamManager @@ -12,6 +13,7 @@ public class Http2StreamManagerOptions { public static final int DEFAULT_MAX = Integer.MAX_VALUE; public static final int DEFAULT_MAX_CONNECTIONS = 2; public static final int DEFAULT_CONNECTION_PING_TIMEOUT_MS = 3000; + private static final String HTTPS = "https"; private HttpClientConnectionManagerOptions connectionManagerOptions; @@ -249,8 +251,9 @@ public void validateOptions() { throw new IllegalArgumentException("Connection manager options are required."); } connectionManagerOptions.validateOptions(); - if ((connectionManagerOptions.getTlsConnectionOptions() != null - || connectionManagerOptions.getTlsContext() != null) && priorKnowledge) { + URI uri = connectionManagerOptions.getUri(); + boolean useTls = HTTPS.equals(uri.getScheme()); + if (useTls && priorKnowledge) { throw new IllegalArgumentException("HTTP/2 prior knowledge cannot be set when TLS is used."); } if ((connectionManagerOptions.getTlsConnectionOptions() == null diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java new file mode 100644 index 000000000..2b6a7d3d2 --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java @@ -0,0 +1,144 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.amazon.awssdk.crt.http; + +import software.amazon.awssdk.crt.CrtRuntimeException; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Manages a pool for either HTTP/1.1 or HTTP/2 connection. + * + * Contains two stream manager for two protocols under the hood. + */ +public class HttpStreamManager implements AutoCloseable { + + private Http1StreamManager h1StreamManager = null; + private Http2StreamManager h2StreamManager = null; + private CompletableFuture shutdownComplete = null; + private AtomicLong shutdownNum = new AtomicLong(0); + private Throwable shutdownCompleteException = null; + + /** + * Factory function for HttpStreamManager instances + * + * @param options configuration options + * @return a new instance of an HttpStreamManager + */ + public static HttpStreamManager create(HttpStreamManagerOptions options) { + return new HttpStreamManager(options); + } + + private HttpStreamManager(HttpStreamManagerOptions options) { + this.shutdownComplete = new CompletableFuture(); + if (options.getExpectedProtocol() == HttpVersion.UNKNOWN) { + this.h1StreamManager = Http1StreamManager.create(options.getHTTP1ConnectionManagerOptions()); + this.h2StreamManager = Http2StreamManager.create(options.getHTTP2StreamManagerOptions()); + } else { + if (options.getExpectedProtocol() == HttpVersion.HTTP_2) { + this.h2StreamManager = Http2StreamManager.create(options.getHTTP2StreamManagerOptions()); + } else { + this.h1StreamManager = Http1StreamManager.create(options.getHTTP1ConnectionManagerOptions()); + } + /* Only one manager created. */ + this.shutdownNum.addAndGet(1); + } + if (this.h1StreamManager != null) { + this.h1StreamManager.getShutdownCompleteFuture().whenComplete((v, throwable) -> { + if (throwable != null) { + this.shutdownCompleteException = throwable; + } + long shutdownNum = this.shutdownNum.addAndGet(1); + if (shutdownNum == 2) { + /* both connectionManager and the h2StreamManager has been shutdown. */ + if (this.shutdownCompleteException != null) { + this.shutdownComplete.completeExceptionally(this.shutdownCompleteException); + } else { + this.shutdownComplete.complete(null); + } + } + }); + } + if (this.h2StreamManager != null) { + this.h2StreamManager.getShutdownCompleteFuture().whenComplete((v, throwable) -> { + if (throwable != null) { + this.shutdownCompleteException = throwable; + } + long shutdownNum = this.shutdownNum.addAndGet(1); + if (shutdownNum == 2) { + /* both connectionManager and the h2StreamManager has been shutdown. */ + if (this.shutdownCompleteException != null) { + this.shutdownComplete.completeExceptionally(this.shutdownCompleteException); + } else { + this.shutdownComplete.complete(null); + } + } + }); + } + } + + private void h1AcquireStream(HttpRequestBase request, + HttpStreamBaseResponseHandler streamHandler, CompletableFuture completionFuture) { + + this.h1StreamManager.acquireStream(request, streamHandler).whenComplete((stream, throwable) -> { + if (throwable != null) { + completionFuture.completeExceptionally(throwable); + } else { + completionFuture.complete(stream); + } + }); + } + + /** + * Request a HttpStream from StreamManager. If the streamManager is made with + * HTTP/2 connection under the hood, it will be Http2Stream. + * + * @param request HttpRequestBase. The Request to make to the Server. + * @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop + * @return A future for a Http2Stream that will be completed when the stream is + * acquired. + */ + public CompletableFuture acquireStream(HttpRequestBase request, + HttpStreamBaseResponseHandler streamHandler) { + CompletableFuture completionFuture = new CompletableFuture<>(); + if (this.h2StreamManager != null) { + this.h2StreamManager.acquireStream(request, streamHandler).whenComplete((stream, throwable) -> { + if (throwable != null) { + if (throwable instanceof CrtRuntimeException) { + CrtRuntimeException exception = (CrtRuntimeException) throwable; + if (exception.errorName.equals("AWS_ERROR_HTTP_STREAM_MANAGER_UNEXPECTED_HTTP_VERSION") && this.h1StreamManager != null) { + this.h1AcquireStream(request, streamHandler, completionFuture); + } else { + completionFuture.completeExceptionally(throwable); + } + } else { + completionFuture.completeExceptionally(throwable); + } + } else { + completionFuture.complete((Http2Stream) stream); + } + }); + return completionFuture; + } + this.h1AcquireStream(request, streamHandler, completionFuture); + return completionFuture; + } + + public CompletableFuture getShutdownCompleteFuture() { + return shutdownComplete; + } + + @Override + public void close() { + if (this.h1StreamManager != null) { + this.h1StreamManager.close(); + } + if (this.h2StreamManager != null) { + this.h2StreamManager.close(); + } + } +} diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManagerOptions.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManagerOptions.java new file mode 100644 index 000000000..8d8d7300b --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManagerOptions.java @@ -0,0 +1,89 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.amazon.awssdk.crt.http; + + +/** + * Contains all the configuration options for a Http2StreamManager + * instance + */ +public class HttpStreamManagerOptions { + + private HttpClientConnectionManagerOptions h1ConnectionManagerOptions; + private Http2StreamManagerOptions h2StreamManagerOptions; + /** + * The expected protocol for the stream manager. + * + * - UNKNOWN: Default to use HTTP/2, but if server returns an HTTP/1.1 connection back, fallback to the HTTP/1.1 pool + * - HTTP2: ONLY HTTP/2 + * - HTTP_1_1/HTTP_1_0: ONLY HTTP/1 and HTTP/1.1 + */ + private HttpVersion expectedProtocol; + + /** + * Default constructor + */ + public HttpStreamManagerOptions() { + this.expectedProtocol = HttpVersion.UNKNOWN; + } + + /** + * The connection manager options for the HTTP/1.1 stream manager. Controls the behavior for HTTP/1 connections. + * + * @param connectionManagerOptions The connection manager options for the underlying HTTP/1.1 stream manager + * @return this + */ + public HttpStreamManagerOptions withHTTP1ConnectionManagerOptions(HttpClientConnectionManagerOptions connectionManagerOptions) { + this.h1ConnectionManagerOptions = connectionManagerOptions; + return this; + } + + /** + * @return The connection manager options for the HTTP/1.1 stream manager. + */ + public HttpClientConnectionManagerOptions getHTTP1ConnectionManagerOptions() { + return h1ConnectionManagerOptions; + } + + /** + * The stream manager options for the HTTP/2 stream manager. Controls the behavior for HTTP/2 connections. + * + * @param streamManagerOptions The stream manager options for the underlying HTTP/2 stream manager + * @return this + */ + public HttpStreamManagerOptions withHTTP2StreamManagerOptions(Http2StreamManagerOptions streamManagerOptions) { + this.h2StreamManagerOptions = streamManagerOptions; + return this; + } + + /** + * @return The stream manager options for the HTTP/2 stream manager. + */ + public Http2StreamManagerOptions getHTTP2StreamManagerOptions() { + return h2StreamManagerOptions; + } + + /** + * The expected protocol for whole stream manager. Default to UNKNOWN. + * + * - UNKNOWN: Default to use HTTP/2, but if server returns an HTTP/1.1 connection back, fallback to the HTTP/1.1 pool + * - HTTP2: ONLY HTTP/2 + * - HTTP_1_1/HTTP_1_0: ONLY HTTP/1 and HTTP/1.1 + * + * @param expectedProtocol The stream manager options for the underlying HTTP/2 stream manager + * @return this + */ + public HttpStreamManagerOptions withExpectedProtocol(HttpVersion expectedProtocol) { + this.expectedProtocol = expectedProtocol; + return this; + } + + /** + * @return The expected protocol for whole stream manager + */ + public HttpVersion getExpectedProtocol() { + return expectedProtocol; + } +} diff --git a/src/test/java/software/amazon/awssdk/crt/test/Http2ClientLocalHostTest.java b/src/test/java/software/amazon/awssdk/crt/test/Http2ClientLocalHostTest.java index 3f7e08709..d14a9a980 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Http2ClientLocalHostTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Http2ClientLocalHostTest.java @@ -180,15 +180,6 @@ public void testParallelRequestsStressWithBody() throws Exception { URI uri = new URI("https://localhost:8443/uploadTest"); try (Http2StreamManager streamManager = createStreamManager(uri, 100)) { int numberToAcquire = 500 * 100; - if (CRT.getOSIdentifier() == "linux") { - /* - * Using Python hyper h2 server frame work, met a weird upload performance issue - * on Linux. Our client against nginx platform has not met the same issue. - * We assume it's because the server framework implementation. - * Use lower number of linux - */ - numberToAcquire = 500; - } int bodyLength = 2000; List> requestCompleteFutures = new ArrayList<>(); @@ -252,15 +243,6 @@ public void testRequestsUploadStress() throws Exception { URI uri = new URI("https://localhost:8443/uploadTest"); try (Http2StreamManager streamManager = createStreamManager(uri, 100)) { long bodyLength = 2500000000L; - if (CRT.getOSIdentifier() == "linux") { - /* - * Using Python hyper h2 server frame work, met a weird upload performance issue - * on Linux. Our client against nginx platform has not met the same issue. - * We assume it's because the server framework implementation. - * Use lower number of linux - */ - bodyLength = 250000000L; - } Http2Request request = createHttp2Request("PUT", uri, bodyLength); diff --git a/src/test/java/software/amazon/awssdk/crt/test/Http2RequestResponseTest.java b/src/test/java/software/amazon/awssdk/crt/test/Http2RequestResponseTest.java index 5e3f81a8f..c729e3c4a 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Http2RequestResponseTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Http2RequestResponseTest.java @@ -183,6 +183,11 @@ public void testHttp2ResetStream() throws Exception { @Override public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) { + } + + @Override + public void onResponseHeadersDone(HttpStreamBase stream, int blockType) { + /* Only invoke once */ Http2Stream h2Stream = (Http2Stream) stream; h2Stream.resetStream(Http2ErrorCode.INTERNAL_ERROR); } diff --git a/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseFixture.java b/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseFixture.java index 91c492d5f..7de144960 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseFixture.java +++ b/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseFixture.java @@ -43,9 +43,13 @@ protected class TestHttpResponse { int statusCode = -1; int blockType = -1; List headers = new ArrayList<>(); - ByteBuffer bodyBuffer = ByteBuffer.wrap(new byte[16 * 1024 * 1024]); // Allow up to 16 MB Responses + ByteBuffer bodyBuffer; int onCompleteErrorCode = -1; + public TestHttpResponse(){ + bodyBuffer = ByteBuffer.allocate(16 * 1024 * 1024); // Allow up to 16 MB Responses + } + public String getBody() { bodyBuffer.flip(); return UTF8.decode(bodyBuffer).toString(); diff --git a/src/test/java/software/amazon/awssdk/crt/test/HttpStreamManagerTest.java b/src/test/java/software/amazon/awssdk/crt/test/HttpStreamManagerTest.java new file mode 100644 index 000000000..13b141b13 --- /dev/null +++ b/src/test/java/software/amazon/awssdk/crt/test/HttpStreamManagerTest.java @@ -0,0 +1,301 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.amazon.awssdk.crt.test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; +import java.util.Arrays; + +import javax.management.RuntimeErrorException; + +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.CrtRuntimeException; +import software.amazon.awssdk.crt.http.HttpStreamManager; +import software.amazon.awssdk.crt.http.HttpStreamManagerOptions; +import software.amazon.awssdk.crt.http.Http2StreamManager; +import software.amazon.awssdk.crt.http.Http2Request; +import software.amazon.awssdk.crt.http.Http2Stream; +import software.amazon.awssdk.crt.http.Http2StreamManagerOptions; +import software.amazon.awssdk.crt.http.HttpClientConnection; +import software.amazon.awssdk.crt.http.HttpClientConnectionManager; +import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions; +import software.amazon.awssdk.crt.http.HttpHeader; +import software.amazon.awssdk.crt.http.HttpProxyOptions; +import software.amazon.awssdk.crt.http.HttpRequest; +import software.amazon.awssdk.crt.http.HttpRequestBase; +import software.amazon.awssdk.crt.http.HttpRequestBodyStream; +import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; +import software.amazon.awssdk.crt.http.HttpVersion; +import software.amazon.awssdk.crt.http.HttpStream; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; +import software.amazon.awssdk.crt.io.ClientBootstrap; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; +import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.io.TlsContext; +import software.amazon.awssdk.crt.io.TlsContextOptions; +import software.amazon.awssdk.crt.utils.ByteBufferUtils; +import software.amazon.awssdk.crt.Log; +import software.amazon.awssdk.crt.Log.LogLevel; + +public class HttpStreamManagerTest extends HttpRequestResponseFixture { + private final static String endpoint = "https://httpbin.org"; + private final static String path = "/anything"; + private final String EMPTY_BODY = ""; + private final static int NUM_CONNECTIONS = 20; + private final static Charset UTF8 = StandardCharsets.UTF_8; + private final static int EXPECTED_HTTP_STATUS = 200; + + private HttpStreamManager createStreamManager(URI uri, int numConnections, HttpVersion expectedVersion) { + + try (EventLoopGroup eventLoopGroup = new EventLoopGroup(1); + HostResolver resolver = new HostResolver(eventLoopGroup); + ClientBootstrap bootstrap = new ClientBootstrap(eventLoopGroup, resolver); + SocketOptions sockOpts = new SocketOptions(); + TlsContextOptions tlsOpts = expectedVersion == HttpVersion.HTTP_2 + ? TlsContextOptions.createDefaultClient().withAlpnList("h2") + : TlsContextOptions.createDefaultClient().withAlpnList("http/1.1"); + TlsContext tlsContext = createHttpClientTlsContext(tlsOpts)) { + Http2StreamManagerOptions h2Options = new Http2StreamManagerOptions(); + HttpClientConnectionManagerOptions h1Options = new HttpClientConnectionManagerOptions(); + h1Options.withClientBootstrap(bootstrap) + .withSocketOptions(sockOpts) + .withTlsContext(tlsContext) + .withUri(uri) + .withMaxConnections(numConnections); + h2Options.withConnectionManagerOptions(h1Options); + HttpStreamManagerOptions options = new HttpStreamManagerOptions(); + options.withHTTP1ConnectionManagerOptions(h1Options) + .withHTTP2StreamManagerOptions(h2Options) + .withExpectedProtocol(expectedVersion); + + return HttpStreamManager.create(options); + } + } + + private Http2Request createHttp2Request(String method, String endpoint, String path, String requestBody) + throws Exception { + URI uri = new URI(endpoint); + HttpHeader[] requestHeaders = new HttpHeader[] { + new HttpHeader(":method", method), + new HttpHeader(":path", path), + new HttpHeader(":scheme", uri.getScheme()), + new HttpHeader(":authority", uri.getHost()), + new HttpHeader("content-length", Integer.toString(requestBody.getBytes(UTF8).length)) + }; + final ByteBuffer payload = ByteBuffer.wrap(requestBody.getBytes()); + HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() { + @Override + public boolean sendRequestBody(ByteBuffer outBuffer) { + ByteBufferUtils.transferData(payload, outBuffer); + return payload.remaining() == 0; + } + + @Override + public boolean resetPosition() { + return true; + } + + @Override + public long getLength() { + return payload.capacity(); + } + }; + Http2Request request = new Http2Request(requestHeaders, payloadStream); + + return request; + } + + private HttpRequest createHttp1Request(String method, String endpoint, String path, String requestBody) + throws Exception { + URI uri = new URI(endpoint); + HttpHeader[] requestHeaders = new HttpHeader[] { + new HttpHeader("host", uri.getHost()), + new HttpHeader("content-length", Integer.toString(requestBody.getBytes(UTF8).length)) + }; + final ByteBuffer payload = ByteBuffer.wrap(requestBody.getBytes()); + HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() { + @Override + public boolean sendRequestBody(ByteBuffer outBuffer) { + ByteBufferUtils.transferData(payload, outBuffer); + return payload.remaining() == 0; + } + + @Override + public boolean resetPosition() { + return true; + } + + @Override + public long getLength() { + return payload.capacity(); + } + }; + return new HttpRequest(method, path, requestHeaders, payloadStream); + } + + private TestHttpResponse getResponseFromManager(HttpStreamManager streamManager, HttpRequestBase request) + throws Exception { + + final CompletableFuture reqCompleted = new CompletableFuture<>(); + + final TestHttpResponse response = new TestHttpResponse(); + + try { + HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + response.statusCode = responseStatusCode; + Assert.assertEquals(responseStatusCode, stream.getResponseStatusCode()); + response.headers.addAll(Arrays.asList(nextHeaders)); + } + + @Override + public void onResponseHeadersDone(HttpStreamBase stream, int blockType) { + response.blockType = blockType; + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { + int amountRead = bodyBytesIn.length; + + // Slide the window open by the number of bytes just read + return amountRead; + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + response.onCompleteErrorCode = errorCode; + reqCompleted.complete(null); + stream.close(); + } + }; + streamManager.acquireStream(request, streamHandler).get(60, TimeUnit.SECONDS); + // Give the request up to 60 seconds to complete, otherwise throw a + // TimeoutException + reqCompleted.get(60, TimeUnit.SECONDS); + } catch (Exception e) { + throw e; + } + + return response; + + } + + @Test + public void testSanitizerHTTP1() throws Exception { + URI uri = new URI(endpoint); + CompletableFuture shutdownComplete = null; + try (HttpStreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS, HttpVersion.HTTP_1_1)) { + shutdownComplete = streamManager.getShutdownCompleteFuture(); + } + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + public void testSanitizerHTTP2() throws Exception { + URI uri = new URI(endpoint); + CompletableFuture shutdownComplete = null; + try (HttpStreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS, HttpVersion.HTTP_2)) { + shutdownComplete = streamManager.getShutdownCompleteFuture(); + } + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + public void testSingleHTTP2Requests() throws Exception { + URI uri = new URI(endpoint); + CompletableFuture shutdownComplete = null; + try (HttpStreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS, HttpVersion.HTTP_2)) { + shutdownComplete = streamManager.getShutdownCompleteFuture(); + Http2Request request = createHttp2Request("GET", endpoint, path, EMPTY_BODY); + TestHttpResponse response = this.getResponseFromManager(streamManager, request); + Assert.assertEquals(response.statusCode, EXPECTED_HTTP_STATUS); + } + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + public void testSingleHTTP1Request() throws Throwable { + URI uri = new URI(endpoint); + CompletableFuture shutdownComplete = null; + try (HttpStreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS, HttpVersion.HTTP_1_1)) { + shutdownComplete = streamManager.getShutdownCompleteFuture(); + HttpRequest request = createHttp1Request("GET", endpoint, path, EMPTY_BODY); + TestHttpResponse response = this.getResponseFromManager(streamManager, request); + Assert.assertEquals(response.statusCode, EXPECTED_HTTP_STATUS); + } + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + /* + * Create HTTP/1.1 stream manager, with HTTP/2 request, which should fail with + * invalid header name. Make sure the exception pops out and everything clean up + * correctly. + */ + @Test + public void testSingleHTTP1RequestsFailure() throws Throwable { + URI uri = new URI(endpoint); + CompletableFuture shutdownComplete = null; + try (HttpStreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS, HttpVersion.HTTP_1_1)) { + /* + * http2 request which will have :method headers that is not allowed for + * HTTP/1.1 + */ + Http2Request request = createHttp2Request("GET", endpoint, path, EMPTY_BODY); + shutdownComplete = streamManager.getShutdownCompleteFuture(); + TestHttpResponse response = this.getResponseFromManager(streamManager, request); + Assert.assertEquals(response.statusCode, EXPECTED_HTTP_STATUS); + } catch (ExecutionException e) { + try { + throw e.getCause(); + } catch (CrtRuntimeException causeException) { + /** + * Assert the exceptions are set correctly. + */ + Assert.assertTrue(causeException.errorName.equals("AWS_ERROR_HTTP_INVALID_HEADER_NAME")); + } + } + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + /** + * TODO: test unknown protocol version behave right. + */ +}