Skip to content

Commit

Permalink
Add some batch-update interfaces.
Browse files Browse the repository at this point in the history
  • Loading branch information
graydon committed Jul 25, 2019
1 parent f0565cf commit 3e451cb
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 37 deletions.
47 changes: 32 additions & 15 deletions src/medida/histogram.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <atomic>
#include <cmath>
#include <mutex>
#include <algorithm>

#include "medida/stats/exp_decay_sample.h"
#include "medida/stats/uniform_sample.h"
Expand All @@ -26,6 +27,7 @@ class Histogram::Impl {
double mean() const;
double std_dev() const;
void Update(std::int64_t value);
void Update(std::vector<std::int64_t> const& value);
std::uint64_t count() const;
double variance() const;
void Process(MetricProcessor& processor);
Expand Down Expand Up @@ -97,6 +99,10 @@ void Histogram::Update(std::int64_t value) {
impl_->Update(value);
}

void Histogram::Update(std::vector<std::int64_t> const& values) {
impl_->Update(values);
}

stats::Snapshot Histogram::GetSnapshot() const {
return impl_->GetSnapshot();
}
Expand Down Expand Up @@ -194,32 +200,43 @@ stats::Snapshot Histogram::Impl::GetSnapshot() const {


void Histogram::Impl::Update(std::int64_t value) {
sample_->Update(value);
std::vector<std::int64_t> tmp{value};
Update(tmp);
}


void Histogram::Impl::Update(std::vector<std::int64_t> const& values) {
sample_->Update(values);
auto pair = std::minmax_element(begin(values), end(values));
auto minval = pair.first == values.end() ? 0 : *pair.first;
auto maxval = pair.second == values.end() ? 0 : *pair.second;
if (count_ > 0) {
auto cur_max = max_.load();
auto cur_min = min_.load();
while (cur_max < value && !max_.compare_exchange_weak(cur_max, value)) {
while (cur_max < maxval && !max_.compare_exchange_weak(cur_max, maxval)) {
// Spin until max is updated
}
while(cur_min > value && !min_.compare_exchange_weak(cur_min, value)) {
while(cur_min > minval && !min_.compare_exchange_weak(cur_min, minval)) {
// Spin until min is updated
}
} else {
max_ = value;
min_ = value;
max_ = maxval;
min_ = minval;
}
sum_ += value;
auto new_count = ++count_;
std::lock_guard<std::mutex> lock {variance_mutex_};
auto old_vm = variance_m_;
auto old_vs = variance_s_;
if (new_count > 1) {
variance_m_ = old_vm + (value - old_vm) / new_count;
variance_s_ = old_vs + (value - old_vm) * (value - variance_m_);
} else {
variance_m_ = value;
for (auto value : values)
{
sum_ += value;
auto new_count = ++count_;
auto old_vm = variance_m_;
auto old_vs = variance_s_;
if (new_count > 1) {
variance_m_ = old_vm + (value - old_vm) / new_count;
variance_s_ = old_vs + (value - old_vm) * (value - variance_m_);
} else {
variance_m_ = value;
}
}
}


} // namespace medida
1 change: 1 addition & 0 deletions src/medida/histogram.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Histogram : public MetricInterface, SamplingInterface, SummarizableInterfa
virtual double mean() const;
virtual double std_dev() const;
void Update(std::int64_t value);
void Update(std::vector<std::int64_t> const& values);
std::uint64_t count() const;
double variance() const;
void Process(MetricProcessor& processor);
Expand Down
51 changes: 39 additions & 12 deletions src/medida/stats/exp_decay_sample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class ExpDecaySample::Impl {
void Clear();
std::uint64_t size() const;
void Update(std::int64_t value);
void Update(std::vector<std::int64_t> const& values);
void Update(std::int64_t value, Clock::time_point timestamp);
void Update(std::vector<std::int64_t> const& values, Clock::time_point timestamp);
Snapshot MakeSnapshot() const;
private:
const double alpha_;
Expand Down Expand Up @@ -68,11 +70,21 @@ void ExpDecaySample::Update(std::int64_t value) {
}


void ExpDecaySample::Update(std::vector<std::int64_t> const& values) {
impl_->Update(values);
}


void ExpDecaySample::Update(std::int64_t value, Clock::time_point timestamp) {
impl_->Update(value, timestamp);
}


void ExpDecaySample::Update(std::vector<std::int64_t> const& values, Clock::time_point timestamp) {
impl_->Update(values, timestamp);
}


Snapshot ExpDecaySample::MakeSnapshot() const {
return impl_->MakeSnapshot();
}
Expand Down Expand Up @@ -110,27 +122,42 @@ std::uint64_t ExpDecaySample::Impl::size() const {


void ExpDecaySample::Impl::Update(std::int64_t value) {
Update(value, Clock::now());
std::vector<std::int64_t> tmp{value};
Update(tmp);
}


void ExpDecaySample::Impl::Update(std::vector<std::int64_t> const& values) {
Update(values, Clock::now());
}


void ExpDecaySample::Impl::Update(std::int64_t value, Clock::time_point timestamp) {
std::vector<std::int64_t> tmp{value};
Update(tmp, timestamp);
}


void ExpDecaySample::Impl::Update(std::vector<std::int64_t> const& values, Clock::time_point timestamp) {
{
if (timestamp >= nextScaleTime_) {
Rescale(timestamp);
}
std::lock_guard<std::mutex> lock {mutex_};
auto dur = std::chrono::duration_cast<std::chrono::milliseconds>(timestamp - startTime_);
auto priority = std::exp(alpha_ * dur.count()) / dist_(rng_);
auto count = ++count_;

if (count <= reservoirSize_) {
values_[priority] = value;
} else {
auto first = std::begin(values_)->first;
if (first < priority && values_.insert({priority, value}).second) {
while (values_.erase(first) == 0) {
first = std::begin(values_)->first;
for (auto value : values)
{
auto dur = std::chrono::duration_cast<std::chrono::milliseconds>(timestamp - startTime_);
auto priority = std::exp(alpha_ * dur.count()) / dist_(rng_);
auto count = ++count_;

if (count <= reservoirSize_) {
values_[priority] = value;
} else {
auto first = std::begin(values_)->first;
if (first < priority && values_.insert({priority, value}).second) {
while (values_.erase(first) == 0) {
first = std::begin(values_)->first;
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/medida/stats/exp_decay_sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class ExpDecaySample : public Sample {
virtual void Clear();
virtual std::uint64_t size() const;
virtual void Update(std::int64_t value);
virtual void Update(std::vector<std::int64_t> const& values);
virtual void Update(std::int64_t value, Clock::time_point timestamp);
virtual void Update(std::vector<std::int64_t> const& values, Clock::time_point timestamp);
virtual Snapshot MakeSnapshot() const;
private:
class Impl;
Expand Down
1 change: 1 addition & 0 deletions src/medida/stats/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Sample {
virtual void Clear() = 0;
virtual std::uint64_t size() const = 0;
virtual void Update(std::int64_t value) = 0;
virtual void Update(std::vector<std::int64_t> const& values) = 0;
virtual Snapshot MakeSnapshot() const = 0;
};

Expand Down
35 changes: 25 additions & 10 deletions src/medida/stats/uniform_sample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class UniformSample::Impl {
void Clear();
std::uint64_t size() const;
void Update(std::int64_t value);
void Update(std::vector<std::int64_t> const& value);
Snapshot MakeSnapshot() const;
private:
std::atomic<std::uint64_t> count_;
Expand Down Expand Up @@ -53,6 +54,11 @@ void UniformSample::Update(std::int64_t value) {
}


void UniformSample::Update(std::vector<std::int64_t> const& values) {
impl_->Update(values);
}


Snapshot UniformSample::MakeSnapshot() const {
return impl_->MakeSnapshot();
}
Expand Down Expand Up @@ -91,17 +97,26 @@ std::uint64_t UniformSample::Impl::size() const {


void UniformSample::Impl::Update(std::int64_t value) {
auto count = ++count_;
std::vector<std::int64_t> tmp{value};
Update(tmp);
}


void UniformSample::Impl::Update(std::vector<std::int64_t> const& values) {
std::lock_guard<std::mutex> lock {mutex_};
auto size = values_.size();
if (count < size) {
values_[count - 1] = value;
} else {
std::uniform_int_distribution<uint64_t> uniform(0, count - 1);
auto rand = uniform(rng_);
if (rand < size) {
values_[rand] = value;
}
for (auto value : values)
{
auto count = ++count_;
auto size = values_.size();
if (count < size) {
values_[count - 1] = value;
} else {
std::uniform_int_distribution<uint64_t> uniform(0, count - 1);
auto rand = uniform(rng_);
if (rand < size) {
values_[rand] = value;
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/medida/stats/uniform_sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class UniformSample : public Sample {
virtual void Clear();
virtual std::uint64_t size() const;
virtual void Update(std::int64_t value);
virtual void Update(std::vector<std::int64_t> const& values);
virtual Snapshot MakeSnapshot() const;
private:
class Impl;
Expand Down
19 changes: 19 additions & 0 deletions src/medida/timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Timer::Impl {
std::chrono::nanoseconds duration_unit() const;
void Clear();
void Update(std::chrono::nanoseconds duration);
void Update(std::vector<std::chrono::nanoseconds> durations);
TimerContext TimeScope();
void Time(std::function<void()>);
private:
Expand Down Expand Up @@ -134,6 +135,9 @@ void Timer::Update(std::chrono::nanoseconds duration) {
impl_->Update(duration);
}

void Timer::Update(std::vector<std::chrono::nanoseconds> durations) {
impl_->Update(durations);
}

stats::Snapshot Timer::GetSnapshot() const {
return impl_->GetSnapshot();
Expand Down Expand Up @@ -251,6 +255,21 @@ void Timer::Impl::Update(std::chrono::nanoseconds duration) {
}


void Timer::Impl::Update(std::vector<std::chrono::nanoseconds> durations) {
std::vector<std::int64_t> counts(durations.size());
for (auto const& ns : durations)
{
auto count = ns.count();
if (count != 0)
{
counts.emplace_back(count);
}
};
histogram_.Update(counts);
meter_.Mark(counts.size());
}


stats::Snapshot Timer::Impl::GetSnapshot() const {
auto values = histogram_.GetSnapshot().getValues();
std::vector<double> converted;
Expand Down
1 change: 1 addition & 0 deletions src/medida/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Timer : public MetricInterface, MeteredInterface, SamplingInterface, Summa
std::chrono::nanoseconds duration_unit() const;
void Clear();
void Update(std::chrono::nanoseconds duration);
void Update(std::vector<std::chrono::nanoseconds> durations);
TimerContext TimeScope();
void Time(std::function<void()>);
private:
Expand Down

0 comments on commit 3e451cb

Please sign in to comment.