Skip to content

Commit

Permalink
merge with master
Browse files Browse the repository at this point in the history
  • Loading branch information
tanneberger committed Dec 4, 2023
2 parents e592f27 + 3b25ffe commit d3ba914
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 19 deletions.
10 changes: 8 additions & 2 deletions include/reactor-cpp/environment.hh
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,18 @@ private:

Scheduler scheduler_;
Phase phase_{Phase::Construction};
Tag start_tag_{};

/// Timeout as given in the constructor
const Duration timeout_{};

Graph<BasePort> graph_{};
Graph<BasePort> optimized_graph_{};

/// The start tag as determined during startup()
Tag start_tag_{};
/// The timeout tag as determined during startup()
Tag timeout_tag_{};

void build_dependency_graph(Reactor* reactor);
void calculate_indexes();

Expand All @@ -83,7 +88,7 @@ public:

template <class T> void draw_connection(Port<T>* source, Port<T>* sink, ConnectionProperties properties) {
if (top_environment_ == nullptr || top_environment_ == this) {
log::Debug() << "drawing connection: " << source << " --> " << sink;
log::Debug() << "drawing connection: " << source->fqn() << " --> " << sink->fqn();
graph_.add_edge(source, sink, properties);
} else {
top_environment_->draw_connection(source, sink, properties);
Expand Down Expand Up @@ -117,6 +122,7 @@ public:
[[nodiscard]] auto logical_time() const noexcept -> const LogicalTime& { return scheduler_.logical_time(); }
[[nodiscard]] auto start_tag() const noexcept -> const Tag& { return start_tag_; }
[[nodiscard]] auto timeout() const noexcept -> const Duration& { return timeout_; }
[[nodiscard]] auto timeout_tag() const noexcept -> const Tag& { return timeout_tag_; }

static auto physical_time() noexcept -> TimePoint { return get_physical_time(); }

Expand Down
2 changes: 2 additions & 0 deletions include/reactor-cpp/logical_time.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public:
[[nodiscard]] auto delay(Duration offset = Duration::zero()) const noexcept -> Tag;
[[nodiscard]] auto subtract(Duration offset = Duration::zero()) const noexcept -> Tag;
[[nodiscard]] auto decrement() const noexcept -> Tag;

[[nodiscard]] static auto max() noexcept -> Tag { return {TimePoint::max(), std::numeric_limits<mstep_t>::max()}; }
};

// define all the comparison operators
Expand Down
2 changes: 2 additions & 0 deletions include/reactor-cpp/scheduler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ private:
std::vector<ReleaseTagCallback> release_tag_callbacks_{};
void release_current_tag();

void cleanup_after_tag();

void schedule() noexcept;
auto schedule_ready_reactions() -> bool;
void next();
Expand Down
11 changes: 3 additions & 8 deletions lib/action.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,11 @@ void Timer::cleanup() noexcept {
ShutdownTrigger::ShutdownTrigger(const std::string& name, Reactor* container)
: Timer(name, container, Duration::zero(), container->environment()->timeout()) {}

void ShutdownTrigger::setup() noexcept {
BaseAction::setup();
environment()->sync_shutdown();
}
void ShutdownTrigger::setup() noexcept { BaseAction::setup(); }

void ShutdownTrigger::shutdown() {
if (!is_present()) {
Tag tag = Tag::from_logical_time(environment()->logical_time()).delay();
environment()->scheduler()->schedule_sync(this, tag);
}
Tag tag = Tag::from_logical_time(environment()->logical_time()).delay();
environment()->scheduler()->schedule_sync(this, tag);
}

auto Action<void>::schedule_at(const Tag& tag) -> bool {
Expand Down
16 changes: 16 additions & 0 deletions lib/environment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ void Environment::assemble() { // NOLINT
env->assemble();
}

// If this is the top level environment, then instantiate all connections.
if (top_environment_ == nullptr || top_environment_ == this) {
log::Debug() << "start optimization on port graph";
this->optimize();
Expand Down Expand Up @@ -339,10 +340,25 @@ auto Environment::startup() -> std::thread {
auto Environment::startup(const TimePoint& start_time) -> std::thread {
validate(this->phase() == Phase::Assembly, "startup() may only be called during assembly phase!");

log::Debug() << "Building the Dependency-Graph";
for (auto* reactor : top_level_reactors_) {
build_dependency_graph(reactor);
}

calculate_indexes();

log_.debug() << "Starting the execution";
phase_ = Phase::Startup;

this->start_tag_ = Tag::from_physical_time(start_time);
if (this->timeout_ == Duration::max()) {
this->timeout_tag_ = Tag::max();
} else if (this->timeout_ == Duration::zero()) {
this->timeout_tag_ = this->start_tag_;
} else {
this->timeout_tag_ = this->start_tag_.delay(this->timeout_);
}

// start up initialize all reactors
for (auto* reactor : top_level_reactors_) {
reactor->startup();
Expand Down
35 changes: 26 additions & 9 deletions lib/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,18 @@ void Scheduler::schedule() noexcept {
bool found_ready_reactions = schedule_ready_reactions();

while (!found_ready_reactions) {
if (!continue_execution_ && !found_ready_reactions) {
// Cleanup and let all workers know that they should terminate.
cleanup_after_tag();
terminate_all_workers();
break;
}

log_.debug() << "call next()";
next();
reaction_queue_pos_ = 0;

found_ready_reactions = schedule_ready_reactions();

if (!continue_execution_ && !found_ready_reactions) {
// let all workers know that they should terminate
terminate_all_workers();
break;
}
}
}

Expand Down Expand Up @@ -306,7 +307,7 @@ void Scheduler::advance_logical_time_to(const Tag& tag) {
Statistics::increment_processed_events();
}

void Scheduler::next() { // NOLINT
void Scheduler::cleanup_after_tag() {
// Notify other environments and let them know that we finished processing the
// current tag
release_current_tag();
Expand All @@ -328,6 +329,11 @@ void Scheduler::next() { // NOLINT
}
vec_ports.clear();
}
}

void Scheduler::next() { // NOLINT
// First, clean up after the last tag.
cleanup_after_tag();

{
std::unique_lock<std::mutex> lock{scheduling_mutex_};
Expand All @@ -354,8 +360,8 @@ void Scheduler::next() { // NOLINT
log_.debug() << "Shutting down the scheduler";
Tag t_next = Tag::from_logical_time(logical_time_).delay();
if (!event_queue_.empty() && t_next == event_queue_.next_tag()) {
log_.debug() << "Schedule the last round of reactions including all "
"termination reactions";
log_.debug() << "Trigger the last round of reactions including all "
"shutdown reactions";
triggered_actions_ = event_queue_.extract_next_event();
advance_logical_time_to(t_next);
} else {
Expand Down Expand Up @@ -395,6 +401,17 @@ void Scheduler::next() { // NOLINT
continue;
}

// Stop execution in case we reach the timeout tag. This checks needs to
// be done here, after acquiring the check, as only then we are fully
// commited to executing the tag t_next. Otherwise, we could still get
// earlier events (e.g., from a physical action).
if (t_next == environment_->timeout_tag()) {
continue_execution_ = false;
log_.debug() << "Shutting down the scheduler due to timeout";
log_.debug() << "Trigger the last round of reactions including all "
"shutdwon reactions";
}

// Retrieve all triggered actions at the next tag.
// We do not need to lock mutex_event_queue_ here, as the lock on
// scheduling_mutex_ already ensures that no one can write to the event
Expand Down

0 comments on commit d3ba914

Please sign in to comment.