Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/acvictor/velox into acvicto…
Browse files Browse the repository at this point in the history
…r/unixSeconds
  • Loading branch information
acvictor committed May 1, 2024
2 parents 2ce5c05 + 0902f1d commit 669e70a
Show file tree
Hide file tree
Showing 64 changed files with 4,025 additions and 361 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linux-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
- name: Make Release Build
env:
MAKEFLAGS: 'NUM_THREADS=8 MAX_HIGH_MEM_JOBS=4 MAX_LINK_JOBS=4'
CUDA_ARCHITECTURES: 60
CUDA_ARCHITECTURES: 70
CUDA_COMPILER: /usr/local/cuda-${CUDA_VERSION}/bin/nvcc
# Without that, nvcc picks /usr/bin/c++ which is GCC 8
CUDA_FLAGS: "-ccbin /opt/rh/gcc-toolset-9/root/usr/bin"
Expand Down
29 changes: 19 additions & 10 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ SsdFile::SsdFile(
folly::Executor* executor)
: fileName_(filename),
maxRegions_(maxRegions),
disableFileCow_(disableFileCow),
shardId_(shardId),
checkpointIntervalBytes_(checkpointIntervalBytes),
executor_(executor) {
Expand All @@ -155,7 +156,7 @@ SsdFile::SsdFile(
filename,
folly::errnoStr(errno));

if (disableFileCow) {
if (disableFileCow_) {
disableCow(fd_);
}

Expand Down Expand Up @@ -711,15 +712,6 @@ void SsdFile::checkpoint(bool force) {
checkpointDeleted_ = false;
bytesAfterCheckpoint_ = 0;
try {
// We schedule the potentially long fsync of the cache file on another
// thread of the cache write executor, if available. If there is none, we do
// the sync on this thread at the end.
auto fileSync = std::make_shared<AsyncSource<int>>(
[fd = fd_]() { return std::make_unique<int>(::fsync(fd)); });
if (executor_ != nullptr) {
executor_->add([fileSync]() { fileSync->prepare(); });
}

const auto checkRc = [&](int32_t rc, const std::string& errMsg) {
if (rc < 0) {
VELOX_FAIL("{} with rc {} :{}", errMsg, rc, folly::errnoStr(errno));
Expand Down Expand Up @@ -769,6 +761,15 @@ void SsdFile::checkpoint(bool force) {
state.write(asChar(&offsetAndSize), sizeof(offsetAndSize));
}

// We schedule the potentially long fsync of the cache file on another
// thread of the cache write executor, if available. If there is none, we do
// the sync on this thread at the end.
auto fileSync = std::make_shared<AsyncSource<int>>(
[fd = fd_]() { return std::make_unique<int>(::fsync(fd)); });
if (executor_ != nullptr) {
executor_->add([fileSync]() { fileSync->prepare(); });
}

// NOTE: we need to ensure cache file data sync update completes before
// updating checkpoint file.
const auto fileSyncRc = fileSync->move();
Expand All @@ -790,6 +791,11 @@ void SsdFile::checkpoint(bool force) {
const auto checkpointFd = checkRc(
::open(checkpointPath.c_str(), O_WRONLY),
"Open of checkpoint file for sync");
// TODO: add this as file open option after we migrate to use velox
// filesystem for ssd file access.
if (disableFileCow_) {
disableCow(checkpointFd);
}
VELOX_CHECK_GE(checkpointFd, 0);
checkRc(::fsync(checkpointFd), "Sync of checkpoint file");
::close(checkpointFd);
Expand Down Expand Up @@ -822,6 +828,9 @@ void SsdFile::initializeCheckpoint() {
}
const auto logPath = fileName_ + kLogExtension;
evictLogFd_ = ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (disableFileCow_) {
disableCow(evictLogFd_);
}
if (evictLogFd_ < 0) {
++stats_.openLogErrors;
// Failure to open the log at startup is a process terminating error.
Expand Down
3 changes: 3 additions & 0 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ class SsdFile {
// Maximum size of the backing file in kRegionSize units.
const int32_t maxRegions_;

// True if copy on write should be disabled.
const bool disableFileCow_;

// Serializes access to all private data members.
mutable std::shared_mutex mutex_;

Expand Down
12 changes: 8 additions & 4 deletions velox/common/memory/MemoryAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,18 @@ std::string Stats::toString() const {
std::stringstream out;
int64_t totalClocks = 0;
int64_t totalBytes = 0;
int64_t totalAllocations = 0;
for (auto i = 0; i < sizes.size(); ++i) {
totalClocks += sizes[i].clocks();
totalBytes += sizes[i].totalBytes;
totalAllocations += sizes[i].numAllocations;
}
out << fmt::format(
"Alloc: {}MB {} Gigaclocks, {}MB advised\n",
"Alloc: {}MB {} Gigaclocks {} Allocations, {}MB advised\n",
totalBytes >> 20,
totalClocks >> 30,
numAdvise >> 8);
numAdvise >> 8,
totalAllocations);

// Sort the size classes by decreasing clocks.
std::vector<int32_t> indices(sizes.size());
Expand All @@ -386,10 +389,11 @@ std::string Stats::toString() const {
break;
}
out << fmt::format(
"Size {}K: {}MB {} Megaclocks\n",
"Size {}K: {}MB {} Megaclocks {} Allocations\n",
sizes[i].size * 4,
sizes[i].totalBytes >> 20,
sizes[i].clocks() >> 20);
sizes[i].clocks() >> 20,
sizes[i].numAllocations);
}
return out.str();
}
Expand Down
32 changes: 32 additions & 0 deletions velox/common/memory/tests/MemoryAllocatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,10 +632,42 @@ TEST_P(MemoryAllocatorTest, allocationClass2) {
allocation->clear();
}

TEST_P(MemoryAllocatorTest, stats) {
const std::vector<MachinePageCount>& sizes = instance_->sizeClasses();
MachinePageCount capacity = kCapacityPages;
for (auto i = 0; i < sizes.size(); ++i) {
std::unique_ptr<Allocation> allocation = std::make_unique<Allocation>();
auto size = sizes[i];
ASSERT_TRUE(allocate(size, *allocation));
ASSERT_GT(instance_->numAllocated(), 0);
instance_->freeNonContiguous(*allocation);
auto stats = instance_->stats();
ASSERT_EQ(0, stats.sizes[i].clocks());
ASSERT_EQ(stats.sizes[i].totalBytes, 0);
ASSERT_EQ(stats.sizes[i].numAllocations, 0);
}

gflags::FlagSaver flagSaver;
FLAGS_velox_time_allocations = true;
for (auto i = 0; i < sizes.size(); ++i) {
std::unique_ptr<Allocation> allocation = std::make_unique<Allocation>();
auto size = sizes[i];
ASSERT_TRUE(allocate(size, *allocation));
ASSERT_GT(instance_->numAllocated(), 0);
instance_->freeNonContiguous(*allocation);
auto stats = instance_->stats();
ASSERT_LT(0, stats.sizes[i].clocks());
ASSERT_GE(stats.sizes[i].totalBytes, size * AllocationTraits::kPageSize);
ASSERT_GE(stats.sizes[i].numAllocations, 1);
}
}

TEST_P(MemoryAllocatorTest, singleAllocation) {
if (!useMmap_ && enableReservation_) {
return;
}
gflags::FlagSaver flagSaver;
FLAGS_velox_time_allocations = true;
const std::vector<MachinePageCount>& sizes = instance_->sizeClasses();
MachinePageCount capacity = kCapacityPages;
for (auto i = 0; i < sizes.size(); ++i) {
Expand Down
19 changes: 17 additions & 2 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,16 @@ std::unique_ptr<dwio::common::SerDeOptions> parseSerdeParameters(
auto mapKeyIt =
serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim);

auto escapeCharIt =
serdeParameters.find(dwio::common::SerDeOptions::kEscapeChar);

auto nullStringIt = tableParameters.find(
dwio::common::TableParameter::kSerializationNullFormat);

if (fieldIt == serdeParameters.end() &&
collectionIt == serdeParameters.end() &&
mapKeyIt == serdeParameters.end() &&
escapeCharIt == serdeParameters.end() &&
nullStringIt == tableParameters.end()) {
return nullptr;
}
Expand All @@ -458,8 +462,19 @@ std::unique_ptr<dwio::common::SerDeOptions> parseSerdeParameters(
if (mapKeyIt != serdeParameters.end()) {
mapKeyDelim = parseDelimiter(mapKeyIt->second);
}
auto serDeOptions = std::make_unique<dwio::common::SerDeOptions>(
fieldDelim, collectionDelim, mapKeyDelim);

uint8_t escapeChar;
bool hasEscapeChar = false;
if (escapeCharIt != serdeParameters.end() && !escapeCharIt->second.empty()) {
hasEscapeChar = true;
escapeChar = escapeCharIt->second[0];
}

auto serDeOptions = hasEscapeChar
? std::make_unique<dwio::common::SerDeOptions>(
fieldDelim, collectionDelim, mapKeyDelim, escapeChar, true)
: std::make_unique<dwio::common::SerDeOptions>(
fieldDelim, collectionDelim, mapKeyDelim);
if (nullStringIt != tableParameters.end()) {
serDeOptions->nullString = nullStringIt->second;
}
Expand Down
Loading

0 comments on commit 669e70a

Please sign in to comment.