A DESCRIPTION OF THE PROBLEM : When using ByteArrayPublisher for sending out HttpRequest data, asynchronously / from multiple threads, reusing the same instance of HttpRequest built with ByteArrayPublisher, some requests would be broken because publisher is not thread safe and may wrongly reuse same instance of delegate for multiple requests. Root cause is in the following lines: @Override public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { List<ByteBuffer> copy = copy(content, offset, length); >> this.delegate << = new PullPublisher<>(copy); delegate.subscribe(subscriber); } delegate should not have been assigned to a member since a context-switch between delegate assignment and call to subscribe, might cause later delegate to be wrongly reused. Instead it should have simply been a local variable. STEPS TO FOLLOW TO REPRODUCE THE PROBLEM : Run test code, several times. We've observed failure every 4-5 runs. EXPECTED VERSUS ACTUAL BEHAVIOR : EXPECTED - Program completes without an issue ACTUAL - At times, the following is being reported: java.util.concurrent.CompletionException: java.io.IOException: SSLTube(SocketTube(76)) [HttpClient-1-Worker-79] Too few bytes returned by the publisher (0/1) ---------- BEGIN SOURCE ---------- import java.net.URI; import java.net.URISyntaxException; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; public class SimpleRunner { private static final HttpRequest.BodyPublisher BODY_PUBLISHER = HttpRequest.BodyPublishers.ofByteArray("a".getBytes()); public static void main(String[] args) throws Exception { HttpClient client = createClient(); Collection<CompletableFuture> futures = new ArrayList<>(); for (int i=0;i<100;i++) { futures.add(client.sendAsync(createRequest(), HttpResponse.BodyHandlers.discarding()).exceptionally(t -> {t.printStackTrace(); return null;})); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } private static HttpRequest createRequest() throws URISyntaxException { HttpRequest.Builder builder = HttpRequest.newBuilder(new URI("https://www.oracle.com")) .method("POST", BODY_PUBLISHER) .version(HttpClient.Version.HTTP_1_1); builder.header("content-type", "text/plain"); return builder.build(); } private static HttpClient createClient() { return HttpClient.newBuilder() .version(HttpClient.Version.HTTP_1_1) .build(); } } ---------- END SOURCE ---------- CUSTOMER SUBMITTED WORKAROUND : Use HttpRequest.BodyPublishers.ofByteArrays instead replacing a single line of code: private static final HttpRequest.BodyPublisher BODY_PUBLISHER = HttpRequest.BodyPublishers.ofByteArrays(Collections.singletonList("a".getBytes())); FREQUENCY : occasionally
|