Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Retry bulk request if all errors are configured retry error codes

Only retry bulk request items that failed

Implemented bulk retry test & implemented splitting up a bulk request if large enough

Added exception if a bulk request item is too large to send, even as a single item

Use HttpStatus.SC_NOT_FOUND instead of 404 literal

Added pseudo test to facilitate manual observation of chunking of bulk writes

Added test asserting silent failure of write to ES due to chunk size limit causing the vertex to omitted from the ES mixed index

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis authored and porunov committed Jun 24, 2024
1 parent 55880ea commit d255396
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 43 deletions.
1 change: 1 addition & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ Elasticsearch index configuration

| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE |
| index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE |
| index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ public class ElasticSearchIndex implements IndexProvider {
"Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " +
"E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]);

public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES =
new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes",
"The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " +
"chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000);

public static final int HOST_PORT_DEFAULT = 9200;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ public ElasticSearchClient connect(Configuration config) throws IOException {
long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT);
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES))
.mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
int bulkChunkLimitBytes = config.getOrDefault(ElasticSearchIndex.BULK_CHUNK_SIZE_LIMIT_BYTES);
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7,
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs);
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs, bulkChunkLimitBytes);
if (config.has(ElasticSearchIndex.BULK_REFRESH)) {
client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH));
}
Expand Down Expand Up @@ -115,9 +116,9 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) {

protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
long retryMaxWaitMs, int bulkChunkSerializedLimit) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes,
retryInitialWaitMs, retryMaxWaitMs);
retryInitialWaitMs, retryMaxWaitMs, bulkChunkSerializedLimit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.tinkerpop.shaded.jackson.annotation.JsonIgnoreProperties;
import org.apache.tinkerpop.shaded.jackson.core.JsonParseException;
import org.apache.tinkerpop.shaded.jackson.core.JsonProcessingException;
import org.apache.tinkerpop.shaded.jackson.core.type.TypeReference;
import org.apache.tinkerpop.shaded.jackson.databind.JsonMappingException;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
Expand All @@ -39,16 +43,21 @@
import org.janusgraph.diskstorage.es.mapping.IndexMapping;
import org.janusgraph.diskstorage.es.mapping.TypedIndexMappings;
import org.janusgraph.diskstorage.es.mapping.TypelessIndexMappings;
import org.janusgraph.diskstorage.es.rest.RestBulkResponse.RestBulkItemResponse;
import org.janusgraph.diskstorage.es.script.ESScriptResponse;
import org.javatuples.Pair;
import org.javatuples.Triplet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -121,9 +130,11 @@ public class RestElasticSearchClient implements ElasticSearchClient {

private final long retryMaxWaitMs;

private final int bulkChunkSerializedLimitBytes;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
long retryMaxWaitMs, int bulkChunkSerializedLimitBytes) {
this.delegate = delegate;
majorVersion = getMajorVersion();
this.scrollKeepAlive = scrollKeepAlive+"s";
Expand All @@ -134,6 +145,7 @@ public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean
this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes);
this.retryInitialWaitMs = retryInitialWaitMs;
this.retryMaxWaitMs = retryMaxWaitMs;
this.bulkChunkSerializedLimitBytes = bulkChunkSerializedLimitBytes;
}

@Override
Expand Down Expand Up @@ -241,7 +253,7 @@ public ESScriptResponse getStoredScript(String scriptName) throws IOException {

final Response response = e.getResponse();

if(e.getResponse().getStatusLine().getStatusCode() == 404){
if(e.getResponse().getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND){
ESScriptResponse esScriptResponse = new ESScriptResponse();
esScriptResponse.setFound(false);
return esScriptResponse;
Expand Down Expand Up @@ -380,10 +392,11 @@ public void clearStore(String indexName, String storeName) throws IOException {
}
}

@Override
public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (final ElasticSearchMutation request : requests) {
private class RequestBytes {
final byte [] requestBytes;
final byte [] requestSource;

private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
Map<String, Object> requestData = new HashMap<>();
if (useMappingTypes) {
requestData.put("_index", request.getIndex());
Expand All @@ -398,15 +411,39 @@ public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipel
requestData.put(retryOnConflictKey, retryOnConflict);
}

outputStream.write(mapWriter.writeValueAsBytes(
ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData))
);
outputStream.write(NEW_LINE_BYTES);
this.requestBytes = mapWriter.writeValueAsBytes(ImmutableMap.of(request.getRequestType().name().toLowerCase(), requestData));
if (request.getSource() != null) {
outputStream.write(mapWriter.writeValueAsBytes(request.getSource()));
this.requestSource = mapWriter.writeValueAsBytes(request.getSource());
} else {
this.requestSource = null;
}
}

private int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
serializedSize += this.requestSource.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
}
return serializedSize;
}

private void writeTo(OutputStream outputStream) throws IOException {
outputStream.write(this.requestBytes);
outputStream.write(NEW_LINE_BYTES);
if (this.requestSource != null) {
outputStream.write(requestSource);
outputStream.write(NEW_LINE_BYTES);
}
}
}

private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests, String ingestPipeline) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (final RequestBytes request : requests) {
request.writeTo(outputStream);
}

final StringBuilder builder = new StringBuilder();
if (ingestPipeline != null) {
Expand All @@ -416,17 +453,107 @@ public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipel
APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh);
}
builder.insert(0, REQUEST_SEPARATOR + "_bulk");
return Pair.with(builder.toString(), outputStream.toByteArray());
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
//Bulk API is documented to return bulk item responses in the same order of submission
//https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body
//As such we only need to retry elements that failed
final List<Map<String, RestBulkResponse.RestBulkItemResponse>> bulkResponseItems,
final List<RequestBytes> submittedBulkRequestItems) {
final List<Triplet<Object, Integer, RequestBytes>> errors = new ArrayList<>(bulkResponseItems.size());
for (int itemIndex = 0; itemIndex < bulkResponseItems.size(); itemIndex++) {
Collection<RestBulkResponse.RestBulkItemResponse> bulkResponseItem = bulkResponseItems.get(itemIndex).values();
if (bulkResponseItem.size() > 1) {
throw new IllegalStateException("There should only be a single item per bulk reponse item entry");
}
RestBulkResponse.RestBulkItemResponse item = bulkResponseItem.iterator().next();
if (item.getError() != null && item.getStatus() != HttpStatus.SC_NOT_FOUND) {
errors.add(Triplet.with(item.getError(), item.getStatus(), submittedBulkRequestItems.get(itemIndex)));
}
}
return errors;
}

final Response response = performRequest(REQUEST_TYPE_POST, builder.toString(), outputStream.toByteArray());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
final List<Object> errors = bulkResponse.getItems().stream()
.flatMap(item -> item.values().stream())
.filter(item -> item.getError() != null && item.getStatus() != 404)
.map(RestBulkItemResponse::getError).collect(Collectors.toList());
if (!errors.isEmpty()) {
errors.forEach(error -> log.error("Failed to execute ES query: {}", error));
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errors);
private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under
//the specified limit

//https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-desc
//There is no "correct" number of actions to perform in a single bulk request. Experiment with different
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;

private BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
for (ElasticSearchMutation request : requests) {
serializedRequests.add(new RequestBytes(request));
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
}

@Override
public boolean hasNext() {
return requestIterator.hasNext();
}

@Override
public List<RequestBytes> next() {
List<RequestBytes> serializedRequests = new ArrayList<>();
int chunkSerializedTotal = 0;
while (requestIterator.hasNext()) {
RequestBytes peeked = requestIterator.peek();
int requestSerializedSize = peeked.getSerializedSize();
if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
chunkSerializedTotal += requestSerializedSize;
serializedRequests.add(requestIterator.next());
} else if (requestSerializedSize > bulkChunkSerializedLimitBytes) {
//we've encountered an element we cannot send to Elasticsearch given the configured limit
throw new IllegalArgumentException(String.format(
"Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s",
bulkChunkSerializedLimitBytes, requestSerializedSize));
} else {
//Adding this element would exceed the limit, so return the chunk
return serializedRequests;
}
}
//All remaining requests fit in this chunk
return serializedRequests;
}
}

@Override
public void bulkRequest(final List<ElasticSearchMutation> requests, String ingestPipeline) throws IOException {
BulkRequestChunker bulkRequestChunker = new BulkRequestChunker(requests);
while (bulkRequestChunker.hasNext()) {
List<RequestBytes> bulkRequestChunk = bulkRequestChunker.next();
int retryCount = 0;
while (true) {
final Pair<String, byte[]> bulkRequestInput = buildBulkRequestInput(bulkRequestChunk, ingestPipeline);
final Response response = performRequest(REQUEST_TYPE_POST, bulkRequestInput.getValue0(), bulkRequestInput.getValue1());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
List<Triplet<Object, Integer, RequestBytes>> bulkItemsThatFailed = pairErrorsWithSubmittedMutation(bulkResponse.getItems(), bulkRequestChunk);
if (!bulkItemsThatFailed.isEmpty()) {
//Only retry the bulk request if *all* the bulk response item error codes are retry error codes
final Set<Integer> errorCodes = bulkItemsThatFailed.stream().map(Triplet::getValue1).collect(Collectors.toSet());
if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) {
//Build up the next request batch, of only the failed mutations
bulkRequestChunk = bulkItemsThatFailed.stream().map(Triplet::getValue2).collect(Collectors.toList());
performRetryWait(retryCount);
retryCount++;
} else {
final List<Object> errorItems = bulkItemsThatFailed.stream().map(Triplet::getValue0).collect(Collectors.toList());
errorItems.forEach(error -> log.error("Failed to execute ES query: {}", error));
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errorItems);
}
} else {
//The entire bulk request was successful, leave the loop
break;
}
}
}
}
}
Expand Down Expand Up @@ -571,19 +698,22 @@ private Response performRequestWithRetry(Request request) throws IOException {
if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode()) || retryCount >= retryAttemptLimit) {
throw e;
}
//Wait before trying again
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);
}
performRetryWait(retryCount);
}
retryCount++;
}
}

private void performRetryWait(int retryCount) {
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);
}
}

private Response performRequest(Request request, byte[] requestData) throws IOException {

final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null;
Expand Down
Loading

0 comments on commit d255396

Please sign in to comment.