Skip to content

Commit

Permalink
Merge pull request #716 from FgForrest/2024-11-patch-fix
Browse files Browse the repository at this point in the history
Patch fix
  • Loading branch information
novoj authored Oct 24, 2024
2 parents 70ba229 + 4f1da2a commit 2836e3d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.evitadb.core.Evita;
import io.evitadb.core.async.ObservableThreadExecutor;
import io.evitadb.utils.CollectionUtils;
import io.prometheus.metrics.core.metrics.Counter;
import io.prometheus.metrics.core.metrics.Gauge;
import io.prometheus.metrics.exporter.common.PrometheusScrapeHandler;
import io.prometheus.metrics.model.registry.Collector;
Expand Down Expand Up @@ -68,76 +69,76 @@ public class PrometheusMetricsHttpService implements HttpService {
* Runnable tasks that can be used to update these metrics.
*
* @param metricPrefix The prefix to use for the metric names.
* @param tp The ThreadPoolExecutor to monitor.
* @param tp The ThreadPoolExecutor to monitor.
* @return A stream of Runnables that update the metrics when executed.
*/
@Nonnull
private static Stream<Runnable> monitor(@Nonnull String metricPrefix, @Nonnull ThreadPoolExecutor tp) {
final Gauge completed = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
final Counter completed = (Counter) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_completed",
name -> Gauge.builder()
name -> Counter.builder()
.name(name)
.help("The approximate total number of tasks that have completed execution")
.unit(UNIT_TASKS)
.help("The approximate total number of tasks that have completed execution")
.unit(UNIT_TASKS)
.register()
);

final Gauge active = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_active",
name -> Gauge.builder()
.name(name)
.help("The approximate number of threads that are actively executing tasks")
.unit(UNIT_THREADS)
.help("The approximate number of threads that are actively executing tasks")
.unit(UNIT_THREADS)
.register()
);

final Gauge queued = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_queued",
name -> Gauge.builder()
.name(name)
.help("The approximate number of tasks that are queued for execution")
.unit(UNIT_TASKS)
.help("The approximate number of tasks that are queued for execution")
.unit(UNIT_TASKS)
.register()
);

final Gauge queueRemaining = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_queue_remaining",
name -> Gauge.builder()
.name(name)
.help("The number of additional elements that this queue can ideally accept without blocking")
.unit(UNIT_TASKS)
.help("The number of additional elements that this queue can ideally accept without blocking")
.unit(UNIT_TASKS)
.register()
);

final Gauge poolSize = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_pool_size",
name -> Gauge.builder()
.name(name)
.help("The current number of threads in the pool")
.unit(UNIT_THREADS)
.help("The current number of threads in the pool")
.unit(UNIT_THREADS)
.register()
);

final Gauge poolCore = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_pool_core",
name -> Gauge.builder()
.name(name)
.help("The core number of threads for the pool")
.unit(UNIT_THREADS)
.help("The core number of threads for the pool")
.unit(UNIT_THREADS)
.register()
);

final Gauge poolMax = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_pool_max",
name -> Gauge.builder()
.name(name)
.help("The maximum allowed number of threads in the pool")
.unit(UNIT_THREADS)
.help("The maximum allowed number of threads in the pool")
.unit(UNIT_THREADS)
.register()
);

return Stream.of(
() -> completed.set(tp.getCompletedTaskCount()),
() -> completed.inc(tp.getCompletedTaskCount() - tp.getCompletedTaskCount()),
() -> active.set(tp.getActiveCount()),
() -> queued.set(tp.getQueue().size()),
() -> queueRemaining.set(tp.getQueue().remainingCapacity()),
Expand All @@ -152,55 +153,55 @@ private static Stream<Runnable> monitor(@Nonnull String metricPrefix, @Nonnull T
* Runnable tasks that can be used to update these metrics.
*
* @param metricPrefix The prefix to use for the metric names.
* @param fj The ForkJoinPool to monitor.
* @param fj The ForkJoinPool to monitor.
* @return A stream of Runnables that update the metrics when executed.
*/
@Nonnull
private static Stream<Runnable> monitor(@Nonnull String metricPrefix, @Nonnull ForkJoinPool fj) {
final Gauge steals = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
final Counter steals = (Counter) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_steals",
name -> Gauge.builder()
name -> Counter.builder()
.name(name)
.help(
"Estimate of the total number of tasks stolen from one thread's work queue by another. The reported value " +
"underestimates the actual total number of steals when the pool is not quiescent")
.unit(UNIT_TASKS)
.help(
"Estimate of the total number of tasks stolen from one thread's work queue by another. The reported value " +
"underestimates the actual total number of steals when the pool is not quiescent")
.unit(UNIT_TASKS)
.register()
);
final Gauge queued = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_queued",
name -> Gauge.builder()
.name(name)
.help("An estimate of the total number of tasks currently held in queues by worker threads")
.unit(UNIT_TASKS)
.help("An estimate of the total number of tasks currently held in queues by worker threads")
.unit(UNIT_TASKS)
.register()
);
final Gauge active = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_active",
name -> Gauge.builder()
.name(name)
.help("An estimate of the number of threads that are currently stealing or executing tasks")
.unit(UNIT_THREADS)
.help("An estimate of the number of threads that are currently stealing or executing tasks")
.unit(UNIT_THREADS)
.register()
);
final Gauge running = (Gauge) REGISTERED_THREAD_POOL_METRICS.computeIfAbsent(
metricPrefix + "executor_running",
name -> Gauge.builder()
.name(name)
.help(
"An estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed" +
" synchronization threads"
)
.unit(UNIT_THREADS)
.help(
"An estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed" +
" synchronization threads"
)
.unit(UNIT_THREADS)
.register()
);

return Stream.of(
() -> steals.set(fj.getStealCount()),
() -> queued.set(fj.getQueuedTaskCount()),
() -> active.set(fj.getActiveThreadCount()),
() -> running.set(fj.getRunningThreadCount())
);
return Stream.of(
() -> steals.inc(fj.getStealCount() - steals.get()),
() -> queued.set(fj.getQueuedTaskCount()),
() -> active.set(fj.getActiveThreadCount()),
() -> running.set(fj.getRunningThreadCount())
);
}

public PrometheusMetricsHttpService(@Nonnull Evita evita) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1705,7 +1705,7 @@ public int countDifference(long catalogVersion, byte recordTypeId) {
final int startIndex = index >= 0 ? index : -index - 1;
for (int ix = nv.length - 1; ix >= startIndex && ix >= 0; ix--) {
final NonFlushedValueSet nonFlushedValueSet = nvValues.get(nv[ix]);
diff += nonFlushedValueSet.getCountFor(recordTypeId);
diff += nonFlushedValueSet == null ? 0 : nonFlushedValueSet.getCountFor(recordTypeId);
}
}
}
Expand All @@ -1727,7 +1727,7 @@ public int countDifference(long catalogVersion, byte recordTypeId) {
final int startIndex = index >= 0 ? index : -index - 1;
for (int ix = hv.length - 1; ix > startIndex && ix >= 0; ix--) {
final PastMemory differenceSet = hvValues.get(hv[ix]);
diff -= differenceSet.getCountFor(recordTypeId);
diff -= differenceSet == null ? 0 : differenceSet.getCountFor(recordTypeId);
}
}
}
Expand Down Expand Up @@ -2040,13 +2040,16 @@ public Optional<OffsetDateTime> getOldestRecordKeptTimestamp() {
* @return true if the non-flushed values contain the non-removed specified key, false otherwise
*/
public boolean contains(@Nonnull RecordKey key) {
for (int i = nonFlushedVersions.length - 1; i >= 0; i--) {
long nonFlushedVersion = nonFlushedVersions[i];
final NonFlushedValueSet nfSet = nonFlushedValues.get(nonFlushedVersion);
if (nfSet.removedKeys.contains(key)) {
return false;
} else if (nfSet.addedKeys.contains(key)) {
return true;
final long[] nv = this.nonFlushedVersions;
for (int i = nv.length - 1; i >= 0; i--) {
long nonFlushedVersion = nv[i];
final NonFlushedValueSet nfSet = this.nonFlushedValues.get(nonFlushedVersion);
if (nfSet != null) {
if (nfSet.removedKeys.contains(key)) {
return false;
} else if (nfSet.addedKeys.contains(key)) {
return true;
}
}
}
return false;
Expand Down

0 comments on commit 2836e3d

Please sign in to comment.