Skip to content

Commit

Permalink
Introduce executors new spin_for method, replace spin_until_future_co…
Browse files Browse the repository at this point in the history
…mplete

with spin_until_complete. (ros2#1821)

* Introduce spin_for method.
* Introduce spin_until_complete.
* Deprecate spin_until_future_complete.
* Replace usage of deprecated method.
* Update unit-tests.

Signed-off-by: Hubert Liberacki <[email protected]>
  • Loading branch information
hliberacki committed Mar 28, 2022
1 parent eac0063 commit 2f1b739
Show file tree
Hide file tree
Showing 25 changed files with 407 additions and 247 deletions.
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,11 @@ class Client : public ClientBase
/// Send a request to the service server.
/**
* This method returns a `FutureAndRequestId` instance
* that can be passed to Executor::spin_until_future_complete() to
* that can be passed to Executor::spin_until_complete() to
* wait until it has been completed.
*
* If the future never completes,
* e.g. the call to Executor::spin_until_future_complete() times out,
* e.g. the call to Executor::spin_until_complete() times out,
* Client::remove_pending_request() must be called to clean the client internal state.
* Not doing so will make the `Client` instance to use more memory each time a response is not
* received from the service server.
Expand All @@ -592,7 +592,7 @@ class Client : public ClientBase
* auto future = client->async_send_request(my_request);
* if (
* rclcpp::FutureReturnCode::TIMEOUT ==
* executor->spin_until_future_complete(future, timeout))
* executor->spin_until_complete(future, timeout))
* {
* client->remove_pending_request(future);
* // handle timeout
Expand Down
132 changes: 86 additions & 46 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <mutex>
#include <string>
#include <vector>
#include <type_traits>

#include "rcl/guard_condition.h"
#include "rcl/wait.h"
Expand Down Expand Up @@ -319,6 +320,47 @@ class Executor
virtual void
spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Spin (blocking) until the condition is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] future The condition which can be callable or future type to wait on. If this function returns SUCCESS, the future can be
* accessed without blocking (though it may still throw an exception).
* \param[in] timeout Optional timeout parameter, which gets passed to Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return
* code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename Condition, typename DurationT = std::chrono::milliseconds>
FutureReturnCode spin_until_complete(
const Condition & condition,
DurationT timeout = DurationT(-1))
{
if constexpr (std::is_invocable_v<Condition>) {
using RetT = std::invoke_result_t<Condition>;
static_assert(
std::is_same_v<bool, RetT>,
"Conditional callable has to return boolean type");
return spin_until_complete_impl(condition, timeout);
} else {
auto checkFuture = [&condition]() {
return condition.wait_for(std::chrono::seconds(0)) ==
std::future_status::ready;
};
return spin_until_complete_impl(checkFuture, timeout);
}
}

/// spin (blocking) for given amount of duration.
/**
* \param[in] duration gets passed to Executor::spin_node_once, and spins the executor for given duration.
*/
template<typename DurationT>
void
spin_for(DurationT duration)
{
(void)spin_until_complete([]() {return false;}, duration);
}

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] future The future to wait on. If this function returns SUCCESS, the future can be
Expand All @@ -330,57 +372,13 @@ class Executor
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
[[deprecated("use spin_until_complete(const Condition & condition, DurationT timeout) instead")]]
FutureReturnCode
spin_until_future_complete(
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
return spin_until_complete(future, timeout);
}

/// Cancel any running spin* function, causing it to return.
Expand Down Expand Up @@ -560,6 +558,48 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

protected:
// Implementation details, used by spin_until_complete and spin_for.
// Previouse implementation of spin_until_future_complete.
template<typename Condition, typename DurationT>
FutureReturnCode spin_until_complete_impl(Condition condition, DurationT timeout)
{
auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;
if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

if (condition()) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The condition did not pass before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
}

public:
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
Expand Down
83 changes: 74 additions & 9 deletions rclcpp/include/rclcpp/executors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,50 @@ namespace executors
using rclcpp::executors::MultiThreadedExecutor;
using rclcpp::executors::SingleThreadedExecutor;

/// Spin (blocking) until the conditon is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] executor The executor which will spin the node.
* \param[in] node_ptr The node to spin.
* \param[in] condition The callable or future to wait on. If `SUCCESS`, the condition is safe to
* access after this function
* \param[in] timeout Optional timeout parameter, which gets passed to
* Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a `TIMEOUT` return code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename Condition, typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_node_until_complete(
rclcpp::Executor & executor,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const Condition & condition,
DurationT timeout = DurationT(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_complete
// inside a callback executed by an executor.
executor.add_node(node_ptr);
auto retcode = executor.spin_until_complete(condition, timeout);
executor.remove_node(node_ptr);
return retcode;
}

template<typename NodeT = rclcpp::Node, typename ConditionT,
typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_node_until_complete(
rclcpp::Executor & executor,
std::shared_ptr<NodeT> node_ptr,
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
return rclcpp::executors::spin_node_until_complete(
executor,
node_ptr->get_node_base_interface(),
condition,
timeout);
}

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] executor The executor which will spin the node.
Expand All @@ -67,31 +111,28 @@ using rclcpp::executors::SingleThreadedExecutor;
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
[[deprecated("use spin_node_until_complete(Executor &, node_interfaces::NodeBaseInterface::SharedPtr, const Condition &, DurationT) instead")]]
rclcpp::FutureReturnCode
spin_node_until_future_complete(
rclcpp::Executor & executor,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.
executor.add_node(node_ptr);
auto retcode = executor.spin_until_future_complete(future, timeout);
executor.remove_node(node_ptr);
return retcode;
return spin_until_complete(executor, node_ptr, future, timeout);
}

template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
[[deprecated("use spin_node_until_complete(Executor &, std::shared_ptr<NodeT>, const Condition &, DurationT) instead")]]
rclcpp::FutureReturnCode
spin_node_until_future_complete(
rclcpp::Executor & executor,
std::shared_ptr<NodeT> node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
return rclcpp::executors::spin_node_until_future_complete(
return rclcpp::executors::spin_node_until_complete(
executor,
node_ptr->get_node_base_interface(),
future,
Expand All @@ -100,26 +141,50 @@ spin_node_until_future_complete(

} // namespace executors

template<typename Condition, typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_until_complete(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const Condition & condition,
DurationT timeout = DurationT(-1))
{
rclcpp::executors::SingleThreadedExecutor executor;
return executors::spin_node_until_complete<Condition>(executor, node_ptr, condition, timeout);
}

template<typename NodeT = rclcpp::Node, typename Condition,
typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_until_complete(
std::shared_ptr<NodeT> node_ptr,
const Condition & condition,
DurationT timeout = DurationT(-1))
{
return rclcpp::spin_until_complete(node_ptr->get_node_base_interface(), condition, timeout);
}

template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
[[deprecated("use spin_until_complete(node_interfaces::NodeBaseInterface::SharedPtr, const Condition &, DurationT) instead")]]
rclcpp::FutureReturnCode
spin_until_future_complete(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
rclcpp::executors::SingleThreadedExecutor executor;
return executors::spin_node_until_future_complete<FutureT>(executor, node_ptr, future, timeout);
return executors::spin_node_until_complete<FutureT>(executor, node_ptr, future, timeout);
}

template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
[[deprecated("use spin_until_complete(std::shared_ptr<NodeT>, const Condition &, DurationT) instead")]]
rclcpp::FutureReturnCode
spin_until_future_complete(
std::shared_ptr<NodeT> node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
return rclcpp::spin_until_future_complete(node_ptr->get_node_base_interface(), future, timeout);
return rclcpp::spin_until_complete(node_ptr->get_node_base_interface(), future, timeout);
}

} // namespace rclcpp
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/future_return_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
namespace rclcpp
{

/// Return codes to be used with spin_until_future_complete.
/// Return codes to be used with spin_until_complete.
/**
* SUCCESS: The future is complete and can be accessed with "get" without blocking.
* This does not indicate that the operation succeeded; "get" may still throw an exception.
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/rclcpp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
* - Executors (responsible for execution of callbacks through a blocking spin):
* - rclcpp::spin()
* - rclcpp::spin_some()
* - rclcpp::spin_until_future_complete()
* - rclcpp::spin_until_complete()
* - rclcpp::executors::SingleThreadedExecutor
* - rclcpp::executors::SingleThreadedExecutor::add_node()
* - rclcpp::executors::SingleThreadedExecutor::spin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
std::shared_ptr<rclcpp::Waitable> && waitable,
std::shared_ptr<void> && associated_entity,
std::function<
void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void> &&)
void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void>&&)
> add_waitable_function)
{
// Explicitly no thread synchronization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class ThreadSafeSynchronization : public detail::SynchronizationPolicyCommon
std::shared_ptr<rclcpp::Waitable> && waitable,
std::shared_ptr<void> && associated_entity,
std::function<
void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void> &&)
void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void>&&)
> add_waitable_function)
{
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;
Expand Down
Loading

0 comments on commit 2f1b739

Please sign in to comment.