Skip to content

Commit

Permalink
Retry bulk request if all errors are configured retry error codes
Browse files Browse the repository at this point in the history
Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis committed May 30, 2024
1 parent 487e10c commit 6721d34
Showing 1 changed file with 35 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import org.janusgraph.diskstorage.es.mapping.IndexMapping;
import org.janusgraph.diskstorage.es.mapping.TypedIndexMappings;
import org.janusgraph.diskstorage.es.mapping.TypelessIndexMappings;
import org.janusgraph.diskstorage.es.rest.RestBulkResponse.RestBulkItemResponse;
import org.janusgraph.diskstorage.es.script.ESScriptResponse;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -417,16 +417,29 @@ public void bulkRequest(List<ElasticSearchMutation> requests, String ingestPipel
}
builder.insert(0, REQUEST_SEPARATOR + "_bulk");

final Response response = performRequest(REQUEST_TYPE_POST, builder.toString(), outputStream.toByteArray());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
final List<Object> errors = bulkResponse.getItems().stream()
.flatMap(item -> item.values().stream())
.filter(item -> item.getError() != null && item.getStatus() != 404)
.map(RestBulkItemResponse::getError).collect(Collectors.toList());
if (!errors.isEmpty()) {
errors.forEach(error -> log.error("Failed to execute ES query: {}", error));
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errors);
int retryCount = 0;
while (true) {
final Response response = performRequest(REQUEST_TYPE_POST, builder.toString(), outputStream.toByteArray());
try (final InputStream inputStream = response.getEntity().getContent()) {
final RestBulkResponse bulkResponse = mapper.readValue(inputStream, RestBulkResponse.class);
final List<Pair<Object, Integer>> errors = bulkResponse.getItems().stream()
.flatMap(item -> item.values().stream())
.filter(item -> item.getError() != null && item.getStatus() != 404)
.map(item -> Pair.with(item.getError(), item.getStatus())).collect(Collectors.toList());
if (!errors.isEmpty()) {
//Only retry the bulk request if *all* the bulk response item error codes are retry error codes
final Set<Integer> errorCodes = errors.stream().map(Pair::getValue1).collect(Collectors.toSet());
if (retryCount < retryAttemptLimit && retryOnErrorCodes.containsAll(errorCodes)) {
performRetryWait(retryCount);
retryCount++;
} else {
errors.forEach(error -> log.error("Failed to execute ES query: {}", error.getValue0()));
throw new IOException("Failure(s) in Elasticsearch bulk request: " + errors);
}
} else {
//The entire bulk request was successful, leave the loop
break;
}
}
}
}
Expand Down Expand Up @@ -571,19 +584,22 @@ private Response performRequestWithRetry(Request request) throws IOException {
if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode()) || retryCount >= retryAttemptLimit) {
throw e;
}
//Wait before trying again
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);
}
performRetryWait(retryCount);
}
retryCount++;
}
}

private void performRetryWait(int retryCount) {
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);
}
}

private Response performRequest(Request request, byte[] requestData) throws IOException {

final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null;
Expand Down

1 comment on commit 6721d34

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 6721d34 Previous: 487e10c Ratio
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 12848.567318423158 ms/op 12689.439979944173 ms/op 1.01
org.janusgraph.GraphCentricQueryBenchmark.getVertices 899.7955620690157 ms/op 925.013828296019 ms/op 0.97
org.janusgraph.MgmtOlapJobBenchmark.runClearIndex 215.68559415434783 ms/op 216.46225680797102 ms/op 1.00
org.janusgraph.MgmtOlapJobBenchmark.runReindex 338.52984895488095 ms/op 340.16287398523804 ms/op 1.00
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 213.98074853100556 ms/op 217.50069746084958 ms/op 0.98
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 4838.562574460619 ms/op 5053.275978996042 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingEmitRepeatSteps 16661.57656015059 ms/op 16275.762675504468 ms/op 1.02
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithSmallBatch 19890.44945777515 ms/op 19161.81803811333 ms/op 1.04
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.vertexCentricPropertiesFetching 54931.90656433334 ms/op 54807.3874225 ms/op 1.00
org.janusgraph.CQLMultiQueryDropBenchmark.dropVertices 1530.8468595182978 ms/op 1525.6592376795456 ms/op 1.00
org.janusgraph.CQLMultiQueryBenchmark.getAllElementsTraversedFromOuterVertex 8076.358909087307 ms/op 8025.759033073809 ms/op 1.01
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithDoubleUnion 380.6296355590945 ms/op 376.13373544806296 ms/op 1.01
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesAllPropertiesWithUnlimitedBatch 3997.614179711392 ms/op 4086.6631890165872 ms/op 0.98
org.janusgraph.CQLMultiQueryBenchmark.getNames 8258.066978721741 ms/op 8416.939881828739 ms/op 0.98
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesThreePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 5865.4912391240305 ms/op 5611.851519805606 ms/op 1.05
org.janusgraph.CQLMultiQueryBenchmark.getLabels 6758.883546882298 ms/op 6828.816333819391 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFilteredByAndStep 429.6512296740028 ms/op 434.35062048578123 ms/op 0.99
org.janusgraph.CQLMultiQueryBenchmark.getVerticesFromMultiNestedRepeatStepStartingFromSingleVertex 11750.807033333316 ms/op 11980.571977528334 ms/op 0.98
org.janusgraph.CQLMultiQueryBenchmark.getVerticesWithCoalesceUsage 363.8937285660601 ms/op 355.802058751578 ms/op 1.02
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithAllMultiQuerySlicesUnderMaxRequestsPerConnection 14146.71597441619 ms/op 14729.77276273078 ms/op 0.96
org.janusgraph.CQLMultiQueryBenchmark.getIdToOutVerticesProjection 248.73769492063587 ms/op 243.61035626593375 ms/op 1.02
org.janusgraph.CQLMultiQueryMultiSlicesBenchmark.getValuesMultiplePropertiesWithUnlimitedBatch 13886.18432533389 ms/op 13656.572724210222 ms/op 1.02
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 8026.373273786823 ms/op 8290.3478513332 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getElementsWithUsingRepeatUntilSteps 8830.257441821106 ms/op 9070.803732347935 ms/op 0.97
org.janusgraph.CQLMultiQueryBenchmark.getAdjacentVerticesLocalCounts 8522.64897371431 ms/op 8637.306367440338 ms/op 0.99

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.