Skip to content

Commit

Permalink
Add LifecycleSubscription
Browse files Browse the repository at this point in the history
Signed-off-by: Aarav Gupta <[email protected]>
  • Loading branch information
Amronos committed Dec 24, 2024
1 parent a0a2a06 commit 113024c
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 16 deletions.
13 changes: 7 additions & 6 deletions rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,13 @@ create_subscription(
subscription_topic_stats->set_publisher_timer(timer);
}

auto factory = rclcpp::create_subscription_factory<MessageT>(
std::forward<CallbackT>(callback),
options,
msg_mem_strat,
subscription_topic_stats
);
auto factory =
rclcpp::create_subscription_factory<MessageT, CallbackT, AllocatorT, SubscriptionT>(
std::forward<CallbackT>(callback),
options,
msg_mem_strat,
subscription_topic_stats
);

const rclcpp::QoS & actual_qos = options.qos_overriding_options.get_policy_kinds().size() ?
rclcpp::detail::declare_qos_parameters(
Expand Down
10 changes: 3 additions & 7 deletions rclcpp/include/rclcpp/subscription_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,9 @@ create_subscription_factory(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rclcpp::QoS & qos
) -> rclcpp::SubscriptionBase::SharedPtr
) -> std::shared_ptr<SubscriptionT>
{
using rclcpp::Subscription;
using rclcpp::SubscriptionBase;

auto sub = Subscription<MessageT, AllocatorT>::make_shared(
auto sub = std::make_shared<SubscriptionT>(
node_base,
rclcpp::get_message_type_support_handle<MessageT>(),
topic_name,
Expand All @@ -116,8 +113,7 @@ create_subscription_factory(
// require this->shared_from_this() which cannot be called from
// the constructor.
sub->post_init_setup(node_base, qos, options);
auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
return sub_base_ptr;
return sub;
}
};

Expand Down
4 changes: 4 additions & 0 deletions rclcpp_lifecycle/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ if(BUILD_TESTING)
if(TARGET test_lifecycle_publisher)
target_link_libraries(test_lifecycle_publisher ${PROJECT_NAME} rcl_lifecycle::rcl_lifecycle rclcpp::rclcpp ${test_msgs_TARGETS})
endif()
ament_add_gtest(test_lifecycle_subscription test/test_lifecycle_subscription.cpp)
if(TARGET test_lifecycle_subscription)
target_link_libraries(test_lifecycle_subscription ${PROJECT_NAME} rcl_lifecycle::rcl_lifecycle rclcpp::rclcpp ${test_msgs_TARGETS})
endif()
ament_add_gtest(test_lifecycle_service_client test/test_lifecycle_service_client.cpp TIMEOUT 120)
ament_add_test_label(test_lifecycle_service_client mimick)
if(TARGET test_lifecycle_service_client)
Expand Down
3 changes: 2 additions & 1 deletion rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@

#include "rclcpp_lifecycle/node_interfaces/lifecycle_node_interface.hpp"
#include "rclcpp_lifecycle/lifecycle_publisher.hpp"
#include "rclcpp_lifecycle/lifecycle_subscription.hpp"
#include "rclcpp_lifecycle/state.hpp"
#include "rclcpp_lifecycle/transition.hpp"
#include "rclcpp_lifecycle/visibility_control.h"
Expand Down Expand Up @@ -240,7 +241,7 @@ class LifecycleNode : public node_interfaces::LifecycleNodeInterface,
typename MessageT,
typename CallbackT,
typename AllocatorT = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<MessageT, AllocatorT>,
typename SubscriptionT = rclcpp_lifecycle::LifecycleSubscription<MessageT, AllocatorT>,
typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType>
std::shared_ptr<SubscriptionT>
create_subscription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/type_support_decl.hpp"

#include "rclcpp_lifecycle/lifecycle_subscription.hpp"
#include "rmw/types.h"

#include "rclcpp_lifecycle/lifecycle_node.hpp"
Expand All @@ -65,7 +66,6 @@ LifecycleNode::create_publisher(
return pub;
}

// TODO(karsten1987): Create LifecycleSubscriber
template<
typename MessageT,
typename CallbackT,
Expand All @@ -80,13 +80,15 @@ LifecycleNode::create_subscription(
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat)
{
return rclcpp::create_subscription<MessageT>(
auto sub = rclcpp::create_subscription<MessageT, CallbackT, AllocatorT, SubscriptionT>(
*this,
topic_name,
qos,
std::forward<CallbackT>(callback),
options,
msg_mem_strat);
this->add_managed_entity(sub);
return sub;
}

template<typename DurationRepT, typename DurationT, typename CallbackT>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP_LIFECYCLE__LIFECYCLE_SUBSCRIPTION_HPP_
#define RCLCPP_LIFECYCLE__LIFECYCLE_SUBSCRIPTION_HPP_

#include <memory>
#include <string>

#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/qos.hpp"

#include "rclcpp_lifecycle/managed_entity.hpp"


namespace rclcpp_lifecycle
{

/// @brief Child class of rclcpp::Subscription that adds lifecycle functionality.
/**
* This class is a child class of rclcpp::Subscription that adds a lifecycle
* check to the callback function. If the node is in an inactive state, the
* callback will not be called.
*/
template<
typename MessageT,
typename AllocatorT = std::allocator<void>,
typename SubscribedT = typename rclcpp::TypeAdapter<MessageT>::custom_type,
typename ROSMessageT = typename rclcpp::TypeAdapter<MessageT>::ros_message_type,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
ROSMessageT,
AllocatorT
>>
class LifecycleSubscription : public SimpleManagedEntity,
public rclcpp::Subscription<MessageT, AllocatorT>
{
private:
using SubscriptionTopicStatisticsSharedPtr =
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics>;

public:
LifecycleSubscription(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rosidl_message_type_support_t & type_support_handle,
const std::string & topic_name,
const rclcpp::QoS & qos,
rclcpp::AnySubscriptionCallback<MessageT, AllocatorT> callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr)
: rclcpp::Subscription<MessageT>(
node_base,
type_support_handle,
topic_name,
qos,
callback,
options,
message_memory_strategy,
subscription_topic_statistics)
{
}

/// TODO: Hold onto the data that arrives before activation, and deliver that on activation.
/// Check if we need to handle the message, and execute the callback if we do.
void handle_message(
std::shared_ptr<void> & message, const rclcpp::MessageInfo & message_info) override
{
if (!this->is_activated()) {
return;
}
rclcpp::Subscription<MessageT, AllocatorT>::handle_message(message, message_info);
}
};

} // namespace rclcpp_lifecycle

#endif // RCLCPP_LIFECYCLE__LIFECYCLE_SUBSCRIPTION_HPP_
165 changes: 165 additions & 0 deletions rclcpp_lifecycle/test/test_lifecycle_subscription.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>
#include <memory>
#include <string>

#include "lifecycle_msgs/msg/state.hpp"
#include "lifecycle_msgs/msg/transition.hpp"

#include "test_msgs/msg/empty.hpp"

#include "rclcpp_lifecycle/lifecycle_node.hpp"
#include "rclcpp_lifecycle/lifecycle_subscription.hpp"

using lifecycle_msgs::msg::State;
using lifecycle_msgs::msg::Transition;

class TestDefaultStateMachine : public ::testing::Test
{
protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
static void TearDownTestCase()
{
rclcpp::shutdown();
}
};

/// We want to test everything for both the wall and generic timer.
enum class TimerType
{
WALL_TIMER,
GENERIC_TIMER,
};

class EmptyLifecycleNode : public rclcpp_lifecycle::LifecycleNode
{
public:
explicit EmptyLifecycleNode(const std::string & node_name, const TimerType & timer_type)
: rclcpp_lifecycle::LifecycleNode(node_name)
{
// For coverage this is being added here
switch (timer_type) {
case TimerType::WALL_TIMER:
{
auto timer = create_wall_timer(std::chrono::seconds(1), []() {});
add_timer_handle(timer);
break;
}
case TimerType::GENERIC_TIMER:
{
auto timer = create_timer(std::chrono::seconds(1), []() {});
add_timer_handle(timer);
break;
}
}
}

void empty_callback(const test_msgs::msg::Empty msg) {}
};

class TestLifecycleSubscription : public ::testing::TestWithParam<TimerType>
{
public:
void SetUp()
{
rclcpp::init(0, nullptr);
}

void TearDown()
{
rclcpp::shutdown();
}
};

TEST_P(TestLifecycleSubscription, subscribe_managed_by_node) {
auto node = std::make_shared<EmptyLifecycleNode>("node", GetParam());

std::shared_ptr<rclcpp_lifecycle::LifecycleSubscription<test_msgs::msg::Empty>> subscription =
node->create_subscription<test_msgs::msg::Empty>(std::string("topic"), rclcpp::QoS(10),
bind(&EmptyLifecycleNode::empty_callback, node, std::placeholders::_1));

// transition via LifecycleNode
auto success = rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::SUCCESS;
auto reset_key = rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::ERROR;
auto ret = reset_key;

EXPECT_EQ(State::PRIMARY_STATE_UNCONFIGURED, node->get_current_state().id());
node->trigger_transition(
rclcpp_lifecycle::Transition(Transition::TRANSITION_CONFIGURE), ret);
ASSERT_EQ(success, ret);
ret = reset_key;
node->trigger_transition(
rclcpp_lifecycle::Transition(Transition::TRANSITION_ACTIVATE), ret);
ASSERT_EQ(success, ret);
ret = reset_key;
EXPECT_TRUE(subscription->is_activated());
{
std::shared_ptr<void> msg_ptr(new test_msgs::msg::Empty());
EXPECT_NO_THROW(subscription->handle_message(msg_ptr, rclcpp::MessageInfo()));
}
node->trigger_transition(
rclcpp_lifecycle::Transition(Transition::TRANSITION_DEACTIVATE), ret);
ASSERT_EQ(success, ret);
ret = reset_key;
(void)ret; // Just to make clang happy
EXPECT_FALSE(subscription->is_activated());
{
std::shared_ptr<void> msg_ptr(new test_msgs::msg::Empty());
EXPECT_NO_THROW(subscription->handle_message(msg_ptr, rclcpp::MessageInfo()));
}
}

TEST_P(TestLifecycleSubscription, subscribe) {
auto node = std::make_shared<EmptyLifecycleNode>("node", GetParam());

std::shared_ptr<rclcpp_lifecycle::LifecycleSubscription<test_msgs::msg::Empty>> subscription =
node->create_subscription<test_msgs::msg::Empty>(std::string("topic"), rclcpp::QoS(10),
bind(&EmptyLifecycleNode::empty_callback, node, std::placeholders::_1));

// transition via LifecycleSubscription
subscription->on_deactivate();
EXPECT_FALSE(subscription->is_activated());
{
std::shared_ptr<void> msg_ptr(new test_msgs::msg::Empty());
EXPECT_NO_THROW(subscription->handle_message(msg_ptr, rclcpp::MessageInfo()));
}

subscription->on_activate();
EXPECT_TRUE(subscription->is_activated());
{
std::shared_ptr<void> msg_ptr(new test_msgs::msg::Empty());
EXPECT_NO_THROW(subscription->handle_message(msg_ptr, rclcpp::MessageInfo()));
}
}

INSTANTIATE_TEST_SUITE_P(
PerTimerType, TestLifecycleSubscription,
::testing::Values(TimerType::WALL_TIMER, TimerType::GENERIC_TIMER),
[](const ::testing::TestParamInfo<TimerType> & info) -> std::string {
switch (info.param) {
case TimerType::WALL_TIMER:
return std::string("wall_timer");
case TimerType::GENERIC_TIMER:
return std::string("generic_timer");
default:
break;
}
return std::string("unknown");
}
);

0 comments on commit 113024c

Please sign in to comment.