diff --git a/include/reactor-cpp/grouped_scheduling_policy.hh b/include/reactor-cpp/grouped_scheduling_policy.hh index 71b57494..2d982fde 100644 --- a/include/reactor-cpp/grouped_scheduling_policy.hh +++ b/include/reactor-cpp/grouped_scheduling_policy.hh @@ -25,6 +25,11 @@ struct ReactionGroup { std::atomic waiting_for{0}; std::atomic triggered{false}; std::size_t num_predecessors{0}; + + std::shared_ptr super_group{nullptr}; + std::vector sub_groups{}; + std::vector triggered_sub_groups{}; + std::atomic triggered_sub_groups_write_pos{0}; }; class GroupedSchedulingPolicy { @@ -59,8 +64,10 @@ private: GroupQueue group_queue_; void schedule(); - auto finalize_group_and_notify_successors(ReactionGroup* group, std::vector& out_ready_groups) -> bool; + auto finalize_group_and_notify_successors(ReactionGroup* group, std::vector& out_ready_groups) + -> bool; void notify_groups(const std::vector& groups, std::vector& out_ready_groups); + void notify_super_group(ReactionGroup* group, std::vector& out_ready_groups); void terminate_workers(); public: diff --git a/lib/grouped_scheduling_policy.cc b/lib/grouped_scheduling_policy.cc index bb173794..4a533f38 100644 --- a/lib/grouped_scheduling_policy.cc +++ b/lib/grouped_scheduling_policy.cc @@ -11,10 +11,12 @@ #include "reactor-cpp/assert.hh" #include "reactor-cpp/dependency_graph.hh" #include "reactor-cpp/logging.hh" +#include "reactor-cpp/reaction.hh" #include "reactor-cpp/scheduler.hh" #include #include +#include #include #include @@ -52,7 +54,7 @@ GroupedSchedulingPolicy::GroupedSchedulingPolicy(Scheduler> vertex_to_group; + std::vector> all_groups; // create a reaction group for each vertex and keep track of all groups without dependencies in initial_groups_ for (auto* vertex : boost::make_iterator_range(vertices(g))) { auto group = std::make_shared(); vertex_to_group[vertex] = group; + all_groups.emplace_back(group); if (boost::in_degree(vertex, g) == 0) { initial_groups_.push_back(group.get()); @@ -108,12 +112,69 @@ void GroupedSchedulingPolicy::init() { num_groups_ = vertex_to_group.size(); group_queue_.init(std::max(num_groups_, environment_.num_workers() + 1)); - log::Debug() << "Identified reaction groups: "; for (const auto& [_, group] : vertex_to_group) { - log::Debug() << "* Group " << group->id << ':'; - log::Debug() << " + reactions:"; - for (auto [_, reaction] : group->reactions) { - log::Debug() << " - " << reaction->fqn(); + auto& successors = group->successors; + std::set in_super_group; + if (successors.size() > 1) { + for (auto* successor_a : successors) { + if (in_super_group.count(successor_a) == 0 && successor_a->num_predecessors <= 1) { + std::vector same_successors; + std::copy_if(successors.begin(), successors.end(), std::back_insert_iterator(same_successors), + [successor_a](ReactionGroup* successor_b) { + return successor_b->num_predecessors <= 1 && + successor_a->successors == successor_b->successors; + }); + if (same_successors.size() > 1) { + auto super_group = std::make_shared(); + all_groups.emplace_back(super_group); + + super_group->id = id_counter++; + super_group->successors = successor_a->successors; + super_group->num_predecessors = successor_a->num_predecessors; + super_group->waiting_for.store(successor_a->num_predecessors, std::memory_order_release); + super_group->triggered_sub_groups.resize(same_successors.size()); + + for (auto* sub_group : same_successors) { + super_group->sub_groups.emplace_back(sub_group); + sub_group->super_group = super_group; + + // remove sub_group from the successor list of our starting group + auto it = std::find(group->successors.begin(), group->successors.end(), sub_group); + reactor_assert(it != group->successors.end()); + group->successors.erase(it); + } + // add the newly creates super_group as a successor to the starting group + group->successors.emplace_back(super_group.get()); + + log::Debug() << "Super Group for Group " << successor_a->id << ':'; + for (auto* elem : same_successors) { + in_super_group.insert(elem); + log::Debug() << " - Group " << elem->id; + } + } + } + } + } + } + + log::Debug() << "Identified reaction groups: "; + for (const auto& group : all_groups) { + if (group->sub_groups.empty()) { + log::Debug() << "* Group " << group->id << ':'; + log::Debug() << " + reactions:"; + for (auto [_, reaction] : group->reactions) { + log::Debug() << " - " << reaction->fqn(); + } + if (group->super_group != nullptr) { + log::Debug() << " + super group: " << group->super_group->id; + } + } else { + log::Debug() << "* Super Group " << group->id << ':'; + reactor_assert(group->reactions.empty()); + log::Debug() << " + sub groups:"; + for (auto* sub_group : group->sub_groups) { + log::Debug() << " - " << sub_group->id; + } } log::Debug() << " + successors:"; for (const auto* successor : group->successors) { @@ -146,18 +207,58 @@ auto GroupedSchedulingPolicy::finalize_group_and_notify_successors(ReactionGroup return 1 == groups_to_process_.fetch_sub(1, std::memory_order_acq_rel); } +void GroupedSchedulingPolicy::notify_super_group(ReactionGroup* group, std::vector& out_ready_groups) { + // the group is a super group with triggered sub groups + // -> extract all the triggered subgroups + std::atomic_thread_fence(std::memory_order_release); + auto num_triggered = group->triggered_sub_groups_write_pos.load(std::memory_order_relaxed); + for (std::size_t i{0}; i < num_triggered; i++) { + out_ready_groups.emplace_back(group->triggered_sub_groups[i]); + group->triggered_sub_groups[i]->triggered.store(false, std::memory_order_relaxed); + } + group->waiting_for.store(group->num_predecessors, std::memory_order_relaxed); + group->triggered_sub_groups_write_pos.store(0, std::memory_order_relaxed); + log::Debug() << "num_triggered: " << num_triggered; + + // we do not need to process the untriggered subgroups and can directly decrement the counter + const auto num_untriggered = group->sub_groups.size() - num_triggered; + if (num_untriggered > 0) { + groups_to_process_.fetch_sub(num_untriggered, std::memory_order_acq_rel); + } + + // update successor if there is any + if (!group->successors.empty()) { + reactor_assert(group->successors.size() == 1); + reactor_assert(group->successors[0]->num_predecessors == 1); + reactor_assert(group->successors[0]->sub_groups.empty()); + + group->successors[0]->waiting_for.fetch_sub(num_untriggered, std::memory_order_acq_rel); + // if none of the groups was triggered, then we can directly notify the successor + if (num_triggered == 0) { + notify_groups(group->successors, out_ready_groups); + } + } +} + void GroupedSchedulingPolicy::notify_groups(const std::vector& groups, std::vector& out_ready_groups) { for (auto* group : groups) { // decrement the waiting for counter auto old = group->waiting_for.fetch_sub(1, std::memory_order_relaxed); + // If the old value was 1 (or 0), then all dependencies are fulfilled and the group is ready for execution if (old <= 1) { // If the group was triggered, then add it to the ready queue. Otherwise, we skip the group and check its // successors. + log::Debug() << "Group " << group->id << " is ready"; if (group->triggered.exchange(false, std::memory_order_relaxed)) { + log::Debug() << "Bar1"; out_ready_groups.emplace_back(group); + } else if (!group->sub_groups.empty()) { + log::Debug() << "Bar2"; + notify_super_group(group, out_ready_groups); } else { + log::Debug() << "Bar3"; finalize_group_and_notify_successors(group, out_ready_groups); } } @@ -241,7 +342,15 @@ void GroupedSchedulingPolicy::trigger_reaction(Reaction* reaction) { log::Debug() << "(GroupedSchedulingPolicy) trigger reaction " << reaction->fqn() << " in Group " << group->id; auto& triggered_reaction_pair = group->reactions[reaction->index()]; triggered_reaction_pair.first = true; - group->triggered.store(true, std::memory_order_release); + std::atomic_thread_fence(std::memory_order_release); + bool old = group->triggered.exchange(true, std::memory_order_relaxed); + // Also notify the super group if there is one and if we did not notify it yet. + if (!old && group->super_group != nullptr) { + log::Debug() << "(GroupedSchedulingPolicy) trigger group " << group->id << " in super group " + << group->super_group->id; + auto pos = group->super_group->triggered_sub_groups_write_pos.fetch_add(1, std::memory_order_relaxed); + group->super_group->triggered_sub_groups[pos] = group.get(); + } } void GroupedSchedulingPolicy::process_group(const Worker& worker, ReactionGroup* group) {