Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler Optimizations #18

Draft
wants to merge 50 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
a7c7c59
add boost as a dependency
cmnrd Jun 30, 2022
77add13
boost graph hello world
cmnrd Jun 30, 2022
3395587
add all reactions to the new dependency graph
cmnrd Jul 1, 2022
8b5923d
sort reactions within reactor by priority
cmnrd Jul 1, 2022
f2306e3
add dependencies within each reactor (priorities)
cmnrd Jul 1, 2022
b16526f
add data dependencies to graph
cmnrd Jul 4, 2022
d8e3a8b
annotate and color the graph
cmnrd Jul 4, 2022
f7a12ef
remove transitive edges
cmnrd Jul 4, 2022
a05e015
restructure the code and introduce an actual class
cmnrd Jul 4, 2022
76f70e7
fix priority sorting
cmnrd Jul 4, 2022
14310f1
first implementation of a grouped dependency graph
cmnrd Jul 4, 2022
8eb1ffa
group reactions within the same reactor
cmnrd Jul 5, 2022
a713d90
simplify contraction algorithm
cmnrd Jul 5, 2022
ca41810
reduce the grouped graph
cmnrd Jul 5, 2022
febe19f
fix segfaults
cmnrd Jul 6, 2022
442fed3
fix a potential bug
cmnrd Jul 6, 2022
2003b58
bugfix in graphviz output
cmnrd Jul 6, 2022
192d5b0
finally group chains
cmnrd Jul 6, 2022
43aec94
check if a contraction would introduce a cycle
cmnrd Jul 8, 2022
97f9eb1
annotate dependency types in basic graph
cmnrd Jul 12, 2022
16b4708
expose the grouped dependency graph from the environment
cmnrd Jul 12, 2022
58e6ffe
copy the original scheduler to grouped_scheduler
cmnrd Jul 12, 2022
88cfadc
first step at introducing a scheduling policy
cmnrd Jul 19, 2022
8788090
fix Worker name
cmnrd Jul 19, 2022
87a2f36
install boost in CI
cmnrd Jul 19, 2022
4220ab9
set minimal boost version to 1.71
cmnrd Jul 19, 2022
24acace
place the ready queue within the default policy
cmnrd Jul 19, 2022
99a8db6
implement the default policy in its own file
cmnrd Jul 19, 2022
2a3bec2
almost complete implementation of the scheduling policy
cmnrd Jul 20, 2022
09acece
factor out a base scheduler interface
cmnrd Jul 22, 2022
b57f6ac
apply clang-format
cmnrd Jul 22, 2022
b8e3976
fix conversion warning
cmnrd Jul 22, 2022
70bfe97
delete the group scheduler again
cmnrd Jul 26, 2022
0c551be
Merge branch 'scheduling-policy' into optimize-graph
cmnrd Jul 26, 2022
81c7354
define grouped scheduling policy
cmnrd Jul 26, 2022
7dc6920
add an dummy implementation
cmnrd Jul 26, 2022
b6d1d71
compute the grouped graph within the policy
cmnrd Jul 27, 2022
430c5a0
create a data structure of managing all reaction groups
cmnrd Jul 28, 2022
1acb6c7
store group pointer in each reaction
cmnrd Jul 28, 2022
cc03940
implement triggering of reactions
cmnrd Jul 28, 2022
b3fb04f
implement a simple single threaded scheduling strategy
cmnrd Jul 28, 2022
19a8834
be consistent with types
cmnrd Jul 29, 2022
77da4a8
implementm multithreaded group based scheduling
cmnrd Jul 29, 2022
27abf26
clean up code and only process triggered groups
cmnrd Jul 30, 2022
f85a3b8
fix bug in termination procedure
cmnrd Jul 30, 2022
ee0a4ef
summarize similar groups in "super groups" for less overhead
cmnrd Jul 31, 2022
a4c886c
also reset group queue before terminating
cmnrd Jul 31, 2022
823645f
schedule in loop also for the initial scheduling
cmnrd Jul 31, 2022
0efd929
support supergroups with multiple successors
cmnrd Jul 31, 2022
2c19152
bugfix and simplification of notify mechanism for supergroups
cmnrd Aug 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/cpp-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ jobs:
repository: lf-lang/lingua-franca
submodules: true
ref: ${{ inputs.compiler-ref }}
- name: Install boost on Ubuntu
run: |
sudo apt-get update
sudo apt-get install -y boost boost-dev
if: matrix.platform == 'ubuntu-latest'
- name: Set cpp runtime version
run: |
echo ${{ inputs.runtime-ref }} > org.lflang/src/org/lflang/generator/cpp/cpp-runtime-version.txt
Expand Down
9 changes: 7 additions & 2 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
os: [ubuntu-latest]

steps:
- uses: actions/checkout@v1
Expand All @@ -23,6 +23,11 @@ jobs:
sudo apt-get update
sudo apt-get install -y clang-tidy
if: matrix.os == 'ubuntu-latest'
- name: Install boost on Ubuntu
run: |
sudo apt-get update
sudo apt-get install -y libboost-all-dev
if: matrix.os == 'ubuntu-latest'
- name: configure
run: |
mkdir build
Expand All @@ -35,7 +40,7 @@ jobs:
cmake --build build --target examples

lf-tests-pull-request:
uses: lf-lang/lingua-franca/.github/workflows/cpp-tests.yml@master
uses: lf-lang/lingua-franca/.github/workflows/cpp-tests.yml@cpp-boost
with:
runtime-ref: ${{github.ref}}
compiler-ref: master
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ if (NOT DEFINED LF_REACTOR_CPP_SUFFIX)
endif()

find_package (Threads)
find_package (Boost 1.71 COMPONENTS graph REQUIRED)

set(DEFAULT_BUILD_TYPE "Release")
if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
Expand Down
4 changes: 2 additions & 2 deletions include/reactor-cpp/action.hh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public:
[[nodiscard]] auto inline min_delay() const noexcept -> Duration { return min_delay_; }

friend class Reaction;
friend class Scheduler;
template<class SchedulingPolicy> friend class Scheduler;
};

template <class T> class Action : public BaseAction {
Expand Down Expand Up @@ -154,6 +154,6 @@ public:

} // namespace reactor

#include "impl/action_impl.hh"
#include "reactor-cpp/impl/action_impl.hh"

#endif // REACTOR_CPP_ACTION_HH
1 change: 1 addition & 0 deletions include/reactor-cpp/assert.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ constexpr bool runtime_assertion = true;
#include "environment.hh"

#include <cassert>
#include <map>
#include <sstream>
#include <stdexcept>
#include <string>
Expand Down
60 changes: 60 additions & 0 deletions include/reactor-cpp/base_scheduler.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2022 TU Dresden
* All rights reserved.
*
* Authors:
* Christian Menard
*/

#ifndef REACTOR_CPP_BASE_SCHEDULER_HH
#define REACTOR_CPP_BASE_SCHEDULER_HH

#include <atomic>
#include <condition_variable>
#include <functional>
#include <map>
#include <mutex>

#include "reactor-cpp/environment.hh"
#include "reactor-cpp/fwd.hh"
#include "reactor-cpp/logical_time.hh"

namespace reactor {

using EventMap = std::map<BaseAction*, std::function<void(void)>>;

class BaseScheduler {
protected:
Environment* environment_; // NOLINT

LogicalTime logical_time_{}; // NOLINT
const bool using_workers_; // NOLINT

std::mutex scheduling_mutex_; // NOLINT
std::unique_lock<std::mutex> scheduling_lock_{scheduling_mutex_, std::defer_lock}; // NOLINT
std::condition_variable cv_schedule_; // NOLINT

std::mutex lock_event_queue_; // NOLINT
std::map<Tag, EventMap> event_queue_; // NOLINT

public:
BaseScheduler(Environment* env);
virtual ~BaseScheduler() = default;
BaseScheduler(const BaseScheduler&) = delete;
BaseScheduler(BaseScheduler&&) = delete;
auto operator=(const BaseScheduler&) -> BaseScheduler& = delete;
auto operator=(BaseScheduler&&) -> BaseScheduler& = delete;

[[nodiscard]] inline auto logical_time() const noexcept -> const auto& { return logical_time_; }
void schedule_sync(const Tag& tag, BaseAction* action, std::function<void(void)> pre_handler);
void schedule_async(const Tag& tag, BaseAction* action, std::function<void(void)> pre_handler);

virtual void set_port(BasePort* port) = 0;

void inline lock() noexcept { scheduling_lock_.lock(); }
void inline unlock() noexcept { scheduling_lock_.unlock(); }
};

} // namespace reactor

#endif // REACTOR_CPP_BASE_SCHEDULER_HH
86 changes: 86 additions & 0 deletions include/reactor-cpp/default_scheduling_policy.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (C) 2022 TU Dresden
* All rights reserved.
*
* Authors:
* Christian Menard
*/

#ifndef REACTOR_CPP_DEFAULT_SCHEDULING_POLICY_HH
#define REACTOR_CPP_DEFAULT_SCHEDULING_POLICY_HH

#include <cstddef>
#include <vector>

#include "reactor-cpp/fwd.hh"
#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/semaphore.hh"

namespace reactor {

class DefaultSchedulingPolicy {
Scheduler<DefaultSchedulingPolicy>& scheduler_;
Environment& environment_;
std::size_t identity_counter{0};

class ReadyQueue {
private:
std::vector<Reaction*> queue_{};
std::atomic<std::ptrdiff_t> size_{0};
Semaphore sem_{0};
std::ptrdiff_t waiting_workers_{0};
const unsigned int num_workers_;

public:
explicit ReadyQueue(unsigned num_workers)
: num_workers_(num_workers) {}

/**
* Retrieve a ready reaction from the queue.
*
* This method may be called concurrently. In case the queue is empty, the
* method blocks and waits until a ready reaction becomes available.
*/
auto pop() -> Reaction*;

/**
* Fill the queue up with ready reactions.
*
* This method assumes that the internal queue is empty. It moves all
* reactions from the provided `ready_reactions` vector to the internal
* queue, leaving `ready_reactions` empty.
*
* Note that this method is not thread-safe. The caller needs to ensure that
* no other thread will try to read from the queue during this operation.
*/
void fill_up(std::vector<Reaction*>& ready_reactions);
};

ReadyQueue ready_queue_;

std::vector<std::vector<Reaction*>> reaction_queue_;
unsigned int reaction_queue_pos_{std::numeric_limits<unsigned>::max()};

std::atomic<std::ptrdiff_t> reactions_to_process_{0};
std::vector<std::vector<Reaction*>> triggered_reactions_;

bool continue_execution_{true};

void schedule() noexcept;
void terminate_all_workers();
auto schedule_ready_reactions() -> bool;

public:
DefaultSchedulingPolicy(Scheduler<DefaultSchedulingPolicy>& scheduler, Environment& env);

void init();
auto create_worker() -> Worker<DefaultSchedulingPolicy>;
void worker_function(const Worker<DefaultSchedulingPolicy>& worker);

void trigger_reaction_from_next(Reaction* reaction);
void trigger_reaction_from_set_port(Reaction* reaction);
};

} // namespace reactor

#endif // REACTOR_CPP_DEFAULT_SCHEDULING_POLICY_HH
139 changes: 139 additions & 0 deletions include/reactor-cpp/dependency_graph.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#ifndef REACTOR_CPP_DEPENDENCY_GRAPH_HH
#define REACTOR_CPP_DEPENDENCY_GRAPH_HH

#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/reactor.hh"

#include <boost/graph/breadth_first_search.hpp>
#include <boost/graph/directed_graph.hpp>
#include <boost/graph/graph_selectors.hpp>
#include <boost/graph/properties.hpp>
#include <boost/pending/property.hpp>
#include <vector>

namespace reactor {

class GroupedDependencyGraph;

enum class DependencyType { Undefined, Priority, Trigger, Effect };

class ReactionDependencyGraph {
private:
struct reaction_info_t {
using kind = boost::vertex_property_tag;
};
struct dependency_info_t {
using kind = boost::edge_property_tag;
};
using ReactionProperty = boost::property<reaction_info_t, Reaction*>;
using DependencyProperty = boost::property<dependency_info_t, DependencyType>;
using ReactionGraph = boost::directed_graph<ReactionProperty, DependencyProperty>;
using ReactionToVertexMap = std::map<Reaction*, ReactionGraph::vertex_descriptor>;
using ReactionPropertyMap = boost::property_map<ReactionGraph, reaction_info_t>::type;
using DependencyPropertyMap = boost::property_map<ReactionGraph, dependency_info_t>::type;

ReactionGraph graph{};
ReactionToVertexMap vertex_map{};

// helper functions
void populate_graph_with_reactions(const Reactor* reactor);
void populate_graph_with_priority_edges(const Reactor* reactor);
void populate_graph_with_dependency_edges(const Reactor* reactor);

[[nodiscard]] auto get_reaction_property_map() -> ReactionPropertyMap { return boost::get(reaction_info_t{}, graph); }
[[nodiscard]] auto get_dependency_property_map() -> DependencyPropertyMap {
return boost::get(dependency_info_t{}, graph);
}

ReactionDependencyGraph() = default;

public:
ReactionDependencyGraph(const std::set<Reactor*>& top_level_reactors);

void export_graphviz(const std::string& file_name);

// TODO: This should be const, but I don't know how to get immutable access to the reaction graph properties...
[[nodiscard]] auto transitive_reduction() -> ReactionDependencyGraph;

friend GroupedDependencyGraph;
};

class GroupedDependencyGraph {
public:
struct group_info_t {
using kind = boost::vertex_property_tag;
};
using Group = std::vector<Reaction*>;
using GroupProperty = boost::property<group_info_t, Group>;
using GroupGraph = boost::directed_graph<GroupProperty>;
using ReactionToVertexMap = std::map<Reaction*, GroupGraph::vertex_descriptor>;
using GroupPropertyMap = boost::property_map<GroupGraph, group_info_t>::type;

private:
GroupGraph graph{};
ReactionToVertexMap vertex_map{};

struct ReachabilityVisitor : public boost::default_bfs_visitor {
private:
GroupGraph::vertex_descriptor to;

public:
ReachabilityVisitor(GroupGraph::vertex_descriptor to)
: to{to} {}

// this exception is used to stop the BFS early if a matching vertex is found
struct PathExists : public std::exception {};

// this throws PathExists on success
void discover_vertex(GroupGraph::vertex_descriptor u, [[maybe_unused]] const GroupGraph& g) const {
if (u == to) {
throw PathExists();
}
}
};

class NonEmptyGoupFilter {
private:
GroupPropertyMap property_map;

public:
NonEmptyGoupFilter() = default;
NonEmptyGoupFilter(const GroupPropertyMap& map)
: property_map{map} {}

auto operator()(GroupGraph::vertex_descriptor vertex) const -> bool {
return !boost::get(property_map, vertex).empty();
}
};

void group_reactions_by_container_helper(const Reactor* reactor);

void clear_all_empty_vertices();

public:
// TODO: This should be a const reference, but I don't know how to get immutable access to the reaction graph
// properties...
GroupedDependencyGraph(ReactionDependencyGraph& reactionGraph);
GroupedDependencyGraph() = default;

void export_graphviz(const std::string& file_name);

auto has_path(GroupGraph::vertex_descriptor va, GroupGraph::vertex_descriptor vb) const -> bool;

void try_contract_edge(GroupGraph::vertex_descriptor va, GroupGraph::vertex_descriptor vb);

void group_reactions_by_container(const std::set<Reactor*>& top_level_reactors);

void group_chains();

// TODO: This should be const, but I don't know how to get immutable access to the reaction graph properties...
[[nodiscard]] auto transitive_reduction() -> GroupedDependencyGraph;

[[nodiscard]] auto get_graph() const -> const GroupGraph& { return graph; }

[[nodiscard]] auto get_group_property_map() -> GroupPropertyMap { return boost::get(group_info_t{}, graph); }
};

} // namespace reactor

#endif
Loading