Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release memory where possible in RestElasticSearchClient #4685

Closed
wants to merge 1 commit into from

Conversation

criminosis
Copy link
Contributor

@criminosis criminosis commented Sep 26, 2024

Closes #4684


Thank you for contributing to JanusGraph!

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there an issue associated with this PR? Is it referenced in the commit message?
  • Does your PR body contain #xyz where xyz is the issue number you are trying to resolve?
  • Has your PR been rebased against the latest commit within the target branch (typically master)?
  • Is your initial contribution a single, squashed commit?

For code changes:

  • Have you written and/or updated unit tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE.txt file, including the main LICENSE.txt file in the root of this repository?
  • If applicable, have you updated the NOTICE.txt file, including the main NOTICE.txt file found in the root of this repository?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Comment on lines +426 to +429
serializedSize += 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
serializedSize += this.requestSource.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
serializedSize += 1; //For follow-up NEW_LINE_BYTES
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed some formatting that I missed previously

Comment on lines +511 to +516
ListIterator<ElasticSearchMutation> requestsIter = requests.listIterator();
while (requestsIter.hasNext()) {
ElasticSearchMutation request = requestsIter.next();
//Remove the element from the collection so the collection's reference to it doesn't hold it from being
//GC'ed after it has been converted to its serialized form
requestsIter.set(null);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing point 1 in #4684

Copy link
Contributor Author

@criminosis criminosis Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mutating the List that is passed in, which will then be exposed to the caller since we can't take ownership of the List in Java in this context. However from reviewing the call sites, they create the list within their function scopes, and don't do anything with it after passing it into here so this should be "safe", but mutating a passed in list is sometimes frowned upon in Java, so I wanted to call that out.

Call Hierarchy analysis by IntelliJ finds:

In each case the passed in List is created in that callsite and isn't mutated, or even read, after being passed to here, at least at the time of this writing.

RequestBytes next = requestIterator.next();
//Remove the element from the collection, so the iterator doesn't prevent it from being GC'ed
//due to the reference to it in the collection
requestIterator.set(null);
Copy link
Contributor Author

@criminosis criminosis Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing point 2 in #4684

@@ -490,14 +500,20 @@ class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//There is no "correct" number of actions to perform in a single bulk request. Experiment with different
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;
private final ListIterator<RequestBytes> requestIterator;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to expose being able to set the element to null, which isn't exposed in PeekingIterator so took on the peek() behavior here, and switched to ListIterator

Comment on lines +434 to +460
private int writeTo(byte[] target, int initialOffset) {
int offset = initialOffset;
System.arraycopy(this.requestBytes, 0, target, offset, this.requestBytes.length);
offset += this.requestBytes.length;
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length);
offset += NEW_LINE_BYTES.length;
if (this.requestSource != null) {
outputStream.write(requestSource);
outputStream.write(NEW_LINE_BYTES);
System.arraycopy(this.requestSource, 0, target, offset, this.requestSource.length);
offset += this.requestSource.length;
System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length);
offset += NEW_LINE_BYTES.length;
}
return offset;
}
}

private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) {
int totalBytes = requests.stream().mapToInt(RequestBytes::getSerializedSize).sum();
//By making a singular array we copy into avoids any dynamically expanded growth of the array that may overshoot
//how much memory we actually need, additionally it also avoids a final copy at the end normally done by
//ByteArrayOutputStream's toByteArray()
byte[] bytes = new byte[totalBytes];
int offset = 0;
for (final RequestBytes request : requests) {
request.writeTo(outputStream);
//We can't remove the element from the collection like we do elsewhere, because we need to retain the
//serialized form in case of an error so the error can be paired to the originating request based on index
offset = request.writeTo(bytes, offset);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing point 3 in #4684

@criminosis
Copy link
Contributor Author

IIRC @porunov & @li-boxuan you two reviewed this code last time, so this may interest you as an enhancement on that submission. As always happy to make changes, hoping this may also help others that bump into OOMs in this area 😅 .

@porunov
Copy link
Member

porunov commented Sep 27, 2024

Hi @criminosis !
Thank you so much for this optimization. However, I want to notice that currently ElasticSearch tests don't pass for this PR.
The failures are logged below:

Error:    RestClientBulkRequestsTest.testThrowingIfSingleBulkItemIsLargerThanLimit:154 Should have thrown due to bulk request item being too large ==> Unexpected exception type thrown, expected: <java.lang.IllegalArgumentException> but was: <java.lang.UnsupportedOperationException>
Error:    RestClientRetryTest.testNonRetryErrorCodeException:202 expected: <responseException> but was: <java.lang.UnsupportedOperationException>
Error:    RestClientRetryTest.testRetriesExhaustedReturnsLastRetryException:181 expected: <responseException> but was: <java.lang.UnsupportedOperationException>
Error:    RestClientRetryTest.testRetryOnConfiguredErrorStatus:151 expected: <java.io.IOException: Expected> but was: <java.lang.UnsupportedOperationException>

@criminosis
Copy link
Contributor Author

Thanks @porunov . Unfortunately missed those from a larger failure with the index provider tests I fixed before turning in last night. Sorry about that 🤦‍♂️ .

Those tests were passing in an immutable list which blows up at runtime when the iterator attempts to call .set(null) on a particular index. I changed them to be passing in a mutable list and validated they passed locally since my laptop could easily handle these unit tests 😅 .

@criminosis
Copy link
Contributor Author

I'm going to pull this back to draft status. I think I have discovered a behavior asymmetry between the Java reference driver and the Rust (unofficial) driver. It seems like sessions aren't getting closed in JanusGraph in the internal Tinkerpop server and the Out Of Memory Errors that prompted this PR are just the part of the iceberg that stuck out of the water.

@criminosis criminosis marked this pull request as draft October 2, 2024 23:16
@criminosis
Copy link
Contributor Author

criminosis commented Oct 8, 2024

So the apparent asymmetry ultimately lead to discovering a bug I believe in the Tinkerpop server. I opened an issue over there: https://issues.apache.org/jira/browse/TINKERPOP-3113

In the mean time @porunov I'm gonna go ahead and close this. If there's a desire for it happy to reopen, but the original premise that prompted this PR is no longer valid. Thank you for your attention nonetheless though.

@criminosis criminosis closed this Oct 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

High Memory Pressure In RestElasticSearchClient
2 participants