Skip to content

Commit

Permalink
Release memory where possible in RestElasticSearchClient
Browse files Browse the repository at this point in the history
Closes #4684

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis committed Sep 27, 2024
1 parent 4a576f6 commit 66ccdc8
Showing 1 changed file with 49 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ByteArrayEntity;
Expand Down Expand Up @@ -50,17 +48,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -426,28 +423,41 @@ class RequestBytes {
@VisibleForTesting
int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
serializedSize += 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
serializedSize += this.requestSource.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
serializedSize += 1; //For follow-up NEW_LINE_BYTES
}
return serializedSize;
}

private void writeTo(OutputStream outputStream) throws IOException {
outputStream.write(this.requestBytes);
outputStream.write(NEW_LINE_BYTES);
private int writeTo(byte[] target, int initialOffset) {
int offset = initialOffset;
System.arraycopy(this.requestBytes, 0, target, offset, this.requestBytes.length);
offset += this.requestBytes.length;
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length);
offset += NEW_LINE_BYTES.length;
if (this.requestSource != null) {
outputStream.write(requestSource);
outputStream.write(NEW_LINE_BYTES);
System.arraycopy(this.requestSource, 0, target, offset, this.requestSource.length);
offset += this.requestSource.length;
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length);
offset += NEW_LINE_BYTES.length;
}
return offset;
}
}

private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) {
int totalBytes = requests.stream().mapToInt(RequestBytes::getSerializedSize).sum();
//By making a singular array we copy into avoids any dynamically expanded growth of the array that may overshoot
//how much memory we actually need, additionally it also avoids a final copy at the end normally done by
//ByteArrayOutputStream's toByteArray()
byte[] bytes = new byte[totalBytes];
int offset = 0;
for (final RequestBytes request : requests) {
request.writeTo(outputStream);
//We can't remove the element from the collection like we do elsewhere, because we need to retain the
//serialized form in case of an error so the error can be paired to the originating request based on index
offset = request.writeTo(bytes, offset);
}

final StringBuilder bulkRequestQueryParameters = new StringBuilder();
Expand All @@ -458,7 +468,7 @@ private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests,
APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh);
}
final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters;
return Pair.with(bulkRequestPath, outputStream.toByteArray());
return Pair.with(bulkRequestPath, bytes);
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
Expand Down Expand Up @@ -490,14 +500,20 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//There is no "correct" number of actions to perform in a single bulk request. Experiment with different
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;
private final ListIterator<RequestBytes> requestIterator;
private final int[] exceptionallyLargeRequests;
private RequestBytes peeked;

@VisibleForTesting
BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
List<Integer> requestSizesThatWereTooLarge = new ArrayList<>();
for (ElasticSearchMutation request : requests) {
ListIterator<ElasticSearchMutation> requestsIter = requests.listIterator();
while (requestsIter.hasNext()) {
ElasticSearchMutation request = requestsIter.next();
//Remove the element from the collection so the collection's reference to it doesn't hold it from being
//GC'ed after it has been converted to its serialized form
requestsIter.set(null);
RequestBytes requestBytes = new RequestBytes(request);
int requestSerializedSize = requestBytes.getSerializedSize();
if (requestSerializedSize <= bulkChunkSerializedLimitBytes) {
Expand All @@ -507,7 +523,7 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
requestSizesThatWereTooLarge.add(requestSerializedSize);
}
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
this.requestIterator = serializedRequests.listIterator();
//Condense request sizes that are too large into an int array to remove Boxed & List memory overhead
this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.isEmpty() ? null :
requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray();
Expand All @@ -517,20 +533,31 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
public boolean hasNext() {
//Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted
//This allows next() to throw after all well sized requests have been chunked for submission
return requestIterator.hasNext() || exceptionallyLargeRequests != null;
return peeked != null || requestIterator.hasNext() || exceptionallyLargeRequests != null;
}

@Override
public List<RequestBytes> next() {
List<RequestBytes> serializedRequests = new ArrayList<>();
int chunkSerializedTotal = 0;
while (requestIterator.hasNext()) {
RequestBytes peeked = requestIterator.peek();
//If we peeked at something but stopped on it, then add it to this list
if (peeked != null) {
chunkSerializedTotal += peeked.getSerializedSize();
serializedRequests.add(peeked);
peeked = null;
}

while (requestIterator.hasNext()) {
RequestBytes next = requestIterator.next();
//Remove the element from the collection, so the iterator doesn't prevent it from being GC'ed
//due to the reference to it in the collection
requestIterator.set(null);
chunkSerializedTotal += next.getSerializedSize();
if (chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
serializedRequests.add(requestIterator.next());
serializedRequests.add(next);
} else {
//Adding this element would exceed the limit, so return the chunk
this.peeked = next;
return serializedRequests;
}
}
Expand Down

0 comments on commit 66ccdc8

Please sign in to comment.