-
Notifications
You must be signed in to change notification settings - Fork 431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implemented a timer manager that can handle any timer type #2573
base: rolling
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,356 @@ | ||
#pragma once | ||
#include <chrono> | ||
#include <functional> | ||
#include <memory> | ||
#include <vector> | ||
#include <map> | ||
|
||
#include <rcl/timer.h> | ||
#include <rclcpp/timer.hpp> | ||
|
||
#include <inttypes.h> | ||
|
||
namespace rclcpp::executors | ||
{ | ||
|
||
/** | ||
* @brief A class for managing a queue of timers | ||
* | ||
* This class holds a queue of timers of one type (RCL_ROS_TIME, RCL_SYSTEM_TIME or RCL_STEADY_TIME). | ||
* The queue itself manages an internal map of the timers, orders by the next time a timer will be | ||
* ready. Each time a timer is ready, a callback will be called from the internal thread. | ||
*/ | ||
class TimerQueue | ||
{ | ||
struct TimerData | ||
{ | ||
std::shared_ptr<const rcl_timer_t> rcl_ref; | ||
std::function<void()> timer_ready_callback; | ||
}; | ||
|
||
public: | ||
TimerQueue(rcl_clock_type_t timer_type) | ||
: timer_type(timer_type), | ||
used_clock_for_timers(timer_type), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This name is not easy to understand. |
||
trigger_thread([this]() { | ||
timer_thread(); | ||
}) | ||
{ | ||
}; | ||
|
||
~TimerQueue() | ||
{ | ||
stop(); | ||
|
||
trigger_thread.join(); | ||
} | ||
|
||
void stop() | ||
{ | ||
running = false; | ||
used_clock_for_timers.cancel_sleep_or_wait(); | ||
} | ||
|
||
/** | ||
* @brief Removes a new timer from the queue. | ||
* This function is thread safe. | ||
* | ||
* Removes a timer, if it was added to this queue. | ||
* Ignores timers that are not part of this queue | ||
* | ||
* @param timer the timer to remove. | ||
*/ | ||
|
||
void remove_timer(const rclcpp::TimerBase::SharedPtr & timer) | ||
{ | ||
rcl_clock_t * clock_type_of_timer; | ||
|
||
std::shared_ptr<const rcl_timer_t> handle = timer->get_timer_handle(); | ||
|
||
if (rcl_timer_clock( | ||
const_cast<rcl_timer_t *>(handle.get()), | ||
&clock_type_of_timer) != RCL_RET_OK) | ||
{ | ||
assert(false); | ||
} | ||
|
||
if (clock_type_of_timer->type != timer_type) { | ||
// timer is handled by another queue | ||
return; | ||
} | ||
|
||
timer->clear_on_reset_callback(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should first check that the timer is managed by this queue (as the timer could be from another executor) before clearing the callback |
||
|
||
std::scoped_lock l(mutex); | ||
|
||
auto it = std::find_if( | ||
all_timers.begin(), all_timers.end(), | ||
[rcl_ref = timer->get_timer_handle()](const std::unique_ptr<TimerData> & d) | ||
{ | ||
return d->rcl_ref == rcl_ref; | ||
}); | ||
|
||
if (it != all_timers.end()) { | ||
const TimerData * data_ptr = it->get(); | ||
|
||
|
||
auto it2 = std::find_if( | ||
running_timers.begin(), running_timers.end(), [data_ptr](const auto & e) { | ||
return e.second == data_ptr; | ||
}); | ||
|
||
running_timers.erase(it2); | ||
all_timers.erase(it); | ||
} | ||
|
||
used_clock_for_timers.cancel_sleep_or_wait(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't need to do this under the mutex lock (i.e. we should release it once we are done working on the timers) |
||
} | ||
|
||
/** | ||
* @brief Adds a new timer to the queue. | ||
* This function is thread safe. | ||
* | ||
* This function will ignore any timer, that has not a matching type | ||
* | ||
* @param timer the timer to add. | ||
* @param timer_ready_callback callback that should be called when the timer is ready. | ||
*/ | ||
void add_timer( | ||
const rclcpp::TimerBase::SharedPtr & timer, | ||
const std::function<void()> & timer_ready_callback) | ||
{ | ||
rcl_clock_t * clock_type_of_timer; | ||
|
||
std::shared_ptr<const rcl_timer_t> handle = timer->get_timer_handle(); | ||
|
||
if (rcl_timer_clock( | ||
const_cast<rcl_timer_t *>(handle.get()), | ||
&clock_type_of_timer) != RCL_RET_OK) | ||
{ | ||
assert(false); | ||
} | ||
|
||
if (clock_type_of_timer->type != timer_type) { | ||
// timer is handled by another queue | ||
return; | ||
} | ||
|
||
std::unique_ptr<TimerData> data = std::make_unique<TimerData>(); | ||
data->timer_ready_callback = std::move(timer_ready_callback); | ||
data->rcl_ref = std::move(handle); | ||
|
||
timer->set_on_reset_callback( | ||
[data_ptr = data.get(), this](size_t) { | ||
std::scoped_lock l(mutex); | ||
if (!remove_if_dropped(data_ptr)) | ||
{ | ||
add_timer_to_running_map(data_ptr); | ||
} | ||
}); | ||
|
||
{ | ||
std::scoped_lock l(mutex); | ||
add_timer_to_running_map(data.get()); | ||
|
||
all_timers.emplace_back(std::move(data) ); | ||
} | ||
|
||
//wake up thread as new timer was added | ||
used_clock_for_timers.cancel_sleep_or_wait(); | ||
} | ||
|
||
private: | ||
/** | ||
* Checks if the timer is still referenced if not deletes it from the queue | ||
* | ||
* @param timer_data The timer to check | ||
* @return true if removed / invalid | ||
*/ | ||
bool remove_if_dropped(const TimerData * timer_data) | ||
{ | ||
if (timer_data->rcl_ref.unique()) { | ||
// timer was deleted | ||
auto it = std::find_if( | ||
all_timers.begin(), all_timers.end(), [timer_data](const std::unique_ptr<TimerData> & e) { | ||
return timer_data == e.get(); | ||
} | ||
); | ||
|
||
if (it != all_timers.end()) { | ||
all_timers.erase(it); | ||
} | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
/** | ||
* @brief adds the given timer_data to the map of running timers, if valid. | ||
* | ||
* Advances the rcl timer. | ||
* Computes the next call time of the timer. | ||
* readds the timer to the map of running timers | ||
*/ | ||
void add_timer_to_running_map(const TimerData * timer_data) | ||
{ | ||
rcl_ret_t ret = rcl_timer_call(const_cast<rcl_timer_t *>(timer_data->rcl_ref.get())); | ||
if (ret == RCL_RET_TIMER_CANCELED) { | ||
return; | ||
} | ||
|
||
int64_t next_call_time; | ||
|
||
ret = rcl_timer_get_next_call_time(timer_data->rcl_ref.get(), &next_call_time); | ||
|
||
if (ret == RCL_RET_OK) { | ||
running_timers.emplace(next_call_time, timer_data); | ||
} | ||
|
||
// wake up the timer thread so that it can pick up the timer | ||
used_clock_for_timers.cancel_sleep_or_wait(); | ||
} | ||
|
||
/** | ||
* Returns the time when the next timer becomes ready | ||
*/ | ||
std::chrono::nanoseconds get_next_timer_ready_time() const | ||
{ | ||
if (running_timers.empty()) { | ||
// can't use std::chrono::nanoseconds::max, as wait_for | ||
// internally computes end time by using ::now() + timeout | ||
// as a workaround, we use some absurd high timeout | ||
return std::chrono::nanoseconds(used_clock_for_timers.now().nanoseconds()) + std::chrono::hours(10000); | ||
} | ||
return running_timers.begin()->first; | ||
} | ||
|
||
void call_ready_timer_callbacks() | ||
{ | ||
auto readd_timer_to_running_map = [this](TimerMap::node_type && e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this may not be static, as this would only be copied the first time the lambda is initialized. |
||
{ | ||
const auto & timer_data = e.mapped(); | ||
if(remove_if_dropped(timer_data)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need this check here? It's already done at the beginning of each |
||
{ | ||
return; | ||
} | ||
|
||
int64_t next_call_time; | ||
|
||
auto ret = rcl_timer_get_next_call_time(timer_data->rcl_ref.get(), &next_call_time); | ||
|
||
if (ret == RCL_RET_OK) { | ||
e.key() = std::chrono::nanoseconds(next_call_time); | ||
running_timers.insert(std::move(e)); | ||
} | ||
}; | ||
|
||
while (!running_timers.empty()) { | ||
|
||
if(remove_if_dropped(running_timers.begin()->second)) | ||
{ | ||
continue; | ||
} | ||
|
||
int64_t time_until_call; | ||
|
||
const rcl_timer_t * rcl_timer_ref = running_timers.begin()->second->rcl_ref.get(); | ||
auto ret = rcl_timer_get_time_until_next_call(rcl_timer_ref, &time_until_call); | ||
if (ret == RCL_RET_TIMER_CANCELED) { | ||
running_timers.erase(running_timers.begin()); | ||
continue; | ||
} | ||
|
||
if (time_until_call <= 0) { | ||
// advance next call time; | ||
rcl_ret_t ret = rcl_timer_call(const_cast<rcl_timer_t *>(rcl_timer_ref)); | ||
if (ret == RCL_RET_TIMER_CANCELED) { | ||
running_timers.erase(running_timers.begin()); | ||
continue; | ||
} | ||
|
||
// timer is ready, execute callback | ||
running_timers.begin()->second->timer_ready_callback(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a couple of comments on this:
Maybe, to support the second mode, we should just set a |
||
readd_timer_to_running_map(running_timers.extract(running_timers.begin())); | ||
continue; | ||
} | ||
break; | ||
} | ||
} | ||
|
||
void timer_thread() | ||
{ | ||
while (running && rclcpp::ok()) { | ||
std::chrono::nanoseconds next_wakeup_time; | ||
{ | ||
std::scoped_lock l(mutex); | ||
call_ready_timer_callbacks(); | ||
|
||
next_wakeup_time = get_next_timer_ready_time(); | ||
} | ||
try { | ||
// RCUTILS_LOG_ERROR_NAMED("rclcpp", "TimerQueue::timer_thread before sleep, next wakeup time %+" PRId64 , next_wakeup_time.count()); | ||
used_clock_for_timers.sleep_until(rclcpp::Time(next_wakeup_time.count(), timer_type)); | ||
} catch (const std::runtime_error &) { | ||
//there is a race on shutdown, were the context may become invalid, while we call sleep_until | ||
running = false; | ||
} | ||
} | ||
thread_terminated = true; | ||
} | ||
|
||
rcl_clock_type_t timer_type; | ||
|
||
Context::SharedPtr clock_sleep_context; | ||
|
||
rclcpp::Clock used_clock_for_timers; | ||
|
||
std::mutex mutex; | ||
|
||
std::atomic_bool running = true; | ||
std::atomic_bool thread_terminated = false; | ||
|
||
std::vector<std::unique_ptr<TimerData>> all_timers; | ||
|
||
using TimerMap = std::multimap<std::chrono::nanoseconds, const TimerData *>; | ||
TimerMap running_timers; | ||
|
||
std::thread trigger_thread; | ||
|
||
std::condition_variable thread_conditional; | ||
}; | ||
|
||
class TimerManager | ||
{ | ||
std::array<TimerQueue, 3> timer_queues; | ||
|
||
public: | ||
TimerManager() | ||
: timer_queues{RCL_ROS_TIME, RCL_SYSTEM_TIME, RCL_STEADY_TIME} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. queues should be created when someone tries to add the first timer of that type, and (maybe?) destroyed when empty. if an application only uses timers of a single type, it shouldn't create 2 extra threads per executor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, I don't see any harm in having two idle threads in this case. They would block on the conditional and consume no CPU whatsoever. Starting threads on runtime, and additional checks inside the executor code I see more as a problem though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if there will be no CPU consumption, threads also have a memory impact. I agree that stopping and restarting them is probably not great, but at least we should wait to create a thread only if we need it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still don't see the issue, memory impact is at most 100 KB per thread. So event for a really big system like ours (106 executors), we are talking 30 MB of memory vs 10MB. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In embedded applications, knowing that there's 2-3 MB wasted on threads that are never used is a problem, especially if it's something that can be easily optimized. I agree that starting and stopping the threads is likely not the correct solution (at least it shouldn't be the default behavior), but I don't see any downside in delaying the creation of the timer queues until they are needed. The |
||
{ | ||
|
||
} | ||
|
||
void remove_timer(const rclcpp::TimerBase::SharedPtr & timer) | ||
{ | ||
for (TimerQueue & q : timer_queues) { | ||
q.remove_timer(timer); | ||
} | ||
} | ||
|
||
void add_timer( | ||
const rclcpp::TimerBase::SharedPtr & timer, | ||
const std::function<void()> & timer_ready_callback) | ||
{ | ||
for (TimerQueue & q : timer_queues) { | ||
q.add_timer(timer, timer_ready_callback); | ||
} | ||
} | ||
|
||
void stop() | ||
{ | ||
for (TimerQueue & q : timer_queues) { | ||
q.stop(); | ||
} | ||
} | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than storing this as an additional member, we could just use the
get_clock_type()
method of the clock