diff --git a/velox/common/caching/CMakeLists.txt b/velox/common/caching/CMakeLists.txt index 1fa409b965e9..6aa3ee11f735 100644 --- a/velox/common/caching/CMakeLists.txt +++ b/velox/common/caching/CMakeLists.txt @@ -28,6 +28,7 @@ target_link_libraries( velox_exception velox_file velox_memory + velox_process Folly::folly fmt::fmt gflags::gflags diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index b246e198bbe4..bcb0c6b01223 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -15,12 +15,14 @@ */ #include "velox/common/caching/SsdFile.h" + #include #include #include "velox/common/base/AsyncSource.h" #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" +#include "velox/common/process/TraceContext.h" #include #ifdef linux @@ -128,6 +130,7 @@ SsdFile::SsdFile( shardId_(shardId), checkpointIntervalBytes_(checkpointIntervalBytes), executor_(executor) { + process::TraceContext trace("SsdFile::SsdFile"); int32_t oDirect = 0; #ifdef linux oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0; @@ -266,6 +269,7 @@ CoalesceIoStats SsdFile::load( void SsdFile::read( uint64_t offset, const std::vector>& buffers) { + process::TraceContext trace("SsdFile::read"); readFile_->preadv(offset, buffers); } @@ -307,6 +311,7 @@ std::optional> SsdFile::getSpace( } bool SsdFile::growOrEvictLocked() { + process::TraceContext trace("SsdFile::growOrEvictLocked"); if (numRegions_ < maxRegions_) { const auto newSize = (numRegions_ + 1) * kRegionSize; const auto rc = ::ftruncate(fd_, newSize); @@ -360,6 +365,7 @@ void SsdFile::clearRegionEntriesLocked(const std::vector& regions) { } void SsdFile::write(std::vector& pins) { + process::TraceContext trace("SsdFile::write"); // Sorts the pins by their file/offset. In this way what is adjacent in // storage is likely adjacent on SSD. std::sort(pins.begin(), pins.end()); @@ -444,6 +450,7 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) { } // namespace void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) { + process::TraceContext trace("SsdFile::verifyWrite"); auto testData = std::make_unique(entry.size()); const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset()); VELOX_CHECK_EQ(rc, entry.size()); @@ -512,6 +519,7 @@ void SsdFile::clear() { } void SsdFile::deleteFile() { + process::TraceContext trace("SsdFile::deleteFile"); if (fd_) { close(fd_); fd_ = 0; @@ -651,6 +659,7 @@ inline const char* asChar(const T* ptr) { } // namespace void SsdFile::checkpoint(bool force) { + process::TraceContext trace("SsdFile::checkpoint"); std::lock_guard l(mutex_); if (!force && (bytesAfterCheckpoint_ < checkpointIntervalBytes_)) { return; diff --git a/velox/common/process/CMakeLists.txt b/velox/common/process/CMakeLists.txt index 22182ed58f12..af0bedd5ce4f 100644 --- a/velox/common/process/CMakeLists.txt +++ b/velox/common/process/CMakeLists.txt @@ -13,7 +13,7 @@ # limitations under the License. add_library(velox_process ProcessBase.cpp StackTrace.cpp ThreadDebugInfo.cpp - TraceContext.cpp) + TraceContext.cpp TraceHistory.cpp) target_link_libraries( velox_process diff --git a/velox/common/process/TraceContext.cpp b/velox/common/process/TraceContext.cpp index cad158f48ee7..b0ee5b724097 100644 --- a/velox/common/process/TraceContext.cpp +++ b/velox/common/process/TraceContext.cpp @@ -16,23 +16,34 @@ #include "velox/common/process/TraceContext.h" +#include "velox/common/process/TraceHistory.h" + #include namespace facebook::velox::process { namespace { -folly::Synchronized>& traceMap() { - static folly::Synchronized> - staticTraceMap; - return staticTraceMap; -} + +// We use thread local instead lock here since the critical path is on write +// side. +auto registry = std::make_shared(); +thread_local auto threadLocalTraceData = + std::make_shared(registry); + } // namespace TraceContext::TraceContext(std::string label, bool isTemporary) : label_(std::move(label)), enterTime_(std::chrono::steady_clock::now()), - isTemporary_(isTemporary) { - traceMap().withWLock([&](auto& counts) { + isTemporary_(isTemporary), + traceData_(threadLocalTraceData) { + TraceHistory::push([&](auto& entry) { + entry.time = enterTime_; + entry.file = __FILE__; + entry.line = __LINE__; + snprintf(entry.label, entry.kLabelCapacity, "%s", label_.c_str()); + }); + traceData_->withValue([&](auto& counts) { auto& data = counts[label_]; ++data.numThreads; if (data.numThreads == 1) { @@ -43,17 +54,18 @@ TraceContext::TraceContext(std::string label, bool isTemporary) } TraceContext::~TraceContext() { - traceMap().withWLock([&](auto& counts) { - auto& data = counts[label_]; - --data.numThreads; + traceData_->withValue([&](auto& counts) { + auto it = counts.find(label_); + auto& data = it->second; + if (--data.numThreads == 0 && isTemporary_) { + counts.erase(it); + return; + } auto ms = std::chrono::duration_cast( std::chrono::steady_clock::now() - enterTime_) .count(); data.totalMs += ms; data.maxMs = std::max(data.maxMs, ms); - if (!data.numThreads && isTemporary_) { - counts.erase(label_); - } }); } @@ -61,27 +73,39 @@ TraceContext::~TraceContext() { std::string TraceContext::statusLine() { std::stringstream out; auto now = std::chrono::steady_clock::now(); - traceMap().withRLock([&](auto& counts) { - for (auto& pair : counts) { - if (pair.second.numThreads) { - auto continued = std::chrono::duration_cast( - now - pair.second.startTime) - .count(); - - out << pair.first << "=" << pair.second.numThreads << " entered " - << pair.second.numEnters << " avg ms " - << (pair.second.totalMs / pair.second.numEnters) << " max ms " - << pair.second.maxMs << " continuous for " << continued - << std::endl; - } + auto counts = status(); + for (auto& [label, data] : counts) { + if (data.numThreads > 0) { + auto continued = std::chrono::duration_cast( + now - data.startTime) + .count(); + out << label << ": numThreads=" << data.numThreads + << " numEnters=" << data.numEnters + << " avgMs=" << (data.totalMs / data.numEnters) + << " maxMs=" << data.maxMs << " continued=" << continued << std::endl; } - }); + } return out.str(); } // static -std::unordered_map TraceContext::status() { - return traceMap().withRLock([&](auto& map) { return map; }); +folly::F14FastMap TraceContext::status() { + folly::F14FastMap total; + registry->forAllValues([&](auto& counts) { + for (auto& [k, v] : counts) { + auto& sofar = total[k]; + if (sofar.numEnters == 0) { + sofar.startTime = v.startTime; + } else if (v.numEnters > 0) { + sofar.startTime = std::min(sofar.startTime, v.startTime); + } + sofar.numThreads += v.numThreads; + sofar.numEnters += v.numEnters; + sofar.totalMs += v.totalMs; + sofar.maxMs = std::max(sofar.maxMs, v.maxMs); + } + }); + return total; } } // namespace facebook::velox::process diff --git a/velox/common/process/TraceContext.h b/velox/common/process/TraceContext.h index c3d3a18be142..6e718515b58d 100644 --- a/velox/common/process/TraceContext.h +++ b/velox/common/process/TraceContext.h @@ -16,11 +16,13 @@ #pragma once +#include "velox/common/process/ThreadLocalRegistry.h" + #include #include #include -#include +#include namespace facebook::velox::process { @@ -47,6 +49,8 @@ struct TraceData { // produces a concise report of what the system is doing at any one // time. This is good for diagnosing crashes or hangs which are // difficult to figure out from stacks in a core dump. +// +// NOTE: TraceContext is not sharable between different threads. class TraceContext { public: // Starts a trace context. isTemporary is false if this is a generic @@ -56,6 +60,9 @@ class TraceContext { // which the record should be dropped once the last thread finishes. explicit TraceContext(std::string label, bool isTemporary = false); + TraceContext(const TraceContext&) = delete; + TraceContext& operator=(const TraceContext&) = delete; + ~TraceContext(); // Produces a human readable report of all TraceContexts in existence at the @@ -63,12 +70,18 @@ class TraceContext { static std::string statusLine(); // Returns a copy of the trace status. - static std::unordered_map status(); + static folly::F14FastMap status(); + + // Implementation detail type. Made public to be available with + // std::make_shared. Do not use outside this class. + using Registry = + ThreadLocalRegistry>; private: const std::string label_; const std::chrono::steady_clock::time_point enterTime_; const bool isTemporary_; + std::shared_ptr traceData_; }; } // namespace facebook::velox::process diff --git a/velox/common/process/TraceHistory.cpp b/velox/common/process/TraceHistory.cpp new file mode 100644 index 000000000000..bf7524590802 --- /dev/null +++ b/velox/common/process/TraceHistory.cpp @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/process/TraceHistory.h" + +#include + +#include + +namespace facebook::velox::process { + +namespace { +auto registry = std::make_shared>(); +} + +namespace detail { +thread_local ThreadLocalRegistry::Reference traceHistory( + registry); +} + +TraceHistory::TraceHistory() + : threadId_(std::this_thread::get_id()), osTid_(folly::getOSThreadID()) {} + +std::vector TraceHistory::listAll() { + std::vector results; + registry->forAllValues([&](auto& history) { + EntriesWithThreadInfo result; + result.threadId = history.threadId_; + result.osTid = history.osTid_; + for (int i = 0; i < kCapacity; ++i) { + const int j = (history.index_ + kCapacity - 1 - i) % kCapacity; + if (!populated(history.data_[j])) { + break; + } + result.entries.push_back(history.data_[j]); + } + std::reverse(result.entries.begin(), result.entries.end()); + results.push_back(std::move(result)); + }); + return results; +} + +} // namespace facebook::velox::process diff --git a/velox/common/process/TraceHistory.h b/velox/common/process/TraceHistory.h new file mode 100644 index 000000000000..bcee2cec69d7 --- /dev/null +++ b/velox/common/process/TraceHistory.h @@ -0,0 +1,105 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/process/ThreadLocalRegistry.h" + +#include +#include +#include +#include +#include + +/// Push an entry to the history ring buffer with a label from format string +/// (same as printf) and optional arguments. +#define VELOX_TRACE_HISTORY_PUSH(_format, ...) \ + ::facebook::velox::process::TraceHistory::push([&](auto& entry) { \ + entry.time = ::std::chrono::steady_clock::now(); \ + entry.file = __FILE__; \ + entry.line = __LINE__; \ + ::snprintf(entry.label, entry.kLabelCapacity, _format, ##__VA_ARGS__); \ + }) + +namespace facebook::velox::process { + +class TraceHistory; + +namespace detail { +extern thread_local ThreadLocalRegistry::Reference traceHistory; +} + +/// Keep list of labels in a ring buffer that is fixed sized and thread local. +class TraceHistory { + public: + TraceHistory(); + + /// An entry with tracing information and custom label. + struct Entry { + std::chrono::steady_clock::time_point time; + const char* file; + int32_t line; + + static constexpr int kLabelCapacity = + 64 - sizeof(time) - sizeof(file) - sizeof(line); + char label[kLabelCapacity]; + }; + + /// NOTE: usually VELOX_TRACE_HISTORY_PUSH should be used instead of calling + /// this function directly. + /// + /// Add a new entry to the thread local instance. If there are more than + /// `kCapacity' entries, overwrite the oldest ones. All the mutation on the + /// new entry should be done in the functor `init'. + template + static void push(F&& init) { + detail::traceHistory.withValue( + [init = std::forward(init)](auto& history) { + auto& entry = history.data_[history.index_]; + init(entry); + assert(populated(entry)); + history.index_ = (history.index_ + 1) % kCapacity; + }); + } + + /// All entries in a specific thread. + struct EntriesWithThreadInfo { + std::thread::id threadId; + uint64_t osTid; + std::vector entries; + }; + + /// List all entries from all threads. + static std::vector listAll(); + + /// Keep the last `kCapacity' entries per thread. Must be a power of 2. + static constexpr int kCapacity = 16; + + private: + static_assert((kCapacity & (kCapacity - 1)) == 0); + static_assert(sizeof(Entry) == 64); + + static bool populated(const Entry& entry) { + return entry.file != nullptr; + } + + alignas(64) Entry data_[kCapacity]{}; + const std::thread::id threadId_; + const uint64_t osTid_; + int index_ = 0; +}; + +} // namespace facebook::velox::process diff --git a/velox/common/process/tests/CMakeLists.txt b/velox/common/process/tests/CMakeLists.txt index 836e397466a2..2fce354e31ec 100644 --- a/velox/common/process/tests/CMakeLists.txt +++ b/velox/common/process/tests/CMakeLists.txt @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_process_test TraceContextTest.cpp - ThreadLocalRegistryTest.cpp) +add_executable( + velox_process_test TraceContextTest.cpp ThreadLocalRegistryTest.cpp + TraceHistoryTest.cpp) add_test(velox_process_test velox_process_test) diff --git a/velox/common/process/tests/TraceContextTest.cpp b/velox/common/process/tests/TraceContextTest.cpp index cfa021432a8a..130055e568fa 100644 --- a/velox/common/process/tests/TraceContextTest.cpp +++ b/velox/common/process/tests/TraceContextTest.cpp @@ -15,33 +15,125 @@ */ #include "velox/common/process/TraceContext.h" +#include "velox/common/process/TraceHistory.h" + #include +#include +#include +#include #include + #include -using namespace facebook::velox::process; +namespace facebook::velox::process { +namespace { + +class TraceContextTest : public testing::Test { + public: + void SetUp() override { + ASSERT_TRUE(TraceContext::status().empty()); + } -TEST(TraceContextTest, basic) { - constexpr int32_t kNumThreads = 10; + void TearDown() override { + ASSERT_TRUE(TraceContext::status().empty()); + } +}; + +TEST_F(TraceContextTest, basic) { + constexpr int kNumThreads = 3; std::vector threads; + folly::Baton<> batons[2][kNumThreads]; + folly::Latch latches[2] = { + folly::Latch(kNumThreads), + folly::Latch(kNumThreads), + }; threads.reserve(kNumThreads); - for (int32_t i = 0; i < kNumThreads; ++i) { - threads.push_back(std::thread([i]() { - TraceContext trace1("process data"); - TraceContext trace2(fmt::format("Process chunk {}", i), true); - std::this_thread::sleep_for(std::chrono::milliseconds(3)); - })); + for (int i = 0; i < kNumThreads; ++i) { + threads.emplace_back([&, i]() { + { + TraceContext trace1("process data"); + TraceContext trace2(fmt::format("Process chunk {}", i), true); + latches[0].count_down(); + batons[0][i].wait(); + } + latches[1].count_down(); + batons[1][i].wait(); + }); + } + latches[0].wait(); + auto status = TraceContext::status(); + ASSERT_EQ(1 + kNumThreads, status.size()); + ASSERT_EQ(kNumThreads, status.at("process data").numThreads); + for (int i = 0; i < kNumThreads; ++i) { + ASSERT_EQ(1, status.at(fmt::format("Process chunk {}", i)).numThreads); } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - LOG(INFO) << TraceContext::statusLine(); - for (auto& thread : threads) { - thread.join(); + for (int i = 0; i < kNumThreads; ++i) { + batons[0][i].post(); } - LOG(INFO) << TraceContext::statusLine(); - // We expect one entry for "process data". The temporary entries - // are deleted after the treads complete. - auto after = TraceContext::status(); - EXPECT_EQ(1, after.size()); - EXPECT_EQ(kNumThreads, after["process data"].numEnters); - EXPECT_EQ(0, after["process data"].numThreads); + latches[1].wait(); + status = TraceContext::status(); + ASSERT_EQ(1, status.size()); + ASSERT_EQ(0, status.at("process data").numThreads); + ASSERT_EQ(kNumThreads, status.at("process data").numEnters); + for (int i = 0; i < kNumThreads; ++i) { + batons[1][i].post(); + threads[i].join(); + } +} + +TEST_F(TraceContextTest, traceHistory) { + std::thread([] { + TraceContext trace("test"); + TraceContext trace2( + std::string(TraceHistory::Entry::kLabelCapacity + 10, 'x')); + auto results = TraceHistory::listAll(); + ASSERT_EQ(results.size(), 1); + ASSERT_EQ(results[0].entries.size(), 2); + ASSERT_STREQ(results[0].entries[0].label, "test"); + ASSERT_EQ( + results[0].entries[1].label, + std::string(TraceHistory::Entry::kLabelCapacity - 1, 'x')); + }).join(); } + +TEST_F(TraceContextTest, transferBetweenThreads) { + auto [promise, future] = + folly::makePromiseContract>(); + folly::Baton<> batons[2]; + std::chrono::steady_clock::time_point timeLow, timeHigh; + std::thread receiver([&, future = std::move(future)]() mutable { + auto trace = std::move(future).get(std::chrono::seconds(1)); + { + SCOPE_EXIT { + batons[0].post(); + }; + auto status = TraceContext::status(); + ASSERT_EQ(1, status.size()); + auto& data = status.at("test"); + ASSERT_EQ(data.numThreads, 1); + ASSERT_EQ(data.numEnters, 1); + ASSERT_LE(timeLow, data.startTime); + ASSERT_LE(data.startTime, timeHigh); + } + batons[1].wait(); + auto status = TraceContext::status(); + ASSERT_EQ(1, status.size()); + auto& data = status.at("test"); + ASSERT_EQ(data.numThreads, 1); + ASSERT_EQ(data.numEnters, 1); + ASSERT_LE(timeLow, data.startTime); + ASSERT_LE(data.startTime, timeHigh); + }); + timeLow = std::chrono::steady_clock::now(); + std::thread([&, promise = std::move(promise)]() mutable { + auto trace = std::make_unique("test"); + timeHigh = std::chrono::steady_clock::now(); + promise.setValue(std::move(trace)); + batons[0].wait(); + }).join(); + batons[1].post(); + receiver.join(); +} + +} // namespace +} // namespace facebook::velox::process diff --git a/velox/common/process/tests/TraceHistoryTest.cpp b/velox/common/process/tests/TraceHistoryTest.cpp new file mode 100644 index 000000000000..754fe6f389c3 --- /dev/null +++ b/velox/common/process/tests/TraceHistoryTest.cpp @@ -0,0 +1,127 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/process/TraceHistory.h" + +#include +#include +#include +#include + +namespace facebook::velox::process { +namespace { + +class TraceHistoryTest : public testing::Test { + public: + void SetUp() override { + ASSERT_TRUE(TraceHistory::listAll().empty()); + } + + void TearDown() override { + ASSERT_TRUE(TraceHistory::listAll().empty()); + } +}; + +TEST_F(TraceHistoryTest, basic) { + std::thread([] { + auto timeLow = std::chrono::steady_clock::now(); + constexpr int kStartLine = __LINE__; + for (int i = 0; i < TraceHistory::kCapacity + 10; ++i) { + VELOX_TRACE_HISTORY_PUSH("Test %d", i); + } + auto timeHigh = std::chrono::steady_clock::now(); + auto results = TraceHistory::listAll(); + ASSERT_EQ(results.size(), 1); + ASSERT_EQ(results[0].threadId, std::this_thread::get_id()); + ASSERT_EQ(results[0].osTid, folly::getOSThreadID()); + ASSERT_EQ(results[0].entries.size(), TraceHistory::kCapacity); + auto lastTime = timeLow; + for (int i = 0; i < TraceHistory::kCapacity; ++i) { + auto& entry = results[0].entries[i]; + ASSERT_EQ(entry.line, kStartLine + 2); + ASSERT_STREQ( + entry.file + strlen(entry.file) - 20, "TraceHistoryTest.cpp"); + ASSERT_LE(lastTime, entry.time); + lastTime = entry.time; + ASSERT_EQ(strncmp(entry.label, "Test ", 5), 0); + ASSERT_EQ(atoi(entry.label + 5), i + 10); + } + ASSERT_LE(lastTime, timeHigh); + }).join(); +} + +TEST_F(TraceHistoryTest, multiThread) { + constexpr int kNumThreads = 3; + folly::Latch latch(kNumThreads); + folly::Baton<> batons[kNumThreads]; + std::vector threads; + auto timeLow = std::chrono::steady_clock::now(); + constexpr int kStartLine = __LINE__; + for (int i = 0; i < kNumThreads; ++i) { + threads.emplace_back([&, i] { + VELOX_TRACE_HISTORY_PUSH("Test"); + VELOX_TRACE_HISTORY_PUSH("Test %d", i); + latch.count_down(); + batons[i].wait(); + }); + } + latch.wait(); + auto timeHigh = std::chrono::steady_clock::now(); + auto results = TraceHistory::listAll(); + ASSERT_EQ(results.size(), kNumThreads); + for (auto& result : results) { + auto threadIndex = + std::find_if( + threads.begin(), + threads.end(), + [&](auto& t) { return t.get_id() == result.threadId; }) - + threads.begin(); + ASSERT_EQ(result.entries.size(), 2); + ASSERT_EQ(result.entries[0].line, kStartLine + 3); + ASSERT_EQ(result.entries[1].line, kStartLine + 4); + ASSERT_STREQ(result.entries[0].label, "Test"); + ASSERT_EQ(result.entries[1].label, fmt::format("Test {}", threadIndex)); + for (auto& entry : result.entries) { + ASSERT_LE(timeLow, entry.time); + ASSERT_LE(entry.time, timeHigh); + ASSERT_TRUE(entry.file); + ASSERT_STREQ( + entry.file + strlen(entry.file) - 20, "TraceHistoryTest.cpp"); + } + } + for (int i = 0; i < kNumThreads; ++i) { + ASSERT_EQ(TraceHistory::listAll().size(), kNumThreads - i); + batons[i].post(); + threads[i].join(); + } +} + +TEST_F(TraceHistoryTest, largeLabel) { + std::thread([] { + VELOX_TRACE_HISTORY_PUSH( + "%s", + std::string(TraceHistory::Entry::kLabelCapacity + 10, 'x').c_str()); + auto results = TraceHistory::listAll(); + ASSERT_EQ(results.size(), 1); + ASSERT_EQ(results[0].entries.size(), 1); + ASSERT_EQ( + results[0].entries[0].label, + std::string(TraceHistory::Entry::kLabelCapacity - 1, 'x')); + }).join(); +} + +} // namespace +} // namespace facebook::velox::process diff --git a/velox/dwio/common/ColumnLoader.cpp b/velox/dwio/common/ColumnLoader.cpp index c04cb74db271..4db934c35bdd 100644 --- a/velox/dwio/common/ColumnLoader.cpp +++ b/velox/dwio/common/ColumnLoader.cpp @@ -16,6 +16,8 @@ #include "velox/dwio/common/ColumnLoader.h" +#include "velox/common/process/TraceContext.h" + namespace facebook::velox::dwio::common { // Wraps '*result' in a dictionary to make the contiguous values @@ -45,6 +47,7 @@ void ColumnLoader::loadInternal( ValueHook* hook, vector_size_t resultSize, VectorPtr* result) { + process::TraceContext trace("ColumnLoader::loadInternal"); VELOX_CHECK_EQ( version_, structReader_->numReads(), diff --git a/velox/dwio/common/SelectiveColumnReader.cpp b/velox/dwio/common/SelectiveColumnReader.cpp index f2c157ff9c7e..25aff8eb42c3 100644 --- a/velox/dwio/common/SelectiveColumnReader.cpp +++ b/velox/dwio/common/SelectiveColumnReader.cpp @@ -66,6 +66,7 @@ const std::vector& SelectiveColumnReader::children() } void SelectiveColumnReader::seekTo(vector_size_t offset, bool readsNullsOnly) { + VELOX_TRACE_HISTORY_PUSH("seekTo %d %d", offset, readsNullsOnly); if (offset == readOffset_) { return; } diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index 08740a139914..4b26b0d4d60c 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -18,6 +18,7 @@ #include "velox/common/base/RawVector.h" #include "velox/common/memory/Memory.h" #include "velox/common/process/ProcessBase.h" +#include "velox/common/process/TraceHistory.h" #include "velox/dwio/common/ColumnSelector.h" #include "velox/dwio/common/FormatData.h" #include "velox/dwio/common/IntDecoder.h" @@ -189,7 +190,8 @@ class SelectiveColumnReader { // group. Interpretation of 'index' depends on format. Clears counts // of skipped enclosing struct nulls for formats where nulls are // recorded at each nesting level, i.e. not rep-def. - virtual void seekToRowGroup(uint32_t /*index*/) { + virtual void seekToRowGroup(uint32_t index) { + VELOX_TRACE_HISTORY_PUSH("seekToRowGroup %u", index); numParentNulls_ = 0; parentNullsRecordedTo_ = 0; } diff --git a/velox/dwio/common/SelectiveStructColumnReader.cpp b/velox/dwio/common/SelectiveStructColumnReader.cpp index 30e6e748fc1b..1f07f73351e3 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.cpp +++ b/velox/dwio/common/SelectiveStructColumnReader.cpp @@ -16,6 +16,7 @@ #include "velox/dwio/common/SelectiveStructColumnReader.h" +#include "velox/common/process/TraceContext.h" #include "velox/dwio/common/ColumnLoader.h" namespace facebook::velox::dwio::common { @@ -56,6 +57,7 @@ void SelectiveStructColumnReaderBase::next( uint64_t numValues, VectorPtr& result, const Mutation* mutation) { + process::TraceContext trace("SelectiveStructColumnReaderBase::next"); if (children_.empty()) { if (mutation && mutation->deletedRows) { numValues -= bits::countBits(mutation->deletedRows, 0, numValues); @@ -136,6 +138,7 @@ void SelectiveStructColumnReaderBase::read( VELOX_CHECK(!childSpecs.empty()); for (size_t i = 0; i < childSpecs.size(); ++i) { auto& childSpec = childSpecs[i]; + VELOX_TRACE_HISTORY_PUSH("read %s", childSpec->fieldName().c_str()); if (isChildConstant(*childSpec)) { continue; } @@ -339,6 +342,7 @@ void SelectiveStructColumnReaderBase::getValues( } bool lazyPrepared = false; for (auto& childSpec : scanSpec_->children()) { + VELOX_TRACE_HISTORY_PUSH("getValues %s", childSpec->fieldName().c_str()); if (!childSpec->projectOut()) { continue; } diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index 8cca5531e56d..4c98bcd3441d 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -18,6 +18,7 @@ #include +#include "velox/common/process/TraceContext.h" #include "velox/dwio/common/exception/Exception.h" namespace facebook::velox::dwrf { @@ -100,6 +101,7 @@ ReaderBase::ReaderBase( footerEstimatedSize_(footerEstimatedSize), filePreloadThreshold_(filePreloadThreshold), input_(std::move(input)) { + process::TraceContext trace("ReaderBase::ReaderBase"); // read last bytes into buffer to get PostScript // If file is small, load the entire file. // TODO: make a config diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 1e0bbd6ffb3d..ce89fe1d60dd 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -20,6 +20,7 @@ #include "velox/common/base/Portability.h" #include "velox/common/base/SimdUtil.h" #include "velox/common/process/ProcessBase.h" +#include "velox/common/process/TraceContext.h" #include "velox/common/testutil/TestValue.h" #include "velox/exec/OperatorUtils.h" #include "velox/vector/VectorTypeUtils.h" @@ -857,6 +858,7 @@ bool HashTable::canApplyParallelJoinBuild() const { template void HashTable::parallelJoinBuild() { + process::TraceContext trace("HashTable::parallelJoinBuild"); TestValue::adjust( "facebook::velox::exec::HashTable::parallelJoinBuild", rows_->pool()); VELOX_CHECK_LE(1 + otherTables_.size(), std::numeric_limits::max());