Skip to content

Commit

Permalink
metrics: Use separate type for shared metadata
Browse files Browse the repository at this point in the history
Use a separate metadata structure for the metadata shared in
`get_value`.

This allows some memory savings as not all metadata is needed at scrape
time.

Also allows us to make the structure non-copyable and hence prevents
unsafe use of the internalized labels.

(cherry picked from commit 308d2df)
  • Loading branch information
StephanDollberg committed Dec 19, 2024
1 parent 37f7015 commit 7b10c61
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 41 deletions.
53 changes: 52 additions & 1 deletion include/seastar/core/metrics_api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,50 @@ public:

using value_map = std::map<sstring, metric_family>;

using metric_metadata_fifo = std::deque<metric_info>;
/*!
* \brief Subset of the per series metadata that is shared via get_values to other shards.
*
* Allows omitting metadata that is already stored elsewhere or not needed by
* the metrics scrap handlers.
*
* Not copyable to allow for safely sharing internalized data.
*/
class metric_series_metadata {
// prom backend only needs the label from here but scollectd needs group and
// metric name separately. metric_family_info only stores the merged and
// filtered name so we have to duplicate it here.
metric_id _id;
skip_when_empty _should_skip_when_empty;
public:
metric_series_metadata() = default;
metric_series_metadata(metric_id id, skip_when_empty should_skip_when_empty)
: _id(std::move(id)), _should_skip_when_empty(should_skip_when_empty) {
}

metric_series_metadata(const metric_series_metadata&) = delete;
metric_series_metadata& operator=(const metric_series_metadata&) = delete;

metric_series_metadata(metric_series_metadata&&) noexcept = default;
metric_series_metadata& operator=(metric_series_metadata&&) noexcept = default;

const labels_type& labels() const {
return _id.labels();
}

skip_when_empty should_skip_when_empty() const {
return _should_skip_when_empty;
}

group_name_type group_name() const {
return _id.group_name();
}

group_name_type name() const {
return _id.name();
}
};

using metric_metadata_fifo = std::deque<metric_series_metadata>;

/*!
* \brief holds a metric family metadata
Expand All @@ -392,8 +435,16 @@ using metric_metadata_fifo = std::deque<metric_info>;
struct metric_family_metadata {
metric_family_info mf;
metric_metadata_fifo metrics;

metric_family_metadata() = default;
metric_family_metadata(metric_family_metadata &&) = default;
metric_family_metadata &operator=(metric_family_metadata &&) = default;
metric_family_metadata(metric_family_info mf, metric_metadata_fifo metrics)
: mf(std::move(mf)), metrics(std::move(metrics)) {}
};

static_assert(std::is_nothrow_move_assignable_v<metric_family_metadata>);

using value_vector = std::deque<metric_value>;
using metric_metadata = std::vector<metric_family_metadata>;
using metric_values = std::deque<value_vector>;
Expand Down
4 changes: 2 additions & 2 deletions src/core/metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -561,14 +561,14 @@ void impl::update_metrics_if_needed() {
_current_metrics[i].clear();
for (auto&& m : mf.second) {
if (m.second && m.second->is_enabled()) {
metrics.emplace_back(m.second->info());
metrics.emplace_back(m.second->info().id, m.second->info().should_skip_when_empty);
_current_metrics[i].emplace_back(m.second->get_function());
}
}
if (!metrics.empty()) {
// If nothing was added, no need to add the metric_family
// and no need to progress
mt.emplace_back(metric_family_metadata{mf.second.info(), std::move(metrics)});
mt.emplace_back(mf.second.info(), std::move(metrics));
i++;
}
}
Expand Down
26 changes: 13 additions & 13 deletions src/core/prometheus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ class metric_family {
return *_family_info;
}

void foreach_metric(std::function<void(const mi::metric_value&, const mi::metric_info&)>&& f);
void foreach_metric(std::function<void(const mi::metric_value&, const mi::metric_series_metadata&)>&& f);

bool end() const {
return !_name || !_family_info;
Expand Down Expand Up @@ -558,7 +558,7 @@ class metric_family_iterator {
return _positions.empty() || _info.end();
}

void foreach_metric(std::function<void(const mi::metric_value&, const mi::metric_info&)>&& f) {
void foreach_metric(std::function<void(const mi::metric_value&, const mi::metric_series_metadata&)>&& f) {
// iterating over the shard vector and the position vector
for (auto&& i : boost::combine(_positions, _families)) {
auto& pos_in_metric_per_shard = boost::get<0>(i);
Expand All @@ -584,7 +584,7 @@ class metric_family_iterator {

};

void metric_family::foreach_metric(std::function<void(const mi::metric_value&, const mi::metric_info&)>&& f) {
void metric_family::foreach_metric(std::function<void(const mi::metric_value&, const mi::metric_series_metadata&)>&& f) {
_iterator_state.foreach_metric(std::move(f));
}

Expand Down Expand Up @@ -767,10 +767,10 @@ future<> write_text_representation(output_stream<char>& out, const config& ctx,
found = false;
metric_aggregate_by_labels aggregated_values(metric_family.metadata().aggregate_labels);
bool should_aggregate = !metric_family.metadata().aggregate_labels.empty();
metric_family.foreach_metric([&s, &out, &ctx, &found, &name, &metric_family, &aggregated_values, should_aggregate, show_help, &filter](const mi::metric_value& value, const mi::metric_info& value_info) mutable {
metric_family.foreach_metric([&s, &out, &ctx, &found, &name, &metric_family, &aggregated_values, should_aggregate, show_help, &filter](const mi::metric_value& value, const mi::metric_series_metadata& value_info) mutable {
s.clear();
s.str("");
if ((value_info.should_skip_when_empty && value.is_empty()) || !filter(value_info.id.labels())) {
if ((value_info.should_skip_when_empty() && value.is_empty()) || !filter(value_info.labels())) {
return;
}
if (!found) {
Expand All @@ -781,13 +781,13 @@ future<> write_text_representation(output_stream<char>& out, const config& ctx,
found = true;
}
if (should_aggregate) {
aggregated_values.add(value, value_info.id.labels());
aggregated_values.add(value, value_info.labels());
} else if (value.type() == mi::data_type::SUMMARY) {
write_summary(s, ctx, name, value.get_histogram(), value_info.id.labels());
write_summary(s, ctx, name, value.get_histogram(), value_info.labels());
} else if (value.type() == mi::data_type::HISTOGRAM) {
write_histogram(s, ctx, name, value.get_histogram(), value_info.id.labels());
write_histogram(s, ctx, name, value.get_histogram(), value_info.labels());
} else {
add_name(s, name, value_info.id.labels(), ctx);
add_name(s, name, value_info.labels(), ctx);
write_value_as_string(s, value);
s << '\n';
}
Expand Down Expand Up @@ -824,14 +824,14 @@ future<> write_protobuf_representation(output_stream<char>& out, const config& c
bool empty_metric = true;
mtf.set_name(fmt::format("{}_{}", ctx.prefix, name));
mtf.mutable_metric()->Reserve(metric_family.size());
metric_family.foreach_metric([&mtf, &ctx, &filter, &aggregated_values, &empty_metric, should_aggregate](auto value, auto value_info) {
if ((value_info.should_skip_when_empty && value.is_empty()) || !filter(value_info.id.labels())) {
metric_family.foreach_metric([&mtf, &ctx, &filter, &aggregated_values, &empty_metric, should_aggregate](const auto& value, const auto& value_info) {
if ((value_info.should_skip_when_empty() && value.is_empty()) || !filter(value_info.labels())) {
return;
}
if (should_aggregate) {
aggregated_values.add(value, value_info.id.labels());
aggregated_values.add(value, value_info.labels());
} else {
fill_metric(mtf, value, value_info.id.labels(), ctx);
fill_metric(mtf, value, value_info.labels(), ctx);
empty_metric = false;
}
});
Expand Down
36 changes: 21 additions & 15 deletions src/core/scollectd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ struct cpwriter {
return *this;
}

sstring get_type_instance(const seastar::metrics::impl::metric_id & id) {
if (id.labels().empty()) {
return id.name();
sstring get_type_instance(const metrics::metric_name_type& name, const metrics::impl::labels_type& labels) {
if (labels.empty()) {
return name;
}
sstring res = id.name();
for (auto i : id.labels()) {
sstring res = name;
for (auto i : labels) {
if (i.first != seastar::metrics::shard_label.name()) {
res += "-" + i.second;
}
Expand Down Expand Up @@ -289,7 +289,8 @@ struct cpwriter {
}
return *this;
}
cpwriter & put(const sstring & host, const seastar::metrics::impl::metric_id & id, const type_id& type) {
cpwriter & put(const sstring & host, const metrics::group_name_type& group_name, const metrics::metric_name_type& name,
const metrics::impl::labels_type& labels, const type_id& type) {
const auto ts = std::chrono::system_clock::now().time_since_epoch();
const auto lrts =
std::chrono::duration_cast<std::chrono::seconds>(ts).count();
Expand All @@ -299,22 +300,24 @@ struct cpwriter {
// Seems hi-res timestamp does not work very well with
// at the very least my default collectd in fedora (or I did it wrong?)
// Use lo-res ts for now, it is probably quite sufficient.
put_cached(part_type::Plugin, id.group_name());
put_cached(part_type::Plugin, group_name);
// Optional
auto instance_id = labels.at(metrics::shard_label.name());
put_cached(part_type::PluginInst,
id.instance_id() == per_cpu_plugin_instance ?
to_sstring(this_shard_id()) : id.instance_id());
instance_id == per_cpu_plugin_instance ?
to_sstring(this_shard_id()) : instance_id);
put_cached(part_type::Type, type);
// Optional
put_cached(part_type::TypeInst, get_type_instance(id));
put_cached(part_type::TypeInst, get_type_instance(name, labels));
return *this;
}
cpwriter & put(const sstring & host,
const duration & period,
const type_instance_id & id, const value_list & v) {
const auto ps = std::chrono::duration_cast<collectd_hres_duration>(
period).count();
put(host, to_metrics_id(id), id.type());
auto mid = to_metrics_id(id);
put(host, mid.group_name(), mid.name(), mid.labels(), id.type());
put(part_type::Values, v);
if (ps != 0) {
put(part_type::IntervalHr, ps);
Expand All @@ -325,10 +328,11 @@ struct cpwriter {
cpwriter & put(const sstring & host,
const duration & period,
const type_id& type,
const seastar::metrics::impl::metric_id & id, const seastar::metrics::impl::metric_value & v) {
const metrics::group_name_type& group_name, const metrics::metric_name_type& name,
const metrics::impl::labels_type& labels, const seastar::metrics::impl::metric_value & v) {
const auto ps = std::chrono::duration_cast<collectd_hres_duration>(
period).count();
put(host, id, type);
put(host, group_name, name, labels, type);
put(part_type::Values, v);
if (ps != 0) {
put(part_type::IntervalHr, ps);
Expand Down Expand Up @@ -362,7 +366,8 @@ future<> impl::send_metric(const type_instance_id & id,
future<> impl::send_notification(const type_instance_id & id,
const sstring & msg) {
cpwriter out;
out.put(_host, to_metrics_id(id), id.type());
auto mid = to_metrics_id(id);
out.put(_host, mid.group_name(), mid.name(), mid.labels(), id.type());
out.put(part_type::Message, msg);
return _chan.send(_addr, net::packet(out.data(), out.size()));
}
Expand Down Expand Up @@ -457,7 +462,8 @@ void impl::run() {
continue;
}
auto m = out.mark();
out.put(_host, _period, std::get<type_id>(*ctxt), md_iterator->id, *i);
out.put(_host, _period, std::get<type_id>(*ctxt),
md_iterator->group_name(), md_iterator->name(), md_iterator->labels(), *i);
if (!out) {
out.reset(m);
out_of_space = true;
Expand Down
20 changes: 10 additions & 10 deletions tests/unit/metrics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ static std::set<seastar::sstring> get_label_values(seastar::sstring metric_name,
BOOST_REQUIRE(qp_group != cend(all_metadata));
std::set<seastar::sstring> labels;
for (const auto& metric : qp_group->metrics) {
const auto found = metric.id.labels().find(label_name);
BOOST_REQUIRE(found != metric.id.labels().cend());
const auto found = metric.labels().find(label_name);
BOOST_REQUIRE(found != metric.labels().cend());
labels.insert(found->second);
}
return labels;
Expand Down Expand Up @@ -178,15 +178,15 @@ int count_by_label(const std::string& label) {
int count = 0;
for (auto&& md : (*values->metadata)) {
for (auto&& mi : md.metrics) {
if (label == "" || mi.id.labels().find(label) != mi.id.labels().end()) {
if (label == "" || mi.labels().find(label) != mi.labels().end()) {
count++;
}
}
}
return count;
}

int count_by_fun(std::function<bool(const seastar::metrics::impl::metric_info&)> f) {
int count_by_fun(std::function<bool(const seastar::metrics::impl::metric_series_metadata&)> f) {
seastar::foreign_ptr<seastar::metrics::impl::values_reference> values = seastar::metrics::impl::get_values();
int count = 0;
for (auto&& md : (*values->metadata)) {
Expand Down Expand Up @@ -337,8 +337,8 @@ SEASTAR_THREAD_TEST_CASE(test_relabel_enable_disable_skip_when_empty) {
sm::metric_relabeling_result success = sm::set_relabel_configs(rl).get();
BOOST_CHECK_EQUAL(success.metrics_relabeled_due_to_collision, 0);
BOOST_CHECK_EQUAL(count_by_label(""), 3);
BOOST_CHECK_EQUAL(count_by_fun([](const seastar::metrics::impl::metric_info& mi) {
return mi.should_skip_when_empty == sm::skip_when_empty::yes;
BOOST_CHECK_EQUAL(count_by_fun([](const seastar::metrics::impl::metric_series_metadata& mi) {
return mi.should_skip_when_empty() == sm::skip_when_empty::yes;
}), 0);

std::vector<sm::relabel_config> rl2(3);
Expand All @@ -356,8 +356,8 @@ SEASTAR_THREAD_TEST_CASE(test_relabel_enable_disable_skip_when_empty) {
success = sm::set_relabel_configs(rl2).get();
BOOST_CHECK_EQUAL(success.metrics_relabeled_due_to_collision, 0);
BOOST_CHECK_EQUAL(count_by_label(""), 3);
BOOST_CHECK_EQUAL(count_by_fun([](const seastar::metrics::impl::metric_info& mi) {
return mi.should_skip_when_empty == sm::skip_when_empty::yes;
BOOST_CHECK_EQUAL(count_by_fun([](const seastar::metrics::impl::metric_series_metadata& mi) {
return mi.should_skip_when_empty() == sm::skip_when_empty::yes;
}), 3);
// clear the configuration
success = sm::set_relabel_configs({}).get();
Expand All @@ -378,8 +378,8 @@ SEASTAR_THREAD_TEST_CASE(test_relabel_enable_disable_skip_when_empty) {

success = sm::set_relabel_configs(rl3).get();
BOOST_CHECK_EQUAL(success.metrics_relabeled_due_to_collision, 0);
BOOST_CHECK_EQUAL(count_by_fun([](const seastar::metrics::impl::metric_info& mi) {
return mi.should_skip_when_empty == sm::skip_when_empty::yes;
BOOST_CHECK_EQUAL(count_by_fun([](const seastar::metrics::impl::metric_series_metadata& mi) {
return mi.should_skip_when_empty() == sm::skip_when_empty::yes;
}), 0);
sm::set_relabel_configs({}).get();
}
Expand Down

0 comments on commit 7b10c61

Please sign in to comment.