Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mutations for scaling banks and multiports and adding connections #75

Merged
merged 8 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ jobs:
- uses: actions/checkout@v4
- name: Analyze
run: |
clang-format --version
clang-format --dry-run --Werror -style=file $(find ./ -name '*.cc' -print)
clang-format --dry-run --Werror -style=file $(find ./ -name '*.hh' -print)
52 changes: 52 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
.PHONY: clean test coverage asan format format-check ci lf-test lib proto

test: unit-test lf-test

# Generate protobuf code
proto:
python3 external/nanopb/generator/nanopb_generator.py -Iexternal/nanopb/generator/proto/ -Iexternal/proto -L'#include "nanopb/%s"' -Dexternal/proto message.proto

# Build reactor-uc as a static library
lib:
cmake -Bbuild
cmake --build build
make -C build

# Build and run the unit tests
unit-test:
cmake -Bbuild -DBUILD_TESTS=ON
cmake --build build
make test -C build

# Build and run lf tests
lf-test:
make -C test/lf

# Get coverage data on unit tests
coverage:
cmake -Bbuild -DBUILD_TESTS=ON -DTEST_COVERAGE=ON
cmake --build build
make coverage -C build

# Compile tests with AddressSanitizer and run them
asan:
cmake -Bbuild -DASAN=ON -DBUILD_TESTS=ON
cmake --build build
make test -C build

# Format the code base
SRC_FILES := $(shell find ./lib -name '*.cc' -print)
HDR_FILES := $(shell find ./include -name '*.hh' -print)

format:
clang-format -i -style=file $(SRC_FILES) $(HDR_FILES)

# Check that the code base is formatted
format-check:
clang-format --dry-run --Werror -style=file $(SRC_FILES) $(HDR_FILES)

# Run the entire CI flow
ci: clean test coverage format-check

clean:
rm -rf build
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ add_subdirectory(count)
add_subdirectory(ports)
add_subdirectory(hello)
add_subdirectory(power_train)
add_subdirectory(multiport_mutation)
6 changes: 3 additions & 3 deletions examples/count/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
using namespace reactor;
using namespace std::chrono_literals;

class Count : public Reactor {
class Count final : public Reactor {
private:
// actions
Timer timer{"timer", this};
LogicalAction<int> counter{"counter", this};

// reactions_
Reaction r_init{"r_init", 1, this, [this]() { init(); }};
Reaction r_counter{"r_counter", 2, this, [this]() { print_count(); }};
Reaction r_init{"r_init", 1, this, [this]() { this->init(); }};
Reaction r_counter{"r_counter", 2, this, [this]() { this->print_count(); }};

public:
explicit Count(Environment* env)
Expand Down
2 changes: 1 addition & 1 deletion examples/hello/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using namespace reactor;
using namespace std::chrono_literals;

class Hello : public Reactor {
class Hello final : public Reactor {
private:
// actions
Timer timer{"timer", this, 1s, 2s};
Expand Down
3 changes: 3 additions & 0 deletions examples/multiport_mutation/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
add_executable(mutation_multiports main.cc)
target_link_libraries(mutation_multiports reactor-cpp)
add_dependencies(examples mutation_multiports)
39 changes: 39 additions & 0 deletions examples/multiport_mutation/consumer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef CONSUMER_HH // NOLINT
#define CONSUMER_HH // NOLINT

#include <reactor-cpp/reactor-cpp.hh>

using namespace reactor;
using namespace std::chrono_literals;

class Consumer final : public Reactor { // NOLINT
class Inner : public Scope {
Inner(Reactor* reactor, std::size_t index)
: Scope(reactor)
, index_(index) {}
std::size_t index_ = 0;

void reaction_1(const Input<unsigned>& in) const {
std::cout << "consumer: " << index_ << " received value:" << *in.get() << '\n';
}

friend Consumer;
};

Inner _lf_inner;
Reaction handle{"handle", 1, this, [this]() { _lf_inner.reaction_1(this->in); }};

public:
Consumer(const std::string& name, Environment* env, std::size_t index)
: Reactor(name, env)
, _lf_inner(this, index) {
std::cout << "creating instance of consumer" << '\n';
}
~Consumer() override { std::cout << "Consumer Object is deleted" << '\n'; };

Input<unsigned> in{"in", this}; // NOLINT

void assemble() override { handle.declare_trigger(&in); }
};

#endif // CONSUMER_HH
81 changes: 81 additions & 0 deletions examples/multiport_mutation/load_balancer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#ifndef LOAD_BALANCER_HH // NOLINT
#define LOAD_BALANCER_HH // NOLINT

#include <reactor-cpp/reactor-cpp.hh>

#include "../../lib/mutation/multiport.cc"
#include "reactor-cpp/mutations/multiport.hh"

using namespace reactor;
using namespace std::chrono_literals;

class LoadBalancer final : public Reactor { // NOLINT
class Inner : public MutableScope {
explicit Inner(Reactor* reactor)
: MutableScope(reactor) {}

// reaction bodies
static void reaction_1(const Input<unsigned>& inbound, LogicalAction<unsigned>& scale_action,
Multiport<Output<unsigned>>& outbound) {
if (std::rand() % 30 == 0) { // NOLINT
scale_action.schedule(std::rand() % 20 + 1); // NOLINT
}
const unsigned sel = std::rand() % outbound.size(); // NOLINT
std::cout << "Sending out to:" << sel << '\n';
outbound[sel].set(inbound.get());
}

void reaction_2(ModifableMultiport<Output<unsigned>>& outbound,
[[maybe_unused]] const LogicalAction<unsigned>& scale, Output<unsigned>& scale_bank) {
ModifableMultiport<Output<unsigned>>* temp = &outbound;
std::size_t new_size = *scale.get();

auto antideps = (outbound[0]).anti_dependencies();

const auto change_size =
std::make_shared<MutationChangeOutputMultiportSize<unsigned>>(temp, this->reactor_, antideps, new_size);

add_to_transaction(change_size);

commit_transaction();

scale_bank.set(new_size);
}

friend LoadBalancer;
};

Inner _lf_inner;
Reaction process{"process", 2, this, [this]() { Inner::reaction_1(this->inbound, this->scale_action, this->out); }};
Reaction scale{"scale", 1, this, [this]() { _lf_inner.reaction_2(this->out, this->scale_action, this->scale_bank); }};

public:
LoadBalancer(const std::string& name, Environment* env)
: Reactor(name, env)
, _lf_inner(this) {
std::cout << "creating instance of load balancer" << '\n';
out.reserve(4);
for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) {
std::string _lf_port_name = out.name() + "_" + std::to_string(_lf_idx);
out.emplace_back(_lf_port_name, this);
}
}
~LoadBalancer() override = default;

LogicalAction<unsigned> scale_action{"scale", this, 1us}; // NOLINT
ModifableMultiport<Output<unsigned>> out{"out"}; // NOLINT
Input<unsigned> inbound{"inbound", this}; // NOLINT
Output<unsigned> scale_bank{"scale_bank", this}; // NOLINT

void assemble() override {
std::cout << "assemble LoadBalancer\n";
for (auto& _lf_port : out) {
process.declare_antidependency(&_lf_port);
}
process.declare_trigger(&inbound);
scale.declare_trigger(&scale_action);
scale.declare_antidependency(&scale_bank);
}
};

#endif // LOAD_BALANCER_HH
95 changes: 95 additions & 0 deletions examples/multiport_mutation/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#include <iostream>

#include <reactor-cpp/mutations/bank.hh>
#include <reactor-cpp/mutations/connection.hh>

#include "../../lib/mutation/bank.cc"
#include "../../lib/mutation/connection.cc"
#include "./consumer.hh"
#include "./load_balancer.hh"
#include "./producer.hh"
#include <reactor-cpp/reactor-cpp.hh>

class Deployment final : public Reactor { // NOLINT
class Inner : public MutableScope {
int state = 0;

public:
Inner(Reactor* reactor)
: MutableScope(reactor) {}
void reaction_1(const Input<unsigned>& scale, std::vector<std::unique_ptr<Consumer>>& reactor_bank,
ModifableMultiport<Output<unsigned>>& load_balancer) {
std::size_t new_size = *scale.get();
std::size_t old_size = reactor_bank.size();

std::function lambda = [](Reactor* reactor, std::size_t index) {
std::string _lf_inst_name = "consumer_" + std::to_string(index);
return std::make_unique<Consumer>(_lf_inst_name, reactor->environment(), index);
};

auto change_size = std::make_shared<MutationChangeBankSize<std::unique_ptr<Consumer>>>(
&reactor_bank, this->reactor_, new_size, lambda);

add_to_transaction(change_size);

commit_transaction();

if (old_size < new_size) {
for (auto i = 0; i < new_size; i++) {
auto add_conn = std::make_shared<MutationAddConnection<Output<unsigned>, Input<unsigned>>>(
&load_balancer[i], &reactor_bank[i].get()->in, reactor_);
add_to_transaction(add_conn);
}
}

commit_transaction(true);
}

friend LoadBalancer;
};

std::unique_ptr<Producer> producer_;
std::unique_ptr<LoadBalancer> load_balancer_;
std::vector<std::unique_ptr<Consumer>> consumers_;

Reaction scale_bank{"scale_bank", 1, this,
[this]() { this->_inner.reaction_1(this->scale, this->consumers_, load_balancer_->out); }};

Inner _inner;

public:
Deployment(const std::string& name, Environment* env)
: Reactor(name, env)
, _inner(this)
, producer_(std::make_unique<Producer>("producer", environment()))
, load_balancer_(std::make_unique<LoadBalancer>("load_balancer", environment())) {
std::cout << "creating instance of deployment" << '\n';
consumers_.reserve(4);
for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) {
std::string _lf_inst_name = "consumer_" + std::to_string(_lf_idx);
consumers_.push_back(std::make_unique<Consumer>(_lf_inst_name, environment(), _lf_idx));
}
}
~Deployment() override = default;

Input<unsigned> scale{"scale", this}; // NOLINT

void assemble() override {
for (size_t _lf_idx = 0; _lf_idx < 4; _lf_idx++) {
environment()->draw_connection(load_balancer_->out[_lf_idx], consumers_[_lf_idx]->in, ConnectionProperties{});
environment()->draw_connection(producer_->value, load_balancer_->inbound, ConnectionProperties{});
}
environment()->draw_connection(load_balancer_->scale_bank, scale, ConnectionProperties{});
scale_bank.declare_trigger(&this->scale);
}
};

auto main() -> int {
Environment env{4, true};
auto deployment = std::make_unique<Deployment>("c1", &env);
env.optimize();
env.assemble();
auto thread = env.startup();
thread.join();
return 0;
}
47 changes: 47 additions & 0 deletions examples/multiport_mutation/producer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef PRODUCER_HH // NOLINT
#define PRODUCER_HH // NOLINT

#include <reactor-cpp/reactor-cpp.hh>

using namespace reactor;
using namespace std::chrono_literals;

class Producer final : public Reactor { // NOLINT
private:
Timer timer{"timer", this, 1s, 1s};
Reaction r_timer{"r_timer", 1, this, [this]() { _lf_inner.reaction_1(this->value); }};

class Inner : public Scope {
unsigned int counter_ = 0;

void reaction_1([[maybe_unused]] Output<unsigned>& out) {
std::cout << "producing value:" << counter_ << "\n";
out.set(counter_++);
}

explicit Inner(Reactor* reactor)
: Scope(reactor) {}

friend Producer;
};

Inner _lf_inner;

public:
Producer(const std::string& name, Environment* env)
: Reactor(name, env)
, _lf_inner(this) {
std::cout << "creating instance of producer\n";
}
Producer() = delete;
~Producer() override = default;

Output<unsigned> value{"value", this}; // NOLINT

void assemble() override {
r_timer.declare_trigger(&timer);
r_timer.declare_antidependency(&value);
}
};

#endif // PRODUCER_HH
Loading
Loading