Skip to content

Commit

Permalink
Add TraceHistory and beef up TraceContext usages (facebookincubator#8677
Browse files Browse the repository at this point in the history
)

Summary:
Pull Request resolved: facebookincubator#8677

Original commit changeset: 55c9738305a1

Original Phabricator Diff: D53365055

Reviewed By: oerling

Differential Revision: D53438972

fbshipit-source-id: 0cb31a71d458880128bfc5b223a50caaeef88025
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 6, 2024
1 parent 18c1701 commit 5f57033
Show file tree
Hide file tree
Showing 16 changed files with 497 additions and 55 deletions.
1 change: 1 addition & 0 deletions velox/common/caching/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ target_link_libraries(
velox_exception
velox_file
velox_memory
velox_process
Folly::folly
fmt::fmt
gflags::gflags
Expand Down
9 changes: 9 additions & 0 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/

#include "velox/common/caching/SsdFile.h"

#include <folly/Executor.h>
#include <folly/portability/SysUio.h>
#include "velox/common/base/AsyncSource.h"
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/common/caching/FileIds.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/process/TraceContext.h"

#include <fcntl.h>
#ifdef linux
Expand Down Expand Up @@ -128,6 +130,7 @@ SsdFile::SsdFile(
shardId_(shardId),
checkpointIntervalBytes_(checkpointIntervalBytes),
executor_(executor) {
process::TraceContext trace("SsdFile::SsdFile");
int32_t oDirect = 0;
#ifdef linux
oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0;
Expand Down Expand Up @@ -266,6 +269,7 @@ CoalesceIoStats SsdFile::load(
void SsdFile::read(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) {
process::TraceContext trace("SsdFile::read");
readFile_->preadv(offset, buffers);
}

Expand Down Expand Up @@ -307,6 +311,7 @@ std::optional<std::pair<uint64_t, int32_t>> SsdFile::getSpace(
}

bool SsdFile::growOrEvictLocked() {
process::TraceContext trace("SsdFile::growOrEvictLocked");
if (numRegions_ < maxRegions_) {
const auto newSize = (numRegions_ + 1) * kRegionSize;
const auto rc = ::ftruncate(fd_, newSize);
Expand Down Expand Up @@ -360,6 +365,7 @@ void SsdFile::clearRegionEntriesLocked(const std::vector<int32_t>& regions) {
}

void SsdFile::write(std::vector<CachePin>& pins) {
process::TraceContext trace("SsdFile::write");
// Sorts the pins by their file/offset. In this way what is adjacent in
// storage is likely adjacent on SSD.
std::sort(pins.begin(), pins.end());
Expand Down Expand Up @@ -444,6 +450,7 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) {
} // namespace

void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) {
process::TraceContext trace("SsdFile::verifyWrite");
auto testData = std::make_unique<char[]>(entry.size());
const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset());
VELOX_CHECK_EQ(rc, entry.size());
Expand Down Expand Up @@ -512,6 +519,7 @@ void SsdFile::clear() {
}

void SsdFile::deleteFile() {
process::TraceContext trace("SsdFile::deleteFile");
if (fd_) {
close(fd_);
fd_ = 0;
Expand Down Expand Up @@ -651,6 +659,7 @@ inline const char* asChar(const T* ptr) {
} // namespace

void SsdFile::checkpoint(bool force) {
process::TraceContext trace("SsdFile::checkpoint");
std::lock_guard<std::shared_mutex> l(mutex_);
if (!force && (bytesAfterCheckpoint_ < checkpointIntervalBytes_)) {
return;
Expand Down
2 changes: 1 addition & 1 deletion velox/common/process/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

add_library(velox_process ProcessBase.cpp StackTrace.cpp ThreadDebugInfo.cpp
TraceContext.cpp)
TraceContext.cpp TraceHistory.cpp)

target_link_libraries(
velox_process
Expand Down
82 changes: 53 additions & 29 deletions velox/common/process/TraceContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,34 @@

#include "velox/common/process/TraceContext.h"

#include "velox/common/process/TraceHistory.h"

#include <sstream>

namespace facebook::velox::process {

namespace {
folly::Synchronized<std::unordered_map<std::string, TraceData>>& traceMap() {
static folly::Synchronized<std::unordered_map<std::string, TraceData>>
staticTraceMap;
return staticTraceMap;
}

// We use thread local instead lock here since the critical path is on write
// side.
auto registry = std::make_shared<TraceContext::Registry>();
thread_local auto threadLocalTraceData =
std::make_shared<TraceContext::Registry::Reference>(registry);

} // namespace

TraceContext::TraceContext(std::string label, bool isTemporary)
: label_(std::move(label)),
enterTime_(std::chrono::steady_clock::now()),
isTemporary_(isTemporary) {
traceMap().withWLock([&](auto& counts) {
isTemporary_(isTemporary),
traceData_(threadLocalTraceData) {
TraceHistory::push([&](auto& entry) {
entry.time = enterTime_;
entry.file = __FILE__;
entry.line = __LINE__;
snprintf(entry.label, entry.kLabelCapacity, "%s", label_.c_str());
});
traceData_->withValue([&](auto& counts) {
auto& data = counts[label_];
++data.numThreads;
if (data.numThreads == 1) {
Expand All @@ -43,45 +54,58 @@ TraceContext::TraceContext(std::string label, bool isTemporary)
}

TraceContext::~TraceContext() {
traceMap().withWLock([&](auto& counts) {
auto& data = counts[label_];
--data.numThreads;
traceData_->withValue([&](auto& counts) {
auto it = counts.find(label_);
auto& data = it->second;
if (--data.numThreads == 0 && isTemporary_) {
counts.erase(it);
return;
}
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - enterTime_)
.count();
data.totalMs += ms;
data.maxMs = std::max<uint64_t>(data.maxMs, ms);
if (!data.numThreads && isTemporary_) {
counts.erase(label_);
}
});
}

// static
std::string TraceContext::statusLine() {
std::stringstream out;
auto now = std::chrono::steady_clock::now();
traceMap().withRLock([&](auto& counts) {
for (auto& pair : counts) {
if (pair.second.numThreads) {
auto continued = std::chrono::duration_cast<std::chrono::milliseconds>(
now - pair.second.startTime)
.count();

out << pair.first << "=" << pair.second.numThreads << " entered "
<< pair.second.numEnters << " avg ms "
<< (pair.second.totalMs / pair.second.numEnters) << " max ms "
<< pair.second.maxMs << " continuous for " << continued
<< std::endl;
}
auto counts = status();
for (auto& [label, data] : counts) {
if (data.numThreads > 0) {
auto continued = std::chrono::duration_cast<std::chrono::milliseconds>(
now - data.startTime)
.count();
out << label << ": numThreads=" << data.numThreads
<< " numEnters=" << data.numEnters
<< " avgMs=" << (data.totalMs / data.numEnters)
<< " maxMs=" << data.maxMs << " continued=" << continued << std::endl;
}
});
}
return out.str();
}

// static
std::unordered_map<std::string, TraceData> TraceContext::status() {
return traceMap().withRLock([&](auto& map) { return map; });
folly::F14FastMap<std::string, TraceData> TraceContext::status() {
folly::F14FastMap<std::string, TraceData> total;
registry->forAllValues([&](auto& counts) {
for (auto& [k, v] : counts) {
auto& sofar = total[k];
if (sofar.numEnters == 0) {
sofar.startTime = v.startTime;
} else if (v.numEnters > 0) {
sofar.startTime = std::min(sofar.startTime, v.startTime);
}
sofar.numThreads += v.numThreads;
sofar.numEnters += v.numEnters;
sofar.totalMs += v.totalMs;
sofar.maxMs = std::max(sofar.maxMs, v.maxMs);
}
});
return total;
}

} // namespace facebook::velox::process
17 changes: 15 additions & 2 deletions velox/common/process/TraceContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

#pragma once

#include "velox/common/process/ThreadLocalRegistry.h"

#include <mutex>
#include <string>
#include <unordered_map>

#include <folly/Synchronized.h>
#include <folly/container/F14Map.h>

namespace facebook::velox::process {

Expand All @@ -47,6 +49,8 @@ struct TraceData {
// produces a concise report of what the system is doing at any one
// time. This is good for diagnosing crashes or hangs which are
// difficult to figure out from stacks in a core dump.
//
// NOTE: TraceContext is not sharable between different threads.
class TraceContext {
public:
// Starts a trace context. isTemporary is false if this is a generic
Expand All @@ -56,19 +60,28 @@ class TraceContext {
// which the record should be dropped once the last thread finishes.
explicit TraceContext(std::string label, bool isTemporary = false);

TraceContext(const TraceContext&) = delete;
TraceContext& operator=(const TraceContext&) = delete;

~TraceContext();

// Produces a human readable report of all TraceContexts in existence at the
// time.
static std::string statusLine();

// Returns a copy of the trace status.
static std::unordered_map<std::string, TraceData> status();
static folly::F14FastMap<std::string, TraceData> status();

// Implementation detail type. Made public to be available with
// std::make_shared. Do not use outside this class.
using Registry =
ThreadLocalRegistry<folly::F14FastMap<std::string, TraceData>>;

private:
const std::string label_;
const std::chrono::steady_clock::time_point enterTime_;
const bool isTemporary_;
std::shared_ptr<Registry::Reference> traceData_;
};

} // namespace facebook::velox::process
56 changes: 56 additions & 0 deletions velox/common/process/TraceHistory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/process/TraceHistory.h"

#include <folly/system/ThreadId.h>

#include <algorithm>

namespace facebook::velox::process {

namespace {
auto registry = std::make_shared<ThreadLocalRegistry<TraceHistory>>();
}

namespace detail {
thread_local ThreadLocalRegistry<TraceHistory>::Reference traceHistory(
registry);
}

TraceHistory::TraceHistory()
: threadId_(std::this_thread::get_id()), osTid_(folly::getOSThreadID()) {}

std::vector<TraceHistory::EntriesWithThreadInfo> TraceHistory::listAll() {
std::vector<EntriesWithThreadInfo> results;
registry->forAllValues([&](auto& history) {
EntriesWithThreadInfo result;
result.threadId = history.threadId_;
result.osTid = history.osTid_;
for (int i = 0; i < kCapacity; ++i) {
const int j = (history.index_ + kCapacity - 1 - i) % kCapacity;
if (!populated(history.data_[j])) {
break;
}
result.entries.push_back(history.data_[j]);
}
std::reverse(result.entries.begin(), result.entries.end());
results.push_back(std::move(result));
});
return results;
}

} // namespace facebook::velox::process
Loading

0 comments on commit 5f57033

Please sign in to comment.