From 439d6dc6b0325811793533414dee89196cd72186 Mon Sep 17 00:00:00 2001 From: yadunund Date: Mon, 7 Oct 2024 12:42:49 -0700 Subject: [PATCH] Thread-safe access to SubscriptionData (#288) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Make SubscriptionData thread safe Signed-off-by: Yadunund * Set subscription handle to SubScriptionData raw ptr due to rmw_wait limitation Signed-off-by: Yadunund * Apply suggestions from code review Signed-off-by: Yadunund * Safely shutdown SubscriptionData even if make() fails and cleanup todo Signed-off-by: Yadunund * Undeclare z_sub only if z_check passes Signed-off-by: Yadunund * Apply suggestions from code review Co-authored-by: Alejandro Hernández Cordero Signed-off-by: yadunund --------- Signed-off-by: Yadunund Signed-off-by: yadunund Co-authored-by: Alejandro Hernández Cordero --- rmw_zenoh_cpp/CMakeLists.txt | 1 + .../src/detail/attachment_helpers.cpp | 2 + .../src/detail/attachment_helpers.hpp | 2 + rmw_zenoh_cpp/src/detail/graph_cache.cpp | 41 +- rmw_zenoh_cpp/src/detail/graph_cache.hpp | 20 +- rmw_zenoh_cpp/src/detail/liveliness_utils.cpp | 13 + rmw_zenoh_cpp/src/detail/liveliness_utils.hpp | 6 +- rmw_zenoh_cpp/src/detail/rmw_data_types.cpp | 202 +----- rmw_zenoh_cpp/src/detail/rmw_data_types.hpp | 71 -- rmw_zenoh_cpp/src/detail/rmw_node_data.cpp | 82 ++- rmw_zenoh_cpp/src/detail/rmw_node_data.hpp | 20 + .../src/detail/rmw_publisher_data.cpp | 1 - .../src/detail/rmw_publisher_data.hpp | 3 +- .../src/detail/rmw_subscription_data.cpp | 634 ++++++++++++++++++ .../src/detail/rmw_subscription_data.hpp | 166 +++++ rmw_zenoh_cpp/src/rmw_event.cpp | 24 +- rmw_zenoh_cpp/src/rmw_zenoh.cpp | 504 +++----------- 17 files changed, 1064 insertions(+), 728 deletions(-) create mode 100644 rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp create mode 100644 rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp diff --git a/rmw_zenoh_cpp/CMakeLists.txt b/rmw_zenoh_cpp/CMakeLists.txt index bd274a70..508d5415 100644 --- a/rmw_zenoh_cpp/CMakeLists.txt +++ b/rmw_zenoh_cpp/CMakeLists.txt @@ -42,6 +42,7 @@ add_library(rmw_zenoh_cpp SHARED src/detail/rmw_data_types.cpp src/detail/rmw_publisher_data.cpp src/detail/rmw_node_data.cpp + src/detail/rmw_subscription_data.cpp src/detail/service_type_support.cpp src/detail/type_support.cpp src/detail/type_support_common.cpp diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp index 46191ecc..7733c918 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.cpp @@ -24,6 +24,7 @@ namespace rmw_zenoh_cpp { +//============================================================================== bool get_gid_from_attachment( const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]) { @@ -45,6 +46,7 @@ bool get_gid_from_attachment( return true; } +//============================================================================== int64_t get_int64_from_attachment( const z_attachment_t * const attachment, const std::string & name) { diff --git a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp index e27dc886..034b6a49 100644 --- a/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp +++ b/rmw_zenoh_cpp/src/detail/attachment_helpers.hpp @@ -23,9 +23,11 @@ namespace rmw_zenoh_cpp { +//============================================================================== bool get_gid_from_attachment( const z_attachment_t * const attachment, uint8_t gid[RMW_GID_STORAGE_SIZE]); +//============================================================================== int64_t get_int64_from_attachment( const z_attachment_t * const attachment, const std::string & name); } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.cpp b/rmw_zenoh_cpp/src/detail/graph_cache.cpp index bad12959..f21de3d1 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.cpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.cpp @@ -852,24 +852,23 @@ rmw_ret_t GraphCache::get_topic_names_and_types( ///============================================================================= rmw_ret_t GraphCache::publisher_count_matched_subscriptions( - PublisherDataConstPtr pub_data, + const liveliness::TopicInfo & pub_topic_info, size_t * subscription_count) { // TODO(Yadunund): Replace this logic by returning a number that is tracked once // we support matched qos events. *subscription_count = 0; - auto topic_info = pub_data->topic_info(); - GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_); + GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(pub_topic_info.name_); if (topic_it != graph_topics_.end()) { GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find( - topic_info.type_); + pub_topic_info.type_); if (topic_data_it != topic_it->second.end()) { for (const auto & [_, topic_data] : topic_data_it->second) { // If a subscription exists with compatible QoS, update the subscription count. if (!topic_data->subs_.empty()) { rmw_qos_compatibility_type_t is_compatible; rmw_ret_t ret = rmw_qos_profile_check_compatible( - pub_data->adapted_qos_profile(), + pub_topic_info.qos_, topic_data->info_.qos_, &is_compatible, nullptr, @@ -887,25 +886,23 @@ rmw_ret_t GraphCache::publisher_count_matched_subscriptions( ///============================================================================= rmw_ret_t GraphCache::subscription_count_matched_publishers( - const rmw_subscription_t * subscription, + const liveliness::TopicInfo & sub_topic_info, size_t * publisher_count) { // TODO(Yadunund): Replace this logic by returning a number that is tracked once // we support matched qos events. *publisher_count = 0; - GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(subscription->topic_name); + GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(sub_topic_info.name_); if (topic_it != graph_topics_.end()) { - rmw_subscription_data_t * sub_data = - static_cast(subscription->data); GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find( - sub_data->type_support->get_name()); + sub_topic_info.type_); if (topic_data_it != topic_it->second.end()) { for (const auto & [_, topic_data] : topic_data_it->second) { // If a subscription exists with compatible QoS, update the subscription count. if (!topic_data->pubs_.empty()) { rmw_qos_compatibility_type_t is_compatible; rmw_ret_t ret = rmw_qos_profile_check_compatible( - sub_data->adapted_qos_profile, + sub_topic_info.qos_, topic_data->info_.qos_, &is_compatible, nullptr, @@ -1344,29 +1341,29 @@ std::unique_ptr GraphCache::take_event_status( ///============================================================================= void GraphCache::set_querying_subscriber_callback( - const rmw_subscription_data_t * sub_data, + const std::string & sub_keyexpr, + const std::size_t sub_guid, QueryingSubscriberCallback cb) { - const std::string keyexpr = sub_data->entity->topic_info().value().topic_keyexpr_; - auto cb_it = querying_subs_cbs_.find(keyexpr); + auto cb_it = querying_subs_cbs_.find(sub_keyexpr); if (cb_it == querying_subs_cbs_.end()) { - querying_subs_cbs_[keyexpr] = std::move( - std::unordered_map{}); - cb_it = querying_subs_cbs_.find(keyexpr); + querying_subs_cbs_[sub_keyexpr] = std::move( + std::unordered_map{}); + cb_it = querying_subs_cbs_.find(sub_keyexpr); } - cb_it->second.insert(std::make_pair(sub_data, std::move(cb))); + cb_it->second.insert(std::make_pair(sub_guid, std::move(cb))); } ///============================================================================= void GraphCache::remove_querying_subscriber_callback( - const rmw_subscription_data_t * sub_data) + const std::string & sub_keyexpr, + const std::size_t sub_guid) { - auto cb_map_it = querying_subs_cbs_.find(sub_data->entity->topic_info()->topic_keyexpr_); + auto cb_map_it = querying_subs_cbs_.find(sub_keyexpr); if (cb_map_it == querying_subs_cbs_.end()) { return; } - cb_map_it->second.erase(sub_data); + cb_map_it->second.erase(sub_guid); } } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/graph_cache.hpp b/rmw_zenoh_cpp/src/detail/graph_cache.hpp index 90ff2900..4864d678 100644 --- a/rmw_zenoh_cpp/src/detail/graph_cache.hpp +++ b/rmw_zenoh_cpp/src/detail/graph_cache.hpp @@ -28,7 +28,6 @@ #include "event.hpp" #include "liveliness_utils.hpp" #include "ordered_map.hpp" -#include "rmw_publisher_data.hpp" #include "rcutils/allocator.h" #include "rcutils/types.h" @@ -40,11 +39,6 @@ namespace rmw_zenoh_cpp { -// Forward declare to prevent circular dependency. -// TODO(Yadunund): Remove this once we move rmw_subscription_data_t out of -// rmw_data_types.hpp. -class rmw_subscription_data_t; - ///============================================================================= // TODO(Yadunund): Consider changing this to an array of unordered_set where the index of the // array corresponds to the EntityType enum. This way we don't need to mix @@ -135,11 +129,11 @@ class GraphCache final rmw_names_and_types_t * topic_names_and_types) const; rmw_ret_t publisher_count_matched_subscriptions( - PublisherDataConstPtr pub_data, + const liveliness::TopicInfo & pub_topic_info, size_t * subscription_count); rmw_ret_t subscription_count_matched_publishers( - const rmw_subscription_t * subscription, + const liveliness::TopicInfo & sub_topic_info, size_t * publisher_count); rmw_ret_t get_service_names_and_types( @@ -196,11 +190,13 @@ class GraphCache final static bool is_entity_pub(const liveliness::Entity & entity); void set_querying_subscriber_callback( - const rmw_subscription_data_t * sub_data, + const std::string & sub_keyexpr, + const std::size_t sub_guid, QueryingSubscriberCallback cb); void remove_querying_subscriber_callback( - const rmw_subscription_data_t * sub_data); + const std::string & sub_keyexpr, + const std::size_t sub_guid); private: // Helper function to convert an Entity into a GraphNode. @@ -300,8 +296,8 @@ class GraphCache final using GraphEventCallbackMap = std::unordered_map; // EventCallbackMap for each type of event we support in rmw_zenoh_cpp. GraphEventCallbackMap event_callbacks_; - // Map keyexpressions to QueryingSubscriberCallback. - std::unordered_map> querying_subs_cbs_; // Counters to track changes to event statues for each topic. std::unordered_map(gid[i]); + } + return std::hash{}(hash_str.str()); +} } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp index e3440766..994de31f 100644 --- a/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp +++ b/rmw_zenoh_cpp/src/detail/liveliness_utils.hpp @@ -237,8 +237,12 @@ std::string zid_to_str(const z_id_t & id); } // namespace liveliness ///============================================================================= -// Helper function to generate a randon GID. +/// Helper function to generate a random GID. void generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]); + +///============================================================================= +/// Generate a hash for a given GID. +size_t hash_gid(const uint8_t gid[RMW_GID_STORAGE_SIZE]); } // namespace rmw_zenoh_cpp ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp index 4b56a5e0..0489c261 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.cpp @@ -21,6 +21,7 @@ #include #include +#include "liveliness_utils.hpp" #include "logging_macros.hpp" #include "rcpputils/scope_exit.hpp" @@ -32,149 +33,8 @@ #include "rmw_data_types.hpp" ///============================================================================= -namespace -{ -size_t hash_gid(const uint8_t * gid) -{ - std::stringstream hash_str; - hash_str << std::hex; - size_t i = 0; - for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) { - hash_str << static_cast(gid[i]); - } - return std::hash{}(hash_str.str()); -} - -///============================================================================= -size_t hash_gid(const rmw_request_id_t & request_id) -{ - return hash_gid(request_id.writer_guid); -} -} // namespace - namespace rmw_zenoh_cpp { -///============================================================================= -saved_msg_data::saved_msg_data( - zc_owned_payload_t p, - uint64_t recv_ts, - const uint8_t pub_gid[RMW_GID_STORAGE_SIZE], - int64_t seqnum, - int64_t source_ts) -: payload(p), recv_timestamp(recv_ts), sequence_number(seqnum), source_timestamp(source_ts) -{ - memcpy(publisher_gid, pub_gid, RMW_GID_STORAGE_SIZE); -} - -///============================================================================= -saved_msg_data::~saved_msg_data() -{ - z_drop(z_move(payload)); -} - -///============================================================================= -bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not( - rmw_wait_set_data_t * wait_set_data) -{ - std::lock_guard lock(condition_mutex_); - if (!message_queue_.empty()) { - return true; - } - - wait_set_data_ = wait_set_data; - - return false; -} - -///============================================================================= -void rmw_subscription_data_t::notify() -{ - std::lock_guard lock(condition_mutex_); - if (wait_set_data_ != nullptr) { - std::lock_guard wait_set_lock(wait_set_data_->condition_mutex); - wait_set_data_->triggered = true; - wait_set_data_->condition_variable.notify_one(); - } -} - -///============================================================================= -bool rmw_subscription_data_t::detach_condition_and_queue_is_empty() -{ - std::lock_guard lock(condition_mutex_); - wait_set_data_ = nullptr; - - return message_queue_.empty(); -} - -///============================================================================= -std::unique_ptr rmw_subscription_data_t::pop_next_message() -{ - std::lock_guard lock(message_queue_mutex_); - - if (message_queue_.empty()) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. - return nullptr; - } - - std::unique_ptr msg_data = std::move(message_queue_.front()); - message_queue_.pop_front(); - - return msg_data; -} - -///============================================================================= -void rmw_subscription_data_t::add_new_message( - std::unique_ptr msg, const std::string & topic_name) -{ - std::lock_guard lock(message_queue_mutex_); - - if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && - message_queue_.size() >= adapted_qos_profile.depth) - { - // Log warning if message is discarded due to hitting the queue depth - RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "Message queue depth of %ld reached, discarding oldest message " - "for subscription for %s", - adapted_qos_profile.depth, - topic_name.c_str()); - - // If the adapted_qos_profile.depth is 0, the std::move command below will result - // in UB and the z_drop will segfault. We explicitly set the depth to a minimum of 1 - // in rmw_create_subscription() but to be safe, we only attempt to discard from the - // queue if it is non-empty. - if (!message_queue_.empty()) { - std::unique_ptr old = std::move(message_queue_.front()); - message_queue_.pop_front(); - } - } - - // Check for messages lost if the new sequence number is not monotonically increasing. - const size_t gid_hash = hash_gid(msg->publisher_gid); - auto last_known_pub_it = last_known_published_msg_.find(gid_hash); - if (last_known_pub_it != last_known_published_msg_.end()) { - const int64_t seq_increment = std::abs(msg->sequence_number - last_known_pub_it->second); - if (seq_increment > 1) { - const size_t num_msg_lost = seq_increment - 1; - total_messages_lost_ += num_msg_lost; - auto event_status = std::make_unique(); - event_status->total_count_change = num_msg_lost; - event_status->total_count = total_messages_lost_; - events_mgr.add_new_event( - ZENOH_EVENT_MESSAGE_LOST, - std::move(event_status)); - } - } - // Always update the last known sequence number for the publisher - last_known_published_msg_[gid_hash] = msg->sequence_number; - - message_queue_.emplace_back(std::move(msg)); - - // Since we added new data, trigger user callback and guard condition if they are available - data_callback_mgr.trigger_callback(); - notify(); -} - ///============================================================================= bool rmw_service_data_t::queue_has_data_and_attach_condition_if_not( rmw_wait_set_data_t * wait_set_data) @@ -251,7 +111,7 @@ void rmw_service_data_t::add_new_query(std::unique_ptr query) bool rmw_service_data_t::add_to_query_map( const rmw_request_id_t & request_id, std::unique_ptr query) { - size_t hash = hash_gid(request_id); + const size_t hash = rmw_zenoh_cpp::hash_gid(request_id.writer_guid); std::lock_guard lock(sequence_to_query_map_mutex_); @@ -280,7 +140,7 @@ bool rmw_service_data_t::add_to_query_map( std::unique_ptr rmw_service_data_t::take_from_query_map( const rmw_request_id_t & request_id) { - size_t hash = hash_gid(request_id); + const size_t hash = rmw_zenoh_cpp::hash_gid(request_id.writer_guid); std::lock_guard lock(sequence_to_query_map_mutex_); @@ -415,62 +275,6 @@ bool rmw_client_data_t::is_shutdown() const return is_shutdown_; } -//============================================================================== -void sub_data_handler( - const z_sample_t * sample, - void * data) -{ - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - auto drop_keystr = rcpputils::make_scope_exit( - [&keystr]() { - z_drop(z_move(keystr)); - }); - - auto sub_data = static_cast(data); - if (sub_data == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain rmw_subscription_data_t from data for " - "subscription for %s", - z_loan(keystr) - ); - return; - } - - uint8_t pub_gid[RMW_GID_STORAGE_SIZE]; - if (!get_gid_from_attachment(&sample->attachment, pub_gid)) { - // We failed to get the GID from the attachment. While this isn't fatal, - // it is unusual and so we should report it. - memset(pub_gid, 0, RMW_GID_STORAGE_SIZE); - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to obtain publisher GID from the attachment."); - } - - int64_t sequence_number = get_int64_from_attachment(&sample->attachment, "sequence_number"); - if (sequence_number < 0) { - // We failed to get the sequence number from the attachment. While this - // isn't fatal, it is unusual and so we should report it. - sequence_number = 0; - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment."); - } - - int64_t source_timestamp = get_int64_from_attachment(&sample->attachment, "source_timestamp"); - if (source_timestamp < 0) { - // We failed to get the source timestamp from the attachment. While this - // isn't fatal, it is unusual and so we should report it. - source_timestamp = 0; - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment."); - } - - sub_data->add_new_message( - std::make_unique( - zc_sample_payload_rcinc(sample), - sample->timestamp.time, pub_gid, sequence_number, source_timestamp), z_loan(keystr)); -} - ///============================================================================= ZenohQuery::ZenohQuery(const z_query_t * query) { diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 90c4279c..c5bfbe4f 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include "rcutils/allocator.h" @@ -44,76 +43,6 @@ namespace rmw_zenoh_cpp { -///============================================================================= -// z_owned_closure_sample_t -void sub_data_handler(const z_sample_t * sample, void * sub_data); - -struct saved_msg_data -{ - explicit saved_msg_data( - zc_owned_payload_t p, - uint64_t recv_ts, - const uint8_t pub_gid[RMW_GID_STORAGE_SIZE], - int64_t seqnum, - int64_t source_ts); - - ~saved_msg_data(); - - zc_owned_payload_t payload; - uint64_t recv_timestamp; - uint8_t publisher_gid[RMW_GID_STORAGE_SIZE]; - int64_t sequence_number; - int64_t source_timestamp; -}; - -///============================================================================= -class rmw_subscription_data_t final -{ -public: - // The Entity generated for the subscription. - std::shared_ptr entity; - - // An owned subscriber or querying_subscriber depending on the QoS settings. - std::variant sub; - - // Store the actual QoS profile used to configure this subscription. - rmw_qos_profile_t adapted_qos_profile; - - // Liveliness token for the subscription. - zc_owned_liveliness_token_t token; - - const void * type_support_impl; - const char * typesupport_identifier; - const rosidl_type_hash_t * type_hash; - MessageTypeSupport * type_support; - rmw_context_t * context; - - bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); - - bool detach_condition_and_queue_is_empty(); - - std::unique_ptr pop_next_message(); - - void add_new_message(std::unique_ptr msg, const std::string & topic_name); - - DataCallbackManager data_callback_mgr; - EventsManager events_mgr; - -private: - std::deque> message_queue_; - mutable std::mutex message_queue_mutex_; - - // Map GID of a publisher to the sequence number of the message it published. - std::unordered_map last_known_published_msg_; - size_t total_messages_lost_{0}; - - void notify(); - - rmw_wait_set_data_t * wait_set_data_{nullptr}; - std::mutex condition_mutex_; -}; - - ///============================================================================= void service_data_handler(const z_query_t * query, void * service_data); diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 5de83a4f..88cb0d3e 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -185,6 +185,74 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher) pubs_.erase(publisher); } +///============================================================================= +bool NodeData::create_sub_data( + const rmw_subscription_t * const subscription, + z_session_t session, + std::shared_ptr graph_cache, + std::size_t id, + const std::string & topic_name, + const rosidl_message_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile) +{ + std::lock_guard lock_guard(mutex_); + if (is_shutdown_) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create SubscriptionData as the NodeData has been shutdown."); + return false; + } + + if (subs_.count(subscription) > 0) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "SubscriptionData already exists."); + return false; + } + + auto sub_data = SubscriptionData::make( + std::move(session), + std::move(graph_cache), + node_, + entity_->node_info(), + id_, + std::move(id), + std::move(topic_name), + type_support, + qos_profile); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to make SubscriptionData."); + return false; + } + + auto insertion = subs_.insert(std::make_pair(subscription, std::move(sub_data))); + if (!insertion.second) { + return false; + } + return true; +} + +///============================================================================= +SubscriptionDataPtr NodeData::get_sub_data(const rmw_subscription_t * const subscription) +{ + std::lock_guard lock_guard(mutex_); + auto it = subs_.find(subscription); + if (it == subs_.end()) { + return nullptr; + } + + return it->second; +} + +///============================================================================= +void NodeData::delete_sub_data(const rmw_subscription_t * const subscription) +{ + std::lock_guard lock_guard(mutex_); + subs_.erase(subscription); +} + ///============================================================================= rmw_ret_t NodeData::shutdown() { @@ -207,12 +275,24 @@ rmw_ret_t NodeData::shutdown() ); } } + for (auto sub_it = subs_.begin(); sub_it != subs_.end(); ++sub_it) { + ret = sub_it->second->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to shutdown subscription %s within id %zu. rmw_ret_t code: %zu.", + sub_it->second->topic_info().name_.c_str(), + id_, + ret + ); + } + } // Unregister this node from the ROS graph. zc_liveliness_undeclare_token(z_move(token_)); is_shutdown_ = true; - return RMW_RET_OK; + return ret; } ///============================================================================= diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index 26c5eca9..cabde5e5 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -23,8 +23,10 @@ #include #include +#include "graph_cache.hpp" #include "liveliness_utils.hpp" #include "rmw_publisher_data.hpp" +#include "rmw_subscription_data.hpp" #include "rmw/rmw.h" @@ -63,6 +65,22 @@ class NodeData final // Delete the PublisherData for a given rmw_publisher_t if present. void delete_pub_data(const rmw_publisher_t * const publisher); + // Create a new SubscriptionData for a publisher in this node. + bool create_sub_data( + const rmw_subscription_t * const publisher, + z_session_t session, + std::shared_ptr graph_cache, + std::size_t id, + const std::string & topic_name, + const rosidl_message_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + /// Retrieve the SubscriptionData for a given rmw_subscription_t if present. + SubscriptionDataPtr get_sub_data(const rmw_subscription_t * const publisher); + + // Delete the SubscriptionData for a given rmw_subscription_t if present. + void delete_sub_data(const rmw_subscription_t * const publisher); + // Shutdown this NodeData. rmw_ret_t shutdown(); @@ -94,6 +112,8 @@ class NodeData final bool is_shutdown_; // Map of publishers. std::unordered_map pubs_; + // Map of subscriptions. + std::unordered_map subs_; }; } // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp index 393fb818..91aa9127 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp @@ -477,7 +477,6 @@ rmw_ret_t PublisherData::shutdown() } ///============================================================================= -// Check if the Node is shutdown. bool PublisherData::is_shutdown() const { std::lock_guard lock(mutex_); diff --git a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp index 7737deeb..2ca66e51 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp @@ -101,10 +101,11 @@ class PublisherData final // Internal mutex. mutable std::mutex mutex_; - // The parent node + // The parent node. const rmw_node_t * rmw_node_; // The Entity generated for the publisher. std::shared_ptr entity_; + // The GID for this publisher. uint8_t gid_[RMW_GID_STORAGE_SIZE]; // An owned publisher. z_owned_publisher_t pub_; diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp new file mode 100644 index 00000000..7fc4154f --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -0,0 +1,634 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_subscription_data.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "attachment_helpers.hpp" +#include "cdr.hpp" +#include "identifier.hpp" +#include "rmw_context_impl_s.hpp" +#include "message_type_support.hpp" +#include "logging_macros.hpp" +#include "qos.hpp" + +#include "rcpputils/scope_exit.hpp" + +#include "rmw/error_handling.h" +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/impl/cpp/macros.hpp" + +namespace rmw_zenoh_cpp +{ +namespace +{ +//============================================================================== +// TODO(Yadunund): Make this a class member and lambda capture weak_from_this() +// instead of passing a rawptr to SubscriptionData when we switch to zenoh-cpp. +void sub_data_handler(const z_sample_t * sample, void * data) +{ + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + auto drop_keystr = rcpputils::make_scope_exit( + [&keystr]() { + z_drop(z_move(keystr)); + }); + + auto sub_data = static_cast(data); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain SubscriptionData from data for %s.", + z_loan(keystr) + ); + return; + } + + uint8_t pub_gid[RMW_GID_STORAGE_SIZE]; + if (!get_gid_from_attachment(&sample->attachment, pub_gid)) { + // We failed to get the GID from the attachment. While this isn't fatal, + // it is unusual and so we should report it. + memset(pub_gid, 0, RMW_GID_STORAGE_SIZE); + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to obtain publisher GID from the attachment."); + } + + int64_t sequence_number = get_int64_from_attachment(&sample->attachment, "sequence_number"); + if (sequence_number < 0) { + // We failed to get the sequence number from the attachment. While this + // isn't fatal, it is unusual and so we should report it. + sequence_number = 0; + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment."); + } + + int64_t source_timestamp = get_int64_from_attachment(&sample->attachment, "source_timestamp"); + if (source_timestamp < 0) { + // We failed to get the source timestamp from the attachment. While this + // isn't fatal, it is unusual and so we should report it. + source_timestamp = 0; + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", "Unable to obtain sequence number from the attachment."); + } + + sub_data->add_new_message( + std::make_unique( + zc_sample_payload_rcinc(sample), + sample->timestamp.time, pub_gid, sequence_number, source_timestamp), z_loan(keystr)); +} +} // namespace + +///============================================================================= +SubscriptionData::Message::Message( + zc_owned_payload_t p, + uint64_t recv_ts, + const uint8_t pub_gid[RMW_GID_STORAGE_SIZE], + int64_t seqnum, + int64_t source_ts) +: payload(p), recv_timestamp(recv_ts), sequence_number(seqnum), source_timestamp(source_ts) +{ + memcpy(publisher_gid, pub_gid, RMW_GID_STORAGE_SIZE); +} + +///============================================================================= +SubscriptionData::Message::~Message() +{ + z_drop(z_move(payload)); +} + +///============================================================================= +std::shared_ptr SubscriptionData::make( + z_session_t session, + std::shared_ptr graph_cache, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t subscription_id, + const std::string & topic_name, + const rosidl_message_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile) +{ + rmw_qos_profile_t adapted_qos_profile = *qos_profile; + rmw_ret_t ret = QoS::get().best_available_qos( + node, topic_name.c_str(), &adapted_qos_profile, rmw_get_publishers_info_by_topic); + if (RMW_RET_OK != ret) { + return nullptr; + } + + rcutils_allocator_t * allocator = &node->context->options.allocator; + + const rosidl_type_hash_t * type_hash = type_support->get_type_hash_func(type_support); + auto callbacks = static_cast(type_support->data); + auto message_type_support = std::make_unique(callbacks); + + // Convert the type hash to a string so that it can be included in + // the keyexpr. + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( + type_hash, + *allocator, + &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); + return nullptr; + } + auto free_type_hash_c_str = rcpputils::make_scope_exit( + [&allocator, &type_hash_c_str]() { + allocator->deallocate(type_hash_c_str, allocator->state); + }); + + // Everything above succeeded and is setup properly. Now declare a subscriber + // with Zenoh; after this, callbacks may come in at any time. + std::size_t domain_id = node_info.domain_id_; + auto entity = liveliness::Entity::make( + z_info_zid(session), + std::to_string(node_id), + std::to_string(subscription_id), + liveliness::EntityType::Subscription, + std::move(node_info), + liveliness::TopicInfo{ + std::move(domain_id), + topic_name, + message_type_support->get_name(), + type_hash_c_str, + adapted_qos_profile} + ); + if (entity == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to generate keyexpr for liveliness token for the subscription %s.", + topic_name.c_str()); + return nullptr; + } + + auto sub_data = std::shared_ptr( + new SubscriptionData{ + node, + graph_cache, + std::move(entity), + type_support->data, + std::move(message_type_support) + }); + + // TODO(Yadunund): Instead of passing a rawptr, rely on capturing weak_ptr + // in the closure callback once we switch to zenoh-cpp. + z_owned_closure_sample_t callback = z_closure(sub_data_handler, nullptr, sub_data.get()); + z_owned_keyexpr_t keyexpr = + z_keyexpr_new(sub_data->entity_->topic_info()->topic_keyexpr_.c_str()); + auto always_free_ros_keyexpr = rcpputils::make_scope_exit( + [&keyexpr]() { + z_keyexpr_drop(z_move(keyexpr)); + }); + if (!z_keyexpr_check(&keyexpr)) { + RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); + return nullptr; + } + // Instantiate the subscription with suitable options depending on the + // adapted_qos_profile. + // TODO(Yadunund): Rely on a separate function to return the sub + // as we start supporting more qos settings. + z_owned_str_t owned_key_str = z_keyexpr_to_string(z_loan(keyexpr)); + auto always_drop_keystr = rcpputils::make_scope_exit( + [&owned_key_str]() { + z_drop(z_move(owned_key_str)); + }); + + if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); + // Make the initial query to hit all the PublicationCaches, using a query_selector with + // '*' in place of the queryable_prefix of each PublicationCache + const std::string selector = "*/" + + sub_data->entity_->topic_info()->topic_keyexpr_; + sub_options.query_selector = z_keyexpr(selector.c_str()); + // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. + // By default a query accepts only replies that matches its query selector. + // This allows us to selectively query certain PublicationCaches when defining the + // set_querying_subscriber_callback below. + sub_options.query_accept_replies = ZCU_REPLY_KEYEXPR_ANY; + // As this initial query is now using a "*", the query target is not COMPLETE. + sub_options.query_target = Z_QUERY_TARGET_ALL; + // We set consolidation to none as we need to receive transient local messages + // from a number of publishers. Eg: To receive TF data published over /tf_static + // by various publishers. + sub_options.query_consolidation = z_query_consolidation_none(); + if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + sub_options.reliability = Z_RELIABILITY_RELIABLE; + } + sub_data->sub_ = ze_declare_querying_subscriber( + session, + z_loan(keyexpr), + z_move(callback), + &sub_options + ); + if (!z_check(std::get(sub_data->sub_))) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return nullptr; + } + // Register the querying subscriber with the graph cache to get latest + // messages from publishers that were discovered after their first publication. + std::weak_ptr data_wp = sub_data; + graph_cache->set_querying_subscriber_callback( + sub_data->entity_->topic_info().value().topic_keyexpr_, + sub_data->entity_->guid(), + [data_wp](const std::string & queryable_prefix) -> void + { + auto sub_data = data_wp.lock(); + if (sub_data == nullptr) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to lock weak_ptr within querying subscription callback." + ); + return; + } + std::lock_guard lock(sub_data->mutex_); + + const std::string selector = queryable_prefix + + "/" + + sub_data->entity_->topic_info().value().topic_keyexpr_; + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "QueryingSubscriberCallback triggered over %s.", + selector.c_str() + ); + z_get_options_t opts = z_get_options_default(); + opts.timeout_ms = std::numeric_limits::max(); + opts.consolidation = z_query_consolidation_latest(); + opts.accept_replies = ZCU_REPLY_KEYEXPR_ANY; + ze_querying_subscriber_get( + z_loan(std::get(sub_data->sub_)), + z_keyexpr(selector.c_str()), + &opts + ); + } + ); + } else { + // Create a regular subscriber for all other durability settings. + z_subscriber_options_t sub_options = z_subscriber_options_default(); + if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + sub_options.reliability = Z_RELIABILITY_RELIABLE; + } + sub_data->sub_ = z_declare_subscriber( + session, + z_loan(keyexpr), + z_move(callback), + &sub_options + ); + if (!z_check(std::get(sub_data->sub_))) { + RMW_SET_ERROR_MSG("unable to create zenoh subscription"); + return nullptr; + } + } + + auto undeclare_z_sub = rcpputils::make_scope_exit( + [sub_data]() { + z_owned_subscriber_t * sub = std::get_if(&sub_data->sub_); + if (sub == nullptr || z_undeclare_subscriber(sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + } else { + ze_owned_querying_subscriber_t * querying_sub = + std::get_if(&sub_data->sub_); + if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub"); + } + } + }); + + // Publish to the graph that a new subscription is in town. + sub_data->token_ = zc_liveliness_declare_token( + session, + z_keyexpr(sub_data->entity_->liveliness_keyexpr().c_str()), + NULL + ); + auto free_token = rcpputils::make_scope_exit( + [data = sub_data]() { + if (data != nullptr) { + z_drop(z_move(data->token_)); + } + }); + if (!z_check(sub_data->token_)) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Unable to create liveliness token for the subscription."); + return nullptr; + } + + undeclare_z_sub.cancel(); + free_token.cancel(); + + return sub_data; +} + +///============================================================================= +SubscriptionData::SubscriptionData( + const rmw_node_t * rmw_node, + std::shared_ptr graph_cache, + std::shared_ptr entity, + const void * type_support_impl, + std::unique_ptr type_support) +: rmw_node_(rmw_node), + graph_cache_(std::move(graph_cache)), + entity_(std::move(entity)), + type_support_impl_(type_support_impl), + type_support_(std::move(type_support)), + last_known_published_msg_({}), + total_messages_lost_(0), + wait_set_data_(nullptr), + is_shutdown_(false) +{ + events_mgr_ = std::make_shared(); +} + +///============================================================================= +std::size_t SubscriptionData::guid() const +{ + std::lock_guard lock(mutex_); + return entity_->guid(); +} + +///============================================================================= +liveliness::TopicInfo SubscriptionData::topic_info() const +{ + std::lock_guard lock(mutex_); + return entity_->topic_info().value(); +} + +///============================================================================= +bool SubscriptionData::liveliness_is_valid() const +{ + std::lock_guard lock(mutex_); + return zc_liveliness_token_check(&token_); +} + +///============================================================================= +std::shared_ptr SubscriptionData::events_mgr() const +{ + std::lock_guard lock(mutex_); + return events_mgr_; +} + +///============================================================================= +SubscriptionData::~SubscriptionData() +{ + const rmw_ret_t ret = this->shutdown(); + if (ret != RMW_RET_OK) { + RMW_ZENOH_LOG_ERROR_NAMED( + "rmw_zenoh_cpp", + "Error destructing publisher /%s.", + entity_->topic_info().value().name_.c_str() + ); + } +} + +///============================================================================= +rmw_ret_t SubscriptionData::shutdown() +{ + rmw_ret_t ret = RMW_RET_OK; + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return ret; + } + + // Unregister this node from the ROS graph. + if (zc_liveliness_token_check(&token_)) { + zc_liveliness_undeclare_token(z_move(token_)); + } + + z_owned_subscriber_t * sub = std::get_if(&sub_); + if (sub != nullptr && z_subscriber_check(sub)) { + if (z_undeclare_subscriber(sub)) { + RMW_SET_ERROR_MSG("failed to undeclare sub."); + ret = RMW_RET_ERROR; + } + } else { + ze_owned_querying_subscriber_t * querying_sub = + std::get_if(&sub_); + if (querying_sub != nullptr && ze_querying_subscriber_check(querying_sub)) { + if (ze_undeclare_querying_subscriber(querying_sub)) { + RMW_SET_ERROR_MSG("failed to undeclare querying sub."); + ret = RMW_RET_ERROR; + } + } + } + + is_shutdown_ = true; + return RMW_RET_OK; +} + +///============================================================================= +bool SubscriptionData::is_shutdown() const +{ + std::lock_guard lock(mutex_); + return is_shutdown_; +} + +///============================================================================= +bool SubscriptionData::queue_has_data_and_attach_condition_if_not( + rmw_wait_set_data_t * wait_set_data) +{ + std::lock_guard lock(mutex_); + if (!message_queue_.empty()) { + return true; + } + + wait_set_data_ = wait_set_data; + + return false; +} + +///============================================================================= +bool SubscriptionData::detach_condition_and_queue_is_empty() +{ + std::lock_guard lock(mutex_); + wait_set_data_ = nullptr; + + return message_queue_.empty(); +} + +///============================================================================= +rmw_ret_t SubscriptionData::take_one_message( + void * ros_message, + rmw_message_info_t * message_info, + bool * taken) +{ + *taken = false; + + std::lock_guard lock(mutex_); + if (is_shutdown_ || message_queue_.empty()) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; + } + std::unique_ptr msg_data = std::move(message_queue_.front()); + message_queue_.pop_front(); + + // Object that manages the raw buffer + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(const_cast(msg_data->payload.payload.start)), + msg_data->payload.payload.len); + + // Object that serializes the data + rmw_zenoh_cpp::Cdr deser(fastbuffer); + if (!type_support_->deserialize_ros_message( + deser.get_cdr(), + ros_message, + type_support_impl_)) + { + RMW_SET_ERROR_MSG("could not deserialize ROS message"); + return RMW_RET_ERROR; + } + + if (message_info != nullptr) { + message_info->source_timestamp = msg_data->source_timestamp; + message_info->received_timestamp = msg_data->recv_timestamp; + message_info->publication_sequence_number = msg_data->sequence_number; + // TODO(clalancette): fill in reception_sequence_number + message_info->reception_sequence_number = 0; + message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); + message_info->from_intra_process = false; + } + + *taken = true; + + return RMW_RET_OK; +} + +///============================================================================= +rmw_ret_t SubscriptionData::take_serialized_message( + rmw_serialized_message_t * serialized_message, + bool * taken, + rmw_message_info_t * message_info) +{ + *taken = false; + + std::lock_guard lock(mutex_); + if (is_shutdown_ || message_queue_.empty()) { + // This tells rcl that the check for a new message was done, but no messages have come in yet. + return RMW_RET_OK; + } + std::unique_ptr msg_data = std::move(message_queue_.front()); + message_queue_.pop_front(); + + if (serialized_message->buffer_capacity < msg_data->payload.payload.len) { + rmw_ret_t ret = + rmw_serialized_message_resize(serialized_message, msg_data->payload.payload.len); + if (ret != RMW_RET_OK) { + return ret; // Error message already set + } + } + serialized_message->buffer_length = msg_data->payload.payload.len; + memcpy( + serialized_message->buffer, msg_data->payload.payload.start, + msg_data->payload.payload.len); + + *taken = true; + + if (message_info != nullptr) { + message_info->source_timestamp = msg_data->source_timestamp; + message_info->received_timestamp = msg_data->recv_timestamp; + message_info->publication_sequence_number = msg_data->sequence_number; + // TODO(clalancette): fill in reception_sequence_number + message_info->reception_sequence_number = 0; + message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; + memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); + message_info->from_intra_process = false; + } + + return RMW_RET_OK; +} + +///============================================================================= +void SubscriptionData::add_new_message( + std::unique_ptr msg, const std::string & topic_name) +{ + std::lock_guard lock(mutex_); + if (is_shutdown_) { + return; + } + const rmw_qos_profile_t adapted_qos_profile = entity_->topic_info().value().qos_; + if (adapted_qos_profile.history != RMW_QOS_POLICY_HISTORY_KEEP_ALL && + message_queue_.size() >= adapted_qos_profile.depth) + { + // Log warning if message is discarded due to hitting the queue depth + RMW_ZENOH_LOG_DEBUG_NAMED( + "rmw_zenoh_cpp", + "Message queue depth of %ld reached, discarding oldest message " + "for subscription for %s", + adapted_qos_profile.depth, + topic_name.c_str()); + + // If the adapted_qos_profile.depth is 0, the std::move command below will result + // in UB and the z_drop will segfault. We explicitly set the depth to a minimum of 1 + // in rmw_create_subscription() but to be safe, we only attempt to discard from the + // queue if it is non-empty. + if (!message_queue_.empty()) { + std::unique_ptr old = std::move(message_queue_.front()); + message_queue_.pop_front(); + } + } + + // Check for messages lost if the new sequence number is not monotonically increasing. + const size_t gid_hash = hash_gid(msg->publisher_gid); + auto last_known_pub_it = last_known_published_msg_.find(gid_hash); + if (last_known_pub_it != last_known_published_msg_.end()) { + const int64_t seq_increment = std::abs(msg->sequence_number - last_known_pub_it->second); + if (seq_increment > 1) { + const size_t num_msg_lost = seq_increment - 1; + total_messages_lost_ += num_msg_lost; + auto event_status = std::make_unique(); + event_status->total_count_change = num_msg_lost; + event_status->total_count = total_messages_lost_; + events_mgr_->add_new_event( + ZENOH_EVENT_MESSAGE_LOST, + std::move(event_status)); + } + } + // Always update the last known sequence number for the publisher. + last_known_published_msg_[gid_hash] = msg->sequence_number; + + message_queue_.emplace_back(std::move(msg)); + + // Since we added new data, trigger user callback and guard condition if they are available + data_callback_mgr_.trigger_callback(); + if (wait_set_data_ != nullptr) { + wait_set_data_->triggered = true; + wait_set_data_->condition_variable.notify_one(); + } +} + +//============================================================================== +void SubscriptionData::set_on_new_message_callback( + rmw_event_callback_t callback, + const void * user_data) +{ + std::lock_guard lock(mutex_); + data_callback_mgr_.set_callback(user_data, callback); +} + +//============================================================================== +std::shared_ptr SubscriptionData::graph_cache() const +{ + std::lock_guard lock(mutex_); + return graph_cache_; +} + +} // namespace rmw_zenoh_cpp diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp new file mode 100644 index 00000000..0d461e49 --- /dev/null +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -0,0 +1,166 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DETAIL__RMW_SUBSCRIPTION_DATA_HPP_ +#define DETAIL__RMW_SUBSCRIPTION_DATA_HPP_ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "event.hpp" +#include "graph_cache.hpp" +#include "liveliness_utils.hpp" +#include "message_type_support.hpp" +#include "type_support_common.hpp" + +#include "rcutils/allocator.h" + +#include "rmw/rmw.h" +#include "rmw/ret_types.h" + +namespace rmw_zenoh_cpp +{ +///============================================================================= +class SubscriptionData final +{ +public: + struct Message + { + explicit Message( + zc_owned_payload_t p, + uint64_t recv_ts, + const uint8_t pub_gid[RMW_GID_STORAGE_SIZE], + int64_t seqnum, + int64_t source_ts); + + ~Message(); + + zc_owned_payload_t payload; + uint64_t recv_timestamp; + uint8_t publisher_gid[RMW_GID_STORAGE_SIZE]; + int64_t sequence_number; + int64_t source_timestamp; + }; + + // Make a shared_ptr of SubscriptionData. + static std::shared_ptr make( + z_session_t session, + std::shared_ptr graph_cache, + const rmw_node_t * const node, + liveliness::NodeInfo node_info, + std::size_t node_id, + std::size_t Subscription_id, + const std::string & topic_name, + const rosidl_message_type_support_t * type_support, + const rmw_qos_profile_t * qos_profile); + + // Publish a ROS message. + rmw_ret_t publish( + const void * ros_message, + std::optional & shm_manager); + + // Get a copy of the GUID of this SubscriptionData's liveliness::Entity. + std::size_t guid() const; + + // Get a copy of the TopicInfo of this SubscriptionData. + liveliness::TopicInfo topic_info() const; + + // Returns true if liveliness token is still valid. + bool liveliness_is_valid() const; + + // Get the events manager of this SubscriptionData. + std::shared_ptr events_mgr() const; + + // Shutdown this SubscriptionData. + rmw_ret_t shutdown(); + + // Check if this SubscriptionData is shutdown. + bool is_shutdown() const; + + // Add a new message to the queue. + void add_new_message(std::unique_ptr msg, const std::string & topic_name); + + bool queue_has_data_and_attach_condition_if_not(rmw_wait_set_data_t * wait_set_data); + + bool detach_condition_and_queue_is_empty(); + + rmw_ret_t take_one_message( + void * ros_message, + rmw_message_info_t * message_info, + bool * taken); + + rmw_ret_t take_serialized_message( + rmw_serialized_message_t * serialized_message, + bool * taken, + rmw_message_info_t * message_info); + + void set_on_new_message_callback( + rmw_event_callback_t callback, + const void * user_data); + + std::shared_ptr graph_cache() const; + + // Destructor. + ~SubscriptionData(); + +private: + SubscriptionData( + const rmw_node_t * rmw_node, + std::shared_ptr graph_cache, + std::shared_ptr entity, + const void * type_support_impl, + std::unique_ptr type_support); + + // Internal mutex. + mutable std::mutex mutex_; + // The parent node. + const rmw_node_t * rmw_node_; + // The graph cache. + std::shared_ptr graph_cache_; + // The Entity generated for the subscription. + std::shared_ptr entity_; + // An owned subscriber or querying_subscriber depending on the QoS settings. + std::variant sub_; + // Liveliness token for the subscription. + zc_owned_liveliness_token_t token_; + // Type support fields + const void * type_support_impl_; + std::unique_ptr type_support_; + std::deque> message_queue_; + // Map GID of a subscription to the sequence number of the message it published. + std::unordered_map last_known_published_msg_; + size_t total_messages_lost_; + // Wait set data. + rmw_wait_set_data_t * wait_set_data_; + // std::mutex condition_mutex_; + DataCallbackManager data_callback_mgr_; + std::shared_ptr events_mgr_; + // Shutdown flag. + bool is_shutdown_; +}; +using SubscriptionDataPtr = std::shared_ptr; +using SubscriptionDataConstPtr = std::shared_ptr; +} // namespace rmw_zenoh_cpp + +#endif // DETAIL__RMW_SUBSCRIPTION_DATA_HPP_ diff --git a/rmw_zenoh_cpp/src/rmw_event.cpp b/rmw_zenoh_cpp/src/rmw_event.cpp index e8846d47..3a209ce8 100644 --- a/rmw_zenoh_cpp/src/rmw_event.cpp +++ b/rmw_zenoh_cpp/src/rmw_event.cpp @@ -23,7 +23,7 @@ #include "detail/rmw_context_impl_s.hpp" #include "detail/rmw_data_types.hpp" #include "detail/rmw_publisher_data.hpp" - +#include "detail/rmw_subscription_data.hpp" extern "C" { @@ -69,12 +69,14 @@ rmw_publisher_event_init( rmw_event->event_type = event_type; // Register the event with graph cache. + std::weak_ptr data_wp = pub_data; context_impl->graph_cache()->set_qos_event_callback( pub_data->guid(), zenoh_event_type, - [pub_data, + [data_wp, zenoh_event_type](std::unique_ptr zenoh_event) { + auto pub_data = data_wp.lock(); if (pub_data == nullptr) { return; } @@ -99,14 +101,9 @@ rmw_subscription_event_init( RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription->implementation_identifier, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscription->data, RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_subscription_data_t * sub_data = - static_cast(subscription->data); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->context, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); - RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->entity, RMW_RET_INVALID_ARGUMENT); - if (subscription->implementation_identifier != rmw_zenoh_cpp::rmw_zenoh_identifier) { RMW_SET_ERROR_MSG( "Subscription implementation identifier not from this implementation"); @@ -122,7 +119,7 @@ rmw_subscription_event_init( } rmw_event->implementation_identifier = subscription->implementation_identifier; - rmw_event->data = &sub_data->events_mgr; + rmw_event->data = sub_data->events_mgr().get(); rmw_event->event_type = event_type; // Register the event with graph cache if the event is not ZENOH_EVENT_MESSAGE_LOST @@ -131,8 +128,9 @@ rmw_subscription_event_init( return RMW_RET_OK; } - context_impl->graph_cache()->set_qos_event_callback( - sub_data->entity->guid(), + // std::weak_ptr data_wp = sub_data; + sub_data->graph_cache()->set_qos_event_callback( + sub_data->guid(), zenoh_event_type, [sub_data, zenoh_event_type](std::unique_ptr zenoh_event) @@ -140,7 +138,7 @@ rmw_subscription_event_init( if (sub_data == nullptr) { return; } - sub_data->events_mgr.add_new_event( + sub_data->events_mgr()->add_new_event( zenoh_event_type, std::move(zenoh_event)); } diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index f3c5433d..a616ff00 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -405,7 +405,7 @@ rmw_create_publisher( return nullptr; } - // Create the publisher. + // Create the rmw_publisher. auto rmw_publisher = static_cast(allocator->zero_allocate( 1, sizeof(rmw_publisher_t), allocator->state)); @@ -650,7 +650,7 @@ rmw_publisher_count_matched_subscriptions( RMW_CHECK_ARGUMENT_FOR_NULL(pub_data, RMW_RET_INVALID_ARGUMENT); return context_impl->graph_cache()->publisher_count_matched_subscriptions( - pub_data, subscription_count); + pub_data->topic_info(), subscription_count); } //============================================================================== @@ -871,6 +871,7 @@ rmw_create_subscription( const rmw_subscription_options_t * subscription_options) { RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr); + RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( node, node->implementation_identifier, @@ -883,7 +884,26 @@ rmw_create_subscription( return nullptr; } RMW_CHECK_ARGUMENT_FOR_NULL(qos_profile, nullptr); + if (!qos_profile->avoid_ros_namespace_conventions) { + int validation_result = RMW_TOPIC_VALID; + rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); + if (RMW_RET_OK != ret) { + return nullptr; + } + if (RMW_TOPIC_VALID != validation_result) { + const char * reason = rmw_full_topic_name_validation_result_string(validation_result); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("invalid topic name: %s", reason); + return nullptr; + } + } RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr); + if (subscription_options->require_unique_network_flow_endpoints == + RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED) + { + RMW_SET_ERROR_MSG( + "Strict requirement on unique network flow endpoints for subscriptions not supported"); + return nullptr; + } const rosidl_message_type_support_t * type_support = find_message_type_support(type_supports); if (type_support == nullptr) { @@ -891,7 +911,6 @@ rmw_create_subscription( return nullptr; } - RMW_CHECK_ARGUMENT_FOR_NULL(node->data, nullptr); // TODO(yadunund): Check if a duplicate entry for the same topic name + topic type // is present in node_data->subscriptions and if so return error; RMW_CHECK_FOR_NULL_WITH_MSG( @@ -914,6 +933,10 @@ rmw_create_subscription( } rcutils_allocator_t * allocator = &node->context->options.allocator; + if (!rcutils_allocator_is_valid(allocator)) { + RMW_SET_ERROR_MSG("allocator is invalid."); + return nullptr; + } // Create the rmw_subscription. rmw_subscription_t * rmw_subscription = @@ -928,65 +951,33 @@ rmw_create_subscription( allocator->deallocate(rmw_subscription, allocator->state); }); - auto sub_data = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::rmw_subscription_data_t), allocator->state)); + auto node_data = context_impl->get_node_data(node); RMW_CHECK_FOR_NULL_WITH_MSG( - sub_data, - "failed to allocate memory for subscription data", + node_data, + "NodeData not found.", return nullptr); - auto free_sub_data = rcpputils::make_scope_exit( - [sub_data, allocator]() { - allocator->deallocate(sub_data, allocator->state); - }); - RMW_TRY_PLACEMENT_NEW(sub_data, sub_data, return nullptr, rmw_zenoh_cpp::rmw_subscription_data_t); - auto destruct_sub_data = rcpputils::make_scope_exit( - [sub_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - sub_data->~rmw_subscription_data_t(), - rmw_zenoh_cpp::rmw_subscription_data_t); - }); - - // Adapt any 'best available' QoS options - sub_data->adapted_qos_profile = *qos_profile; - rmw_ret_t ret = rmw_zenoh_cpp::QoS::get().best_available_qos( - node, topic_name, &sub_data->adapted_qos_profile, rmw_get_publishers_info_by_topic); - if (RMW_RET_OK != ret) { - RMW_SET_ERROR_MSG("Failed to obtain adapted_qos_profile."); + if (!node_data->create_sub_data( + rmw_subscription, + context_impl->session(), + context_impl->graph_cache(), + context_impl->get_next_entity_id(), + topic_name, + type_support, + qos_profile)) + { + // Error already handled. return nullptr; } - sub_data->typesupport_identifier = type_support->typesupport_identifier; - sub_data->type_hash = type_support->get_type_hash_func(type_support); - sub_data->type_support_impl = type_support->data; - auto callbacks = static_cast(type_support->data); - sub_data->type_support = static_cast( - allocator->allocate(sizeof(rmw_zenoh_cpp::MessageTypeSupport), allocator->state)); - RMW_CHECK_FOR_NULL_WITH_MSG( - sub_data->type_support, - "Failed to allocate MessageTypeSupport", - return nullptr); - auto free_type_support = rcpputils::make_scope_exit( - [sub_data, allocator]() { - allocator->deallocate(sub_data->type_support, allocator->state); - }); - - RMW_TRY_PLACEMENT_NEW( - sub_data->type_support, - sub_data->type_support, - return nullptr, - rmw_zenoh_cpp::MessageTypeSupport, callbacks); - auto destruct_msg_type_support = rcpputils::make_scope_exit( - [sub_data]() { - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE( - sub_data->type_support->~MessageTypeSupport(), - rmw_zenoh_cpp::MessageTypeSupport); - }); - - sub_data->context = node->context; - + // TODO(Yadunund): We cannot store the rmw_node_t * here since this type erased + // subscription handle will be returned in the rmw_subscriptions_t in rmw_wait + // from which we cannot obtain SubscriptionData. + rmw_subscription->data = static_cast(node_data->get_sub_data(rmw_subscription).get()); rmw_subscription->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - + rmw_subscription->options = *subscription_options; + rmw_subscription->can_loan_messages = false; + rmw_subscription->is_cft_enabled = false; rmw_subscription->topic_name = rcutils_strdup(topic_name, *allocator); RMW_CHECK_FOR_NULL_WITH_MSG( rmw_subscription->topic_name, @@ -997,195 +988,7 @@ rmw_create_subscription( allocator->deallocate(const_cast(rmw_subscription->topic_name), allocator->state); }); - rmw_subscription->options = *subscription_options; - rmw_subscription->can_loan_messages = false; - rmw_subscription->is_cft_enabled = false; - - // Convert the type hash to a string so that it can be included in - // the keyexpr. - char * type_hash_c_str = nullptr; - rcutils_ret_t stringify_ret = rosidl_stringify_type_hash( - sub_data->type_hash, - *allocator, - &type_hash_c_str); - if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { - RMW_SET_ERROR_MSG("Failed to allocate type_hash_c_str."); - return nullptr; - } - auto free_type_hash_c_str = rcpputils::make_scope_exit( - [&allocator, &type_hash_c_str]() { - allocator->deallocate(type_hash_c_str, allocator->state); - }); - - z_session_t session = context_impl->session(); - auto node_data = context_impl->get_node_data(node); - RMW_CHECK_FOR_NULL_WITH_MSG( - node_data, - "NodeData not found.", - return nullptr); - // Everything above succeeded and is setup properly. Now declare a subscriber - // with Zenoh; after this, callbacks may come in at any time. - sub_data->entity = rmw_zenoh_cpp::liveliness::Entity::make( - z_info_zid(session), - std::to_string(node_data->id()), - std::to_string( - context_impl->get_next_entity_id()), - rmw_zenoh_cpp::liveliness::EntityType::Subscription, - rmw_zenoh_cpp::liveliness::NodeInfo{ - node->context->actual_domain_id, node->namespace_, node->name, context_impl->enclave()}, - rmw_zenoh_cpp::liveliness::TopicInfo{ - node->context->actual_domain_id, - rmw_subscription->topic_name, - sub_data->type_support->get_name(), - type_hash_c_str, - sub_data->adapted_qos_profile} - ); - if (sub_data->entity == nullptr) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to generate keyexpr for liveliness token for the subscription %s.", - rmw_subscription->topic_name); - return nullptr; - } - z_owned_closure_sample_t callback = z_closure(rmw_zenoh_cpp::sub_data_handler, nullptr, sub_data); - z_owned_keyexpr_t keyexpr = z_keyexpr_new(sub_data->entity->topic_info()->topic_keyexpr_.c_str()); - auto always_free_ros_keyexpr = rcpputils::make_scope_exit( - [&keyexpr]() { - z_keyexpr_drop(z_move(keyexpr)); - }); - if (!z_keyexpr_check(&keyexpr)) { - RMW_SET_ERROR_MSG("unable to create zenoh keyexpr."); - return nullptr; - } - // Instantiate the subscription with suitable options depending on the - // adapted_qos_profile. - // TODO(Yadunund): Rely on a separate function to return the sub - // as we start supporting more qos settings. - z_owned_str_t owned_key_str = z_keyexpr_to_string(z_loan(keyexpr)); - auto always_drop_keystr = rcpputils::make_scope_exit( - [&owned_key_str]() { - z_drop(z_move(owned_key_str)); - }); - - if (sub_data->adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - ze_querying_subscriber_options_t sub_options = ze_querying_subscriber_options_default(); - // Make the initial query to hit all the PublicationCaches, using a query_selector with - // '*' in place of the queryable_prefix of each PublicationCache - const std::string selector = "*/" + - sub_data->entity->topic_info()->topic_keyexpr_; - sub_options.query_selector = z_keyexpr(selector.c_str()); - // Tell the PublicationCache's Queryable that the query accepts any key expression as a reply. - // By default a query accepts only replies that matches its query selector. - // This allows us to selectively query certain PublicationCaches when defining the - // set_querying_subscriber_callback below. - sub_options.query_accept_replies = ZCU_REPLY_KEYEXPR_ANY; - // As this initial query is now using a "*", the query target is not COMPLETE. - sub_options.query_target = Z_QUERY_TARGET_ALL; - // We set consolidation to none as we need to receive transient local messages - // from a number of publishers. Eg: To receive TF data published over /tf_static - // by various publishers. - sub_options.query_consolidation = z_query_consolidation_none(); - if (sub_data->adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - sub_options.reliability = Z_RELIABILITY_RELIABLE; - } - sub_data->sub = ze_declare_querying_subscriber( - session, - z_loan(keyexpr), - z_move(callback), - &sub_options - ); - if (!z_check(std::get(sub_data->sub))) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return nullptr; - } - // Register the querying subscriber with the graph cache to get latest - // messages from publishers that were discovered after their first publication. - context_impl->graph_cache()->set_querying_subscriber_callback( - sub_data, - [sub_data](const std::string & queryable_prefix) -> void - { - if (sub_data == nullptr) { - return; - } - const std::string selector = queryable_prefix + - "/" + - sub_data->entity->topic_info()->topic_keyexpr_; - RMW_ZENOH_LOG_DEBUG_NAMED( - "rmw_zenoh_cpp", - "QueryingSubscriberCallback triggered over %s.", - selector.c_str() - ); - z_get_options_t opts = z_get_options_default(); - opts.timeout_ms = std::numeric_limits::max(); - opts.consolidation = z_query_consolidation_latest(); - opts.accept_replies = ZCU_REPLY_KEYEXPR_ANY; - ze_querying_subscriber_get( - z_loan(std::get(sub_data->sub)), - z_keyexpr(selector.c_str()), - &opts - ); - } - ); - } else { - // Create a regular subscriber for all other durability settings. - z_subscriber_options_t sub_options = z_subscriber_options_default(); - if (qos_profile->reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - sub_options.reliability = Z_RELIABILITY_RELIABLE; - } - sub_data->sub = z_declare_subscriber( - session, - z_loan(keyexpr), - z_move(callback), - &sub_options - ); - if (!z_check(std::get(sub_data->sub))) { - RMW_SET_ERROR_MSG("unable to create zenoh subscription"); - return nullptr; - } - } - - auto undeclare_z_sub = rcpputils::make_scope_exit( - [sub_data]() { - z_owned_subscriber_t * sub = std::get_if(&sub_data->sub); - if (sub == nullptr || z_undeclare_subscriber(sub)) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - } else { - ze_owned_querying_subscriber_t * querying_sub = - std::get_if(&sub_data->sub); - if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - } - } - }); - - // Publish to the graph that a new subscription is in town. - sub_data->token = zc_liveliness_declare_token( - session, - z_keyexpr(sub_data->entity->liveliness_keyexpr().c_str()), - NULL - ); - auto free_token = rcpputils::make_scope_exit( - [sub_data]() { - if (sub_data != nullptr) { - z_drop(z_move(sub_data->token)); - } - }); - if (!z_check(sub_data->token)) { - RMW_ZENOH_LOG_ERROR_NAMED( - "rmw_zenoh_cpp", - "Unable to create liveliness token for the subscription."); - return nullptr; - } - - rmw_subscription->data = sub_data; - - free_token.cancel(); - undeclare_z_sub.cancel(); free_topic_name.cancel(); - destruct_msg_type_support.cancel(); - free_type_support.cancel(); - destruct_sub_data.cancel(); - free_sub_data.cancel(); free_rmw_subscription.cancel(); return rmw_subscription; @@ -1212,42 +1015,29 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - + auto node_data = context_impl->get_node_data(node); + if (node_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; + } + auto sub_data = node_data->get_sub_data(subscription); + if (sub_data == nullptr) { + return RMW_RET_INVALID_ARGUMENT; + } rmw_ret_t ret = RMW_RET_OK; rcutils_allocator_t * allocator = &node->context->options.allocator; - auto sub_data = static_cast(subscription->data); - if (sub_data != nullptr) { - // Publish to the graph that a subscription has ridden off into the sunset - zc_liveliness_undeclare_token(z_move(sub_data->token)); - - RMW_TRY_DESTRUCTOR(sub_data->type_support->~MessageTypeSupport(), MessageTypeSupport, ); - allocator->deallocate(sub_data->type_support, allocator->state); - - z_owned_subscriber_t * sub = std::get_if(&sub_data->sub); - if (sub != nullptr) { - if (z_undeclare_subscriber(sub)) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - ret = RMW_RET_ERROR; - } - } else { - ze_owned_querying_subscriber_t * querying_sub = - std::get_if(&sub_data->sub); - if (querying_sub == nullptr || ze_undeclare_querying_subscriber(querying_sub)) { - RMW_SET_ERROR_MSG("failed to undeclare sub"); - ret = RMW_RET_ERROR; - } - // Also remove the registered callback from the GraphCache. - context_impl->graph_cache()->remove_querying_subscriber_callback(sub_data); - } - - // Remove any event callbacks registered to this subscription. - context_impl->graph_cache()->remove_qos_event_callbacks(sub_data->entity->guid()); + // Remove the registered callback from the GraphCache if any. + const std::size_t guid = sub_data->guid(); + context_impl->graph_cache()->remove_querying_subscriber_callback( + sub_data->topic_info().topic_keyexpr_, + guid + ); + // Remove any event callbacks registered to this subscription. + context_impl->graph_cache()->remove_qos_event_callbacks(guid); + // Finally remove the SubscriptionData from NodeData. + node_data->delete_sub_data(subscription); - RMW_TRY_DESTRUCTOR(sub_data->~rmw_subscription_data_t(), rmw_subscription_data_t, ); - allocator->deallocate(sub_data, allocator->state); - } allocator->deallocate(const_cast(subscription->topic_name), allocator->state); allocator->deallocate(subscription, allocator->state); @@ -1269,16 +1059,12 @@ rmw_subscription_count_matched_publishers( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(publisher_count, RMW_RET_INVALID_ARGUMENT); - - rmw_zenoh_cpp::rmw_subscription_data_t * sub_data = - static_cast(subscription->data); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_ARGUMENT_FOR_NULL(sub_data->context, RMW_RET_INVALID_ARGUMENT); - rmw_context_impl_t * context_impl = static_cast(sub_data->context->impl); - RMW_CHECK_ARGUMENT_FOR_NULL(context_impl, RMW_RET_INVALID_ARGUMENT); - return context_impl->graph_cache()->subscription_count_matched_publishers( - subscription, publisher_count); + return sub_data->graph_cache()->subscription_count_matched_publishers( + sub_data->topic_info(), publisher_count); } //============================================================================== @@ -1295,11 +1081,11 @@ rmw_subscription_get_actual_qos( rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); - - auto sub_data = static_cast(subscription->data); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - *qos = sub_data->adapted_qos_profile; + *qos = sub_data->topic_info().qos_; return RMW_RET_OK; } @@ -1333,56 +1119,6 @@ rmw_subscription_get_content_filter( return RMW_RET_UNSUPPORTED; } -namespace -{ -rmw_ret_t -__rmw_take_one( - rmw_zenoh_cpp::rmw_subscription_data_t * sub_data, - void * ros_message, - rmw_message_info_t * message_info, - bool * taken) -{ - *taken = false; - - std::unique_ptr msg_data = sub_data->pop_next_message(); - if (msg_data == nullptr) { - // There are no more messages to take. - return RMW_RET_OK; - } - - // Object that manages the raw buffer - eprosima::fastcdr::FastBuffer fastbuffer( - reinterpret_cast(const_cast(msg_data->payload.payload.start)), - msg_data->payload.payload.len); - - // Object that serializes the data - rmw_zenoh_cpp::Cdr deser(fastbuffer); - if (!sub_data->type_support->deserialize_ros_message( - deser.get_cdr(), - ros_message, - sub_data->type_support_impl)) - { - RMW_SET_ERROR_MSG("could not deserialize ROS message"); - return RMW_RET_ERROR; - } - - if (message_info != nullptr) { - message_info->source_timestamp = msg_data->source_timestamp; - message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->sequence_number; - // TODO(clalancette): fill in reception_sequence_number - message_info->reception_sequence_number = 0; - message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); - message_info->from_intra_process = false; - } - - *taken = true; - - return RMW_RET_OK; -} -} // namespace - //============================================================================== /// Take an incoming ROS message. rmw_ret_t @@ -1403,13 +1139,11 @@ rmw_take( subscription handle, subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - *taken = false; - - auto sub_data = static_cast(subscription->data); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - return __rmw_take_one(sub_data, ros_message, nullptr, taken); + return sub_data->take_one_message(ros_message, nullptr, taken); } //============================================================================== @@ -1434,13 +1168,11 @@ rmw_take_with_info( subscription handle, subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - *taken = false; - - auto sub_data = static_cast(subscription->data); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - return __rmw_take_one(sub_data, ros_message, message_info, taken); + return sub_data->take_one_message(ros_message, message_info, taken); } //============================================================================== @@ -1466,6 +1198,9 @@ rmw_take_sequence( subscription handle, subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); + RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); if (0u == count) { RMW_SET_ERROR_MSG("count cannot be 0"); @@ -1491,20 +1226,11 @@ rmw_take_sequence( *taken = 0; - auto sub_data = static_cast(subscription->data); - RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - - if (sub_data->context->impl->is_shutdown()) { - return RMW_RET_OK; - } - rmw_ret_t ret; - while (*taken < count) { bool one_taken = false; - - ret = __rmw_take_one( - sub_data, message_sequence->data[*taken], + ret = sub_data->take_one_message( + message_sequence->data[*taken], &message_info_sequence->data[*taken], &one_taken); if (ret != RMW_RET_OK) { // If we are taking a sequence and the 2nd take in the sequence failed, we'll report @@ -1548,50 +1274,15 @@ __rmw_take_serialized( subscription handle, subscription->implementation_identifier, rmw_zenoh_cpp::rmw_zenoh_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - - *taken = false; - - auto sub_data = static_cast(subscription->data); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - if (sub_data->context->impl->is_shutdown()) { - return RMW_RET_OK; - } - - // RETRIEVE SERIALIZED MESSAGE =============================================== - - std::unique_ptr msg_data = sub_data->pop_next_message(); - if (msg_data == nullptr) { - // This tells rcl that the check for a new message was done, but no messages have come in yet. - return RMW_RET_OK; - } - - if (serialized_message->buffer_capacity < msg_data->payload.payload.len) { - rmw_ret_t ret = - rmw_serialized_message_resize(serialized_message, msg_data->payload.payload.len); - if (ret != RMW_RET_OK) { - return ret; // Error message already set - } - } - serialized_message->buffer_length = msg_data->payload.payload.len; - memcpy( - serialized_message->buffer, msg_data->payload.payload.start, - msg_data->payload.payload.len); - - *taken = true; - - if (message_info != nullptr) { - message_info->source_timestamp = msg_data->source_timestamp; - message_info->received_timestamp = msg_data->recv_timestamp; - message_info->publication_sequence_number = msg_data->sequence_number; - // TODO(clalancette): fill in reception_sequence_number - message_info->reception_sequence_number = 0; - message_info->publisher_gid.implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier; - memcpy(message_info->publisher_gid.data, msg_data->publisher_gid, RMW_GID_STORAGE_SIZE); - message_info->from_intra_process = false; - } - - return RMW_RET_OK; + return sub_data->take_serialized_message( + serialized_message, + taken, + message_info + ); } } // namespace @@ -3063,8 +2754,8 @@ check_and_attach_condition( if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { - auto sub_data = - static_cast(subscriptions->subscribers[i]); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscriptions->subscribers[i]); if (sub_data == nullptr) { continue; } @@ -3217,12 +2908,11 @@ rmw_wait( if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { - auto sub_data = - static_cast(subscriptions->subscribers[i]); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscriptions->subscribers[i]); if (sub_data == nullptr) { continue; } - if (sub_data->detach_condition_and_queue_is_empty()) { // Setting to nullptr lets rcl know that this subscription is not ready subscriptions->subscribers[i] = nullptr; @@ -3575,11 +3265,11 @@ rmw_subscription_set_on_new_message_callback( const void * user_data) { RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); - rmw_zenoh_cpp::rmw_subscription_data_t * sub_data = - static_cast(subscription->data); + rmw_zenoh_cpp::SubscriptionData * sub_data = + static_cast(subscription->data); RMW_CHECK_ARGUMENT_FOR_NULL(sub_data, RMW_RET_INVALID_ARGUMENT); - sub_data->data_callback_mgr.set_callback( - user_data, callback); + + sub_data->set_on_new_message_callback(callback, user_data); return RMW_RET_OK; }