Skip to content

Commit

Permalink
Merge pull request #312 from eProsima/develop
Browse files Browse the repository at this point in the history
v2.2.0
  • Loading branch information
pablogs9 authored Jun 28, 2022
2 parents 22587ee + 154fe8d commit 851bbbf
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 52 deletions.
6 changes: 3 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ option(UAGENT_SECURITY_PROFILE "Build security profile." OFF)
option(UAGENT_BUILD_EXECUTABLE "Build Micro XRCE-DDS Agent provided executable." ON)
option(UAGENT_BUILD_USAGE_EXAMPLES "Build Micro XRCE-DDS Agent built-in usage examples" OFF)

set(UAGENT_P2P_CLIENT_VERSION 2.1.0 CACHE STRING "Sets Micro XRCE-DDS client version for P2P")
set(UAGENT_P2P_CLIENT_TAG v2.1.0 CACHE STRING "Sets Micro XRCE-DDS client tag for P2P")
set(UAGENT_P2P_CLIENT_VERSION 2.2.0 CACHE STRING "Sets Micro XRCE-DDS client version for P2P")
set(UAGENT_P2P_CLIENT_TAG v2.2.0 CACHE STRING "Sets Micro XRCE-DDS client tag for P2P")

option(UAGENT_BUILD_CI_TESTS "Build CI test cases.")
if(UAGENT_BUILD_CI_TESTS)
Expand Down Expand Up @@ -117,7 +117,7 @@ endif()
###############################################################################
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules)
if(NOT UAGENT_SUPERBUILD)
project(microxrcedds_agent VERSION "2.1.1" LANGUAGES C CXX)
project(microxrcedds_agent VERSION "2.2.0" LANGUAGES C CXX)
else()
project(uagent_superbuild NONE)
include(${PROJECT_SOURCE_DIR}/cmake/SuperBuild.cmake)
Expand Down
11 changes: 9 additions & 2 deletions include/uxr/agent/client/ProxyClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class ProxyClient : public std::enable_shared_from_this<ProxyClient>
enum class State : uint8_t
{
alive,
dead
dead,
to_remove
};

explicit ProxyClient(
Expand Down Expand Up @@ -71,10 +72,13 @@ class ProxyClient : public std::enable_shared_from_this<ProxyClient>

State get_state();

void update_state();
void update_state(const ProxyClient::State state = State::alive);

Middleware& get_middleware() { return *middleware_ ; };

bool has_hard_liveliness_check() const { return hard_liveliness_check_; }

uint8_t & get_hard_liveliness_check_tries() { return hard_liveliness_check_tries_; }
private:
bool create_object(
const dds::xrce::ObjectId& object_id,
Expand Down Expand Up @@ -134,6 +138,9 @@ class ProxyClient : public std::enable_shared_from_this<ProxyClient>
State state_;
std::chrono::time_point<std::chrono::steady_clock> timestamp_;
std::unordered_map<std::string, std::string> properties_;
std::chrono::milliseconds client_dead_time_;
bool hard_liveliness_check_;
uint8_t hard_liveliness_check_tries_;
};

} // namespace uxr
Expand Down
4 changes: 4 additions & 0 deletions include/uxr/agent/processor/Processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ class Processor
ProxyClient& client,
InputPacket<EndPoint>& input_packet);

bool process_get_info_submessage(
ProxyClient& client,
InputPacket<EndPoint>& input_packet);

bool read_data_callback(
const WriteFnArgs& write_args,
const std::vector<uint8_t>& buffer,
Expand Down
22 changes: 22 additions & 0 deletions include/uxr/agent/transport/SessionManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class SessionManager
void destroy_session(
const EndPoint& endpoint);

void destroy_session(
const uint32_t& client_key);

bool get_client_key(
const EndPoint& endpoint,
uint32_t& client_key);
Expand Down Expand Up @@ -117,6 +120,25 @@ void SessionManager<EndPoint>::destroy_session(
}
}

template<typename EndPoint>
void SessionManager<EndPoint>::destroy_session(
const uint32_t& client_key)
{
std::lock_guard<std::mutex> lock(mtx_);

auto it = client_to_endpoint_map_.find(client_key);
if (it != client_to_endpoint_map_.end())
{
UXR_AGENT_LOG_INFO(
UXR_DECORATE_GREEN("session closed"),
"client_key: 0x{:08X}, address: {}",
client_key,
it->second);
endpoint_to_client_map_.erase(it->second);
client_to_endpoint_map_.erase(it->first);
}
}

template<typename EndPoint>
bool SessionManager<EndPoint>::get_client_key(
const EndPoint& endpoint,
Expand Down
22 changes: 18 additions & 4 deletions src/cpp/client/ProxyClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ ProxyClient::ProxyClient(
, state_{State::alive}
, timestamp_{std::chrono::steady_clock::now()}
, properties_(std::move(properties))
, client_dead_time_(CLIENT_DEAD_TIME)
, hard_liveliness_check_(false)
{
switch (middleware_kind)
{
Expand Down Expand Up @@ -77,6 +79,15 @@ ProxyClient::ProxyClient(
}
#endif
}
hard_liveliness_check_ = properties_.find("uxr_hl") != properties_.end();
if (hard_liveliness_check_) {
client_dead_time_ = std::chrono::milliseconds(std::stoi(properties_["uxr_hl"]));
UXR_AGENT_LOG_INFO(
UXR_DECORATE_GREEN("session hard timeout enabled"),
"client_key: 0x{:08X}, timeout: {} ms",
conversion::clientkey_to_raw(representation.client_key()),
std::stoi(properties_["uxr_hl"]));
}
}

dds::xrce::ResultStatus ProxyClient::create_object(
Expand Down Expand Up @@ -744,18 +755,21 @@ ProxyClient::State ProxyClient::get_state()
if (State::alive == state_)
{
using namespace std::chrono;
state_ = (duration_cast<milliseconds>(steady_clock::now() - timestamp_) < CLIENT_DEAD_TIME)
state_ = (duration_cast<milliseconds>(steady_clock::now() - timestamp_) < client_dead_time_)
? State::alive
: State::dead;
}
return state_;
}

void ProxyClient::update_state()
void ProxyClient::update_state(const ProxyClient::State state)
{
std::lock_guard<std::mutex> lock(state_mtx_);
state_ = State::alive;
timestamp_ = std::chrono::steady_clock::now();
state_ = state;
if (State::alive == state_) {
timestamp_ = std::chrono::steady_clock::now();
hard_liveliness_check_tries_ = 0;
}
}

} // namespace uxr
Expand Down
10 changes: 8 additions & 2 deletions src/cpp/middleware/fastdds/FastDDSMiddleware.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,9 @@ bool FastDDSMiddleware::create_requester_by_bin(
attrs.subscriber.topic.topicName = requester_xrce.reply_topic_name();
attrs.subscriber.topic.topicDataType = requester_xrce.reply_type();

attrs.publisher.historyMemoryPolicy = fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
attrs.subscriber.historyMemoryPolicy = fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

std::shared_ptr<FastDDSRequester> requester = create_requester(participant, attrs);
if (nullptr == requester)
{
Expand Down Expand Up @@ -692,7 +695,10 @@ bool FastDDSMiddleware::create_replier_by_bin(
attrs.subscriber.topic.topicDataType = replier_xrce.request_type();

attrs.publisher.topic.topicName = replier_xrce.reply_topic_name();
attrs.publisher.topic.topicDataType = replier_xrce.reply_type();
attrs.publisher.topic.topicDataType = replier_xrce.reply_type();

attrs.publisher.historyMemoryPolicy = fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
attrs.subscriber.historyMemoryPolicy = fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

std::shared_ptr<FastDDSReplier> replier = create_replier(participant, attrs);
if (nullptr == replier)
Expand Down Expand Up @@ -894,7 +900,7 @@ bool FastDDSMiddleware::read_data(
{
fastdds::dds::SampleInfo sample_info;
rv = it->second->read(data, timeout, sample_info);

if (intraprocess_enabled_)
{
for (auto dw = datawriters_.begin(); dw != datawriters_.end(); dw++)
Expand Down
Loading

0 comments on commit 851bbbf

Please sign in to comment.