-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Release memory where possible in RestElasticSearchClient #4685
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,6 @@ | |
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.collect.ImmutableMap; | ||
import com.google.common.collect.Iterators; | ||
import com.google.common.collect.PeekingIterator; | ||
import org.apache.http.HttpEntity; | ||
import org.apache.http.HttpStatus; | ||
import org.apache.http.entity.ByteArrayEntity; | ||
|
@@ -50,17 +48,16 @@ | |
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.ListIterator; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
|
@@ -426,28 +423,41 @@ class RequestBytes { | |
@VisibleForTesting | ||
int getSerializedSize() { | ||
int serializedSize = this.requestBytes.length; | ||
serializedSize+= 1; //For follow-up NEW_LINE_BYTES | ||
serializedSize += 1; //For follow-up NEW_LINE_BYTES | ||
if (this.requestSource != null) { | ||
serializedSize += this.requestSource.length; | ||
serializedSize+= 1; //For follow-up NEW_LINE_BYTES | ||
serializedSize += 1; //For follow-up NEW_LINE_BYTES | ||
} | ||
return serializedSize; | ||
} | ||
|
||
private void writeTo(OutputStream outputStream) throws IOException { | ||
outputStream.write(this.requestBytes); | ||
outputStream.write(NEW_LINE_BYTES); | ||
private int writeTo(byte[] target, int initialOffset) { | ||
int offset = initialOffset; | ||
System.arraycopy(this.requestBytes, 0, target, offset, this.requestBytes.length); | ||
offset += this.requestBytes.length; | ||
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length); | ||
offset += NEW_LINE_BYTES.length; | ||
if (this.requestSource != null) { | ||
outputStream.write(requestSource); | ||
outputStream.write(NEW_LINE_BYTES); | ||
System.arraycopy(this.requestSource, 0, target, offset, this.requestSource.length); | ||
offset += this.requestSource.length; | ||
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length); | ||
offset += NEW_LINE_BYTES.length; | ||
} | ||
return offset; | ||
} | ||
} | ||
|
||
private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) throws IOException { | ||
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); | ||
private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) { | ||
int totalBytes = requests.stream().mapToInt(RequestBytes::getSerializedSize).sum(); | ||
//By making a singular array we copy into avoids any dynamically expanded growth of the array that may overshoot | ||
//how much memory we actually need, additionally it also avoids a final copy at the end normally done by | ||
//ByteArrayOutputStream's toByteArray() | ||
byte[] bytes = new byte[totalBytes]; | ||
int offset = 0; | ||
for (final RequestBytes request : requests) { | ||
request.writeTo(outputStream); | ||
//We can't remove the element from the collection like we do elsewhere, because we need to retain the | ||
//serialized form in case of an error so the error can be paired to the originating request based on index | ||
offset = request.writeTo(bytes, offset); | ||
Comment on lines
+434
to
+460
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressing point 3 in #4684 |
||
} | ||
|
||
final StringBuilder bulkRequestQueryParameters = new StringBuilder(); | ||
|
@@ -458,7 +468,7 @@ private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, | |
APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh); | ||
} | ||
final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters; | ||
return Pair.with(bulkRequestPath, outputStream.toByteArray()); | ||
return Pair.with(bulkRequestPath, bytes); | ||
} | ||
|
||
private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation( | ||
|
@@ -490,14 +500,20 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> { | |
//There is no "correct" number of actions to perform in a single bulk request. Experiment with different | ||
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum | ||
// size of a HTTP request to 100mb by default | ||
private final PeekingIterator<RequestBytes> requestIterator; | ||
private final ListIterator<RequestBytes> requestIterator; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had to expose being able to set the element to null, which isn't exposed in |
||
private final int[] exceptionallyLargeRequests; | ||
private RequestBytes peeked; | ||
|
||
@VisibleForTesting | ||
BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException { | ||
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size()); | ||
List<Integer> requestSizesThatWereTooLarge = new ArrayList<>(); | ||
for (ElasticSearchMutation request : requests) { | ||
ListIterator<ElasticSearchMutation> requestsIter = requests.listIterator(); | ||
while (requestsIter.hasNext()) { | ||
ElasticSearchMutation request = requestsIter.next(); | ||
//Remove the element from the collection so the collection's reference to it doesn't hold it from being | ||
//GC'ed after it has been converted to its serialized form | ||
requestsIter.set(null); | ||
Comment on lines
+511
to
+516
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressing point 1 in #4684 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is mutating the Call Hierarchy analysis by IntelliJ finds: In each case the passed in List is created in that callsite and isn't mutated, or even read, after being passed to here, at least at the time of this writing. |
||
RequestBytes requestBytes = new RequestBytes(request); | ||
int requestSerializedSize = requestBytes.getSerializedSize(); | ||
if (requestSerializedSize <= bulkChunkSerializedLimitBytes) { | ||
|
@@ -507,7 +523,7 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> { | |
requestSizesThatWereTooLarge.add(requestSerializedSize); | ||
} | ||
} | ||
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator()); | ||
this.requestIterator = serializedRequests.listIterator(); | ||
//Condense request sizes that are too large into an int array to remove Boxed & List memory overhead | ||
this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.isEmpty() ? null : | ||
requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray(); | ||
|
@@ -517,20 +533,31 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> { | |
public boolean hasNext() { | ||
//Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted | ||
//This allows next() to throw after all well sized requests have been chunked for submission | ||
return requestIterator.hasNext() || exceptionallyLargeRequests != null; | ||
return peeked != null || requestIterator.hasNext() || exceptionallyLargeRequests != null; | ||
} | ||
|
||
@Override | ||
public List<RequestBytes> next() { | ||
List<RequestBytes> serializedRequests = new ArrayList<>(); | ||
int chunkSerializedTotal = 0; | ||
while (requestIterator.hasNext()) { | ||
RequestBytes peeked = requestIterator.peek(); | ||
//If we peeked at something but stopped on it, then add it to this list | ||
if (peeked != null) { | ||
chunkSerializedTotal += peeked.getSerializedSize(); | ||
serializedRequests.add(peeked); | ||
peeked = null; | ||
} | ||
|
||
while (requestIterator.hasNext()) { | ||
RequestBytes next = requestIterator.next(); | ||
//Remove the element from the collection, so the iterator doesn't prevent it from being GC'ed | ||
//due to the reference to it in the collection | ||
requestIterator.set(null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressing point 2 in #4684 |
||
chunkSerializedTotal += next.getSerializedSize(); | ||
if (chunkSerializedTotal <= bulkChunkSerializedLimitBytes) { | ||
serializedRequests.add(requestIterator.next()); | ||
serializedRequests.add(next); | ||
} else { | ||
//Adding this element would exceed the limit, so return the chunk | ||
this.peeked = next; | ||
return serializedRequests; | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed some formatting that I missed previously