Skip to content

Commit

Permalink
Put advertisement info and content in same dds message
Browse files Browse the repository at this point in the history
  • Loading branch information
kbrowne15 committed Nov 30, 2023
1 parent 3897858 commit 5b6f8c0
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@
<!-- ASTROBEE Extension: Generic Comms ============================ -->
<qos_profile name="AstrobeeGenericCommsAdvertisementInfoProfile" base_name="RapidConfigQos" />
<qos_profile name="AstrobeeGenericCommsContentProfile" base_name="RapidStateQos" />
<qos_profile name="AstrobeeGenericCommsResetProfile" base_name="RapidConfigQos" />
<qos_profile name="AstrobeeGenericCommsDataProfile" base_name="RapidStateQos" />

<!-- ASTROBEE Extension: GNC Fam Cmd ============================== -->
<qos_profile name="AstrobeeGncFamCmdStateProfile" base_name="RapidStateQos" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "dds_msgs/GenericCommsAdvertisementInfoSupport.h"
#include "dds_msgs/GenericCommsContentSupport.h"
#include "dds_msgs/GenericCommsDataSupport.h"

// default time to delay between advertisement and publishing on that topic [sec]
#define DEFAULT_ADVERTISE_TO_PUB_DELAY 3.0
Expand All @@ -40,6 +41,7 @@ class GenericRapidMsgRosPub : public BridgePublisher {

void ConvertData(rapid::ext::astrobee::GenericCommsAdvertisementInfo const* data);
void ConvertData(rapid::ext::astrobee::GenericCommsContent const* data);
void ConvertData(rapid::ext::astrobee::GenericCommsData const* data);
};

} // end namespace ff
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include "dds_msgs/GenericCommsAdvertisementInfoSupport.h"
#include "dds_msgs/GenericCommsContentSupport.h"
#include "dds_msgs/GenericCommsDataSupport.h"

namespace ff {

Expand Down Expand Up @@ -102,7 +103,8 @@ typedef std::shared_ptr<GenericRapidSub<rapid::ext::astrobee::GenericCommsAdvert
AdvertisementInfoRapidSubPtr;
typedef std::shared_ptr<GenericRapidSub<rapid::ext::astrobee::GenericCommsContent>>
ContentRapidSubPtr;

typedef std::shared_ptr<GenericRapidSub<rapid::ext::astrobee::GenericCommsData>>
DataRapidSubPtr;
} // end namespace ff

#endif // COMMS_BRIDGE_GENERIC_RAPID_SUB_H_
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <ff_msgs/GenericCommsContent.h>
#include <ff_msgs/GenericCommsReset.h>

#include <map>
#include <string>

#include "knDds/DdsTypedSupplier.h"
Expand All @@ -38,6 +39,7 @@
#include "dds_msgs/AstrobeeConstantsSupport.h"
#include "dds_msgs/GenericCommsAdvertisementInfoSupport.h"
#include "dds_msgs/GenericCommsContentSupport.h"
#include "dds_msgs/GenericCommsDataSupport.h"

namespace ff {

Expand Down Expand Up @@ -78,6 +80,14 @@ class GenericROSSubRapidPub : public BridgeSubscriber {

ContentSupplierPtr content_supplier_;

using DataSupplier =
kn::DdsTypedSupplier<rapid::ext::astrobee::GenericCommsData>;
using DataSupplierPtr = std::unique_ptr<DataSupplier>;

DataSupplierPtr data_supplier_;

std::map<std::string, AdvertisementInfo> advertisement_info_;

unsigned int advertisement_info_seq_;
};

Expand Down
12 changes: 11 additions & 1 deletion communications/comms_bridge/src/comms_bridge_nodelet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
std::string connection;
ff::AdvertisementInfoRapidSubPtr advertisement_info_sub;
ff::ContentRapidSubPtr content_sub;
ff::DataRapidSubPtr data_sub;
ros_sub_.InitializeDDS();
for (size_t i = 0; i < rapid_connections_.size(); i++) {
// Lower case the external agent name to use it like a namespace
Expand All @@ -211,14 +212,22 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
connection,
ros_pub_.get());
content_rapid_subs_.push_back(content_sub);

data_sub = std::make_shared<ff::GenericRapidSub<rapid::ext::astrobee::GenericCommsData>>(
"AstrobeeGenericCommsDataProfile",
rapid::ext::astrobee::GENERIC_COMMS_DATA_TOPIC,
connection,
ros_pub_.get());
data_rapid_subs_.push_back(data_sub);
}

std::string ns = std::string("/") + agent_name_ + "/";
ns[1] = std::tolower(ns[1]); // namespaces don't start with upper case

for (size_t i = 0; i < topics_sub_.size(); i++) {
ROS_INFO_STREAM("Initialize DDS topic sub: " << topics_sub_[i]);
ros_sub_.addTopic(topics_sub_[i], (ns + topics_sub_[i]));
// ros_sub_.addTopic(topics_sub_[i], (ns + topics_sub_[i]));
ros_sub_.addTopic(topics_sub_[i], topics_sub_[i]);
}
}

Expand Down Expand Up @@ -319,6 +328,7 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
ff::GenericROSSubRapidPub ros_sub_;
std::vector<ff::ContentRapidSubPtr> content_rapid_subs_;
std::vector<ff::AdvertisementInfoRapidSubPtr> advertisement_info_rapid_subs_;
std::vector<ff::DataRapidSubPtr> data_rapid_subs_;
std::shared_ptr<kn::DdsEntitiesFactorySvc> dds_entities_factory_;
std::shared_ptr<ff::GenericRapidMsgRosPub> ros_pub_;
std::string agent_name_, participant_name_;
Expand Down
47 changes: 47 additions & 0 deletions communications/comms_bridge/src/generic_rapid_msg_ros_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,51 @@ void GenericRapidMsgRosPub::ConvertData(
}
}

// Handle data message
void GenericRapidMsgRosPub::ConvertData(
rapid::ext::astrobee::GenericCommsData const* data) {
const std::lock_guard<std::mutex> lock(m_mutex_);

const std::string output_topic = data->outputTopic;

ROS_ERROR("Comms Bridge Nodelet: Received data message for topic %s\n",
output_topic.c_str());

std::map<std::string, RelayTopicInfo>::iterator iter = m_relay_topics_.find(output_topic);
if (iter == m_relay_topics_.end()) {
ROS_ERROR("Comms Bridge Nodelet: Adding advertisement info for topic %s.\n",
output_topic.c_str());

AdvertisementInfo ad_info;
ad_info.latching = data->latching;
ad_info.data_type = data->dataType;
ad_info.md5_sum = data->md5Sum;
ad_info.definition = data->msgDefinition;
ROS_INFO_STREAM("ad_info latching: " << data->latching);
ROS_INFO_STREAM("ad_info dataType: " << data->dataType);
ROS_INFO_STREAM("ad_info md5Sum: " << data->md5Sum);
ROS_INFO_STREAM("ad_info msgDefinition: " << data->msgDefinition);

if (!advertiseTopic(output_topic, ad_info)) {
ROS_ERROR("Comms Bridge Nodelet: Error advertising topic: %s\n",
output_topic.c_str());
}
}

RelayTopicInfo &topic_info = iter->second;

ContentInfo content_info;
content_info.type_md5_sum = data->md5Sum;

unsigned char* buf = data->data.get_contiguous_buffer();
for (size_t i = 0; i < data->data.length(); i++) {
content_info.data.push_back(buf[i]);
}

if (!relayMessage(topic_info, content_info)) {
ROS_ERROR("Comms Bridge Nodelet: Error relaying message for topic %s\n",
output_topic.c_str());
}
}

} // end namespace ff
72 changes: 70 additions & 2 deletions communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ void GenericROSSubRapidPub::InitializeDDS() {
rapid::ext::astrobee::GENERIC_COMMS_CONTENT_TOPIC,
"", "AstrobeeGenericCommsContentProfile", ""));

data_supplier_.reset(new GenericROSSubRapidPub::DataSupplier(
rapid::ext::astrobee::GENERIC_COMMS_DATA_TOPIC,
"", "AstrobeeGenericCommsDataProfile", ""));

rapid::RapidHelper::initHeader(advertisement_info_supplier_->event().hdr);
rapid::RapidHelper::initHeader(content_supplier_->event().hdr);
rapid::RapidHelper::initHeader(data_supplier_->event().hdr);

dds_initialized_ = true;
}
Expand All @@ -51,6 +56,14 @@ void GenericROSSubRapidPub::subscribeTopic(std::string const& in_topic, const Re

// Called with the mutex held
void GenericROSSubRapidPub::advertiseTopic(const RelayTopicInfo& relay_info) {
/* std::string out_topic = relay_info.out_topic;
if (advertisement_info_.count(out_topic) == 0) {
advertisement_info_.emplace(out_topic, relay_info.ad_info);
} else {
ROS_ERROR("Comms Bridge: Already added this advertisement info to the map");
}
ROS_ERROR("Comms Bridge: Saved advertisement info for topic %s\n");
const AdvertisementInfo &info = relay_info.ad_info;
rapid::ext::astrobee::GenericCommsAdvertisementInfo &msg =
advertisement_info_supplier_->event();
Expand Down Expand Up @@ -86,11 +99,12 @@ void GenericROSSubRapidPub::advertiseTopic(const RelayTopicInfo& relay_info) {
msg.msgDefinition[size] = '\0';
// Send message
advertisement_info_supplier_->sendEvent();
advertisement_info_supplier_->sendEvent();*/
}

// Called with the mutex held
void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info, ContentInfo const& content_info) {
// Old relay function
/*void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info, ContentInfo const& content_info) {
rapid::ext::astrobee::GenericCommsContent &msg = content_supplier_->event();
unsigned int size;
Expand Down Expand Up @@ -125,8 +139,62 @@ void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info, Conte
// Send message
content_supplier_->sendEvent();
}*/

// Called with the mutex held
void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info,
ContentInfo const& content_info) {
rapid::ext::astrobee::GenericCommsData &msg = data_supplier_->event();
AdvertisementInfo info = topic_info.ad_info;
std::string out_topic = topic_info.out_topic;
unsigned int size;

ROS_ERROR("Comms bridge: Sending content message for topic %s\n", out_topic.c_str());

msg.hdr.timeStamp = comms_util::RosTime2RapidTime(ros::Time::now());
msg.hdr.serial = topic_info.relay_seqnum;

// Currently the output topic can only be 128 characters long
SizeCheck(size, out_topic.size(), 128, "Out topic", out_topic);
std::strncpy(msg.outputTopic, out_topic.data(), size);
msg.outputTopic[size] = '\0';

msg.latching = info.latching;

// Currently the data type can only be 128 characters long
SizeCheck(size, info.data_type.size(), 128, "Data type", out_topic);
std::strncpy(msg.dataType, info.data_type.data(), size);
msg.dataType[size] = '\0';

// TODO(Katie): Check md5 sum matches co
// Currently the md5 sum can only be 64 characters long
SizeCheck(size, info.md5_sum.size(), 64, "MD5 sum", out_topic);
std::strncpy(msg.md5Sum, info.md5_sum.data(), size);
msg.md5Sum[size] = '\0';

// Current the ROS message definition can only be 16384 characters long
SizeCheck(size, info.definition.size(), 16384, "Msg definition", out_topic);
std::strncpy(msg.msgDefinition, info.definition.data(), size);
msg.msgDefinition[size] = '\0';

// Currently the content can only be 128K bytes long
SizeCheck(size, content_info.data_size, 131072, "Data", out_topic);
msg.data.ensure_length(size, (size + 1));
unsigned char *buf = msg.data.get_contiguous_buffer();
if (buf == NULL) {
ROS_ERROR("DDS: RTI didn't give a contiguous buffer for the content data!");
return;
}

std::memset(buf, 0, (size + 1));
std::memmove(buf, content_info.data, size);


// Send message
data_supplier_->sendEvent();
}


void GenericROSSubRapidPub::SizeCheck(unsigned int &size,
const int size_in,
const int max_size,
Expand Down
1 change: 1 addition & 0 deletions communications/dds_msgs/idl/AstrobeeConstants.idl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ module rapid {
const String64 FAULT_STATE_TOPIC = "astrobee_fault_state";
const String64 GENERIC_COMMS_ADVERTISEMENT_INFO_TOPIC = "astrobee_generic_comms_advertisement_info";
const String64 GENERIC_COMMS_CONTENT_TOPIC = "astrobee_generic_comms_content";
const String64 GENERIC_COMMS_DATA_TOPIC = "astrobee_generic_comms_data";
const String64 GNC_CONTROL_STATE_TOPIC = "astrobee_gnc_control_state";
const String64 GNC_FAM_CMD_STATE_TOPIC = "astrobee_gnc_fam_cmd_state";
const String64 GUEST_SCIENCE_CONFIG_TOPIC = "astrobee_guest_science_config";
Expand Down
4 changes: 2 additions & 2 deletions communications/dds_msgs/idl/GenericCommsAdvertisementInfo.idl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
module rapid {
module ext {
module astrobee {
typedef string<16384> String16K;
typedef string<32768> String32K;

//@copy-c-declaration class GenericCommsAdvertisementInfoTypeSupport;
//@copy-c-declaration class GenericCommsAdvertisementInfoDataWriter;
Expand Down Expand Up @@ -39,7 +39,7 @@ module rapid {
public String64 md5Sum;

//@copy-declaration /** ROS message definition */
public String16K msgDefinition;
public String32K msgDefinition;
};
};
};
Expand Down
49 changes: 49 additions & 0 deletions communications/dds_msgs/idl/GenericCommsData.idl
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 (c) 2023 Intelligent Robotics Group, NASA ARC
*/

#include "Message.idl"

module rapid {
module ext {
module astrobee {
typedef string<16384> String16K;

//@copy-c-declaration class GenericCommsDataTypeSupport;
//@copy-c-declaration class GenericCommsDataDataWriter;
//@copy-c-declaration class GenericCommsDataDataReader;
//@copy-c-declaration struct GenericCommsDataSeq;

//@copy-declaration /**
//@copy-declaration * Data of a generic comms ROS message
//@copy-declaration */
valuetype GenericCommsData : Message {
//@copy-c-declaration #if RTI_DDS_VERSION_MAJOR < 4 || (RTI_DDS_VERSION_MAJOR == 4 && RTI_DDS_VERSION_MINOR < 5) || (RTI_DDS_VERSION_MAJOR == 4 && RTI_DDS_VERSION_MINOR == 5 && RTI_DDS_VERSION_RELEASE != 'f' )
//@copy-c-declaration typedef GenericCommsDataTypeSupport TypeSupport;
//@copy-c-declaration typedef GenericCommsDataDataWriter DataWriter;
//@copy-c-declaration typedef GenericCommsDataDataReader DataReader;
//@copy-c-declaration typedef GenericCommsDataSeq Seq;
//@copy-c-declaration #endif
//@copy-c-declaration typedef GenericCommsData Type;

//@copy-declaration /** Topic on which to republish */
public String128 outputTopic;

//@copy-declaration /** Whether republisher should advertise topic as latching */
public boolean latching;

//@copy-declaration /** ROS message data type name */
public String128 dataType;

//@copy-declaration /** ROS message md5sum of type */
public String64 md5Sum;

//@copy-declaration /** ROS message definition */
public String16K msgDefinition;

//@copy-declaration /** Serialized content of the message */
public OctetSequence128K data;
};
};
};
};

0 comments on commit 5b6f8c0

Please sign in to comment.