Skip to content

Commit

Permalink
Update file handle
Browse files Browse the repository at this point in the history
  • Loading branch information
acvictor committed Apr 12, 2024
1 parent 2b52132 commit d5ebf03
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 16 deletions.
16 changes: 9 additions & 7 deletions velox/connectors/hive/FileHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ std::string groupName(const std::string& filename) {
} // namespace

std::shared_ptr<FileHandle> FileHandleGenerator::operator()(
const std::string& filename) {
const std::tuple<std::string, int64_t, int64_t> fileProperties) {
// We have seen cases where drivers are stuck when creating file handles.
// Adding a trace here to spot this more easily in future.
process::TraceContext trace("FileHandleGenerator::operator()");
Expand All @@ -43,12 +43,14 @@ std::shared_ptr<FileHandle> FileHandleGenerator::operator()(
{
MicrosecondTimer timer(&elapsedTimeUs);
fileHandle = std::make_shared<FileHandle>();
fileHandle->file = filesystems::getFileSystem(filename, properties_)
->openFileForRead(filename);
fileHandle->uuid = StringIdLease(fileIds(), filename);
fileHandle->groupId = StringIdLease(fileIds(), groupName(filename));
VLOG(1) << "Generating file handle for: " << filename
<< " uuid: " << fileHandle->uuid.id();
filesystems::FileOptions options;
options.fileSize = std::get<1>(fileProperties) == -1 ? std::nullopt
: std::optional<int64_t>{std::get<1>(fileProperties)};
fileHandle->file = filesystems::getFileSystem(std::get<0>(fileProperties), properties_)
->openFileForRead(std::get<0>(fileProperties), options);
fileHandle->uuid = StringIdLease(fileIds(), std::get<0>(fileProperties));
fileHandle->groupId = StringIdLease(fileIds(), groupName(std::get<0>(fileProperties)));
VLOG(1) << "Generating file handle for: " << std::get<0>(fileProperties);
}
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricHiveFileHandleGenerateLatencyMs, elapsedTimeUs / 1000);
Expand Down
7 changes: 4 additions & 3 deletions velox/connectors/hive/FileHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,23 @@ struct FileHandle {
// first diff we'll not include the map.
};

using FileHandleCache = SimpleLRUCache<std::string, FileHandle>;
using FileHandleCache = SimpleLRUCache<std::tuple<std::string, int64_t, int64_t>, FileHandle>;

// Creates FileHandles via the Generator interface the CachedFactory requires.
class FileHandleGenerator {
public:
FileHandleGenerator() {}
FileHandleGenerator(std::shared_ptr<const Config> properties)
: properties_(std::move(properties)) {}
std::shared_ptr<FileHandle> operator()(const std::string& filename);
std::shared_ptr<FileHandle> operator()(
std::tuple<std::string, int64_t, int64_t> params);

private:
const std::shared_ptr<const Config> properties_;
};

using FileHandleFactory = CachedFactory<
std::string,
std::tuple<std::string, int64_t, int64_t>,
std::shared_ptr<FileHandle>,
FileHandleGenerator>;

Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ HiveConnector::HiveConnector(
fileHandleFactory_(
hiveConfig_->isFileHandleCacheEnabled()
? std::make_unique<
SimpleLRUCache<std::string, std::shared_ptr<FileHandle>>>(
SimpleLRUCache<std::tuple<std::string, int64_t, int64_t>, std::shared_ptr<FileHandle>>>(
hiveConfig_->numCacheFileHandles())
: nullptr,
std::make_unique<FileHandleGenerator>(config)),
Expand Down
6 changes: 6 additions & 0 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
dwio::common::FileFormat fileFormat;
const uint64_t start;
const uint64_t length;
const int64_t fileSize;
const int64_t modificationTime;

/// Mapping from partition keys to values. Values are specified as strings
/// formatted the same way as CAST(x as VARCHAR). Null values are specified as
Expand All @@ -49,6 +51,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
dwio::common::FileFormat _fileFormat,
uint64_t _start = 0,
uint64_t _length = std::numeric_limits<uint64_t>::max(),
int64_t _fileSize = -1,
int64_t _modificationTime = -1,
const std::unordered_map<std::string, std::optional<std::string>>&
_partitionKeys = {},
std::optional<int32_t> _tableBucketNumber = std::nullopt,
Expand All @@ -62,6 +66,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
fileFormat(_fileFormat),
start(_start),
length(_length),
fileSize(_fileSize),
modificationTime(_modificationTime),
partitionKeys(_partitionKeys),
tableBucketNumber(_tableBucketNumber),
customSplitInfo(_customSplitInfo),
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ void SplitReader::createReader() {

std::shared_ptr<FileHandle> fileHandle;
try {
fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second;
fileHandle = fileHandleFactory_->generate(std::make_tuple(
hiveSplit_->filePath, hiveSplit_->fileSize, hiveSplit_->modificationTime)).second;
} catch (const VeloxRuntimeError& e) {
if (e.errorCode() == error_code::kFileNotFound &&
hiveConfig_->ignoreMissingFiles(
Expand Down
8 changes: 8 additions & 0 deletions velox/connectors/hive/iceberg/IcebergSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ HiveIcebergSplit::HiveIcebergSplit(
dwio::common::FileFormat _fileFormat,
uint64_t _start,
uint64_t _length,
int64_t _fileSize,
int64_t _modificationTime,
const std::unordered_map<std::string, std::optional<std::string>>&
_partitionKeys,
std::optional<int32_t> _tableBucketNumber,
Expand All @@ -38,6 +40,8 @@ HiveIcebergSplit::HiveIcebergSplit(
_fileFormat,
_start,
_length,
_fileSize,
_modificationTime,
_partitionKeys,
_tableBucketNumber,
_customSplitInfo,
Expand All @@ -55,6 +59,8 @@ HiveIcebergSplit::HiveIcebergSplit(
dwio::common::FileFormat _fileFormat,
uint64_t _start,
uint64_t _length,
int64_t _fileSize,
int64_t _modificationTime,
const std::unordered_map<std::string, std::optional<std::string>>&
_partitionKeys,
std::optional<int32_t> _tableBucketNumber,
Expand All @@ -68,6 +74,8 @@ HiveIcebergSplit::HiveIcebergSplit(
_fileFormat,
_start,
_length,
_fileSize,
_modificationTime,
_partitionKeys,
_tableBucketNumber,
_customSplitInfo,
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/iceberg/IcebergSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
dwio::common::FileFormat _fileFormat,
uint64_t _start = 0,
uint64_t _length = std::numeric_limits<uint64_t>::max(),
int64_t _fileSize = -1,
int64_t _modificationTime = -1,
const std::unordered_map<std::string, std::optional<std::string>>&
_partitionKeys = {},
std::optional<int32_t> _tableBucketNumber = std::nullopt,
Expand All @@ -46,6 +48,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
dwio::common::FileFormat _fileFormat,
uint64_t _start = 0,
uint64_t _length = std::numeric_limits<uint64_t>::max(),
int64_t _fileSize = -1,
int64_t _modificationTime = -1,
const std::unordered_map<std::string, std::optional<std::string>>&
_partitionKeys = {},
std::optional<int32_t> _tableBucketNumber = std::nullopt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
deleteSplit_);

auto deleteFileHandle =
fileHandleFactory_->generate(deleteFile_.filePath).second;
fileHandleFactory_->generate(make_tuple(deleteFile_.filePath, -1, -1)).second;
auto deleteFileInput = createBufferedInput(
*deleteFileHandle,
deleteReaderOpts,
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class HiveIcebergTest : public HiveConnectorTestBase {
fileFomat_,
0,
fileSize,
-1,
-1,
partitionKeys,
std::nullopt,
customSplitInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ TEST_F(S3FileSystemRegistrationTest, fileHandle) {
auto hiveConfig = minioServer_->hiveConfig();
FileHandleFactory factory(
std::make_unique<
SimpleLRUCache<std::string, std::shared_ptr<FileHandle>>>(1000),
SimpleLRUCache<std::tuple<std::string, int64_t, int64_t>, std::shared_ptr<FileHandle>>>(1000),
std::make_unique<FileHandleGenerator>(hiveConfig));
auto fileHandle = factory.generate(s3File).second;
readData(fileHandle->file.get());
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/tests/FileHandleTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ TEST(FileHandleTest, localFile) {

FileHandleFactory factory(
std::make_unique<
SimpleLRUCache<std::string, std::shared_ptr<FileHandle>>>(1000),
SimpleLRUCache<std::tuple<std::string, int64_t, int64_t>, std::shared_ptr<FileHandle>>>(1000),
std::make_unique<FileHandleGenerator>());
auto fileHandle = factory.generate(filename).second;
auto fileHandle = factory.generate(std::make_tuple(filename, -1, -1)).second;
ASSERT_EQ(fileHandle->file->size(), 3);
char buffer[3];
ASSERT_EQ(fileHandle->file->pread(0, 3, &buffer), "foo");
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/tests/HiveConnectorUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ TEST_F(HiveConnectorUtilTest, configureReaderOptions) {
fileFormat,
0UL,
std::numeric_limits<uint64_t>::max(),
-1,
-1,
partitionKeys,
std::nullopt,
customSplitInfo,
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/tests/utils/HiveConnectorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ class HiveConnectorSplitBuilder {
fileFormat_,
start_,
length_,
fileSize_,
modificationTime_,
partitionKeys_,
tableBucketNumber_,
customSplitInfo,
Expand All @@ -281,6 +283,8 @@ class HiveConnectorSplitBuilder {
dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF};
uint64_t start_{0};
uint64_t length_{std::numeric_limits<uint64_t>::max()};
int64_t fileSize_{-1};
int64_t modificationTime_{-1};
std::unordered_map<std::string, std::optional<std::string>> partitionKeys_;
std::optional<int32_t> tableBucketNumber_;
std::unordered_map<std::string, std::string> customSplitInfo_ = {};
Expand Down

0 comments on commit d5ebf03

Please sign in to comment.