Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release memory where possible in RestElasticSearchClient #4685

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ByteArrayEntity;
Expand Down Expand Up @@ -50,17 +48,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -426,28 +423,41 @@ class RequestBytes {
@VisibleForTesting
int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
serializedSize += 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
serializedSize += this.requestSource.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
serializedSize += 1; //For follow-up NEW_LINE_BYTES
Comment on lines +426 to +429
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed some formatting that I missed previously

}
return serializedSize;
}

private void writeTo(OutputStream outputStream) throws IOException {
outputStream.write(this.requestBytes);
outputStream.write(NEW_LINE_BYTES);
private int writeTo(byte[] target, int initialOffset) {
int offset = initialOffset;
System.arraycopy(this.requestBytes, 0, target, offset, this.requestBytes.length);
offset += this.requestBytes.length;
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length);
offset += NEW_LINE_BYTES.length;
if (this.requestSource != null) {
outputStream.write(requestSource);
outputStream.write(NEW_LINE_BYTES);
System.arraycopy(this.requestSource, 0, target, offset, this.requestSource.length);
offset += this.requestSource.length;
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length);
offset += NEW_LINE_BYTES.length;
}
return offset;
}
}

private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) {
int totalBytes = requests.stream().mapToInt(RequestBytes::getSerializedSize).sum();
//By making a singular array we copy into avoids any dynamically expanded growth of the array that may overshoot
//how much memory we actually need, additionally it also avoids a final copy at the end normally done by
//ByteArrayOutputStream's toByteArray()
byte[] bytes = new byte[totalBytes];
int offset = 0;
for (final RequestBytes request : requests) {
request.writeTo(outputStream);
//We can't remove the element from the collection like we do elsewhere, because we need to retain the
//serialized form in case of an error so the error can be paired to the originating request based on index
offset = request.writeTo(bytes, offset);
Comment on lines +434 to +460
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing point 3 in #4684

}

final StringBuilder bulkRequestQueryParameters = new StringBuilder();
Expand All @@ -458,7 +468,7 @@ private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests,
APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh);
}
final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters;
return Pair.with(bulkRequestPath, outputStream.toByteArray());
return Pair.with(bulkRequestPath, bytes);
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
Expand Down Expand Up @@ -490,14 +500,20 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//There is no "correct" number of actions to perform in a single bulk request. Experiment with different
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;
private final ListIterator<RequestBytes> requestIterator;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to expose being able to set the element to null, which isn't exposed in PeekingIterator so took on the peek() behavior here, and switched to ListIterator

private final int[] exceptionallyLargeRequests;
private RequestBytes peeked;

@VisibleForTesting
BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
List<Integer> requestSizesThatWereTooLarge = new ArrayList<>();
for (ElasticSearchMutation request : requests) {
ListIterator<ElasticSearchMutation> requestsIter = requests.listIterator();
while (requestsIter.hasNext()) {
ElasticSearchMutation request = requestsIter.next();
//Remove the element from the collection so the collection's reference to it doesn't hold it from being
//GC'ed after it has been converted to its serialized form
requestsIter.set(null);
Comment on lines +511 to +516
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing point 1 in #4684

Copy link
Contributor Author

@criminosis criminosis Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mutating the List that is passed in, which will then be exposed to the caller since we can't take ownership of the List in Java in this context. However from reviewing the call sites, they create the list within their function scopes, and don't do anything with it after passing it into here so this should be "safe", but mutating a passed in list is sometimes frowned upon in Java, so I wanted to call that out.

Call Hierarchy analysis by IntelliJ finds:

In each case the passed in List is created in that callsite and isn't mutated, or even read, after being passed to here, at least at the time of this writing.

RequestBytes requestBytes = new RequestBytes(request);
int requestSerializedSize = requestBytes.getSerializedSize();
if (requestSerializedSize <= bulkChunkSerializedLimitBytes) {
Expand All @@ -507,7 +523,7 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
requestSizesThatWereTooLarge.add(requestSerializedSize);
}
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
this.requestIterator = serializedRequests.listIterator();
//Condense request sizes that are too large into an int array to remove Boxed & List memory overhead
this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.isEmpty() ? null :
requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray();
Expand All @@ -517,20 +533,31 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
public boolean hasNext() {
//Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted
//This allows next() to throw after all well sized requests have been chunked for submission
return requestIterator.hasNext() || exceptionallyLargeRequests != null;
return peeked != null || requestIterator.hasNext() || exceptionallyLargeRequests != null;
}

@Override
public List<RequestBytes> next() {
List<RequestBytes> serializedRequests = new ArrayList<>();
int chunkSerializedTotal = 0;
while (requestIterator.hasNext()) {
RequestBytes peeked = requestIterator.peek();
//If we peeked at something but stopped on it, then add it to this list
if (peeked != null) {
chunkSerializedTotal += peeked.getSerializedSize();
serializedRequests.add(peeked);
peeked = null;
}

while (requestIterator.hasNext()) {
RequestBytes next = requestIterator.next();
//Remove the element from the collection, so the iterator doesn't prevent it from being GC'ed
//due to the reference to it in the collection
requestIterator.set(null);
Copy link
Contributor Author

@criminosis criminosis Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing point 2 in #4684

chunkSerializedTotal += next.getSerializedSize();
if (chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
serializedRequests.add(requestIterator.next());
serializedRequests.add(next);
} else {
//Adding this element would exceed the limit, so return the chunk
this.peeked = next;
return serializedRequests;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -152,10 +154,11 @@ public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException {
//This payload is too large to send given the set limit, since it is a single item we can't split it
IntStream.range(0, bulkLimit * 10).forEach(value -> payloadBuilder.append("a"));
Assertions.assertThrows(IllegalArgumentException.class, () -> restClientUnderTest.bulkRequest(
Collections.singletonList(
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id",
Collections.singletonMap("someKey", payloadBuilder.toString()))
), null), "Should have thrown due to bulk request item being too large");
Stream.of(
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id",
Collections.singletonMap("someKey", payloadBuilder.toString())))
.collect(Collectors.toList()),
null), "Should have thrown due to bulk request item being too large");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ public void testRetryOnConfiguredErrorStatus() throws IOException {
when(restClientMock.performRequest(any()))
.thenThrow(responseException)
.thenThrow(expectedFinalException);
restClientUnderTest.bulkRequest(Collections.singletonList(
ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")),
restClientUnderTest.bulkRequest(
Stream.of(
ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id"))
.collect(Collectors.toList()),
null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception actualException) {
Expand Down Expand Up @@ -173,8 +175,10 @@ public void testRetriesExhaustedReturnsLastRetryException() throws IOException {
.thenThrow(responseException);


restClientUnderTest.bulkRequest(Collections.singletonList(
ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")),
restClientUnderTest.bulkRequest(
Stream.of(
ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id"))
.collect(Collectors.toList()),
null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception e) {
Expand All @@ -194,8 +198,9 @@ public void testNonRetryErrorCodeException() throws IOException {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
.thenThrow(responseException);
restClientUnderTest.bulkRequest(Collections.singletonList(
ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")),
restClientUnderTest.bulkRequest(
Stream.of(ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")).
collect(Collectors.toList()),
null);
Assertions.fail("Should have thrown the expected exception");
} catch (Exception e) {
Expand Down
Loading