From 5b6f8c09a4de88f352dd0347e7b5b090552cf030 Mon Sep 17 00:00:00 2001 From: Katie Hamilton Date: Thu, 30 Nov 2023 12:47:25 -0800 Subject: [PATCH] Put advertisement info and content in same dds message --- .../dds_generic_comms/RAPID_QOS_PROFILES.xml | 2 +- .../comms_bridge/generic_rapid_msg_ros_pub.h | 2 + .../include/comms_bridge/generic_rapid_sub.h | 4 +- .../comms_bridge/generic_ros_sub_rapid_pub.h | 10 +++ .../comms_bridge/src/comms_bridge_nodelet.cpp | 12 +++- .../src/generic_rapid_msg_ros_pub.cpp | 47 ++++++++++++ .../src/generic_ros_sub_rapid_pub.cpp | 72 ++++++++++++++++++- .../dds_msgs/idl/AstrobeeConstants.idl | 1 + .../idl/GenericCommsAdvertisementInfo.idl | 4 +- .../dds_msgs/idl/GenericCommsData.idl | 49 +++++++++++++ 10 files changed, 196 insertions(+), 7 deletions(-) create mode 100644 communications/dds_msgs/idl/GenericCommsData.idl diff --git a/astrobee/config/communications/dds_generic_comms/RAPID_QOS_PROFILES.xml b/astrobee/config/communications/dds_generic_comms/RAPID_QOS_PROFILES.xml index 78ddb509a0..bc99e1e458 100644 --- a/astrobee/config/communications/dds_generic_comms/RAPID_QOS_PROFILES.xml +++ b/astrobee/config/communications/dds_generic_comms/RAPID_QOS_PROFILES.xml @@ -660,7 +660,7 @@ - + diff --git a/communications/comms_bridge/include/comms_bridge/generic_rapid_msg_ros_pub.h b/communications/comms_bridge/include/comms_bridge/generic_rapid_msg_ros_pub.h index 16e46f9c05..2c45bad3c6 100644 --- a/communications/comms_bridge/include/comms_bridge/generic_rapid_msg_ros_pub.h +++ b/communications/comms_bridge/include/comms_bridge/generic_rapid_msg_ros_pub.h @@ -27,6 +27,7 @@ #include "dds_msgs/GenericCommsAdvertisementInfoSupport.h" #include "dds_msgs/GenericCommsContentSupport.h" +#include "dds_msgs/GenericCommsDataSupport.h" // default time to delay between advertisement and publishing on that topic [sec] #define DEFAULT_ADVERTISE_TO_PUB_DELAY 3.0 @@ -40,6 +41,7 @@ class GenericRapidMsgRosPub : public BridgePublisher { void ConvertData(rapid::ext::astrobee::GenericCommsAdvertisementInfo const* data); void ConvertData(rapid::ext::astrobee::GenericCommsContent const* data); + void ConvertData(rapid::ext::astrobee::GenericCommsData const* data); }; } // end namespace ff diff --git a/communications/comms_bridge/include/comms_bridge/generic_rapid_sub.h b/communications/comms_bridge/include/comms_bridge/generic_rapid_sub.h index 1cfd956015..9be35f44dc 100644 --- a/communications/comms_bridge/include/comms_bridge/generic_rapid_sub.h +++ b/communications/comms_bridge/include/comms_bridge/generic_rapid_sub.h @@ -34,6 +34,7 @@ #include "dds_msgs/GenericCommsAdvertisementInfoSupport.h" #include "dds_msgs/GenericCommsContentSupport.h" +#include "dds_msgs/GenericCommsDataSupport.h" namespace ff { @@ -102,7 +103,8 @@ typedef std::shared_ptr> ContentRapidSubPtr; - +typedef std::shared_ptr> + DataRapidSubPtr; } // end namespace ff #endif // COMMS_BRIDGE_GENERIC_RAPID_SUB_H_ diff --git a/communications/comms_bridge/include/comms_bridge/generic_ros_sub_rapid_pub.h b/communications/comms_bridge/include/comms_bridge/generic_ros_sub_rapid_pub.h index 42d8ee0205..eafb1a5a4d 100644 --- a/communications/comms_bridge/include/comms_bridge/generic_ros_sub_rapid_pub.h +++ b/communications/comms_bridge/include/comms_bridge/generic_ros_sub_rapid_pub.h @@ -29,6 +29,7 @@ #include #include +#include #include #include "knDds/DdsTypedSupplier.h" @@ -38,6 +39,7 @@ #include "dds_msgs/AstrobeeConstantsSupport.h" #include "dds_msgs/GenericCommsAdvertisementInfoSupport.h" #include "dds_msgs/GenericCommsContentSupport.h" +#include "dds_msgs/GenericCommsDataSupport.h" namespace ff { @@ -78,6 +80,14 @@ class GenericROSSubRapidPub : public BridgeSubscriber { ContentSupplierPtr content_supplier_; + using DataSupplier = + kn::DdsTypedSupplier; + using DataSupplierPtr = std::unique_ptr; + + DataSupplierPtr data_supplier_; + + std::map advertisement_info_; + unsigned int advertisement_info_seq_; }; diff --git a/communications/comms_bridge/src/comms_bridge_nodelet.cpp b/communications/comms_bridge/src/comms_bridge_nodelet.cpp index f548648434..9853a8bd72 100644 --- a/communications/comms_bridge/src/comms_bridge_nodelet.cpp +++ b/communications/comms_bridge/src/comms_bridge_nodelet.cpp @@ -195,6 +195,7 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet { std::string connection; ff::AdvertisementInfoRapidSubPtr advertisement_info_sub; ff::ContentRapidSubPtr content_sub; + ff::DataRapidSubPtr data_sub; ros_sub_.InitializeDDS(); for (size_t i = 0; i < rapid_connections_.size(); i++) { // Lower case the external agent name to use it like a namespace @@ -211,6 +212,13 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet { connection, ros_pub_.get()); content_rapid_subs_.push_back(content_sub); + + data_sub = std::make_shared>( + "AstrobeeGenericCommsDataProfile", + rapid::ext::astrobee::GENERIC_COMMS_DATA_TOPIC, + connection, + ros_pub_.get()); + data_rapid_subs_.push_back(data_sub); } std::string ns = std::string("/") + agent_name_ + "/"; @@ -218,7 +226,8 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet { for (size_t i = 0; i < topics_sub_.size(); i++) { ROS_INFO_STREAM("Initialize DDS topic sub: " << topics_sub_[i]); - ros_sub_.addTopic(topics_sub_[i], (ns + topics_sub_[i])); + // ros_sub_.addTopic(topics_sub_[i], (ns + topics_sub_[i])); + ros_sub_.addTopic(topics_sub_[i], topics_sub_[i]); } } @@ -319,6 +328,7 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet { ff::GenericROSSubRapidPub ros_sub_; std::vector content_rapid_subs_; std::vector advertisement_info_rapid_subs_; + std::vector data_rapid_subs_; std::shared_ptr dds_entities_factory_; std::shared_ptr ros_pub_; std::string agent_name_, participant_name_; diff --git a/communications/comms_bridge/src/generic_rapid_msg_ros_pub.cpp b/communications/comms_bridge/src/generic_rapid_msg_ros_pub.cpp index 3fbcab04f3..8400fdbb93 100644 --- a/communications/comms_bridge/src/generic_rapid_msg_ros_pub.cpp +++ b/communications/comms_bridge/src/generic_rapid_msg_ros_pub.cpp @@ -87,4 +87,51 @@ void GenericRapidMsgRosPub::ConvertData( } } +// Handle data message +void GenericRapidMsgRosPub::ConvertData( + rapid::ext::astrobee::GenericCommsData const* data) { + const std::lock_guard lock(m_mutex_); + + const std::string output_topic = data->outputTopic; + + ROS_ERROR("Comms Bridge Nodelet: Received data message for topic %s\n", + output_topic.c_str()); + + std::map::iterator iter = m_relay_topics_.find(output_topic); + if (iter == m_relay_topics_.end()) { + ROS_ERROR("Comms Bridge Nodelet: Adding advertisement info for topic %s.\n", + output_topic.c_str()); + + AdvertisementInfo ad_info; + ad_info.latching = data->latching; + ad_info.data_type = data->dataType; + ad_info.md5_sum = data->md5Sum; + ad_info.definition = data->msgDefinition; + ROS_INFO_STREAM("ad_info latching: " << data->latching); + ROS_INFO_STREAM("ad_info dataType: " << data->dataType); + ROS_INFO_STREAM("ad_info md5Sum: " << data->md5Sum); + ROS_INFO_STREAM("ad_info msgDefinition: " << data->msgDefinition); + + if (!advertiseTopic(output_topic, ad_info)) { + ROS_ERROR("Comms Bridge Nodelet: Error advertising topic: %s\n", + output_topic.c_str()); + } + } + + RelayTopicInfo &topic_info = iter->second; + + ContentInfo content_info; + content_info.type_md5_sum = data->md5Sum; + + unsigned char* buf = data->data.get_contiguous_buffer(); + for (size_t i = 0; i < data->data.length(); i++) { + content_info.data.push_back(buf[i]); + } + + if (!relayMessage(topic_info, content_info)) { + ROS_ERROR("Comms Bridge Nodelet: Error relaying message for topic %s\n", + output_topic.c_str()); + } +} + } // end namespace ff diff --git a/communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp b/communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp index 42549fc640..480dfc6f46 100644 --- a/communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp +++ b/communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp @@ -37,8 +37,13 @@ void GenericROSSubRapidPub::InitializeDDS() { rapid::ext::astrobee::GENERIC_COMMS_CONTENT_TOPIC, "", "AstrobeeGenericCommsContentProfile", "")); + data_supplier_.reset(new GenericROSSubRapidPub::DataSupplier( + rapid::ext::astrobee::GENERIC_COMMS_DATA_TOPIC, + "", "AstrobeeGenericCommsDataProfile", "")); + rapid::RapidHelper::initHeader(advertisement_info_supplier_->event().hdr); rapid::RapidHelper::initHeader(content_supplier_->event().hdr); + rapid::RapidHelper::initHeader(data_supplier_->event().hdr); dds_initialized_ = true; } @@ -51,6 +56,14 @@ void GenericROSSubRapidPub::subscribeTopic(std::string const& in_topic, const Re // Called with the mutex held void GenericROSSubRapidPub::advertiseTopic(const RelayTopicInfo& relay_info) { +/* std::string out_topic = relay_info.out_topic; + if (advertisement_info_.count(out_topic) == 0) { + advertisement_info_.emplace(out_topic, relay_info.ad_info); + } else { + ROS_ERROR("Comms Bridge: Already added this advertisement info to the map"); + } + + ROS_ERROR("Comms Bridge: Saved advertisement info for topic %s\n"); const AdvertisementInfo &info = relay_info.ad_info; rapid::ext::astrobee::GenericCommsAdvertisementInfo &msg = advertisement_info_supplier_->event(); @@ -86,11 +99,12 @@ void GenericROSSubRapidPub::advertiseTopic(const RelayTopicInfo& relay_info) { msg.msgDefinition[size] = '\0'; // Send message - advertisement_info_supplier_->sendEvent(); + advertisement_info_supplier_->sendEvent();*/ } // Called with the mutex held -void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info, ContentInfo const& content_info) { +// Old relay function +/*void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info, ContentInfo const& content_info) { rapid::ext::astrobee::GenericCommsContent &msg = content_supplier_->event(); unsigned int size; @@ -125,8 +139,62 @@ void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info, Conte // Send message content_supplier_->sendEvent(); +}*/ + +// Called with the mutex held +void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info, + ContentInfo const& content_info) { + rapid::ext::astrobee::GenericCommsData &msg = data_supplier_->event(); + AdvertisementInfo info = topic_info.ad_info; + std::string out_topic = topic_info.out_topic; + unsigned int size; + + ROS_ERROR("Comms bridge: Sending content message for topic %s\n", out_topic.c_str()); + + msg.hdr.timeStamp = comms_util::RosTime2RapidTime(ros::Time::now()); + msg.hdr.serial = topic_info.relay_seqnum; + + // Currently the output topic can only be 128 characters long + SizeCheck(size, out_topic.size(), 128, "Out topic", out_topic); + std::strncpy(msg.outputTopic, out_topic.data(), size); + msg.outputTopic[size] = '\0'; + + msg.latching = info.latching; + + // Currently the data type can only be 128 characters long + SizeCheck(size, info.data_type.size(), 128, "Data type", out_topic); + std::strncpy(msg.dataType, info.data_type.data(), size); + msg.dataType[size] = '\0'; + + // TODO(Katie): Check md5 sum matches co + // Currently the md5 sum can only be 64 characters long + SizeCheck(size, info.md5_sum.size(), 64, "MD5 sum", out_topic); + std::strncpy(msg.md5Sum, info.md5_sum.data(), size); + msg.md5Sum[size] = '\0'; + + // Current the ROS message definition can only be 16384 characters long + SizeCheck(size, info.definition.size(), 16384, "Msg definition", out_topic); + std::strncpy(msg.msgDefinition, info.definition.data(), size); + msg.msgDefinition[size] = '\0'; + + // Currently the content can only be 128K bytes long + SizeCheck(size, content_info.data_size, 131072, "Data", out_topic); + msg.data.ensure_length(size, (size + 1)); + unsigned char *buf = msg.data.get_contiguous_buffer(); + if (buf == NULL) { + ROS_ERROR("DDS: RTI didn't give a contiguous buffer for the content data!"); + return; + } + + std::memset(buf, 0, (size + 1)); + std::memmove(buf, content_info.data, size); + + + // Send message + data_supplier_->sendEvent(); } + void GenericROSSubRapidPub::SizeCheck(unsigned int &size, const int size_in, const int max_size, diff --git a/communications/dds_msgs/idl/AstrobeeConstants.idl b/communications/dds_msgs/idl/AstrobeeConstants.idl index 9d022e663e..fcf475281c 100644 --- a/communications/dds_msgs/idl/AstrobeeConstants.idl +++ b/communications/dds_msgs/idl/AstrobeeConstants.idl @@ -36,6 +36,7 @@ module rapid { const String64 FAULT_STATE_TOPIC = "astrobee_fault_state"; const String64 GENERIC_COMMS_ADVERTISEMENT_INFO_TOPIC = "astrobee_generic_comms_advertisement_info"; const String64 GENERIC_COMMS_CONTENT_TOPIC = "astrobee_generic_comms_content"; + const String64 GENERIC_COMMS_DATA_TOPIC = "astrobee_generic_comms_data"; const String64 GNC_CONTROL_STATE_TOPIC = "astrobee_gnc_control_state"; const String64 GNC_FAM_CMD_STATE_TOPIC = "astrobee_gnc_fam_cmd_state"; const String64 GUEST_SCIENCE_CONFIG_TOPIC = "astrobee_guest_science_config"; diff --git a/communications/dds_msgs/idl/GenericCommsAdvertisementInfo.idl b/communications/dds_msgs/idl/GenericCommsAdvertisementInfo.idl index 2950f0a493..f9114cf142 100644 --- a/communications/dds_msgs/idl/GenericCommsAdvertisementInfo.idl +++ b/communications/dds_msgs/idl/GenericCommsAdvertisementInfo.idl @@ -7,7 +7,7 @@ module rapid { module ext { module astrobee { - typedef string<16384> String16K; + typedef string<32768> String32K; //@copy-c-declaration class GenericCommsAdvertisementInfoTypeSupport; //@copy-c-declaration class GenericCommsAdvertisementInfoDataWriter; @@ -39,7 +39,7 @@ module rapid { public String64 md5Sum; //@copy-declaration /** ROS message definition */ - public String16K msgDefinition; + public String32K msgDefinition; }; }; }; diff --git a/communications/dds_msgs/idl/GenericCommsData.idl b/communications/dds_msgs/idl/GenericCommsData.idl new file mode 100644 index 0000000000..ee39f99f40 --- /dev/null +++ b/communications/dds_msgs/idl/GenericCommsData.idl @@ -0,0 +1,49 @@ +/* + * Copyright 2023 (c) 2023 Intelligent Robotics Group, NASA ARC + */ + +#include "Message.idl" + +module rapid { + module ext { + module astrobee { + typedef string<16384> String16K; + + //@copy-c-declaration class GenericCommsDataTypeSupport; + //@copy-c-declaration class GenericCommsDataDataWriter; + //@copy-c-declaration class GenericCommsDataDataReader; + //@copy-c-declaration struct GenericCommsDataSeq; + + //@copy-declaration /** + //@copy-declaration * Data of a generic comms ROS message + //@copy-declaration */ + valuetype GenericCommsData : Message { + //@copy-c-declaration #if RTI_DDS_VERSION_MAJOR < 4 || (RTI_DDS_VERSION_MAJOR == 4 && RTI_DDS_VERSION_MINOR < 5) || (RTI_DDS_VERSION_MAJOR == 4 && RTI_DDS_VERSION_MINOR == 5 && RTI_DDS_VERSION_RELEASE != 'f' ) + //@copy-c-declaration typedef GenericCommsDataTypeSupport TypeSupport; + //@copy-c-declaration typedef GenericCommsDataDataWriter DataWriter; + //@copy-c-declaration typedef GenericCommsDataDataReader DataReader; + //@copy-c-declaration typedef GenericCommsDataSeq Seq; + //@copy-c-declaration #endif + //@copy-c-declaration typedef GenericCommsData Type; + + //@copy-declaration /** Topic on which to republish */ + public String128 outputTopic; + + //@copy-declaration /** Whether republisher should advertise topic as latching */ + public boolean latching; + + //@copy-declaration /** ROS message data type name */ + public String128 dataType; + + //@copy-declaration /** ROS message md5sum of type */ + public String64 md5Sum; + + //@copy-declaration /** ROS message definition */ + public String16K msgDefinition; + + //@copy-declaration /** Serialized content of the message */ + public OctetSequence128K data; + }; + }; + }; +};