Skip to content

Commit

Permalink
[BACKPORT 2.14][#18668] DocDB: Block stack collection during thread c…
Browse files Browse the repository at this point in the history
…reation

Summary:
Original commit: 3e34805 / D27851
We use `backtrace` function to collect stack traces of long running operations and to show list of running threads in the UI.
But when this function runs in the middle of pthread_create, or during thread start or thread exit it cases a SIGSEGV segmentation faults.
We use SIGUSR2 signal to trigger `HandleStackTraceSignal` which calls `backtrace`.

Fix it to block SIGUSR2 during the `pthread_create` call. Threads inherit the parent threads signal masters so the new thread will also start with with SIGUSR2 blocked. We unblock it in `Thread::SuperviseThread` which is the first function run on the thread and block it again on `Thread::FinishThread` which is the last function that we run.
Signals received when a block is placed are stored and resent as soon as we unblock, so we will not miss any stacks with this change.

Another issue was that we sometime recursively call `backtrace`. In debug mode this most commonly happens when were get a SIGUSR2 signal while we are already collecting backtrace for `Mutex::CheckUnheldAndMark`. Prevent this by blocking the signal during `StackTrace::Collect`

Fixes #18668
Jira: DB-7589

Test Plan: DebugUtilTest.TestGetStackTraceWhileCreatingThreads

Reviewers: slingam, mlillibridge, sergei, timur, bogdan

Reviewed By: slingam

Subscribers: ybase, yql, mbautin

Differential Revision: https://phorge.dev.yugabyte.com/D28513
  • Loading branch information
hari90 committed Sep 18, 2023
1 parent ed06d25 commit adbbb45
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 13 deletions.
64 changes: 64 additions & 0 deletions src/yb/util/debug-util-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "yb/util/scope_exit.h"
#include "yb/util/test_macros.h"
#include "yb/util/test_util.h"
#include "yb/util/test_thread_holder.h"
#include "yb/util/thread.h"
#include "yb/util/tsan_util.h"

Expand All @@ -56,6 +57,8 @@ using std::vector;

using namespace std::literals;

DECLARE_bool(TEST_disable_thread_stack_collection_wait);

namespace yb {

class DebugUtilTest : public YBTest {
Expand Down Expand Up @@ -386,4 +389,65 @@ TEST_F(DebugUtilTest, LongOperationTracker) {
ASSERT_STR_CONTAINS(log_sink.MessageAt(3), "Op4");
}

TEST_F(DebugUtilTest, TestGetStackTraceWhileCreatingThreads) {
// This test makes sure we can collect stack traces while threads are being created.
// We create 10 threads that create threads in a loop. Then we create 100 threads that collect
// stack traces from the other 10 threads.
std::atomic<bool> stop = false;
TestThreadHolder thread_holder;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_disable_thread_stack_collection_wait) = true;

// Run this once so that all first time initialization routines are executed.
DumpThreadStack(Thread::CurrentThreadIdForStack());

std::set<ThreadIdForStack> thread_ids_to_dump;
auto dump_threads_fn = [&thread_ids_to_dump, &stop]() {
int64_t count = 0;
while (!stop.load(std::memory_order_acquire)) {
for (auto& tid : thread_ids_to_dump) {
DumpThreadStack(tid);
count++;
}
}
LOG(INFO) << "Dumped " << count << " threads";
};

auto create_threads_fn = [&stop]() {
int64_t count = 0, failed = 0;
while (!stop.load(std::memory_order_acquire)) {
scoped_refptr<Thread> thread;
auto s = Thread::Create(
"test", "test thread", []() {}, &thread);
if (!s.ok()) {
failed++;
continue;
}
thread->Join();
count++;
}
LOG(INFO) << "Successfully created " << count << " threads, Failed to create " << failed
<< " threads";
};

std::vector<scoped_refptr<Thread>> thread_creator_threads;
for (int i = 0; i < 10; i++) {
scoped_refptr<Thread> t;
ASSERT_OK(Thread::Create("test", "test thread", create_threads_fn, &t));
thread_ids_to_dump.insert(t->tid_for_stack());
thread_creator_threads.push_back(std::move(t));
}

for (int i = 0; i < 100; i++) {
thread_holder.AddThreadFunctor(dump_threads_fn);
}

SleepFor(1min);

stop.store(true, std::memory_order_release);

for (auto& creator_thread : thread_creator_threads) {
creator_thread->Join();
}
thread_holder.Stop();
}
} // namespace yb
13 changes: 13 additions & 0 deletions src/yb/util/debug-util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <execinfo.h>
#include <dirent.h>
#include <sys/syscall.h>
#include "yb/util/scope_exit.h"

#ifdef __linux__
#include <link.h>
Expand Down Expand Up @@ -426,6 +427,18 @@ string GetLogFormatStackTraceHex() {
}

void StackTrace::Collect(int skip_frames) {
static thread_local bool is_collecting_stack = false;

if (is_collecting_stack) {
// It is unsafe to call backtrace recursively. Return an empty stack trace.
// A thread can get here if while it was collecting its own stack, it got interrupted by a
// StackTraceSignal request from another thread.
return;
}

is_collecting_stack = true;
auto se = ScopeExit([]() { is_collecting_stack = false; });

#if THREAD_SANITIZER || ADDRESS_SANITIZER
num_frames_ = google::GetStackTrace(frames_, arraysize(frames_), skip_frames);
#else
Expand Down
11 changes: 8 additions & 3 deletions src/yb/util/debug/long_operation_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,14 @@ class LongOperationTrackerHelper {
queue_.pop();
if (!operation.unique()) {
lock.unlock();
LOG(WARNING) << operation->message << " running for " << MonoDelta(now - operation->start)
<< " in thread " << operation->thread_id << ":\n"
<< DumpThreadStack(operation->thread_id);
auto stack = DumpThreadStack(operation->thread_id);
// Make sure the task did not complete while we were dumping the stack. Else we could get
// some other innocent stack.
if (!operation.unique()) {
LOG(WARNING) << operation->message << " running for " << MonoDelta(now - operation->start)
<< " in thread " << operation->thread_id << ":\n"
<< stack;
}
lock.lock();
}
}
Expand Down
24 changes: 18 additions & 6 deletions src/yb/util/signal_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,36 @@

namespace yb {

Result<sigset_t> ThreadSignalMaskBlock(const std::vector<int>& signals_to_block) {
namespace {

Result<sigset_t> ThreadSignalMask(int how, const std::vector<int>& signals_to_change) {
// Note that in case of an error sigaddset sets errno and returns -1
// while pthread_sigmask returns errno directly.

sigset_t mask;
sigemptyset(&mask);
for (int sig : signals_to_block) {
for (int sig : signals_to_change) {
int err = sigaddset(&mask, sig);
if (err != 0) {
return STATUS(InternalError, "sigaddset failed", Errno(errno));
}
}

sigset_t old_mask;
int err = pthread_sigmask(SIG_BLOCK, &mask, &old_mask);
return (err == 0
? Result<sigset_t>(old_mask)
: STATUS(InternalError, "SIG_BLOCK failed", Errno(err)));
int err = pthread_sigmask(how, &mask, &old_mask);
return (
err == 0 ? Result<sigset_t>(old_mask)
: STATUS(InternalError, "SIG_BLOCK failed", Errno(err)));
}

} // namespace

Result<sigset_t> ThreadSignalMaskBlock(const std::vector<int>& signals_to_block) {
return ThreadSignalMask(SIG_BLOCK, signals_to_block);
}

Result<sigset_t> ThreadSignalMaskUnblock(const std::vector<int>& signals_to_unblock) {
return ThreadSignalMask(SIG_UNBLOCK, signals_to_unblock);
}

Status ThreadSignalMaskRestore(sigset_t old_mask) {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/util/signal_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ namespace yb {
// at the main thread to avoid concurrency issues.
Result<sigset_t> ThreadSignalMaskBlock(const std::vector<int>& signals_to_block);

// On current thread, unblock the given signals and return an old signal mask.
// Signals that were accumulated by OS will be delivered before this function returns.
Result<sigset_t> ThreadSignalMaskUnblock(const std::vector<int>& signals_to_unblock);

// Restore previous signal mask on the current thread.
// Unblocking signals lets the blocked signals be delivered if they had been raised in the meantime.
Status ThreadSignalMaskRestore(sigset_t old_mask);
Expand Down
30 changes: 26 additions & 4 deletions src/yb/util/stack_trace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "yb/gutil/casts.h"
#include "yb/gutil/linux_syscall_support.h"

#include "yb/util/flags.h"
#include "yb/util/flag_tags.h"
#include "yb/util/lockfree.h"
#include "yb/util/monotime.h"
#include "yb/util/result.h"
Expand All @@ -36,6 +38,9 @@ using namespace std::literals;
typedef sig_t sighandler_t;
#endif

DEFINE_test_flag(bool, disable_thread_stack_collection_wait, false,
"When set to true, ThreadStacks() will not wait for threads to respond");

namespace yb {

namespace {
Expand Down Expand Up @@ -142,16 +147,28 @@ struct ThreadStackHelper {
// a few iterations of the loop, so this timeout is very conservative.
//
// The main reason that a thread would not respond is that it has blocked signals. For
// example, glibc's timer_thread doesn't respond to our signal, so we always time out
// on that one.
if (left_to_collect.load(std::memory_order_acquire) > 0) {
// example, we may be creating a new thread, or glibc's timer_thread doesn't respond to our
// signal.
if (left_to_collect.load(std::memory_order_acquire) > 0 &&
!FLAGS_TEST_disable_thread_stack_collection_wait) {
completion_flag.TimedWait(1s);
}

while (auto entry = collected.Pop()) {
auto it = std::lower_bound(tids.begin(), tids.end(), entry->tid);
if (it != tids.end() && *it == entry->tid) {
(*out)[it - tids.begin()] = entry->stack;
auto& entry_out = (*out)[it - tids.begin()];

if (entry->stack) {
entry_out = entry->stack;
} else {
// If the thread is in the middle of collecting stack trace for any other reason then it
// will return an empty output.
static const Status status = STATUS(
TryAgain,
"Thread did not respond: maybe it was in the middle of a stack trace collection");
entry_out = status;
}
}
allocated.Push(entry);
}
Expand Down Expand Up @@ -303,4 +320,9 @@ Status SetStackTraceSignal(int signum) {
return Status::OK();
}

int GetStackTraceSignal() {
// Only tests modify this value when multiple threads run, so this is safe.
return ANNOTATE_UNPROTECTED_READ(g_stack_trace_signum);
}

} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/util/stack_trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ std::vector<Result<StackTrace>> ThreadStacks(const std::vector<ThreadIdForStack>
// Set which POSIX signal number should be used internally for triggering
// stack traces. If the specified signal handler is already in use, this
// returns an error, and stack traces will be disabled.
// It is not safe to call this after threads have been created.
Status SetStackTraceSignal(int signum);
int GetStackTraceSignal();

} // namespace yb

Expand Down
14 changes: 14 additions & 0 deletions src/yb/util/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include <sys/time.h>
#include <sys/types.h>

#include "yb/util/signal_util.h"

#if defined(__linux__)
#include <sys/prctl.h>
#endif // defined(__linux__)
Expand Down Expand Up @@ -693,7 +695,14 @@ Status Thread::StartThread(const std::string& category, const std::string& name,

{
SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "creating pthread");

// Block stack trace collection while we create a thread. This also prevents stack trace
// collection in the new thread while it is being started since it will inherit our signal
// masks. SuperviseThread function will unblock the signal as soon as thread begins to run.
auto old_signal = VERIFY_RESULT(ThreadSignalMaskBlock({GetStackTraceSignal()}));
int ret = pthread_create(&t->thread_, NULL, &Thread::SuperviseThread, t.get());
RETURN_NOT_OK(ThreadSignalMaskRestore(old_signal));

if (ret) {
return STATUS(RuntimeError, "Could not create thread", Errno(ret));
}
Expand Down Expand Up @@ -733,6 +742,8 @@ Status Thread::StartThread(const std::string& category, const std::string& name,
}

void* Thread::SuperviseThread(void* arg) {
CHECK_OK(ThreadSignalMaskUnblock({GetStackTraceSignal()}));

Thread* t = static_cast<Thread*>(arg);
int64_t system_tid = Thread::CurrentThreadId();
if (system_tid == -1) {
Expand Down Expand Up @@ -801,6 +812,9 @@ void Thread::FinishThread(void* arg) {

VLOG(2) << "Ended thread " << t->tid() << " - "
<< t->category() << ":" << t->name();

// Its no longer safe to collect stack traces in this thread.
CHECK_OK(ThreadSignalMaskBlock({GetStackTraceSignal()}));
}

CDSAttacher::CDSAttacher() {
Expand Down

0 comments on commit adbbb45

Please sign in to comment.