Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
- Reference the ElasticSearch documentation in the bulk chunker size limit config option
- Cleaning up the population of the ES bulk request path logic
- Submitting bulk request items that are below the configured limit, then throwing for overly large items

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis committed Jun 25, 2024
1 parent d255396 commit c3e646e
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 19 deletions.
2 changes: 1 addition & 1 deletion docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Elasticsearch index configuration

| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. If a single bulk item exceeds this limit an exception will be thrown after the smaller bulk items are submitted. Ensure that this limit is always less than or equal to the configured limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html). | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE |
| index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE |
| index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ public class ElasticSearchIndex implements IndexProvider {
public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES =
new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes",
"The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " +
"chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000);
"chunked to this size. If a single bulk item exceeds this limit an exception will be thrown after the " +
"smaller bulk items are submitted. Ensure that this limit is always less than or equal to the configured " +
"limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the " +
"[Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html).",
ConfigOption.Type.LOCAL, Integer.class, 100_000_000);

public static final int HOST_PORT_DEFAULT = 9200;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.janusgraph.diskstorage.es.rest;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -392,11 +394,13 @@ public void clearStore(String indexName, String storeName) throws IOException {
}
}

private class RequestBytes {
@VisibleForTesting
class RequestBytes {
final byte [] requestBytes;
final byte [] requestSource;

private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
@VisibleForTesting
RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
Map<String, Object> requestData = new HashMap<>();
if (useMappingTypes) {
requestData.put("_index", request.getIndex());
Expand All @@ -419,7 +423,8 @@ private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingE
}
}

private int getSerializedSize() {
@VisibleForTesting
int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
Expand All @@ -445,15 +450,15 @@ private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests,
request.writeTo(outputStream);
}

final StringBuilder builder = new StringBuilder();
final StringBuilder bulkRequestQueryParameters = new StringBuilder();
if (ingestPipeline != null) {
APPEND_OP.apply(builder).append("pipeline=").append(ingestPipeline);
APPEND_OP.apply(bulkRequestQueryParameters).append("pipeline=").append(ingestPipeline);
}
if (bulkRefreshEnabled) {
APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh);
APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh);
}
builder.insert(0, REQUEST_SEPARATOR + "_bulk");
return Pair.with(builder.toString(), outputStream.toByteArray());
final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters;
return Pair.with(bulkRequestPath, outputStream.toByteArray());
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
Expand All @@ -476,7 +481,8 @@ private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMuta
return errors;
}

private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
@VisibleForTesting
class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under
//the specified limit

Expand All @@ -485,18 +491,32 @@ private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;
private final int[] exceptionallyLargeRequests;

private BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
@VisibleForTesting
BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
List<Integer> requestSizesThatWereTooLarge = new ArrayList<>();
for (ElasticSearchMutation request : requests) {
serializedRequests.add(new RequestBytes(request));
RequestBytes requestBytes = new RequestBytes(request);
int requestSerializedSize = requestBytes.getSerializedSize();
if (requestSerializedSize <= bulkChunkSerializedLimitBytes) {
//Only keep items that we can actually send in memory
serializedRequests.add(requestBytes);
} else {
requestSizesThatWereTooLarge.add(requestSerializedSize);
}
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
//Condense request sizes that are too large into an int array to remove Boxed & List memory overhead
this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray();
}

@Override
public boolean hasNext() {
return requestIterator.hasNext();
//Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted
//This allows next() to throw after all well sized requests have been chunked for submission
return requestIterator.hasNext() || exceptionallyLargeRequests.length > 0;
}

@Override
Expand All @@ -509,16 +529,18 @@ public List<RequestBytes> next() {
if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
chunkSerializedTotal += requestSerializedSize;
serializedRequests.add(requestIterator.next());
} else if (requestSerializedSize > bulkChunkSerializedLimitBytes) {
//we've encountered an element we cannot send to Elasticsearch given the configured limit
throw new IllegalArgumentException(String.format(
"Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s",
bulkChunkSerializedLimitBytes, requestSerializedSize));
} else {
//Adding this element would exceed the limit, so return the chunk
return serializedRequests;
}
}
//Check if we should throw an exception for items that were exceptionally large and therefore undeliverable.
//This is only done after all items that could be sent have been sent
if (serializedRequests.isEmpty() && this.exceptionallyLargeRequests.length > 0) {
throw new IllegalArgumentException(String.format(
"Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) %s",
bulkChunkSerializedLimitBytes, Arrays.toString(this.exceptionallyLargeRequests)));
}
//All remaining requests fit in this chunk
return serializedRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -105,6 +106,44 @@ public void testSplittingOfLargeBulkItems() throws IOException {
}
}

@Test
public void testThrowingForOverlyLargeBulkItemOnlyAfterSmallerItemsAreChunked() throws IOException {
int bulkLimit = 1_000_000;
StringBuilder overlyLargePayloadBuilder = new StringBuilder();
IntStream.range(0, bulkLimit * 10).forEach(value -> overlyLargePayloadBuilder.append("a"));
String overlyLargePayload = overlyLargePayloadBuilder.toString();
ElasticSearchMutation overlyLargeMutation = ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id2",
Collections.singletonMap("someKey", overlyLargePayload));
List<ElasticSearchMutation> bulkItems = Arrays.asList(
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id1",
Collections.singletonMap("someKey", "small_payload1")),
overlyLargeMutation,
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id3",
Collections.singletonMap("someKey", "small_payload2"))
);

try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) {
RestElasticSearchClient.BulkRequestChunker chunkerUnderTest = restClientUnderTest.new BulkRequestChunker(bulkItems);
int overlyLargeRequestExpectedSize = restClientUnderTest.new RequestBytes(overlyLargeMutation).getSerializedSize();

//The chunker should chunk this request first as a list of the 2 smaller items
List<RestElasticSearchClient.RequestBytes> smallItemsChunk = chunkerUnderTest.next();
Assertions.assertEquals(2, smallItemsChunk.size());

//Then the chunker should still return true for hasNext()
Assertions.assertTrue(chunkerUnderTest.hasNext());

//Then the next call for next() should throw to report the exceptionally large item
IllegalArgumentException thrownException = Assertions.assertThrows(IllegalArgumentException.class, chunkerUnderTest::next,
"Should have thrown due to bulk request item being too large");

String expectedExceptionMessage = String.format("Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) [%s]",
bulkLimit, overlyLargeRequestExpectedSize);

Assertions.assertEquals(expectedExceptionMessage, thrownException.getMessage());
}
}

@Test
public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException {
int bulkLimit = 800;
Expand Down

1 comment on commit c3e646e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: c3e646e Previous: d255396 Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 12862.219559583655 ms/op 15017.170187305353 ms/op 0.86
org.janusgraph.GraphCentricQueryBenchmark.getVertices 898.6512015951268 ms/op 961.779849406255 ms/op 0.93
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 215.5834557217391 ms/op 217.56629310797098 ms/op 0.99
org.janusgraph.MgmtOlapJobBenchmark.runReindex 335.1204059441667 ms/op 361.57722250333336 ms/op 0.93
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 206.55426227041346 ms/op 299.2003695664086 ms/op 0.69
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 4722.406769342562 ms/op 6071.426485185737 ms/op 0.78
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 16046.515924841911 ms/op 19975.681407350366 ms/op 0.80
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 20278.357182006665 ms/op 23952.72953915508 ms/op 0.85
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 57230.2521287 ms/op 68171.69029556667 ms/op 0.84
org.janusgraph.CQLMultiQueryDropBenchmark.dropVertices 1515.2769933759903 ms/op 1816.9581586647128 ms/op 0.83
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 7955.192165018823 ms/op 9443.527656875849 ms/op 0.84
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 389.96725622295355 ms/op 428.072660988753 ms/op 0.91
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 4123.0877695500785 ms/op 5586.392145922853 ms/op 0.74
org.janusgraph.CQLMultiQueryBenchmark.getNames 8103.410474475017 ms/op 11370.815512474728 ms/op 0.71
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5803.617361817818 ms/op 7631.645434302394 ms/op 0.76
org.janusgraph.CQLMultiQueryBenchmark.getLabels 6850.789608782451 ms/op 8902.651176663585 ms/op 0.77
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 427.7170621210132 ms/op 501.7901263066493 ms/op 0.85
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 11626.882707181077 ms/op 15968.194510484615 ms/op 0.73
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 372.9511760675745 ms/op 423.02341747485923 ms/op 0.88
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 14490.027382270237 ms/op 17368.035613674252 ms/op 0.83
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 249.31190426756135 ms/op 282.14623086325463 ms/op 0.88
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 14878.465976054094 ms/op 20111.529798068364 ms/op 0.74
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 7992.263157908502 ms/op 10801.221835707169 ms/op 0.74
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 8728.627565772 ms/op 10771.838655087315 ms/op 0.81
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8334.496251152172 ms/op 10574.69229554027 ms/op 0.79

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.