Skip to content

Commit

Permalink
summarize similar groups in "super groups" for less overhead
Browse files Browse the repository at this point in the history
  • Loading branch information
cmnrd committed Jul 31, 2022
1 parent f85a3b8 commit e0c7a60
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 8 deletions.
9 changes: 8 additions & 1 deletion include/reactor-cpp/grouped_scheduling_policy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ struct ReactionGroup {
std::atomic<std::size_t> waiting_for{0};
std::atomic<bool> triggered{false};
std::size_t num_predecessors{0};

std::shared_ptr<ReactionGroup> super_group{nullptr};
std::vector<ReactionGroup*> sub_groups{};
std::vector<ReactionGroup*> triggered_sub_groups{};
std::atomic<std::size_t> triggered_sub_groups_write_pos{0};
};

class GroupedSchedulingPolicy {
Expand Down Expand Up @@ -59,8 +64,10 @@ private:
GroupQueue group_queue_;

void schedule();
auto finalize_group_and_notify_successors(ReactionGroup* group, std::vector<ReactionGroup*>& out_ready_groups) -> bool;
auto finalize_group_and_notify_successors(ReactionGroup* group, std::vector<ReactionGroup*>& out_ready_groups)
-> bool;
void notify_groups(const std::vector<ReactionGroup*>& groups, std::vector<ReactionGroup*>& out_ready_groups);
void notify_super_group(ReactionGroup* group, std::vector<ReactionGroup*>& out_ready_groups);
void terminate_workers();

public:
Expand Down
123 changes: 116 additions & 7 deletions lib/grouped_scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
#include "reactor-cpp/assert.hh"
#include "reactor-cpp/dependency_graph.hh"
#include "reactor-cpp/logging.hh"
#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/scheduler.hh"

#include <algorithm>
#include <atomic>
#include <cstddef>
#include <iterator>
#include <vector>

Expand Down Expand Up @@ -52,7 +54,7 @@ GroupedSchedulingPolicy::GroupedSchedulingPolicy(Scheduler<GroupedSchedulingPoli
: scheduler_(scheduler)
, environment_(env) {}

void GroupedSchedulingPolicy::init() {
void GroupedSchedulingPolicy::init() { // NOLINT
ReactionDependencyGraph graph{environment_.top_level_reactors()};
ReactionDependencyGraph reduced_graph = graph.transitive_reduction();
GroupedDependencyGraph grouped_graph{reduced_graph};
Expand All @@ -64,11 +66,13 @@ void GroupedSchedulingPolicy::init() {
auto g = reduced_grouped_graph.get_graph();

std::map<GroupedDependencyGraph::GroupGraph::vertex_descriptor, std::shared_ptr<ReactionGroup>> vertex_to_group;
std::vector<std::shared_ptr<ReactionGroup>> all_groups;

// create a reaction group for each vertex and keep track of all groups without dependencies in initial_groups_
for (auto* vertex : boost::make_iterator_range(vertices(g))) {
auto group = std::make_shared<ReactionGroup>();
vertex_to_group[vertex] = group;
all_groups.emplace_back(group);

if (boost::in_degree(vertex, g) == 0) {
initial_groups_.push_back(group.get());
Expand Down Expand Up @@ -108,12 +112,69 @@ void GroupedSchedulingPolicy::init() {
num_groups_ = vertex_to_group.size();
group_queue_.init(std::max(num_groups_, environment_.num_workers() + 1));

log::Debug() << "Identified reaction groups: ";
for (const auto& [_, group] : vertex_to_group) {
log::Debug() << "* Group " << group->id << ':';
log::Debug() << " + reactions:";
for (auto [_, reaction] : group->reactions) {
log::Debug() << " - " << reaction->fqn();
auto& successors = group->successors;
std::set<ReactionGroup*> in_super_group;
if (successors.size() > 1) {
for (auto* successor_a : successors) {
if (in_super_group.count(successor_a) == 0 && successor_a->num_predecessors <= 1) {
std::vector<ReactionGroup*> same_successors;
std::copy_if(successors.begin(), successors.end(), std::back_insert_iterator(same_successors),
[successor_a](ReactionGroup* successor_b) {
return successor_b->num_predecessors <= 1 &&
successor_a->successors == successor_b->successors;
});
if (same_successors.size() > 1) {
auto super_group = std::make_shared<ReactionGroup>();
all_groups.emplace_back(super_group);

super_group->id = id_counter++;
super_group->successors = successor_a->successors;
super_group->num_predecessors = successor_a->num_predecessors;
super_group->waiting_for.store(successor_a->num_predecessors, std::memory_order_release);
super_group->triggered_sub_groups.resize(same_successors.size());

for (auto* sub_group : same_successors) {
super_group->sub_groups.emplace_back(sub_group);
sub_group->super_group = super_group;

// remove sub_group from the successor list of our starting group
auto it = std::find(group->successors.begin(), group->successors.end(), sub_group);
reactor_assert(it != group->successors.end());
group->successors.erase(it);
}
// add the newly creates super_group as a successor to the starting group
group->successors.emplace_back(super_group.get());

log::Debug() << "Super Group for Group " << successor_a->id << ':';
for (auto* elem : same_successors) {
in_super_group.insert(elem);
log::Debug() << " - Group " << elem->id;
}
}
}
}
}
}

log::Debug() << "Identified reaction groups: ";
for (const auto& group : all_groups) {
if (group->sub_groups.empty()) {
log::Debug() << "* Group " << group->id << ':';
log::Debug() << " + reactions:";
for (auto [_, reaction] : group->reactions) {
log::Debug() << " - " << reaction->fqn();
}
if (group->super_group != nullptr) {
log::Debug() << " + super group: " << group->super_group->id;
}
} else {
log::Debug() << "* Super Group " << group->id << ':';
reactor_assert(group->reactions.empty());
log::Debug() << " + sub groups:";
for (auto* sub_group : group->sub_groups) {
log::Debug() << " - " << sub_group->id;
}
}
log::Debug() << " + successors:";
for (const auto* successor : group->successors) {
Expand Down Expand Up @@ -146,18 +207,58 @@ auto GroupedSchedulingPolicy::finalize_group_and_notify_successors(ReactionGroup
return 1 == groups_to_process_.fetch_sub(1, std::memory_order_acq_rel);
}

void GroupedSchedulingPolicy::notify_super_group(ReactionGroup* group, std::vector<ReactionGroup*>& out_ready_groups) {
// the group is a super group with triggered sub groups
// -> extract all the triggered subgroups
std::atomic_thread_fence(std::memory_order_release);
auto num_triggered = group->triggered_sub_groups_write_pos.load(std::memory_order_relaxed);
for (std::size_t i{0}; i < num_triggered; i++) {
out_ready_groups.emplace_back(group->triggered_sub_groups[i]);
group->triggered_sub_groups[i]->triggered.store(false, std::memory_order_relaxed);
}
group->waiting_for.store(group->num_predecessors, std::memory_order_relaxed);
group->triggered_sub_groups_write_pos.store(0, std::memory_order_relaxed);
log::Debug() << "num_triggered: " << num_triggered;

// we do not need to process the untriggered subgroups and can directly decrement the counter
const auto num_untriggered = group->sub_groups.size() - num_triggered;
if (num_untriggered > 0) {
groups_to_process_.fetch_sub(num_untriggered, std::memory_order_acq_rel);
}

// update successor if there is any
if (!group->successors.empty()) {
reactor_assert(group->successors.size() == 1);
reactor_assert(group->successors[0]->num_predecessors == 1);
reactor_assert(group->successors[0]->sub_groups.empty());

group->successors[0]->waiting_for.fetch_sub(num_untriggered, std::memory_order_acq_rel);
// if none of the groups was triggered, then we can directly notify the successor
if (num_triggered == 0) {
notify_groups(group->successors, out_ready_groups);
}
}
}

void GroupedSchedulingPolicy::notify_groups(const std::vector<ReactionGroup*>& groups,
std::vector<ReactionGroup*>& out_ready_groups) {
for (auto* group : groups) {
// decrement the waiting for counter
auto old = group->waiting_for.fetch_sub(1, std::memory_order_relaxed);

// If the old value was 1 (or 0), then all dependencies are fulfilled and the group is ready for execution
if (old <= 1) {
// If the group was triggered, then add it to the ready queue. Otherwise, we skip the group and check its
// successors.
log::Debug() << "Group " << group->id << " is ready";
if (group->triggered.exchange(false, std::memory_order_relaxed)) {
log::Debug() << "Bar1";
out_ready_groups.emplace_back(group);
} else if (!group->sub_groups.empty()) {
log::Debug() << "Bar2";
notify_super_group(group, out_ready_groups);
} else {
log::Debug() << "Bar3";
finalize_group_and_notify_successors(group, out_ready_groups);
}
}
Expand Down Expand Up @@ -241,7 +342,15 @@ void GroupedSchedulingPolicy::trigger_reaction(Reaction* reaction) {
log::Debug() << "(GroupedSchedulingPolicy) trigger reaction " << reaction->fqn() << " in Group " << group->id;
auto& triggered_reaction_pair = group->reactions[reaction->index()];
triggered_reaction_pair.first = true;
group->triggered.store(true, std::memory_order_release);
std::atomic_thread_fence(std::memory_order_release);
bool old = group->triggered.exchange(true, std::memory_order_relaxed);
// Also notify the super group if there is one and if we did not notify it yet.
if (!old && group->super_group != nullptr) {
log::Debug() << "(GroupedSchedulingPolicy) trigger group " << group->id << " in super group "
<< group->super_group->id;
auto pos = group->super_group->triggered_sub_groups_write_pos.fetch_add(1, std::memory_order_relaxed);
group->super_group->triggered_sub_groups[pos] = group.get();
}
}

void GroupedSchedulingPolicy::process_group(const Worker<GroupedSchedulingPolicy>& worker, ReactionGroup* group) {
Expand Down

0 comments on commit e0c7a60

Please sign in to comment.