Skip to content

Commit

Permalink
Update all connectors and remove file handle changes
Browse files Browse the repository at this point in the history
  • Loading branch information
acvictor committed Apr 5, 2024
1 parent 7a189e6 commit 147b322
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 48 deletions.
18 changes: 6 additions & 12 deletions velox/connectors/hive/FileHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ std::string groupName(const std::string& filename) {
} // namespace

std::shared_ptr<FileHandle> FileHandleGenerator::operator()(
std::tuple<const std::string, const std::string, const std::string>
params) {
const std::string& filename) {
// We have seen cases where drivers are stuck when creating file handles.
// Adding a trace here to spot this more easily in future.
process::TraceContext trace("FileHandleGenerator::operator()");
Expand All @@ -44,16 +43,11 @@ std::shared_ptr<FileHandle> FileHandleGenerator::operator()(
{
MicrosecondTimer timer(&elapsedTimeUs);
fileHandle = std::make_shared<FileHandle>();
filesystems::FileOptions options;
options.values["fileSize"] = std::get<1>(params);
// Add length and modification time here to create a unique handle?
fileHandle->file =
filesystems::getFileSystem(std::get<0>(params), properties_)
->openFileForRead(std::get<0>(params), options);
fileHandle->uuid = StringIdLease(fileIds(), std::get<0>(params));
fileHandle->groupId =
StringIdLease(fileIds(), groupName(std::get<0>(params)));
VLOG(1) << "Generating file handle for: " << std::get<0>(params)
fileHandle->file = filesystems::getFileSystem(filename, properties_)
->openFileForRead(filename);
fileHandle->uuid = StringIdLease(fileIds(), filename);
fileHandle->groupId = StringIdLease(fileIds(), groupName(filename));
VLOG(1) << "Generating file handle for: " << filename
<< " uuid: " << fileHandle->uuid.id();
}
RECORD_HISTOGRAM_METRIC_VALUE(
Expand Down
6 changes: 2 additions & 4 deletions velox/connectors/hive/FileHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ class FileHandleGenerator {
FileHandleGenerator() {}
FileHandleGenerator(std::shared_ptr<const Config> properties)
: properties_(std::move(properties)) {}
std::shared_ptr<FileHandle> operator()(
std::tuple<const std::string, const std::string, const std::string>
params);
std::shared_ptr<FileHandle> operator()(const std::string& filename);

private:
const std::shared_ptr<const Config> properties_;
};

using FileHandleFactory = CachedFactory<
std::tuple<const std::string, const std::string, const std::string>,
std::string,
std::shared_ptr<FileHandle>,
FileHandleGenerator>;

Expand Down
8 changes: 2 additions & 6 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,8 @@ HiveConnector::HiveConnector(
hiveConfig_(std::make_shared<HiveConfig>(config)),
fileHandleFactory_(
hiveConfig_->isFileHandleCacheEnabled()
? std::make_unique<SimpleLRUCache<
std::tuple<
const std::string,
const std::string,
const std::string>,
std::shared_ptr<FileHandle>>>(
? std::make_unique<
SimpleLRUCache<std::string, std::shared_ptr<FileHandle>>>(
hiveConfig_->numCacheFileHandles())
: nullptr,
std::make_unique<FileHandleGenerator>(config)),
Expand Down
17 changes: 1 addition & 16 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,7 @@ void SplitReader::prepareSplit(

std::shared_ptr<FileHandle> fileHandle;
try {
auto fileSizeIter = hiveSplit_->infoColumns.find(kFileSize);
auto fileModificationTimeIter =
hiveSplit_->infoColumns.find(kModificationTime);
if (fileSizeIter != hiveSplit_->infoColumns.end() &&
fileModificationTimeIter != hiveSplit_->infoColumns.end()) {
fileHandle = fileHandleFactory_
->generate(std::make_tuple(
hiveSplit_->filePath,
fileSizeIter->second,
fileModificationTimeIter->second))
.second;
} else {
fileHandle = fileHandleFactory_
->generate(std::make_tuple(hiveSplit_->filePath, "", ""))
.second;
}
fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second;
} catch (const VeloxRuntimeError& e) {
if (e.errorCode() == error_code::kFileNotFound &&
hiveConfig_->ignoreMissingFiles(
Expand Down
3 changes: 1 addition & 2 deletions velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
deleteSplit_);

auto deleteFileHandle =
fileHandleFactory_->generate(make_tuple(deleteFile_.filePath, "", ""))
.second;
fileHandleFactory_->generate(deleteFile_.filePath).second;
auto deleteFileInput = createBufferedInput(
*deleteFileHandle,
deleteReaderOpts,
Expand Down
9 changes: 7 additions & 2 deletions velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ class GCSReadFile final : public ReadFile {

// Gets the length of the file.
// Checks if there are any issues reading the file.
void initialize() {
void initialize(const FileOptions& options) {
if (options.values.count("fileSize") > 0) {
length_ = !options.values.at("fileSize").empty()
? std::stoull(options.values.at("fileSize"))
: -1;
}
// Make it a no-op if invoked twice.
if (length_ != -1) {
return;
Expand Down Expand Up @@ -308,7 +313,7 @@ std::unique_ptr<ReadFile> GCSFileSystem::openFileForRead(
const FileOptions& options) {
const auto gcspath = gcsPath(path);
auto gcsfile = std::make_unique<GCSReadFile>(gcspath, impl_->getClient());
gcsfile->initialize();
gcsfile->initialize(options);
return gcsfile;
}

Expand Down
9 changes: 7 additions & 2 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ class S3ReadFile final : public ReadFile {

// Gets the length of the file.
// Checks if there are any issues reading the file.
void initialize() {
void initialize(const FileOptions& options) {
if (options.values.count("fileSize") > 0) {
length_ = !options.values.at("fileSize").empty()
? std::stoull(options.values.at("fileSize"))
: -1;
}
// Make it a no-op if invoked twice.
if (length_ != -1) {
return;
Expand Down Expand Up @@ -643,7 +648,7 @@ std::unique_ptr<ReadFile> S3FileSystem::openFileForRead(
const FileOptions& options) {
const auto file = s3Path(path);
auto s3file = std::make_unique<S3ReadFile>(file, impl_->s3Client());
s3file->initialize();
s3file->initialize(options);
return s3file;
}

Expand Down
7 changes: 3 additions & 4 deletions velox/connectors/hive/tests/FileHandleTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ TEST(FileHandleTest, localFile) {
}

FileHandleFactory factory(
std::make_unique<SimpleLRUCache<
std::tuple<const std::string, const std::string, const std::string>,
std::shared_ptr<FileHandle>>>(1000),
std::make_unique<
SimpleLRUCache<std::string, std::shared_ptr<FileHandle>>>(1000),
std::make_unique<FileHandleGenerator>());
auto fileHandle = factory.generate(std::make_tuple(filename, "", "")).second;
auto fileHandle = factory.generate(filename).second;
ASSERT_EQ(fileHandle->file->size(), 3);
char buffer[3];
ASSERT_EQ(fileHandle->file->pread(0, 3, &buffer), "foo");
Expand Down

0 comments on commit 147b322

Please sign in to comment.