From 2e4c112e85df6d5cd32d0a9ee5c2b80280d25e70 Mon Sep 17 00:00:00 2001 From: tanneberger Date: Mon, 9 Dec 2024 05:07:17 +0100 Subject: [PATCH] init mutations --- examples/CMakeLists.txt | 2 + examples/count/main.cc | 4 +- examples/multiport_mutation/CMakeLists.txt | 3 + examples/multiport_mutation/consumer.hh | 46 +++++++++ examples/multiport_mutation/load_balancer.hh | 82 +++++++++++++++ examples/multiport_mutation/main.cc | 100 +++++++++++++++++++ examples/multiport_mutation/producer.hh | 45 +++++++++ examples/ports/main.cc | 6 +- examples/unit_tests_mutations/CMakeLists.txt | 3 + examples/unit_tests_mutations/main.cc | 82 +++++++++++++++ include/reactor-cpp/environment.hh | 39 +++++++- include/reactor-cpp/graph.hh | 14 +++ include/reactor-cpp/impl/port_impl.hh | 5 +- include/reactor-cpp/logging.hh | 3 - include/reactor-cpp/multiport.hh | 22 +++- include/reactor-cpp/mutations.hh | 22 ++++ include/reactor-cpp/mutations/bank.hh | 39 ++++++++ include/reactor-cpp/mutations/connection.hh | 33 ++++++ include/reactor-cpp/mutations/multiport.hh | 43 ++++++++ include/reactor-cpp/port.hh | 31 +++++- include/reactor-cpp/reaction.hh | 6 +- include/reactor-cpp/reactor-cpp.hh | 2 +- include/reactor-cpp/reactor.hh | 9 +- include/reactor-cpp/reactor_element.hh | 6 +- include/reactor-cpp/{scops.hh => scopes.hh} | 16 +-- include/reactor-cpp/transaction.hh | 35 ++----- lib/CMakeLists.txt | 9 +- lib/environment.cc | 21 ++-- lib/logical_time.cc | 3 +- lib/mutation/bank.cc | 49 +++++++++ lib/mutation/connection.cc | 35 +++++++ lib/mutation/multiport.cc | 54 ++++++++++ lib/port.cc | 4 +- lib/reaction.cc | 8 +- lib/reactor.cc | 50 ++++++++-- lib/reactor_element.cc | 4 +- lib/scopes.cc | 11 ++ lib/scops.cc | 13 --- lib/transaction.cc | 33 +++--- 39 files changed, 862 insertions(+), 130 deletions(-) create mode 100644 examples/multiport_mutation/CMakeLists.txt create mode 100644 examples/multiport_mutation/consumer.hh create mode 100644 examples/multiport_mutation/load_balancer.hh create mode 100644 examples/multiport_mutation/main.cc create mode 100644 examples/multiport_mutation/producer.hh create mode 100644 examples/unit_tests_mutations/CMakeLists.txt create mode 100644 examples/unit_tests_mutations/main.cc create mode 100644 include/reactor-cpp/mutations.hh create mode 100644 include/reactor-cpp/mutations/bank.hh create mode 100644 include/reactor-cpp/mutations/connection.hh create mode 100644 include/reactor-cpp/mutations/multiport.hh rename include/reactor-cpp/{scops.hh => scopes.hh} (81%) create mode 100644 lib/mutation/bank.cc create mode 100644 lib/mutation/connection.cc create mode 100644 lib/mutation/multiport.cc create mode 100644 lib/scopes.cc delete mode 100644 lib/scops.cc diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5f05c960..3d40852e 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -7,3 +7,5 @@ add_subdirectory(count) add_subdirectory(ports) add_subdirectory(hello) add_subdirectory(power_train) +add_subdirectory(multiport_mutation) +add_subdirectory(unit_tests_mutations) diff --git a/examples/count/main.cc b/examples/count/main.cc index e0c6e92b..ca20df5d 100644 --- a/examples/count/main.cc +++ b/examples/count/main.cc @@ -12,8 +12,8 @@ class Count : public Reactor { LogicalAction counter{"counter", this}; // reactions_ - Reaction r_init{"r_init", 1, this, [this]() { init(); }}; - Reaction r_counter{"r_counter", 2, this, [this]() { print_count(); }}; + Reaction r_init{"r_init", 1, false, this, [this]() { this->init(); }}; + Reaction r_counter{"r_counter", 2, false, this, [this]() { this->print_count(); }}; public: explicit Count(Environment* env) diff --git a/examples/multiport_mutation/CMakeLists.txt b/examples/multiport_mutation/CMakeLists.txt new file mode 100644 index 00000000..b19dd98a --- /dev/null +++ b/examples/multiport_mutation/CMakeLists.txt @@ -0,0 +1,3 @@ +add_executable(mutation_multiports main.cc) +target_link_libraries(mutation_multiports reactor-cpp) +add_dependencies(examples mutation_multiports) diff --git a/examples/multiport_mutation/consumer.hh b/examples/multiport_mutation/consumer.hh new file mode 100644 index 00000000..2265b62b --- /dev/null +++ b/examples/multiport_mutation/consumer.hh @@ -0,0 +1,46 @@ +// +// Created by tanneberger on 11/17/24. +// + +#ifndef CONSUMER_HH +#define CONSUMER_HH + +#include + +using namespace reactor; +using namespace std::chrono_literals; + +class Consumer : public Reactor { +private: + class Inner: public Scope { + Inner(Reactor* reactor, std::size_t index) : Scope(reactor), index_(index) {} + std::size_t index_; + + [[maybe_unused]] const Inner& __lf_inner = *this; + + void reaction_1([[maybe_unused]] const Input& scale) { + std::cout << "consumer: " << index_ << " received value!" << std::endl; + } + + friend Consumer; + }; + + Inner __lf_inner; + Reaction handle{"handle", 1, this, [this]() { __lf_inner.reaction_1(this->in); }}; +public: + Consumer(const std::string& name, Environment* env, std::size_t index) : Reactor(name, env), __lf_inner(this, index) { + std::cout << "creating instance of consumer" << std::endl; + } + ~Consumer() override { + std::cout << "Consumer Object is deleted" << std::endl; + }; + + Input in{"in", this}; + + void assemble() override { + handle.declare_trigger(&in); + } +}; + + +#endif //CONSUMER_HH diff --git a/examples/multiport_mutation/load_balancer.hh b/examples/multiport_mutation/load_balancer.hh new file mode 100644 index 00000000..d701e33e --- /dev/null +++ b/examples/multiport_mutation/load_balancer.hh @@ -0,0 +1,82 @@ +// +// Created by tanneberger on 11/17/24. +// + +#ifndef LOAD_BALANCER_HH +#define LOAD_BALANCER_HH + +#include + +#include "../../lib/mutation/multiport.cc" +#include "reactor-cpp/mutations/multiport.hh" + +using namespace reactor; +using namespace std::chrono_literals; + +class LoadBalancer : public Reactor { +private: + class Inner: public MutableScope { + Inner(Reactor* reactor) : MutableScope(reactor) {} + [[maybe_unused]] const Inner& __lf_inner = *this; + + // reaction bodies + void reaction_1(const Input& inbound, LogicalAction& scale_action, Multiport>& outbound) { + if (rand() % 30 == 0) { + scale_action.schedule(rand() % 20 + 1); + } + + std::cout << "multiport size: " << outbound.size() << std::endl; + outbound[rand() % outbound.size()].set(inbound.get()); + } + + void reaction_2(ModifableMultiport>&outbound, [[maybe_unused]] const LogicalAction& scale, Output& scale_bank) { + ModifableMultiport>* temp = &outbound; + std::size_t new_size = *scale.get(); + + auto antideps = (outbound[0]).anti_dependencies(); + + MutationChangeOutputMultiportSize change_size{temp, this->reactor_, antideps, new_size}; + + add_to_transaction(&change_size); + + commit_transaction(); + + scale_bank.set(new_size); + } + + friend LoadBalancer; + }; + + Inner __lf_inner; + Reaction process{"process", 2, this, [this]() { __lf_inner.reaction_1(this->inbound, this->scale_action, this->out); }}; + Reaction scale{"scale", 1, this, [this]() { __lf_inner.reaction_2(this->out, this->scale_action, this->scale_bank); }}; + +public: + LoadBalancer(const std::string& name, Environment* env) + : Reactor(name, env), __lf_inner(this) { + std::cout << "creating instance of load balancer" << std::endl; + out.reserve(4); + for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) { + std::string _lf_port_name = out.name() + "_" + std::to_string(_lf_idx); + out.emplace_back(_lf_port_name, this); + } + } + + LogicalAction scale_action{"scale", this, 1us}; + ModifableMultiport> out{"out"}; + Input inbound{"inbound", this}; // NOLINT + Output scale_bank{"scale_bank", this}; + + void assemble() override { + std::cout << "assemble LoadBalancer\n"; + for (auto& __lf_port : out) { + process.declare_antidependency(&__lf_port); + } + process.declare_trigger(&inbound); + scale.declare_trigger(&scale_action); + } +}; + + + +#endif //LOAD_BALANCER_HH diff --git a/examples/multiport_mutation/main.cc b/examples/multiport_mutation/main.cc new file mode 100644 index 00000000..1f38f736 --- /dev/null +++ b/examples/multiport_mutation/main.cc @@ -0,0 +1,100 @@ +#include + +#include +#include + +#include "./consumer.hh" +#include "./load_balancer.hh" +#include "./producer.hh" +#include "../../lib/mutation/bank.cc" +#include "../../lib/mutation/connection.cc" +#include + +class Deployment : public Reactor { + std::unique_ptr producer_; + std::unique_ptr load_balancer_; + std::vector> consumers_; + + Reaction scale_bank{"scale_bank", 1, this, [this](){this->__inner.reaction_1(this->scale, this->consumers_, load_balancer_->out);}}; + +public: + +class Inner: public MutableScope { + int state = 0; + [[maybe_unused]] const Inner& __lf_inner = *this; +public: + + Inner(Reactor* reactor) : MutableScope(reactor) {} + void reaction_1(const Input& scale, std::vector>& reactor_bank, ModifableMultiport>& load_balancer) { + std::size_t new_size = *scale.get(); + std::size_t old_size = reactor_bank.size(); + + std::function(Reactor*, std::size_t)> lambda = [](Reactor* reactor, std::size_t index) { + std::string __lf_inst_name = "consumer_" + std::to_string(index); + return std::make_unique(__lf_inst_name, reactor->environment(), index); + }; + MutationChangeBankSize change_size{&reactor_bank, this->reactor_, new_size, lambda}; + + add_to_transaction(&change_size); + + // old topology + commit_transaction(); + // new topology + + if (old_size > new_size) { + + for (auto i = 0; i < old_size - new_size; i++) { + } + } else { + std::cout << "load_balancer size:" << load_balancer.size() << " bank size: " << reactor_bank.size() << std::endl; + for (auto i = 0; i < new_size - old_size; i++) { + std::cout << "add connection: " << i + old_size << std::endl; + MutationAddConnection, Input> add_conn{&load_balancer[i + old_size], &reactor_bank[i + old_size].get()->in, reactor_}; + add_to_transaction(&add_conn); + commit_transaction(); + } + } + + std::cout << "new bank size:" << reactor_bank.size() << std::endl; + } + + friend LoadBalancer; + }; + + Inner __inner; + + Deployment(const std::string& name, Environment* env) : Reactor(name, env), __inner(this), + producer_(std::make_unique("producer", environment())), + load_balancer_(std::make_unique("load_balancer", environment())) { + std::cout << "creating instance of deployment" << std::endl; + consumers_.reserve(4); + for (size_t __lf_idx = 0; __lf_idx < 4; __lf_idx++) { + std::string __lf_inst_name = "consumer_" + std::to_string(__lf_idx); + consumers_.push_back(std::make_unique(__lf_inst_name, environment(), __lf_idx)); + } + } + ~Deployment() override = default; + + Input scale{"scale", this}; + + void assemble() override { + for (size_t __lf_idx = 0; __lf_idx < 4; __lf_idx++) { + environment()->draw_connection(load_balancer_->out[__lf_idx], consumers_[__lf_idx]->in, ConnectionProperties{}); + environment()->draw_connection(producer_->value, load_balancer_->inbound, ConnectionProperties{}); + } + environment()->draw_connection(load_balancer_->scale_bank, scale, ConnectionProperties{}); + scale_bank.declare_trigger(&this->scale); + } +}; + + +auto main() -> int { + //srand(time(nullptr)); + Environment env{4}; + auto deployment = std::make_unique("c1", &env); + env.optimize(); + env.assemble(); + auto thread = env.startup(); + thread.join(); + return 0; +} diff --git a/examples/multiport_mutation/producer.hh b/examples/multiport_mutation/producer.hh new file mode 100644 index 00000000..9894412d --- /dev/null +++ b/examples/multiport_mutation/producer.hh @@ -0,0 +1,45 @@ +// +// Created by tanneberger on 11/17/24. +// + +#ifndef PRODUCER_HH +#define PRODUCER_HH + +#include + +using namespace reactor; +using namespace std::chrono_literals; + +class Producer : public Reactor { +private: + Timer timer{"timer", this, 1s, 1s}; + Reaction r_timer{"r_timer", 1, this, [this]() { __lf_inner.reaction_1(this->value);}}; + + class Inner: public Scope { + [[maybe_unused]] const Inner& __lf_inner = *this; + void reaction_1([[maybe_unused]] Output& out) { + std::cout << "producing value" << std::endl; + out.set(42); + } + explicit Inner(Reactor* reactor) : Scope(reactor) {} + + friend Producer; + }; + + Inner __lf_inner; +public: + Producer(const std::string& name, Environment* env) : Reactor(name, env), __lf_inner(this) { + std::cout << "creating instance of producer" << std::endl; + } + Producer() = delete; + ~Producer() override = default; + + Output value{"value", this}; + + void assemble() override { + r_timer.declare_trigger(&timer); + r_timer.declare_antidependency(&value); + } +}; + +#endif //PRODUCER_HH diff --git a/examples/ports/main.cc b/examples/ports/main.cc index b0a0eb4c..d1dda5d1 100644 --- a/examples/ports/main.cc +++ b/examples/ports/main.cc @@ -51,7 +51,7 @@ class Counter : public Reactor { class Printer : public Reactor { private: - Reaction r_value{"r_value", 1, this, [this]() { on_value(); }}; + Reaction r_value{"r_value", 1, this, [this]() { this->on_value(); }}; public: Input value{"value", this}; // NOLINT @@ -69,7 +69,7 @@ class Printer : public Reactor { class Adder : public Reactor { private: - Reaction r_add{"r_add", 1, this, [this]() { add(); }}; + Reaction r_add{"r_add", 1, this, [this]() { this->add(); }}; public: Input i1{"i1", this}; // NOLINT @@ -85,7 +85,7 @@ class Adder : public Reactor { r_add.declare_antidependency(&sum); } - void add(Adder* self) { + void add() { if (i1.is_present() && i2.is_present()) { sum.set(*i1.get() + *i2.get()); std::cout << "setting sum\n"; diff --git a/examples/unit_tests_mutations/CMakeLists.txt b/examples/unit_tests_mutations/CMakeLists.txt new file mode 100644 index 00000000..0843149f --- /dev/null +++ b/examples/unit_tests_mutations/CMakeLists.txt @@ -0,0 +1,3 @@ +add_executable(unit_tests_mutations main.cc) +target_link_libraries(unit_tests_mutations reactor-cpp) +add_dependencies(examples unit_tests_mutations) diff --git a/examples/unit_tests_mutations/main.cc b/examples/unit_tests_mutations/main.cc new file mode 100644 index 00000000..0066d705 --- /dev/null +++ b/examples/unit_tests_mutations/main.cc @@ -0,0 +1,82 @@ +#include + +#include "../../lib/mutation/multiport.cc" +#include +#include + +using namespace std::chrono_literals; + +class TestMultiport: public reactor::Reactor { + reactor::ModifableMultiport> test_multiport_{"modifable_multiports_"}; + reactor::Timer timer_{"timer", this, 1s}; + reactor::LogicalAction scale{"scale", this}; + reactor::Reaction trigger_reaction_{"trigger_reaction", 1, this, [this](){this->__inner.reaction_1(scale);}}; + reactor::Reaction test_reaction_{"test_reaction", 2, this, [this](){this->__inner.reaction_2(test_multiport_, scale);}}; + reactor::Reaction validate_reaction_{"validate_reaction", 3, this, [this](){this->__inner.reaction_3(test_multiport_);}}; +public: + +class Inner: public reactor::MutableScope { + int state = 0; + std::vector sizes = {4, 5, 6, 5, 4, 3}; + [[maybe_unused]] const Inner& __lf_inner = *this; +public: + + Inner(Reactor* reactor) : MutableScope(reactor) {} + + void reaction_1(reactor::LogicalAction& scale) { + int size = sizes[state]; + state = (state + 1) % sizes.size(); + std::cout << "set: " << size << std::endl; + scale.schedule(size); + } + + void reaction_2(reactor::ModifableMultiport>& test_multiport, reactor::LogicalAction& scale) { + reactor::ModifableMultiport>* temp = &test_multiport; + std::size_t new_size = *scale.get(); + + auto anti_dep = test_multiport[0].anti_dependencies(); + reactor::MutationChangeOutputMultiportSize change_size{temp, this->reactor_, anti_dep, new_size}; + add_to_transaction(&change_size); + + commit_transaction(); + } + + void reaction_3(reactor::ModifableMultiport>& test_multiport) { + for (auto i = 0; i < test_multiport.size(); i++) { + std::cout << test_multiport[i].fqn() << "/" << std::endl; + } + } + }; + + Inner __inner; + + TestMultiport(const std::string& name, reactor::Environment* env) : Reactor(name, env), __inner(this) { + std::cout << "creating instance of deployment" << std::endl; + + test_multiport_.reserve(4); + for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) { + std::string _lf_port_name = test_multiport_.name() + "_" + std::to_string(_lf_idx); + test_multiport_.emplace_back(_lf_port_name, this); + } + } + ~TestMultiport() override = default; + + void assemble() override { + trigger_reaction_.declare_trigger(&timer_); + trigger_reaction_.declare_schedulable_action(&scale); + test_reaction_.declare_trigger(&scale); + + } +}; + + +auto main() -> int { + // srand(time(nullptr)); + reactor::Environment env{4}; + auto test_multiport = std::make_unique("test_multiport", &env); + env.optimize(); + env.assemble(); + auto thread = env.startup(); + thread.join(); + return 0; +} diff --git a/include/reactor-cpp/environment.hh b/include/reactor-cpp/environment.hh index 825b3bba..82e28559 100644 --- a/include/reactor-cpp/environment.hh +++ b/include/reactor-cpp/environment.hh @@ -32,8 +32,9 @@ enum class Phase : std::uint8_t { Assembly = 1, Startup = 2, Execution = 3, - Shutdown = 4, - Deconstruction = 5 + Mutation = 4, + Shutdown = 5, + Deconstruction = 6, }; class Environment { @@ -74,25 +75,53 @@ private: Graph graph_{}; Graph optimized_graph_{}; - void build_dependency_graph(Reactor* reactor); - void calculate_indexes(); std::mutex shutdown_mutex_{}; auto startup(const TimePoint& start_time) -> std::thread; public: + //TODO: fix visebility + void calculate_indexes(); + void build_dependency_graph(Reactor* reactor); + + explicit Environment(unsigned int num_workers, bool fast_fwd_execution = default_fast_fwd_execution, const Duration& timeout = Duration::max()); explicit Environment(const std::string& name, Environment* containing_environment); auto name() -> const std::string& { return name_; } + void start_mutation() { + phase_ = Phase::Mutation; + } + + void stop_mutation() { + phase_ = Phase::Execution; + } + // this method draw a connection between two graph elements with some properties template void draw_connection(Port& source, Port& sink, ConnectionProperties properties) { this->draw_connection(&source, &sink, properties); } + template void remove_connection(Port* source, Port* sink) { + if (top_environment_ == nullptr || top_environment_ == this) { + log::Debug() << "remove connection: " << source->fqn() << " -/-> " << sink->fqn(); + auto properties = graph_.remove_edge(source, sink); + } else { + return top_environment_->remove_connection(source, sink); + + } + } + + void remove_top_level_reactor(Reactor* reactor) { + auto elements_erased = top_level_reactors_.erase(reactor); + if (elements_erased == 0) { + std::cout << "no elements erased" << std::endl; + } + } + template void draw_connection(Port* source, Port* sink, ConnectionProperties properties) { if (top_environment_ == nullptr || top_environment_ == this) { log::Debug() << "drawing connection: " << source->fqn() << " --> " << sink->fqn(); @@ -114,7 +143,7 @@ public: void export_dependency_graph(const std::string& path); - [[nodiscard]] auto top_level_reactors() const noexcept -> const auto& { return top_level_reactors_; } + [[nodiscard]] auto top_level_reactors() noexcept -> auto& { return top_level_reactors_; } [[nodiscard]] auto phase() const noexcept -> Phase { return phase_; } [[nodiscard]] auto scheduler() const noexcept -> const Scheduler* { return &scheduler_; } diff --git a/include/reactor-cpp/graph.hh b/include/reactor-cpp/graph.hh index 4009e169..8544306f 100644 --- a/include/reactor-cpp/graph.hh +++ b/include/reactor-cpp/graph.hh @@ -58,6 +58,20 @@ public: } } + auto remove_edge(E source, E destinations) noexcept -> std::optional

{ + if (graph_.find(source) == std::end(graph_)) { + return std::nullopt; + } else { + auto conns = std::find_if(std::begin(graph_[source]), std::end(graph_[source]), [destinations](auto val) { + return val.second == destinations; + }); + + if (conns != std::end(graph_[source])) { + graph_[source].erase(conns); + } + } + } + // this groups connections by same source and properties [[nodiscard]] auto get_edges() const noexcept -> std::map, map_key_compare> { std::map, map_key_compare> all_edges{}; diff --git a/include/reactor-cpp/impl/port_impl.hh b/include/reactor-cpp/impl/port_impl.hh index db596dbb..836384d4 100644 --- a/include/reactor-cpp/impl/port_impl.hh +++ b/include/reactor-cpp/impl/port_impl.hh @@ -9,10 +9,11 @@ #ifndef REACTOR_CPP_IMPL_PORT_IMPL_HH #define REACTOR_CPP_IMPL_PORT_IMPL_HH -#include "reactor-cpp/assert.hh" -#include "reactor-cpp/environment.hh" #include "reactor-cpp/port.hh" #include "reactor-cpp/reactor.hh" +#include "reactor-cpp/assert.hh" +#include "reactor-cpp/environment.hh" + namespace reactor { diff --git a/include/reactor-cpp/logging.hh b/include/reactor-cpp/logging.hh index 4f457309..43da8212 100644 --- a/include/reactor-cpp/logging.hh +++ b/include/reactor-cpp/logging.hh @@ -10,13 +10,10 @@ #define REACTOR_CPP_LOGGING_HH #include "reactor-cpp/config.hh" -#include "reactor-cpp/time.hh" -#include #include #include #include #include -#include namespace reactor::log { diff --git a/include/reactor-cpp/multiport.hh b/include/reactor-cpp/multiport.hh index bbf4a48c..a0d2e81e 100644 --- a/include/reactor-cpp/multiport.hh +++ b/include/reactor-cpp/multiport.hh @@ -25,6 +25,8 @@ class BaseMultiport { // NOLINT cppcoreguidelines-special-member-functions,-warn private: std::atomic present_ports_size_{0}; std::vector present_ports_{}; + std::string multiport_name_; + std::string fqn_; // record that the port with the given index has been set void set_present(std::size_t index); @@ -42,12 +44,17 @@ protected: [[nodiscard]] auto present_ports_size() const -> auto { return present_ports_size_.load(); } void present_ports_reserve(size_t n) { present_ports_.reserve(n); } + void present_ports_pop_back() { present_ports_.pop_back(); } void register_port(BasePort& port, size_t idx); + public: - BaseMultiport() = default; + explicit BaseMultiport(const std::string& name) : multiport_name_(name) {}; ~BaseMultiport() = default; + auto name() const -> std::string { + return multiport_name_; + } }; template @@ -66,7 +73,7 @@ public: using iterator = typename std::vector::iterator; using const_iterator = typename std::vector::const_iterator; - Multiport() noexcept = default; + explicit Multiport(const std::string& name) noexcept : BaseMultiport(name) {}; ~Multiport() noexcept = default; auto operator==(const Multiport& other) const noexcept -> bool { @@ -99,16 +106,23 @@ public: template > class ModifableMultiport : public Multiport { public: + ModifableMultiport(const std::string& name) : Multiport(name) {} + void reserve(std::size_t size) noexcept { this->ports_.reserve(size); this->present_ports_reserve(size); } - void push_back(const T& elem) noexcept { - this->ports_.push_back(elem); + void push_back(const T&& elem) noexcept { + this->ports_.push_back(std::move(elem)); this->register_port(this->ports_.back(), this->ports_.size() - 1); } + void pop_back() { + this->ports_.pop_back(); + this->present_ports_pop_back(); + } + template void emplace_back(Args&&... args) noexcept { this->ports_.emplace_back(std::forward(args)...); this->register_port(this->ports_.back(), this->ports_.size() - 1); diff --git a/include/reactor-cpp/mutations.hh b/include/reactor-cpp/mutations.hh new file mode 100644 index 00000000..d507f89f --- /dev/null +++ b/include/reactor-cpp/mutations.hh @@ -0,0 +1,22 @@ +#ifndef MUTATIONS_HH +#define MUTATIONS_HH + +namespace reactor { +class Reactor; +class Environment; + +enum MutationResult { + Success = 0, + NotMatchingBankSize = 1, +}; + +class Mutation { +public: + virtual ~Mutation() = default; + virtual auto run() -> MutationResult = 0 ; + virtual auto rollback() -> MutationResult = 0; +}; + +} + +#endif //MUTATIONS_HH diff --git a/include/reactor-cpp/mutations/bank.hh b/include/reactor-cpp/mutations/bank.hh new file mode 100644 index 00000000..8d52490f --- /dev/null +++ b/include/reactor-cpp/mutations/bank.hh @@ -0,0 +1,39 @@ +// +// Created by tanneberger on 11/18/24. +// + +#ifndef MUTATION_BANK_HH +#define MUTATION_BANK_HH + +#include + +#include "../reactor.hh" +#include "../mutations.hh" + +namespace reactor { +class Reactor; +class Environment; + +template +class MutationChangeBankSize : public reactor::Mutation { +private: + std::vector* bank_ = nullptr; + std::size_t desired_size_ = 0; + std::size_t size_before_application_ = 0; + Reactor* reactor_ = nullptr; + std::function create_lambda_; + + void change_size(std::size_t); + +public: + MutationChangeBankSize(std::vector* bank, Reactor* reactor, std::size_t size, std::function); + MutationChangeBankSize() = default; + explicit MutationChangeBankSize(const std::vector& other) : bank_(other.bank_), desired_size_(other.desired_size_), size_before_application_(other.size_before_application_) {} + ~MutationChangeBankSize() override = default; + + auto run() -> reactor::MutationResult override; + auto rollback() -> reactor::MutationResult override; +}; +} + +#endif //MUTATION_BANK_HH diff --git a/include/reactor-cpp/mutations/connection.hh b/include/reactor-cpp/mutations/connection.hh new file mode 100644 index 00000000..2e2b02ef --- /dev/null +++ b/include/reactor-cpp/mutations/connection.hh @@ -0,0 +1,33 @@ +// +// Created by tanneberger on 11/18/24. +// + +#ifndef MUTATION_CONNECTION_HH +#define MUTATION_CONNECTION_HH + +#include "../mutations.hh" + +namespace reactor { +class Reactor; +class Environment; + +template +class MutationAddConnection : public Mutation { +private: + A* source_; + B* sink_; + bool connection_ = false; + Reactor* reactor_{}; + +public: + MutationAddConnection(A* source, B* sink, Reactor* reactor); + MutationAddConnection(const MutationAddConnection& other) : source_(other.source_), sink_(other.sink_), connection_(other.connection_), reactor_(other.reactor_) {} + MutationAddConnection() = default; + ~MutationAddConnection() override = default; + + auto run() -> MutationResult override; + auto rollback() -> MutationResult override; +}; +} + +#endif //MUTATION_CONNECTION_HH diff --git a/include/reactor-cpp/mutations/multiport.hh b/include/reactor-cpp/mutations/multiport.hh new file mode 100644 index 00000000..e7a86aaa --- /dev/null +++ b/include/reactor-cpp/mutations/multiport.hh @@ -0,0 +1,43 @@ +// +// Created by tanneberger on 11/11/24. +// + +#ifndef MUTATION_MULTIPORT_HH +#define MUTATION_MULTIPORT_HH + +#include + +#include "../multiport.hh" +#include "../mutations.hh" +#include "../port.hh" + + +namespace reactor { +class Reactor; +class Environment; + +template +class MutationChangeOutputMultiportSize : public Mutation { +private: + ModifableMultiport>* multiport_ = nullptr; + std::set anti_dependencies_{}; + std::size_t desired_size_ = 0; + std::size_t size_before_application_ = 0; + Reactor* reactor_{}; + + void change_size(std::size_t); + +public: + MutationChangeOutputMultiportSize(ModifableMultiport>* multiport, Reactor* reactor, std::set& anti_dependencies, std::size_t size); + MutationChangeOutputMultiportSize() = default; + MutationChangeOutputMultiportSize(const MutationChangeOutputMultiportSize& other) : multiport_(other.multiport_), desired_size_(other.desired_size_), size_before_application_(other.size_before_application_) {} + ~MutationChangeOutputMultiportSize() override = default; + + + auto run() -> MutationResult override; + auto rollback() -> MutationResult override; +}; +} + + +#endif //MUTATION_MULTIPORT_HH diff --git a/include/reactor-cpp/port.hh b/include/reactor-cpp/port.hh index fd3048da..a8e74e07 100644 --- a/include/reactor-cpp/port.hh +++ b/include/reactor-cpp/port.hh @@ -22,6 +22,9 @@ namespace reactor { +template +class MutationChangeMultiportSize; + enum class PortType : std::uint8_t { Input, Output, Delay }; class BasePort : public ReactorElement { @@ -44,8 +47,6 @@ protected: : ReactorElement(name, match_port_enum(type), container) , type_(type) {} - void register_dependency(Reaction* reaction, bool is_trigger) noexcept; - void register_antidependency(Reaction* reaction) noexcept; virtual void cleanup() = 0; static auto match_port_enum(PortType type) noexcept -> ReactorElement::Type { @@ -72,7 +73,15 @@ protected: } public: - void set_inward_binding(BasePort* port) noexcept { inward_binding_ = port; } + void register_dependency(Reaction* reaction, bool is_trigger) noexcept; + void register_antidependency(Reaction* reaction) noexcept; + void set_inward_binding(BasePort* port) noexcept { + if (port != nullptr) { + std::cout << port->fqn() << "(" << port << ")" << " --> " << this->fqn() << "(" << this << ")" << std::endl; + } + + inward_binding_ = port; + } void add_outward_binding(BasePort* port) noexcept { outward_bindings_.insert(port); } virtual void instantiate_connection_to(const ConnectionProperties& properties, @@ -107,6 +116,12 @@ public: friend class Scheduler; }; + +inline auto operator==(const BasePort& a, const BasePort& b) -> bool { + bool equal = ((const ReactorElement&)a) == ((const ReactorElement&)b); + return equal; +} + template class Port : public BasePort { private: ImmutableValuePtr value_ptr_{nullptr}; @@ -173,7 +188,11 @@ public: Input(const std::string& name, Reactor* container) : Port(name, PortType::Input, container) {} - Input(Input&&) noexcept = default; + Input(Input&&) = default; + + ~Input() { + std::cout << "Input port gets deallocated:" << this->fqn() << std::endl; + } }; template class Output : public Port { // NOLINT(cppcoreguidelines-special-member-functions) @@ -182,6 +201,10 @@ public: : Port(name, PortType::Output, container) {} Output(Output&&) noexcept = default; + + ~Output() { + std::cout << "Output port gets deallocated: " << this->fqn() << std::endl; + } }; } // namespace reactor diff --git a/include/reactor-cpp/reaction.hh b/include/reactor-cpp/reaction.hh index c90bd017..3e34c6cb 100644 --- a/include/reactor-cpp/reaction.hh +++ b/include/reactor-cpp/reaction.hh @@ -26,9 +26,9 @@ private: std::set antidependencies_; std::set dependencies_; - const int priority_; + const int priority_ = -1; const bool mutation_; - unsigned int index_{}; + unsigned int index_ = -1; std::function body_{nullptr}; @@ -38,7 +38,7 @@ private: void set_deadline_impl(Duration deadline, const std::function& handler); public: - Reaction(const std::string& name, int priority, bool mutation, Reactor* container, std::function body); + Reaction(const std::string& name, int priority, Reactor* container, std::function body); ~Reaction() override = default; diff --git a/include/reactor-cpp/reactor-cpp.hh b/include/reactor-cpp/reactor-cpp.hh index e4fc89db..a4367c80 100644 --- a/include/reactor-cpp/reactor-cpp.hh +++ b/include/reactor-cpp/reactor-cpp.hh @@ -21,6 +21,6 @@ #include "reaction.hh" #include "reactor.hh" #include "time.hh" -#include "scops.hh" +#include "scopes.hh" #endif // REACTOR_CPP_REACTOR_CPP_HH diff --git a/include/reactor-cpp/reactor.hh b/include/reactor-cpp/reactor.hh index 02b698dd..11e861ac 100644 --- a/include/reactor-cpp/reactor.hh +++ b/include/reactor-cpp/reactor.hh @@ -14,7 +14,6 @@ #include #include "action.hh" -#include "environment.hh" #include "logical_time.hh" #include "reactor_element.hh" @@ -23,10 +22,10 @@ class Reactor : public ReactorElement { // NOLINT(cppcoreguidelines-special-memb private: std::set actions_{}; - std::set inputs_{}; + std::vector inputs_{}; std::set outputs_{}; std::set reactions_{}; - std::set reactors_{}; + std::vector reactors_{}; std::set> connections_{}; void register_action(BaseAction* action); @@ -38,6 +37,7 @@ private: public: Reactor(const std::string& name, Reactor* container); Reactor(const std::string& name, Environment* environment); + Reactor() = delete; ~Reactor() override = default; void register_connection(std::unique_ptr&& connection); @@ -60,6 +60,9 @@ public: [[nodiscard]] auto get_elapsed_logical_time() const noexcept -> Duration; [[nodiscard]] auto get_elapsed_physical_time() const noexcept -> Duration; + void remove_inputs(BasePort* base_port); + void remove_child_reactor(const Reactor* base_reactor); + friend ReactorElement; }; diff --git a/include/reactor-cpp/reactor_element.hh b/include/reactor-cpp/reactor_element.hh index d78bfeff..70965959 100644 --- a/include/reactor-cpp/reactor_element.hh +++ b/include/reactor-cpp/reactor_element.hh @@ -12,7 +12,6 @@ #include #include -#include #include #include @@ -52,6 +51,11 @@ public: virtual void startup() = 0; virtual void shutdown() = 0; + + auto operator==(const ReactorElement& other) const -> bool { + //std::cout << other.name() << "==" << name_ << std::endl; + return name_ == other.name(); // && container_ == other.container() && environment_ == other.environment(); // && fqn_ == other.fqn() + } }; } // namespace reactor diff --git a/include/reactor-cpp/scops.hh b/include/reactor-cpp/scopes.hh similarity index 81% rename from include/reactor-cpp/scops.hh rename to include/reactor-cpp/scopes.hh index 430f8d8a..9f4da694 100644 --- a/include/reactor-cpp/scops.hh +++ b/include/reactor-cpp/scopes.hh @@ -10,17 +10,18 @@ #define REACTOR_CPP_SCOPS_HH #include "transaction.hh" -#include "time.hh" +#include "logical_time.hh" +#include "reactor.hh" +#include "environment.hh" namespace reactor { -class Reactor; class Scope { private: - reactor::Reactor* reactor; + Reactor* reactor; public: - Scope(reactor::Reactor* reactor) + Scope(Reactor* reactor) : reactor(reactor) {} auto get_physical_time() const -> reactor::TimePoint { return reactor->get_physical_time(); } @@ -33,14 +34,13 @@ public: void request_stop() const { return environment()->sync_shutdown(); } }; -template class MutableScope : public Scope { public: - HostReactor* self_ = nullptr; - Environment* env_ = nullptr; Transaction transaction_; + Reactor* reactor_; + Environment* env_ = nullptr; - explicit MutableScope(reactor::Reactor* reactor) : Scope(reactor), self_(reactor), env_(reactor->environment()) {} + explicit MutableScope(Reactor* reactor) : Scope(reactor), transaction_(reactor), reactor_(reactor), env_(reactor->environment()) {} ~MutableScope() = default; void commit_transaction(); diff --git a/include/reactor-cpp/transaction.hh b/include/reactor-cpp/transaction.hh index 9da4e6a3..8316bd8c 100644 --- a/include/reactor-cpp/transaction.hh +++ b/include/reactor-cpp/transaction.hh @@ -11,45 +11,24 @@ #include -#include "multiport.hh" +#include "mutations.hh" +//#include "reactor.hh" namespace reactor { class Reactor; class Environment; -enum MutationResult { - Success = 0, - NotMatchingBankSize = 1, -}; - -class Mutation { -public: - virtual ~Mutation() = default; - virtual auto run() -> MutationResult = 0 ; - virtual auto rollback() -> MutationResult = 0; -}; - -template -class MutationChangeMultiportSize : public Mutation { -private: - Multiport* multiport_ = nullptr; - std::size_t desired_size_ = 0; - std::size_t size_before_application_ = 0; -public: - MutationChangeMultiportSize(Multiport* multiport, std::size_t size); - ~MutationChangeMultiportSize() override = default; - - auto run() -> MutationResult override; - auto rollback() -> MutationResult override; -}; class Transaction { private: - Reactor* parent = nullptr; - Environment* environment = nullptr; + Reactor* parent_ = nullptr; + Environment* environment_ = nullptr; std::vector mutations_{}; public: + explicit Transaction(Reactor* parent); + ~Transaction() = default; + void push_back(Mutation* mutation); auto execute() -> MutationResult; }; diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 75ecf535..2ade6076 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -7,15 +7,18 @@ set(SOURCE_FILES reaction.cc reactor.cc scheduler.cc - scops.cc + scopes.cc time.cc transaction.cc multiport.cc reactor_element.cc + mutation/multiport.cc + mutation/bank.cc + mutation/connection.cc ) if(REACTOR_CPP_TRACE) - set(SOURCE_FILES ${SOURCE_FILES} trace.cc ) + set(SOURCE_FILES ${SOURCE_FILES} trace.cc) endif() if (DEFINED LF_REACTOR_CPP_SUFFIX) @@ -24,7 +27,7 @@ else() set(REACTOR_CPP_INCLUDE "include") endif() -add_library(${LIB_TARGET} SHARED ${SOURCE_FILES}) +add_library(${LIB_TARGET} STATIC ${SOURCE_FILES}) target_include_directories(${LIB_TARGET} PUBLIC "$" "$" diff --git a/lib/environment.cc b/lib/environment.cc index 52c0b6bf..b517146b 100644 --- a/lib/environment.cc +++ b/lib/environment.cc @@ -47,7 +47,7 @@ Environment::Environment(const std::string& name, Environment* containing_enviro void Environment::register_reactor(Reactor* reactor) { reactor_assert(reactor != nullptr); - validate(this->phase() == Phase::Construction, "Reactors may only be registered during construction phase!"); + validate(this->phase() == Phase::Construction || this->phase() == Phase::Mutation, "Reactors may only be registered during construction phase!"); validate(reactor->is_top_level(), "The environment may only contain top level reactors!"); [[maybe_unused]] bool result = top_level_reactors_.insert(reactor).second; reactor_assert(result); @@ -157,14 +157,23 @@ void Environment::build_dependency_graph(Reactor* reactor) { validate(result.second, "priorities must be unique for all reactions_ of the same reactor"); } + dependencies_.clear(); //TODO: fix + // connect all reactions_ this reaction depends on for (auto* reaction : reactor->reactions()) { - for (auto* dependency : reaction->dependencies()) { - auto* source = dependency; - while (source->has_inward_binding()) { - source = source->inward_binding(); + std::set dependencies = reaction->dependencies(); + for (auto* dependency : dependencies) { + if (dependency <= (BasePort*)0x100) { + std::cout << "FUCK" << std::endl; + } + while (dependency->has_inward_binding()) { + if (dependency <= (BasePort*)0x100) { + std::cout << "FUCK" << std::endl; + } + dependency = dependency->inward_binding(); } - for (auto* antidependency : source->anti_dependencies()) { + std::cout << "anti deps of: " << dependency->fqn() << "(" << dependency << ")" << std::endl; + for (auto* antidependency : dependency->anti_dependencies()) { dependencies_.emplace_back(reaction, antidependency); } } diff --git a/lib/logical_time.cc b/lib/logical_time.cc index c17ca2f4..56bd723e 100644 --- a/lib/logical_time.cc +++ b/lib/logical_time.cc @@ -45,7 +45,8 @@ auto Tag::subtract(Duration offset) const noexcept -> Tag { auto Tag::decrement() const noexcept -> Tag { if (micro_step_ == 0) { - return {time_point_ - Duration{1}, std::numeric_limits::max()}; + //FIXME: return {time_point_ - Duration{1}, std::numeric_limits::max()}; + return {time_point_ - Duration{1}, 0}; } return {time_point_, micro_step_ - 1}; } diff --git a/lib/mutation/bank.cc b/lib/mutation/bank.cc new file mode 100644 index 00000000..098c0de5 --- /dev/null +++ b/lib/mutation/bank.cc @@ -0,0 +1,49 @@ +// +// Created by tanneberger on 11/11/24. +// + +#include "reactor-cpp/mutations/bank.hh" +#include "reactor-cpp/action.hh" + +template +reactor::MutationChangeBankSize::MutationChangeBankSize(std::vector* bank, Reactor* reactor, std::size_t size, std::function create_lambda) + : bank_(bank), reactor_(reactor), desired_size_(size), create_lambda_(create_lambda){ +} + +template +void reactor::MutationChangeBankSize::change_size(std::size_t new_size) { + bank_->reserve(new_size); + auto current_size = bank_->size(); + std::cout << "scaling from: " << current_size << " to " << new_size << std::endl; + + if (current_size >= new_size) { + // downscale + + for (auto i = 0; i < current_size - new_size; i++) { + //TODO: consider saving the ports here here + Reactor* last = (*bank_->end()).get(); + reactor_->environment()->remove_top_level_reactor(last); + bank_->pop_back(); + } + } else { + // upscale + + for (auto i = 0; i < new_size - current_size; i++) { + bank_->push_back(create_lambda_(reactor_, current_size + i)); + (*bank_)[bank_->size() - 1]->assemble(); + } + std::cout << "created new reactors" << std::endl; + } +} +template +auto reactor::MutationChangeBankSize::run() -> MutationResult { + size_before_application_ = bank_->size(); + change_size(desired_size_); + return Success; +} + +template +auto reactor::MutationChangeBankSize::rollback() -> MutationResult { + change_size(size_before_application_); + return Success; +} diff --git a/lib/mutation/connection.cc b/lib/mutation/connection.cc new file mode 100644 index 00000000..38ceeb27 --- /dev/null +++ b/lib/mutation/connection.cc @@ -0,0 +1,35 @@ +// +// Created by tanneberger on 11/20/24. +// + +#include "reactor-cpp/mutations/connection.hh" +#include "reactor-cpp/reactor.hh" + +template +reactor::MutationAddConnection::MutationAddConnection(A* source, B* sink, Reactor* reactor) : source_(source), sink_(sink), reactor_(reactor) { + +} + + +template auto reactor::MutationAddConnection::run() -> MutationResult { + reactor_->environment()->draw_connection(source_, sink_, ConnectionProperties{}); + sink_->set_inward_binding(source_); + source_->add_outward_binding(sink_); + std::cout << "from: " << source_->fqn() << "(" << source_ << ")" + << " --> to: " << sink_->fqn() << "(" << sink_ << ")" << std::endl; + + for (const auto* reactor : reactor_->environment()->top_level_reactors()) { + reactor_->environment()->build_dependency_graph((Reactor*)reactor); + } + + reactor_->environment()->calculate_indexes(); + return Success; +} + +template auto reactor::MutationAddConnection::rollback() -> MutationResult { + reactor_->environment()->remove_connection(source_, sink_); + + return Success; +} + + diff --git a/lib/mutation/multiport.cc b/lib/mutation/multiport.cc new file mode 100644 index 00000000..ed950299 --- /dev/null +++ b/lib/mutation/multiport.cc @@ -0,0 +1,54 @@ +// +// Created by tanneberger on 11/11/24. +// + +#include "reactor-cpp/mutations/multiport.hh" +#include "reactor-cpp/reaction.hh" + +template +reactor::MutationChangeOutputMultiportSize::MutationChangeOutputMultiportSize(ModifableMultiport>* multiport, Reactor* reactor, std::set& anti_dependencies, std::size_t size) + : multiport_(multiport), reactor_(reactor), desired_size_(size), anti_dependencies_(anti_dependencies) { } + +template +void reactor::MutationChangeOutputMultiportSize::change_size(std::size_t new_size) { + multiport_->reserve(new_size); + auto current_size = multiport_->size(); + std::cout << "scaling from: " << current_size << " to " << new_size << std::endl; + + if (current_size >= new_size) { + // downscale + + for (auto i = 0; i < current_size - new_size; i++) { + //TODO: consider saving the ports here here + + std::string port_name_ = multiport_->name() + "_" + std::to_string(current_size + i - 1); + multiport_->pop_back(); + auto base_port = Output{port_name_, reactor_}; + reactor_->remove_inputs(&base_port); + } + } else { + // upscale + + for (auto i = 0; i < new_size - current_size; i++) { + std::string port_name_ = multiport_->name() + "_" + std::to_string(current_size + i); + multiport_->emplace_back(port_name_, reactor_); + BasePort* port = &(multiport_->operator[](current_size + i)); + for (auto* anti_dep : anti_dependencies_) { + anti_dep->declare_antidependency(port); + } + } + + } +} +template +auto reactor::MutationChangeOutputMultiportSize::run() -> MutationResult { + size_before_application_ = multiport_->size(); + change_size(desired_size_); + return Success; +} + +template +auto reactor::MutationChangeOutputMultiportSize::rollback() -> MutationResult { + change_size(size_before_application_); + return Success; +} diff --git a/lib/port.cc b/lib/port.cc index 7260c661..412e4cab 100644 --- a/lib/port.cc +++ b/lib/port.cc @@ -20,7 +20,7 @@ void BasePort::register_dependency(Reaction* reaction, bool is_trigger) noexcept reactor_assert(reaction != nullptr); reactor_assert(this->environment() == reaction->environment()); validate(!this->has_outward_bindings(), "Dependencies may no be declared on ports with an outward binding!"); - assert_phase(this, Phase::Assembly); + //assert_phase(this, Phase::Assembly); if (this->is_input()) { validate(this->container() == reaction->container(), "Dependent input ports must belong to the same reactor as the " @@ -42,7 +42,7 @@ void BasePort::register_antidependency(Reaction* reaction) noexcept { reactor_assert(reaction != nullptr); reactor_assert(this->environment() == reaction->environment()); validate(!this->has_inward_binding(), "Antidependencies may no be declared on ports with an inward binding!"); - assert_phase(this, Phase::Assembly); + //TODO: assert_phase(this, Phase::Assembly); if (this->is_output()) { validate(this->container() == reaction->container(), diff --git a/lib/reaction.cc b/lib/reaction.cc index 4e895da5..22ce0f21 100644 --- a/lib/reaction.cc +++ b/lib/reaction.cc @@ -17,10 +17,10 @@ namespace reactor { -Reaction::Reaction(const std::string& name, int priority, bool mutation, Reactor* container, std::function body) +Reaction::Reaction(const std::string& name, int priority, Reactor* container, std::function body) : ReactorElement(name, ReactorElement::Type::Reaction, container) , priority_(priority) - , mutation_(mutation) + , mutation_(false) , body_(std::move(std::move(body))) { reactor_assert(priority != 0); @@ -53,7 +53,7 @@ void Reaction::declare_schedulable_action(BaseAction* action) { void Reaction::declare_trigger(BasePort* port) { reactor_assert(port != nullptr); reactor_assert(this->environment() == port->environment()); - assert_phase(this, Phase::Assembly); + //assert_phase(this, Phase::Assembly); if (port->is_input()) { validate(this->container() == port->container(), @@ -128,7 +128,7 @@ void Reaction::set_deadline_impl(Duration deadline, const std::functionenvironment()->phase() == Phase::Assembly, "Reaction indexes may only be set during assembly phase!"); + validate(this->environment()->phase() == Phase::Assembly || this->environment()->phase() == Phase::Mutation, "Reaction indexes may only be set during assembly phase!"); this->index_ = index; } diff --git a/lib/reactor.cc b/lib/reactor.cc index f42488b0..eb9e5d26 100644 --- a/lib/reactor.cc +++ b/lib/reactor.cc @@ -18,7 +18,9 @@ namespace reactor { Reactor::Reactor(const std::string& name, Reactor* container) - : ReactorElement(name, ReactorElement::Type::Reactor, container) {} + : ReactorElement(name, ReactorElement::Type::Reactor, container) { + container->register_reactor(this); +} Reactor::Reactor(const std::string& name, Environment* environment) : ReactorElement(name, ReactorElement::Type::Reactor, environment) { environment->register_reactor(this); @@ -36,26 +38,28 @@ void Reactor::register_action([[maybe_unused]] BaseAction* action) { void Reactor::register_input(BasePort* port) { reactor_assert(port != nullptr); - reactor::validate(this->environment()->phase() == Phase::Construction, + reactor::validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, "Ports can only be registered during construction phase!"); - [[maybe_unused]] bool result = inputs_.insert(port).second; - reactor_assert(result); + //[[maybe_unused]] bool result = inputs_.insert(port).second; + inputs_.push_back(port); + //reactor_assert(result); Statistics::increment_ports(); } void Reactor::register_output(BasePort* port) { reactor_assert(port != nullptr); - reactor::validate(this->environment()->phase() == Phase::Construction, + reactor::validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, "Ports can only be registered during construction phase!"); - [[maybe_unused]] bool result = inputs_.insert(port).second; - reactor_assert(result); + //[[maybe_unused]] bool result = inputs_.insert(port).second; + //std::cout << "reactor port count:" << inputs_.size() << std::endl; + //TODO: reactor_assert(result); Statistics::increment_ports(); } void Reactor::register_reaction([[maybe_unused]] Reaction* reaction) { reactor_assert(reaction != nullptr); - validate(this->environment()->phase() == Phase::Construction, + validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, "Reactions can only be registered during construction phase!"); [[maybe_unused]] bool result = reactions_.insert(reaction).second; reactor_assert(result); @@ -64,10 +68,16 @@ void Reactor::register_reaction([[maybe_unused]] Reaction* reaction) { void Reactor::register_reactor([[maybe_unused]] Reactor* reactor) { reactor_assert(reactor != nullptr); - validate(this->environment()->phase() == Phase::Construction, + validate(this->environment()->phase() == Phase::Construction || this->environment()->phase() == Phase::Mutation, "Reactions can only be registered during construction phase!"); - [[maybe_unused]] bool result = reactors_.insert(reactor).second; - reactor_assert(result); + if (std::find(std::begin(reactors_), std::end(reactors_), reactor) == std::end(reactors_)) { + reactors_.push_back(reactor); + } else { + std::cout << "duplicate insertion!" << std::endl; + } + + //[[maybe_unused]] bool result = reactors_.insert(reactor).second; + //reactor_assert(result); Statistics::increment_reactor_instances(); } @@ -141,4 +151,22 @@ auto Reactor::get_elapsed_physical_time() const noexcept -> Duration { return get_physical_time() - environment()->start_tag().time_point(); } +void Reactor::remove_inputs(BasePort* base_port) { + auto index = std::find_if(std::begin(inputs_), std::end(inputs_), [base_port](const BasePort* other) { return *other == *base_port;}); + + if (index != std::end(inputs_)) { + inputs_.erase(index); + } + }; + +void Reactor::remove_child_reactor(const Reactor* base_reactor) { + auto index = std::find_if(std::begin(reactors_), std::end(reactors_), [base_reactor](const Reactor* other) { + return base_reactor == other; + }); + + if (index != std::end(reactors_)) { + reactors_.erase(index); + } +} + } // namespace reactor diff --git a/lib/reactor_element.cc b/lib/reactor_element.cc index 32db319f..51eae68d 100644 --- a/lib/reactor_element.cc +++ b/lib/reactor_element.cc @@ -24,7 +24,7 @@ ReactorElement::ReactorElement(const std::string& name, ReactorElement::Type typ reactor_assert(container != nullptr); this->environment_ = container->environment(); reactor_assert(this->environment_ != nullptr); - validate(this->environment_->phase() == Phase::Construction || + validate(this->environment_->phase() == Phase::Construction ||this->environment_->phase() == Phase::Mutation || (type == Type::Action && this->environment_->phase() == Phase::Assembly), "Reactor elements can only be created during construction phase!"); // We need a reinterpret_cast here as the derived class is not yet created @@ -71,7 +71,7 @@ ReactorElement::ReactorElement(const std::string& name, ReactorElement::Type typ reactor_assert(environment != nullptr); validate(type == Type::Reactor || type == Type::Action, "Only reactors and actions can be owned by the environment!"); - validate(this->environment_->phase() == Phase::Construction || + validate(this->environment_->phase() == Phase::Construction || this->environment_->phase() == Phase::Mutation || (type == Type::Action && this->environment_->phase() == Phase::Assembly), "Reactor elements can only be created during construction phase!"); diff --git a/lib/scopes.cc b/lib/scopes.cc new file mode 100644 index 00000000..de65e5dd --- /dev/null +++ b/lib/scopes.cc @@ -0,0 +1,11 @@ + + +#include "reactor-cpp/scopes.hh" + +void reactor::MutableScope::add_to_transaction(Mutation* mutation) { + transaction_.push_back(mutation); +} + +void reactor::MutableScope::commit_transaction() { + transaction_.execute(); +} \ No newline at end of file diff --git a/lib/scops.cc b/lib/scops.cc deleted file mode 100644 index d52f3ce0..00000000 --- a/lib/scops.cc +++ /dev/null @@ -1,13 +0,0 @@ - - -#include "reactor-cpp/scops.hh" - -template -void reactor::MutableScope::begin_transaction() { - transaction_.reset(); -} - -template -void reactor::MutableScope::end_transaction() { - transaction_.execute(); -} \ No newline at end of file diff --git a/lib/transaction.cc b/lib/transaction.cc index 17aba182..2576c222 100644 --- a/lib/transaction.cc +++ b/lib/transaction.cc @@ -1,33 +1,24 @@ #include "reactor-cpp/transaction.hh" +#include "reactor-cpp/environment.hh" +#include "reactor-cpp/reactor.hh" -template -reactor::MutationChangeMultiportSize::MutationChangeMultiportSize(Multiport* multiport, std::size_t size) - : multiport_(multiport), desired_size_(size){ -} - -template -auto reactor::MutationChangeMultiportSize::run() -> MutationResult { - multiport_->ports_.resize(desired_size_); - - return Success; -} - -template -auto reactor::MutationChangeMultiportSize::rollback() -> MutationResult { - return Success; -} + reactor::Transaction::Transaction(Reactor* parent) : parent_(parent), environment_(parent->environment()) { } auto reactor::Transaction::execute() -> MutationResult { - for (auto *mutation : mutations_) { - mutation->run(); - } - return Success; + this->environment_->start_mutation(); + for (auto *mutation : mutations_) { + mutation->run(); + } + + this->environment_->stop_mutation(); + mutations_.clear(); + return Success; } void reactor::Transaction::push_back(reactor::Mutation* mutation) { - mutations_.push_back(mutation); + mutations_.push_back(mutation); } \ No newline at end of file