diff --git a/CMakeLists.txt b/CMakeLists.txt index 1c7dc7d568d3..82fed089f27c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -62,6 +62,11 @@ option( VELOX_BUILD_MINIMAL "Build a minimal set of components only. This will override other build options." OFF) +option( + VELOX_BUILD_MINIMAL_WITH_DWIO + "Build a minimal set of components, including DWIO (file format readers/writers). + This will override other build options." + OFF) # option() always creates a BOOL variable so we have to use a normal cache # variable with STRING type for this option. @@ -96,8 +101,10 @@ option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF) option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF) option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF) option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON) +option(VELOX_ENABLE_CODEGEN_SUPPORT "Enable experimental codegen support." OFF) option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF) +option(VELOX_BUILD_VECTOR_TEST_UTILS "Builds Velox vector test utilities" OFF) option(VELOX_BUILD_PYTHON_PACKAGE "Builds Velox Python bindings" OFF) option( VELOX_ENABLE_INT64_BUILD_PARTITION_BOUND @@ -125,7 +132,7 @@ if(${VELOX_BUILD_MINIMAL}) set(VELOX_ENABLE_GCS OFF) set(VELOX_ENABLE_ABFS OFF) set(VELOX_ENABLE_SUBSTRAIT OFF) - set(VELOX_CODEGEN_SUPPORT OFF) + set(VELOX_ENABLE_CODEGEN_SUPPORT OFF) endif() if(${VELOX_BUILD_TESTING}) @@ -175,7 +182,7 @@ if(${VELOX_BUILD_PYTHON_PACKAGE}) set(VELOX_ENABLE_GCS OFF) set(VELOX_ENABLE_ABFS OFF) set(VELOX_ENABLE_SUBSTRAIT OFF) - set(VELOX_CODEGEN_SUPPORT OFF) + set(VELOX_ENABLE_CODEGEN_SUPPORT OFF) set(VELOX_ENABLE_BENCHMARKS_BASIC OFF) set(VELOX_ENABLE_BENCHMARKS OFF) endif() @@ -257,7 +264,7 @@ if(VELOX_ENABLE_PARQUET) endif() # define processor variable for conditional compilation -if(${VELOX_CODEGEN_SUPPORT}) +if(${VELOX_ENABLE_CODEGEN_SUPPORT}) add_compile_definitions(CODEGEN_ENABLED=1) endif() @@ -420,7 +427,10 @@ endif() set_source(fmt) resolve_dependency(fmt 9.0.0) -if(NOT ${VELOX_BUILD_MINIMAL}) +if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR}) + # DWIO needs all sorts of stream compression libraries. + # + # TODO: make these optional and pluggable. find_package(ZLIB REQUIRED) find_package(lz4 REQUIRED) find_package(lzo2 REQUIRED) @@ -467,7 +477,11 @@ else() set(FOLLY_BENCHMARK Folly::follybenchmark) endif() -if(NOT ${VELOX_BUILD_MINIMAL}) +# DWIO (ORC/DWRF), Substrait and experimental/codegen depend on protobuf. +if(${VELOX_BUILD_MINIMAL_WITH_DWIO} + OR ${VELOX_ENABLE_HIVE_CONNECTOR} + OR ${VELOX_ENABLE_SUBSTRAIT} + OR ${VELOX_ENABLE_CODEGEN_SUPPORT}) # Locate or build protobuf. set_source(Protobuf) resolve_dependency(Protobuf 3.21 EXACT) diff --git a/Makefile b/Makefile index 794d876b41c9..82de59432436 100644 --- a/Makefile +++ b/Makefile @@ -98,10 +98,26 @@ release: #: Build the release version $(MAKE) cmake BUILD_DIR=release BUILD_TYPE=Release && \ $(MAKE) build BUILD_DIR=release -min_debug: #: Minimal build with debugging symbols +minimal_debug: #: Minimal build with debugging symbols $(MAKE) cmake BUILD_DIR=debug BUILD_TYPE=debug EXTRA_CMAKE_FLAGS="${EXTRA_CMAKE_FLAGS} -DVELOX_BUILD_MINIMAL=ON" $(MAKE) build BUILD_DIR=debug +min_debug: minimal_debug + +minimal: #: Minimal build + $(MAKE) cmake BUILD_DIR=release BUILD_TYPE=release EXTRA_CMAKE_FLAGS="${EXTRA_CMAKE_FLAGS} -DVELOX_BUILD_MINIMAL=ON" + $(MAKE) build BUILD_DIR=release + +dwio: #: Minimal build with dwio enabled. + $(MAKE) cmake BUILD_DIR=release BUILD_TYPE=release EXTRA_CMAKE_FLAGS="${EXTRA_CMAKE_FLAGS} \ + -DVELOX_BUILD_MINIMAL_WITH_DWIO=ON" + $(MAKE) build BUILD_DIR=release + +dwio_debug: #: Minimal build with dwio debugging symbols. + $(MAKE) cmake BUILD_DIR=debug BUILD_TYPE=debug EXTRA_CMAKE_FLAGS="${EXTRA_CMAKE_FLAGS} \ + -DVELOX_BUILD_MINIMAL_WITH_DWIO=ON" + $(MAKE) build BUILD_DIR=debug + benchmarks-basic-build: $(MAKE) release EXTRA_CMAKE_FLAGS=" ${EXTRA_CMAKE_FLAGS} \ -DVELOX_BUILD_TESTING=OFF \ diff --git a/scripts/setup-centos8.sh b/scripts/setup-centos8.sh index 60a20d49c7e9..2df599e0d0d8 100755 --- a/scripts/setup-centos8.sh +++ b/scripts/setup-centos8.sh @@ -30,19 +30,16 @@ function dnf_install { dnf install -y -q --setopt=install_weak_deps=False "$@" } +dnf update -y dnf_install epel-release dnf-plugins-core # For ccache, ninja dnf config-manager --set-enabled powertools +dnf update -y dnf_install ninja-build cmake curl ccache gcc-toolset-9 git wget which libevent-devel \ openssl-devel re2-devel libzstd-devel lz4-devel double-conversion-devel \ libdwarf-devel curl-devel libicu-devel -dnf remove -y gflags - -# Required for Thrift dnf_install autoconf automake libtool bison flex python3 libsodium-devel -dnf_install conda - # install sphinx for doc gen pip3 install sphinx sphinx-tabs breathe sphinx_rtd_theme @@ -50,83 +47,156 @@ pip3 install sphinx sphinx-tabs breathe sphinx_rtd_theme source /opt/rh/gcc-toolset-9/enable || exit 1 set -u -function cmake_install { - cmake -B "$1-build" -GNinja -DCMAKE_CXX_STANDARD=17 \ - -DCMAKE_CXX_FLAGS="${CFLAGS}" -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" -Wno-dev "$@" - ninja -C "$1-build" install +function install_conda { + dnf_install conda } -# Fetch sources. -wget_and_untar https://github.com/gflags/gflags/archive/v2.2.2.tar.gz gflags -wget_and_untar https://github.com/google/glog/archive/v0.6.0.tar.gz glog -wget_and_untar http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz lzo -wget_and_untar https://boostorg.jfrog.io/artifactory/main/release/1.72.0/source/boost_1_72_0.tar.gz boost -wget_and_untar https://github.com/google/snappy/archive/1.1.8.tar.gz snappy -wget_and_untar https://github.com/fmtlib/fmt/archive/10.1.1.tar.gz fmt +function install_gflags { + # Remove an older version if present. + dnf remove -y gflags + wget_and_untar https://github.com/gflags/gflags/archive/v2.2.2.tar.gz gflags + ( + cd gflags + cmake_install -DBUILD_SHARED_LIBS=ON -DBUILD_STATIC_LIBS=ON -DBUILD_gflags_LIB=ON -DLIB_SUFFIX=64 + ) +} -wget_and_untar https://github.com/protocolbuffers/protobuf/releases/download/v21.4/protobuf-all-21.4.tar.gz protobuf +function install_glog { + wget_and_untar https://github.com/google/glog/archive/v0.6.0.tar.gz glog + ( + cd glog + cmake_install -DBUILD_SHARED_LIBS=ON + ) +} + +function install_lzo { + wget_and_untar http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz lzo + ( + cd lzo + ./configure --prefix=/usr --enable-shared --disable-static --docdir=/usr/share/doc/lzo-2.10 + make "-j$(nproc)" + make install + ) +} + +function install_boost { + wget_and_untar https://boostorg.jfrog.io/artifactory/main/release/1.72.0/source/boost_1_72_0.tar.gz boost + ( + cd boost + ./bootstrap.sh --prefix=/usr/local + ./b2 "-j$(nproc)" -d0 install threading=multi + ) +} + +function install_snappy { + wget_and_untar https://github.com/google/snappy/archive/1.1.8.tar.gz snappy + ( + cd snappy + cmake_install -DSNAPPY_BUILD_TESTS=OFF + ) +} + +function install_fmt { + wget_and_untar https://github.com/fmtlib/fmt/archive/10.1.1.tar.gz fmt + ( + cd fmt + cmake_install -DFMT_TEST=OFF + ) +} + +function install_protobuf { + wget_and_untar https://github.com/protocolbuffers/protobuf/releases/download/v21.4/protobuf-all-21.4.tar.gz protobuf + ( + cd protobuf + ./configure --prefix=/usr + make "-j${NPROC}" + make install + ldconfig + ) +} FB_OS_VERSION="v2023.12.04.00" -wget_and_untar https://github.com/facebookincubator/fizz/archive/refs/tags/${FB_OS_VERSION}.tar.gz fizz -wget_and_untar https://github.com/facebook/folly/archive/refs/tags/${FB_OS_VERSION}.tar.gz folly -wget_and_untar https://github.com/facebook/wangle/archive/refs/tags/${FB_OS_VERSION}.tar.gz wangle -wget_and_untar https://github.com/facebook/fbthrift/archive/refs/tags/${FB_OS_VERSION}.tar.gz fbthrift -wget_and_untar https://github.com/facebook/mvfst/archive/refs/tags/${FB_OS_VERSION}.tar.gz mvfst +function install_fizz { + wget_and_untar https://github.com/facebookincubator/fizz/archive/refs/tags/${FB_OS_VERSION}.tar.gz fizz + ( + cd fizz/fizz + cmake_install -DBUILD_TESTS=OFF + ) +} -wait # For cmake and source downloads to complete. +function install_folly { + wget_and_untar https://github.com/facebook/folly/archive/refs/tags/${FB_OS_VERSION}.tar.gz folly + ( + cd folly + cmake_install -DFOLLY_HAVE_INT128_T=ON + ) +} -# Build & install. -( - cd lzo - ./configure --prefix=/usr --enable-shared --disable-static --docdir=/usr/share/doc/lzo-2.10 - make "-j$(nproc)" - make install -) +function install_wangle { + wget_and_untar https://github.com/facebook/wangle/archive/refs/tags/${FB_OS_VERSION}.tar.gz wangle + ( + cd wangle/wangle + cmake_install -DBUILD_TESTS=OFF + ) +} -( - cd boost - ./bootstrap.sh --prefix=/usr/local - ./b2 "-j$(nproc)" -d0 install threading=multi -) +function install_fbthrift { + wget_and_untar https://github.com/facebook/fbthrift/archive/refs/tags/${FB_OS_VERSION}.tar.gz fbthrift + ( + cd fbthrift + cmake_install -Denable_tests=OFF + ) +} + +function install_mvfst { + wget_and_untar https://github.com/facebook/mvfst/archive/refs/tags/${FB_OS_VERSION}.tar.gz mvfst + ( + cd mvfst + cmake_install -DBUILD_TESTS=OFF + ) +} + +function install_duckdb { + if $BUILD_DUCKDB ; then + echo 'Building DuckDB' + wget_and_untar https://github.com/duckdb/duckdb/archive/refs/tags/v0.8.1.tar.gz duckdb + ( + cd duckdb + cmake_install -DBUILD_UNITTESTS=OFF -DENABLE_SANITIZER=OFF -DENABLE_UBSAN=OFF -DBUILD_SHELL=OFF -DEXPORT_DLL_SYMBOLS=OFF -DCMAKE_BUILD_TYPE=Release + ) + fi +} + +function install_velox_deps { + run_and_time install_conda + run_and_time install_gflags + run_and_time install_glog + run_and_time install_lzo + run_and_time install_snappy + run_and_time install_boost + run_and_time install_protobuf + run_and_time install_fmt + run_and_time install_folly + run_and_time install_fizz + run_and_time install_wangle + run_and_time install_mvfst + run_and_time install_fbthrift + run_and_time install_duckdb +} + +(return 2> /dev/null) && return # If script was sourced, don't run commands. ( - cd protobuf - ./configure --prefix=/usr - make "-j${NPROC}" - make install - ldconfig + if [[ $# -ne 0 ]]; then + for cmd in "$@"; do + run_and_time "${cmd}" + done + else + install_velox_deps + fi ) -cmake_install gflags -DBUILD_SHARED_LIBS=ON -DBUILD_STATIC_LIBS=ON -DBUILD_gflags_LIB=ON -DLIB_SUFFIX=64 -DCMAKE_INSTALL_PREFIX:PATH=/usr -cmake_install glog -DBUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX:PATH=/usr -cmake_install snappy -DSNAPPY_BUILD_TESTS=OFF -cmake_install fmt -DFMT_TEST=OFF -cmake_install folly -DFOLLY_HAVE_INT128_T=ON - -cmake_install fizz/fizz -DBUILD_TESTS=OFF -cmake_install wangle/wangle -DBUILD_TESTS=OFF -cmake_install mvfst -DBUILD_TESTS=OFF -cmake_install fbthrift -Denable_tests=OFF - -if $BUILD_DUCKDB ; then - echo 'Building DuckDB' - mkdir ~/duckdb-install && cd ~/duckdb-install - wget https://github.com/duckdb/duckdb/archive/refs/tags/v0.8.1.tar.gz - tar -xf v0.8.1.tar.gz - cd duckdb-0.8.1 - mkdir build && cd build - CMAKE_FLAGS=( - "-DBUILD_UNITTESTS=OFF" - "-DENABLE_SANITIZER=OFF" - "-DENABLE_UBSAN=OFF" - "-DBUILD_SHELL=OFF" - "-DEXPORT_DLL_SYMBOLS=OFF" - "-DCMAKE_BUILD_TYPE=Release" - ) - cmake ${CMAKE_FLAGS[*]} .. - make install -j 16 - rm -rf ~/duckdb-install -fi +echo "All dependencies for Velox installed!" dnf clean all diff --git a/scripts/setup-helper-functions.sh b/scripts/setup-helper-functions.sh index 9495d42bfc36..4f0a11e152fd 100644 --- a/scripts/setup-helper-functions.sh +++ b/scripts/setup-helper-functions.sh @@ -15,6 +15,27 @@ # github_checkout $REPO $VERSION $GIT_CLONE_PARAMS clones or re-uses an existing clone of the # specified repo, checking out the requested version. + +function run_and_time { + time "$@" || (echo "Failed to run $* ." ; exit 1 ) + { echo "+ Finished running $*"; } 2> /dev/null +} + +function prompt { + ( + while true; do + local input="${PROMPT_ALWAYS_RESPOND:-}" + echo -n "$(tput bold)$* [Y, n]$(tput sgr0) " + [[ -z "${input}" ]] && read input + if [[ "${input}" == "Y" || "${input}" == "y" || "${input}" == "" ]]; then + return 0 + elif [[ "${input}" == "N" || "${input}" == "n" ]]; then + return 1 + fi + done + ) 2> /dev/null +} + function github_checkout { local REPO=$1 shift @@ -36,7 +57,6 @@ function github_checkout { cd "${DIRNAME}" } - # get_cxx_flags [$CPU_ARCH] # Sets and exports the variable VELOX_CXX_FLAGS with appropriate compiler flags. # If $CPU_ARCH is set then we use that else we determine best possible set of flags diff --git a/scripts/setup-macos.sh b/scripts/setup-macos.sh index 197ea54e8394..7872195b9e77 100755 --- a/scripts/setup-macos.sh +++ b/scripts/setup-macos.sh @@ -38,26 +38,6 @@ MACOS_DEPS="ninja flex bison cmake ccache protobuf@21 icu4c boost gflags glog li FB_OS_VERSION="v2023.12.04.00" -function run_and_time { - time "$@" || (echo "Failed to run $* ." ; exit 1 ) - { echo "+ Finished running $*"; } 2> /dev/null -} - -function prompt { - ( - while true; do - local input="${PROMPT_ALWAYS_RESPOND:-}" - echo -n "$(tput bold)$* [Y, n]$(tput sgr0) " - [[ -z "${input}" ]] && read input - if [[ "${input}" == "Y" || "${input}" == "y" || "${input}" == "" ]]; then - return 0 - elif [[ "${input}" == "N" || "${input}" == "n" ]]; then - return 1 - fi - done - ) 2> /dev/null -} - function update_brew { DEFAULT_BREW_PATH=/usr/local/bin/brew if [ `arch` == "arm64" ] ; diff --git a/scripts/setup-ubuntu.sh b/scripts/setup-ubuntu.sh index 69760cf85ec0..d44b06df22da 100755 --- a/scripts/setup-ubuntu.sh +++ b/scripts/setup-ubuntu.sh @@ -67,26 +67,6 @@ sudo --preserve-env apt update && sudo --preserve-env apt install -y libunwind-d tzdata \ wget -function run_and_time { - time "$@" - { echo "+ Finished running $*"; } 2> /dev/null -} - -function prompt { - ( - while true; do - local input="${PROMPT_ALWAYS_RESPOND:-}" - echo -n "$(tput bold)$* [Y, n]$(tput sgr0) " - [[ -z "${input}" ]] && read input - if [[ "${input}" == "Y" || "${input}" == "y" || "${input}" == "" ]]; then - return 0 - elif [[ "${input}" == "N" || "${input}" == "n" ]]; then - return 1 - fi - done - ) 2> /dev/null -} - function install_fmt { github_checkout fmtlib/fmt "${FMT_VERSION}" cmake_install -DFMT_TEST=OFF @@ -119,11 +99,20 @@ function install_fbthrift { function install_conda { mkdir -p conda && cd conda - wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh + ARCH=$(uname -m) + + if [ "$ARCH" != "x86_64" ] && [ "$ARCH" != "aarch64" ]; then + echo "Unsupported architecture: $ARCH" + exit 1 + fi + + wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-$ARCH.sh + MINICONDA_PATH=/opt/miniconda-for-velox - bash Miniconda3-latest-Linux-x86_64.sh -b -p $MINICONDA_PATH + bash Miniconda3-latest-Linux-$ARCH.sh -b -p $MINICONDA_PATH } + function install_velox_deps { run_and_time install_fmt run_and_time install_folly @@ -146,4 +135,4 @@ function install_velox_deps { fi ) -echo "All deps for Velox installed! Now try \"make\"" +echo "All dependencies for Velox installed!" diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index cd18d344e809..be53ad8beeb9 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -44,7 +44,7 @@ if(${VELOX_ENABLE_PARSE}) endif() # hive connector depends on dwio -if(${VELOX_ENABLE_HIVE_CONNECTOR}) +if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR}) add_subdirectory(dwio) endif() @@ -65,7 +65,7 @@ if(${VELOX_ENABLE_DUCKDB}) add_subdirectory(duckdb) endif() -if(${VELOX_CODEGEN_SUPPORT}) +if(${VELOX_ENABLE_CODEGEN_SUPPORT}) add_subdirectory(experimental/codegen) endif() diff --git a/velox/codegen/CMakeLists.txt b/velox/codegen/CMakeLists.txt index e54a0d133c07..37e7d2a34389 100644 --- a/velox/codegen/CMakeLists.txt +++ b/velox/codegen/CMakeLists.txt @@ -13,7 +13,7 @@ # limitations under the License. add_library(velox_codegen Codegen.cpp) -if(${VELOX_CODEGEN_SUPPORT}) +if(${VELOX_ENABLE_CODEGEN_SUPPORT}) target_link_libraries(velox_codegen velox_experimental_codegen) else() target_link_libraries(velox_codegen velox_core velox_exec velox_expression diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 265a35b54f80..f8f60c41c4e2 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -13,9 +13,10 @@ # limitations under the License. add_library(velox_hive_config OBJECT HiveConfig.cpp) - target_link_libraries(velox_hive_config velox_exception) +add_subdirectory(iceberg) + add_library( velox_hive_connector OBJECT FileHandle.cpp @@ -31,19 +32,20 @@ add_library( target_link_libraries( velox_hive_connector - velox_common_io - velox_connector - velox_dwio_catalog_fbhive - velox_dwio_dwrf_reader - velox_dwio_dwrf_writer - velox_dwio_parquet_reader - velox_dwio_parquet_writer - velox_file - velox_hive_partition_function - velox_s3fs - velox_hdfs - velox_gcs - velox_abfs) + PUBLIC velox_hive_iceberg_splitreader + PRIVATE velox_common_io + velox_connector + velox_dwio_catalog_fbhive + velox_dwio_dwrf_reader + velox_dwio_dwrf_writer + velox_dwio_parquet_reader + velox_dwio_parquet_writer + velox_file + velox_hive_partition_function + velox_s3fs + velox_hdfs + velox_gcs + velox_abfs) add_library(velox_hive_partition_function HivePartitionFunction.cpp) diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index 15edd9d2ac2f..6fb6853d7544 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -25,14 +25,9 @@ #pragma once -#include -#include -#include - #include "velox/common/caching/CachedFactory.h" #include "velox/common/caching/FileIds.h" #include "velox/common/file/File.h" -#include "velox/dwio/common/InputStream.h" namespace facebook::velox { diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index a0bc7fbd046e..95b47de314a0 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -430,13 +430,29 @@ void configureReaderOptions( const Config* sessionProperties, const std::shared_ptr& hiveTableHandle, const std::shared_ptr& hiveSplit) { + configureReaderOptions( + readerOptions, + hiveConfig, + sessionProperties, + hiveTableHandle->dataColumns(), + hiveSplit, + hiveTableHandle->tableParameters()); +} + +void configureReaderOptions( + dwio::common::ReaderOptions& readerOptions, + const std::shared_ptr& hiveConfig, + const Config* sessionProperties, + const RowTypePtr& fileSchema, + const std::shared_ptr& hiveSplit, + const std::unordered_map& tableParameters) { readerOptions.setMaxCoalesceBytes(hiveConfig->maxCoalescedBytes()); readerOptions.setMaxCoalesceDistance(hiveConfig->maxCoalescedDistanceBytes()); readerOptions.setFileColumnNamesReadAsLowerCase( hiveConfig->isFileColumnNamesReadAsLowerCase(sessionProperties)); readerOptions.setUseColumnNamesForColumnMapping( hiveConfig->isOrcUseColumnNames(sessionProperties)); - readerOptions.setFileSchema(hiveTableHandle->dataColumns()); + readerOptions.setFileSchema(fileSchema); readerOptions.setFooterEstimatedSize(hiveConfig->footerEstimatedSize()); readerOptions.setFilePreloadThreshold(hiveConfig->filePreloadThreshold()); @@ -447,8 +463,8 @@ void configureReaderOptions( dwio::common::toString(readerOptions.getFileFormat()), dwio::common::toString(hiveSplit->fileFormat)); } else { - auto serDeOptions = parseSerdeParameters( - hiveSplit->serdeParameters, hiveTableHandle->tableParameters()); + auto serDeOptions = + parseSerdeParameters(hiveSplit->serdeParameters, tableParameters); if (serDeOptions) { readerOptions.setSerDeOptions(*serDeOptions); } diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 67426bef78ca..329295b133d4 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -61,6 +61,14 @@ void configureReaderOptions( const std::shared_ptr& hiveTableHandle, const std::shared_ptr& hiveSplit); +void configureReaderOptions( + dwio::common::ReaderOptions& readerOptions, + const std::shared_ptr& hiveConfig, + const Config* sessionProperties, + const RowTypePtr& fileSchema, + const std::shared_ptr& hiveSplit, + const std::unordered_map& tableParameters = {}); + void configureRowReaderOptions( dwio::common::RowReaderOptions& rowReaderOptions, const std::unordered_map& tableParameters, diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 92376e566d38..6395d5d5f8bb 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -21,6 +21,8 @@ #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/HiveConnectorUtil.h" #include "velox/connectors/hive/TableHandle.h" +#include "velox/connectors/hive/iceberg/IcebergSplitReader.h" +#include "velox/dwio/common/CachedBufferedInput.h" #include "velox/dwio/common/ReaderFactory.h" namespace facebook::velox::connector::hive { @@ -38,17 +40,33 @@ std::unique_ptr SplitReader::create( const ConnectorQueryCtx* connectorQueryCtx, const std::shared_ptr& hiveConfig, const std::shared_ptr& ioStats) { - return std::make_unique( - hiveSplit, - hiveTableHandle, - scanSpec, - readerOutputType, - partitionKeys, - fileHandleFactory, - executor, - connectorQueryCtx, - hiveConfig, - ioStats); + // Create the SplitReader based on hiveSplit->customSplitInfo["table_format"] + if (hiveSplit->customSplitInfo.count("table_format") > 0 && + hiveSplit->customSplitInfo["table_format"] == "hive-iceberg") { + return std::make_unique( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + executor, + connectorQueryCtx, + hiveConfig, + ioStats); + } else { + return std::make_unique( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + executor, + connectorQueryCtx, + hiveConfig, + ioStats); + } } SplitReader::SplitReader( diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt new file mode 100644 index 000000000000..726ca63e31f3 --- /dev/null +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -0,0 +1,28 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library( + velox_hive_iceberg_splitreader IcebergSplitReader.cpp IcebergSplit.cpp + PositionalDeleteFileReader.cpp) + +target_link_libraries( + velox_hive_iceberg_splitreader + Folly::folly + gflags::gflags + glog::glog + gtest + gtest_main + xsimd) + +add_subdirectory(tests) diff --git a/velox/connectors/hive/iceberg/IcebergDeleteFile.h b/velox/connectors/hive/iceberg/IcebergDeleteFile.h new file mode 100644 index 000000000000..2f9206dfc264 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDeleteFile.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include "velox/dwio/common/Options.h" + +namespace facebook::velox::connector::hive::iceberg { + +enum class FileContent { + kData, + kPositionalDeletes, + kEqualityDeletes, +}; + +struct IcebergDeleteFile { + FileContent content; + const std::string filePath; + dwio::common::FileFormat fileFormat; + uint64_t recordCount; + uint64_t fileSizeInBytes; + // The field ids for the delete columns for equality delete files + std::vector equalityFieldIds; + // The lower bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], where 10 and 50 are the deleted row positions in + // the data file, then lowerBounds would contain entry <1, "10"> + std::unordered_map lowerBounds; + // The upper bounds of the in-file positions for the deleted rows, identified + // by each column's field id. E.g. The deleted rows for a column with field id + // 1 is in range [10, 50], then upperBounds will contain entry <1, "50"> + std::unordered_map upperBounds; + + IcebergDeleteFile( + FileContent _content, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _recordCount, + uint64_t _fileSizeInBytes, + std::vector _equalityFieldIds = {}, + std::unordered_map _lowerBounds = {}, + std::unordered_map _upperBounds = {}) + : content(_content), + filePath(_filePath), + fileFormat(_fileFormat), + recordCount(_recordCount), + fileSizeInBytes(_fileSizeInBytes), + equalityFieldIds(_equalityFieldIds), + lowerBounds(_lowerBounds), + upperBounds(_upperBounds) {} +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h new file mode 100644 index 000000000000..4cbf2a7862b3 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/type/Type.h" + +namespace facebook::velox::connector::hive::iceberg { + +struct IcebergMetadataColumn { + int id; + std::string name; + std::shared_ptr type; + std::string doc; + + IcebergMetadataColumn( + int _id, + const std::string& _name, + std::shared_ptr _type, + const std::string& _doc) + : id(_id), name(_name), type(_type), doc(_doc) {} + + static std::shared_ptr icebergDeleteFilePathColumn() { + return std::make_shared( + 2147483546, + "file_path", + VARCHAR(), + "Path of a file in which a deleted row is stored"); + } + + static std::shared_ptr icebergDeletePosColumn() { + return std::make_shared( + 2147483545, + "pos", + BIGINT(), + "Ordinal position of a deleted row in the data file"); + } +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.cpp b/velox/connectors/hive/iceberg/IcebergSplit.cpp new file mode 100644 index 000000000000..7fa9a52f2c69 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.cpp @@ -0,0 +1,69 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplit.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +namespace facebook::velox::connector::hive::iceberg { + +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber) { + // TODO: Deserialize _extraFileInfo to get deleteFiles; +} + +// For tests only +HiveIcebergSplit::HiveIcebergSplit( + const std::string& _connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start, + uint64_t _length, + const std::unordered_map>& + _partitionKeys, + std::optional _tableBucketNumber, + const std::unordered_map& _customSplitInfo, + const std::shared_ptr& _extraFileInfo, + std::vector _deletes) + : HiveConnectorSplit( + _connectorId, + _filePath, + _fileFormat, + _start, + _length, + _partitionKeys, + _tableBucketNumber, + _customSplitInfo, + _extraFileInfo), + deleteFiles(_deletes) {} +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplit.h b/velox/connectors/hive/iceberg/IcebergSplit.h new file mode 100644 index 000000000000..37b8c3c3eb36 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplit.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/connectors/hive/HiveConnectorSplit.h" + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit { + std::vector deleteFiles; + + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}); + + // For tests only + HiveIcebergSplit( + const std::string& connectorId, + const std::string& _filePath, + dwio::common::FileFormat _fileFormat, + uint64_t _start = 0, + uint64_t _length = std::numeric_limits::max(), + const std::unordered_map>& + _partitionKeys = {}, + std::optional _tableBucketNumber = std::nullopt, + const std::unordered_map& _customSplitInfo = {}, + const std::shared_ptr& _extraFileInfo = {}, + std::vector deletes = {}); +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp new file mode 100644 index 000000000000..fa65c41043e4 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -0,0 +1,113 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergSplitReader.h" + +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/dwio/common/BufferUtil.h" +#include "velox/dwio/common/Mutation.h" +#include "velox/dwio/common/Reader.h" + +using namespace facebook::velox::dwio::common; + +namespace facebook::velox::connector::hive::iceberg { + +IcebergSplitReader::IcebergSplitReader( + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* + partitionKeys, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats) + : SplitReader( + hiveSplit, + hiveTableHandle, + scanSpec, + readerOutputType, + partitionKeys, + fileHandleFactory, + executor, + connectorQueryCtx, + hiveConfig, + ioStats) {} + +void IcebergSplitReader::prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats) { + SplitReader::prepareSplit(metadataFilter, runtimeStats); + baseReadOffset_ = 0; + positionalDeleteFileReaders_.clear(); + splitOffset_ = baseRowReader_->nextRowNumber(); + + // TODO: Deserialize the std::vector deleteFiles. For now + // we assume it's already deserialized. + std::shared_ptr icebergSplit = + std::dynamic_pointer_cast(hiveSplit_); + + const auto& deleteFiles = icebergSplit->deleteFiles; + for (const auto& deleteFile : deleteFiles) { + positionalDeleteFileReaders_.push_back( + std::make_unique( + deleteFile, + hiveSplit_->filePath, + fileHandleFactory_, + connectorQueryCtx_, + executor_, + hiveConfig_, + ioStats_, + runtimeStats, + splitOffset_, + hiveSplit_->connectorId)); + } +} + +uint64_t IcebergSplitReader::next(int64_t size, VectorPtr& output) { + Mutation mutation; + mutation.deletedRows = nullptr; + + if (!positionalDeleteFileReaders_.empty()) { + auto numBytes = bits::nbytes(size); + dwio::common::ensureCapacity( + deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool()); + std::memset((void*)deleteBitmap_->as(), 0L, numBytes); + + for (auto iter = positionalDeleteFileReaders_.begin(); + iter != positionalDeleteFileReaders_.end(); + iter++) { + (*iter)->readDeletePositions( + baseReadOffset_, size, deleteBitmap_->asMutable()); + if ((*iter)->endOfFile()) { + iter = positionalDeleteFileReaders_.erase(iter); + } + } + + deleteBitmap_->setSize(numBytes); + mutation.deletedRows = deleteBitmap_->as(); + } + + auto rowsScanned = baseRowReader_->next(size, output, &mutation); + baseReadOffset_ += rowsScanned; + + return rowsScanned; +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h new file mode 100644 index 000000000000..5c5552369735 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/SplitReader.h" +#include "velox/connectors/hive/iceberg/PositionalDeleteFileReader.h" + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; + +class IcebergSplitReader : public SplitReader { + public: + IcebergSplitReader( + std::shared_ptr hiveSplit, + std::shared_ptr hiveTableHandle, + std::shared_ptr scanSpec, + const RowTypePtr readerOutputType, + std::unordered_map>* + partitionKeys, + FileHandleFactory* fileHandleFactory, + folly::Executor* executor, + const ConnectorQueryCtx* connectorQueryCtx, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats); + + ~IcebergSplitReader() override = default; + + void prepareSplit( + std::shared_ptr metadataFilter, + dwio::common::RuntimeStatistics& runtimeStats) override; + + uint64_t next(int64_t size, VectorPtr& output) override; + + private: + // The read offset to the beginning of the split in number of rows for the + // current batch for the base data file + uint64_t baseReadOffset_; + + // The file position for the first row in the split + uint64_t splitOffset_; + + std::list> + positionalDeleteFileReaders_; + BufferPtr deleteBitmap_; +}; +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp new file mode 100644 index 000000000000..b87007fb9804 --- /dev/null +++ b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp @@ -0,0 +1,243 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/PositionalDeleteFileReader.h" + +#include "velox/connectors/hive/HiveConnectorUtil.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/dwio/common/ReaderFactory.h" + +namespace facebook::velox::connector::hive::iceberg { + +PositionalDeleteFileReader::PositionalDeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + const ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + uint64_t splitOffset, + const std::string& connectorId) + : deleteFile_(deleteFile), + baseFilePath_(baseFilePath), + fileHandleFactory_(fileHandleFactory), + executor_(executor), + connectorQueryCtx_(connectorQueryCtx), + hiveConfig_(hiveConfig), + ioStats_(ioStats), + pool_(connectorQueryCtx->memoryPool()), + filePathColumn_(IcebergMetadataColumn::icebergDeleteFilePathColumn()), + posColumn_(IcebergMetadataColumn::icebergDeletePosColumn()), + splitOffset_(splitOffset), + deleteSplit_(nullptr), + deleteRowReader_(nullptr), + deletePositionsOutput_(nullptr), + deletePositionsOffset_(0), + endOfFile_(false) { + VELOX_CHECK(deleteFile_.content == FileContent::kPositionalDeletes); + + if (deleteFile_.recordCount == 0) { + return; + } + + // TODO: check if the lowerbounds and upperbounds in deleteFile overlap with + // this batch. If not, no need to proceed. + + // Create the ScanSpec for this delete file + auto scanSpec = std::make_shared(""); + scanSpec->addField(posColumn_->name, 0); + auto* pathSpec = + scanSpec->getOrCreateChild(common::Subfield(filePathColumn_->name)); + pathSpec->setFilter(std::make_unique( + std::vector({baseFilePath_}), false)); + + // Create the file schema (in RowType) and split that will be used by readers + std::vector deleteColumnNames( + {filePathColumn_->name, posColumn_->name}); + std::vector> deleteColumnTypes( + {filePathColumn_->type, posColumn_->type}); + RowTypePtr deleteFileSchema = + ROW(std::move(deleteColumnNames), std::move(deleteColumnTypes)); + + deleteSplit_ = std::make_shared( + connectorId, + deleteFile_.filePath, + deleteFile_.fileFormat, + 0, + deleteFile_.fileSizeInBytes); + + // Create the Reader and RowReader + + dwio::common::ReaderOptions deleteReaderOpts(pool_); + configureReaderOptions( + deleteReaderOpts, + hiveConfig_, + connectorQueryCtx_->sessionProperties(), + deleteFileSchema, + deleteSplit_); + + auto deleteFileHandle = + fileHandleFactory_->generate(deleteFile_.filePath).second; + auto deleteFileInput = createBufferedInput( + *deleteFileHandle, + deleteReaderOpts, + connectorQueryCtx_, + ioStats_, + executor_); + + auto deleteReader = + dwio::common::getReaderFactory(deleteReaderOpts.getFileFormat()) + ->createReader(std::move(deleteFileInput), deleteReaderOpts); + + // Check if the whole delete file split can be skipped. This could happen when + // 1) the delete file doesn't contain the base file that is being read; 2) The + // delete file does not contain the positions in the current batch for the + // base file. + if (!testFilters( + scanSpec.get(), + deleteReader.get(), + deleteSplit_->filePath, + deleteSplit_->partitionKeys, + {})) { + ++runtimeStats.skippedSplits; + runtimeStats.skippedSplitBytes += deleteSplit_->length; + deleteSplit_.reset(); + return; + } + + dwio::common::RowReaderOptions deleteRowReaderOpts; + configureRowReaderOptions( + deleteRowReaderOpts, + {}, + scanSpec, + nullptr, + deleteFileSchema, + deleteSplit_); + + deleteRowReader_.reset(); + deleteRowReader_ = deleteReader->createRowReader(deleteRowReaderOpts); +} + +void PositionalDeleteFileReader::readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + int8_t* deleteBitmap) { + // We are going to read to the row number up to the end of the batch. For the + // same base file, the deleted rows are in ascending order in the same delete + // file + int64_t rowNumberUpperBound = splitOffset_ + baseReadOffset + size; + + // Finish unused delete positions from last batch + if (deletePositionsOutput_ && + deletePositionsOffset_ < deletePositionsOutput_->size()) { + updateDeleteBitmap( + std::dynamic_pointer_cast(deletePositionsOutput_) + ->childAt(0), + baseReadOffset, + rowNumberUpperBound, + deleteBitmap); + + if (readFinishedForBatch(rowNumberUpperBound)) { + return; + } + } + + if (!deleteRowReader_ || !deleteSplit_) { + return; + } + + // Read the new delete positions for this batch into deletePositionsOutput_ + // and update the delete bitmap + + auto outputType = posColumn_->type; + + RowTypePtr outputRowType = ROW({posColumn_->name}, {posColumn_->type}); + if (!deletePositionsOutput_) { + deletePositionsOutput_ = BaseVector::create(outputRowType, 0, pool_); + } + + while (!readFinishedForBatch(rowNumberUpperBound)) { + auto rowsScanned = deleteRowReader_->next(size, deletePositionsOutput_); + if (rowsScanned > 0) { + VELOX_CHECK( + !deletePositionsOutput_->mayHaveNulls(), + "Iceberg delete file pos column cannot have nulls"); + + auto numDeletedRows = deletePositionsOutput_->size(); + if (numDeletedRows > 0) { + deletePositionsOutput_->loadedVector(); + deletePositionsOffset_ = 0; + + updateDeleteBitmap( + std::dynamic_pointer_cast(deletePositionsOutput_) + ->childAt(0), + baseReadOffset, + rowNumberUpperBound, + deleteBitmap); + } + } else { + // Reaching the end of the file + endOfFile_ = true; + deleteSplit_.reset(); + return; + } + } +} + +bool PositionalDeleteFileReader::endOfFile() { + return endOfFile_; +} + +void PositionalDeleteFileReader::updateDeleteBitmap( + VectorPtr deletePositionsVector, + uint64_t baseReadOffset, + int64_t rowNumberUpperBound, + int8_t* deleteBitmap) { + // Convert the positions in file into positions relative to the start of the + // split. + const int64_t* deletePositions = + deletePositionsVector->as>()->rawValues(); + int64_t offset = baseReadOffset + splitOffset_; + while (deletePositionsOffset_ < deletePositionsVector->size() && + deletePositions[deletePositionsOffset_] < rowNumberUpperBound) { + bits::setBit( + deleteBitmap, deletePositions[deletePositionsOffset_] - offset); + deletePositionsOffset_++; + } +} + +bool PositionalDeleteFileReader::readFinishedForBatch( + int64_t rowNumberUpperBound) { + VELOX_CHECK_NOT_NULL(deletePositionsOutput_); + + auto deletePositionsVector = + std::dynamic_pointer_cast(deletePositionsOutput_)->childAt(0); + const int64_t* deletePositions = + deletePositionsVector->as>()->rawValues(); + + if (deletePositionsOutput_->size() != 0 && + deletePositionsOffset_ < deletePositionsVector->size() && + deletePositions[deletePositionsOffset_] >= rowNumberUpperBound) { + return true; + } + return false; +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h new file mode 100644 index 000000000000..f6a1ddebcdb0 --- /dev/null +++ b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "velox/connectors/Connector.h" +#include "velox/connectors/hive/FileHandle.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/dwio/common/Reader.h" + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergDeleteFile; +class IcebergMetadataColumn; + +using SubfieldFilters = + std::unordered_map>; + +class PositionalDeleteFileReader { + public: + PositionalDeleteFileReader( + const IcebergDeleteFile& deleteFile, + const std::string& baseFilePath, + FileHandleFactory* fileHandleFactory, + const ConnectorQueryCtx* connectorQueryCtx, + folly::Executor* executor, + const std::shared_ptr hiveConfig, + std::shared_ptr ioStats, + dwio::common::RuntimeStatistics& runtimeStats, + uint64_t splitOffset, + const std::string& connectorId); + + void readDeletePositions( + uint64_t baseReadOffset, + uint64_t size, + int8_t* deleteBitmap); + + bool endOfFile(); + + private: + void updateDeleteBitmap( + VectorPtr deletePositionsVector, + uint64_t baseReadOffset, + int64_t rowNumberUpperBound, + int8_t* deleteBitmap); + + bool readFinishedForBatch(int64_t rowNumberUpperBound); + + const IcebergDeleteFile& deleteFile_; + const std::string& baseFilePath_; + FileHandleFactory* const fileHandleFactory_; + folly::Executor* const executor_; + const ConnectorQueryCtx* const connectorQueryCtx_; + const std::shared_ptr hiveConfig_; + std::shared_ptr ioStats_; + memory::MemoryPool* const pool_; + + std::shared_ptr filePathColumn_; + std::shared_ptr posColumn_; + uint64_t splitOffset_; + + std::shared_ptr deleteSplit_; + std::unique_ptr deleteRowReader_; + VectorPtr deletePositionsOutput_; + uint64_t deletePositionsOffset_; + bool endOfFile_; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt new file mode 100644 index 000000000000..63603c724ec2 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -0,0 +1,34 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +if(NOT VELOX_DISABLE_GOOGLETEST) + + add_executable(velox_hive_iceberg_test IcebergReadTest.cpp) + add_test(velox_hive_iceberg_test velox_hive_iceberg_test) + + target_link_libraries( + velox_hive_iceberg_test + velox_hive_connector + velox_hive_iceberg_splitreader + velox_hive_partition_function + velox_dwio_common_exception + velox_dwio_common_test_utils + velox_dwio_dwrf_proto + velox_vector_test_lib + velox_exec + velox_exec_test_lib + Folly::folly + gtest + gtest_main) + +endif() diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp new file mode 100644 index 000000000000..79443c73b3ce --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -0,0 +1,280 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +#include + +using namespace facebook::velox::exec::test; +using namespace facebook::velox::exec; +using namespace facebook::velox::dwio::common; +using namespace facebook::velox::test; + +namespace facebook::velox::connector::hive::iceberg { + +class HiveIcebergTest : public HiveConnectorTestBase { + public: + void assertPositionalDeletes( + const std::vector& deleteRows, + bool multipleBaseFiles = false) { + assertPositionalDeletes( + deleteRows, + "SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + ")", + multipleBaseFiles); + } + void assertPositionalDeletes( + const std::vector& deleteRows, + std::string duckdbSql, + bool multipleBaseFiles = false) { + std::shared_ptr dataFilePath = writeDataFile(rowCount); + + std::mt19937 gen{0}; + int64_t numDeleteRowsBefore = + multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0; + int64_t numDeleteRowsAfter = + multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0; + std::shared_ptr deleteFilePath = writePositionDeleteFile( + dataFilePath->path, + deleteRows, + numDeleteRowsBefore, + numDeleteRowsAfter); + + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath->path, + fileFomat_, + deleteRows.size() + numDeleteRowsBefore + numDeleteRowsAfter, + testing::internal::GetFileSize( + std::fopen(deleteFilePath->path.c_str(), "r"))); + + auto icebergSplit = makeIcebergSplit(dataFilePath->path, {deleteFile}); + + auto plan = tableScanNode(); + auto task = OperatorTestBase::assertQuery(plan, {icebergSplit}, duckdbSql); + + auto planStats = toPlanStats(task->taskStats()); + auto scanNodeId = plan->id(); + auto it = planStats.find(scanNodeId); + ASSERT_TRUE(it != planStats.end()); + ASSERT_TRUE(it->second.peakMemoryBytes > 0); + } + + std::vector makeRandomDeleteRows(int32_t maxRowNumber) { + std::mt19937 gen{0}; + std::vector deleteRows; + for (int i = 0; i < maxRowNumber; i++) { + if (folly::Random::rand32(0, 10, gen) > 8) { + deleteRows.push_back(i); + } + } + return deleteRows; + } + + std::vector makeSequenceRows(int32_t maxRowNumber) { + std::vector deleteRows; + deleteRows.resize(maxRowNumber); + std::iota(deleteRows.begin(), deleteRows.end(), 0); + return deleteRows; + } + + const static int rowCount = 20000; + + private: + std::shared_ptr makeIcebergSplit( + const std::string& dataFilePath, + const std::vector& deleteFiles = {}) { + std::unordered_map> partitionKeys; + std::unordered_map customSplitInfo; + customSplitInfo["table_format"] = "hive-iceberg"; + + auto file = filesystems::getFileSystem(dataFilePath, nullptr) + ->openFileForRead(dataFilePath); + const int64_t fileSize = file->size(); + + return std::make_shared( + kHiveConnectorId, + dataFilePath, + fileFomat_, + 0, + fileSize, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + deleteFiles); + } + + std::vector makeVectors(int32_t count, int32_t rowsPerVector) { + std::vector vectors; + + for (int i = 0; i < count; i++) { + auto data = makeSequenceRows(rowsPerVector); + VectorPtr c0 = vectorMaker_.flatVector(data); + vectors.push_back(makeRowVector({"c0"}, {c0})); + } + + return vectors; + } + + std::shared_ptr writeDataFile(uint64_t numRows) { + auto dataVectors = makeVectors(1, numRows); + + auto dataFilePath = TempFilePath::create(); + writeToFile(dataFilePath->path, dataVectors); + createDuckDbTable(dataVectors); + return dataFilePath; + } + + std::shared_ptr writePositionDeleteFile( + const std::string& dataFilePath, + const std::vector& deleteRows, + int64_t numRowsBefore = 0, + int64_t numRowsAfter = 0) { + // if containsMultipleDataFiles == true, we will write rows for other base + // files before and after the target base file + uint32_t numDeleteRows = numRowsBefore + deleteRows.size() + numRowsAfter; + + std::string dataFilePathBefore = dataFilePath + "_before"; + std::string dataFilePathAfter = dataFilePath + "_after"; + + auto filePathVector = + vectorMaker_.flatVector(numDeleteRows, [&](auto row) { + if (row < numRowsBefore) { + return StringView(dataFilePathBefore); + } else if ( + row >= numRowsBefore && row < deleteRows.size() + numRowsBefore) { + return StringView(dataFilePath); + } else if ( + row >= deleteRows.size() + numRowsBefore && row < numDeleteRows) { + return StringView(dataFilePathAfter); + } else { + return StringView(); + } + }); + + std::vector deleteRowsVec; + deleteRowsVec.reserve(numDeleteRows); + + if (numRowsBefore > 0) { + auto rowsBefore = makeSequenceRows(numRowsBefore); + deleteRowsVec.insert( + deleteRowsVec.end(), rowsBefore.begin(), rowsBefore.end()); + } + deleteRowsVec.insert( + deleteRowsVec.end(), deleteRows.begin(), deleteRows.end()); + if (numRowsAfter > 0) { + auto rowsAfter = makeSequenceRows(numRowsAfter); + deleteRowsVec.insert( + deleteRowsVec.end(), rowsAfter.begin(), rowsAfter.end()); + } + + auto deletePositionsVector = + vectorMaker_.flatVector(deleteRowsVec); + RowVectorPtr deleteFileVectors = makeRowVector( + {pathColumn_->name, posColumn_->name}, + {filePathVector, deletePositionsVector}); + + auto deleteFilePath = TempFilePath::create(); + writeToFile(deleteFilePath->path, deleteFileVectors); + + return deleteFilePath; + } + + std::string makeNotInList(const std::vector& deleteRows) { + if (deleteRows.empty()) { + return ""; + } + + return std::accumulate( + deleteRows.begin() + 1, + deleteRows.end(), + std::to_string(deleteRows[0]), + [](const std::string& a, int64_t b) { + return a + ", " + std::to_string(b); + }); + } + + std::shared_ptr assertQuery( + const core::PlanNodePtr& plan, + std::shared_ptr dataFilePath, + const std::vector& deleteFiles, + const std::string& duckDbSql) { + auto icebergSplit = makeIcebergSplit(dataFilePath->path, deleteFiles); + return OperatorTestBase::assertQuery(plan, {icebergSplit}, duckDbSql); + } + + core::PlanNodePtr tableScanNode() { + return PlanBuilder(pool_.get()).tableScan(rowType_).planNode(); + } + + private: + dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; + RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; + std::shared_ptr pathColumn_ = + IcebergMetadataColumn::icebergDeleteFilePathColumn(); + std::shared_ptr posColumn_ = + IcebergMetadataColumn::icebergDeletePosColumn(); +}; + +TEST_F(HiveIcebergTest, positionalDeletesSingleBaseFile) { + folly::SingletonVault::singleton()->registrationComplete(); + + // Delete row 0, 1, 2, 3 from the first batch out of two. + assertPositionalDeletes({0, 1, 2, 3}); + // Delete the first and last row in each batch (10000 rows per batch) + assertPositionalDeletes({0, 9999, 10000, 19999}); + // Delete several rows in the second batch (10000 rows per batch) + assertPositionalDeletes({10000, 10002, 19999}); + // Delete random rows + assertPositionalDeletes(makeRandomDeleteRows(rowCount)); + // Delete 0 rows + assertPositionalDeletes({}, "SELECT * FROM tmp", false); + // Delete all rows + assertPositionalDeletes( + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", false); + // Delete rows that don't exist + assertPositionalDeletes({20000, 29999}); +} + +// The positional delete file contains rows from multiple base files +TEST_F(HiveIcebergTest, positionalDeletesMultipleBaseFiles) { + folly::SingletonVault::singleton()->registrationComplete(); + + // Delete row 0, 1, 2, 3 from the first batch out of two. + assertPositionalDeletes({0, 1, 2, 3}, true); + // Delete the first and last row in each batch (10000 rows per batch) + assertPositionalDeletes({0, 9999, 10000, 19999}, true); + // Delete several rows in the second batch (10000 rows per batch) + assertPositionalDeletes({10000, 10002, 19999}, true); + // Delete random rows + assertPositionalDeletes(makeRandomDeleteRows(rowCount), true); + // Delete 0 rows + assertPositionalDeletes({}, "SELECT * FROM tmp", true); + // Delete all rows + assertPositionalDeletes( + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", true); + // Delete rows that don't exist + assertPositionalDeletes({20000, 29999}, true); +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/docs/functions/spark/datetime.rst b/velox/docs/functions/spark/datetime.rst index 65d7ed1057d8..35f17687ee09 100644 --- a/velox/docs/functions/spark/datetime.rst +++ b/velox/docs/functions/spark/datetime.rst @@ -71,17 +71,12 @@ These functions support TIMESTAMP and DATE input types. SELECT dayofyear('2016-04-09'); -- 100 -.. spark:function:: dayofweek(date/timestamp) -> integer +.. spark:function:: dayofweek(date) -> integer - Returns the day of the week for date/timestamp (1 = Sunday, 2 = Monday, ..., 7 = Saturday). - We can use `dow` as alias for :: + Returns the day of the week for date (1 = Sunday, 2 = Monday, ..., 7 = Saturday). SELECT dayofweek('2009-07-30'); -- 5 - SELECT dayofweek('2023-08-22 11:23:00.100'); -- 3 - -.. spark:function:: dow(x) -> integer - - This is an alias for :spark:func:`dayofweek`. + SELECT dayofweek('2023-08-22'); -- 3 .. spark:function:: from_unixtime(unixTime, format) -> string diff --git a/velox/dwio/common/CMakeLists.txt b/velox/dwio/common/CMakeLists.txt index 8334de75e0f5..25e8bb56a104 100644 --- a/velox/dwio/common/CMakeLists.txt +++ b/velox/dwio/common/CMakeLists.txt @@ -74,7 +74,6 @@ target_link_libraries( velox_exception velox_expression velox_memory - velox_exec Boost::regex Folly::folly glog::glog) diff --git a/velox/dwio/dwrf/reader/FlatMapColumnReader.cpp b/velox/dwio/dwrf/reader/FlatMapColumnReader.cpp index 7ba27932222b..2e52f05081b8 100644 --- a/velox/dwio/dwrf/reader/FlatMapColumnReader.cpp +++ b/velox/dwio/dwrf/reader/FlatMapColumnReader.cpp @@ -686,7 +686,7 @@ void FlatMapStructEncodingColumnReader::next( if (rowVector) { // Track children vectors in a local variable because readNulls may reset // the parent vector. - result->resize(numValues, false); + rowVector->unsafeResize(numValues, false); children = rowVector->children(); DWIO_ENSURE_EQ(children.size(), keyNodes_.size()); } diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index d09b3bc041f4..44cd1e82e273 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -580,8 +580,7 @@ void HashTable::arrayGroupProbe(HashLookup& lookup) { if (UNLIKELY(!group)) { group = insertEntry(lookup, index, row); } - groups[row] = group; - lookup.hits[row] = group; // NOLINT + groups[row] = group; // NOLINT } } diff --git a/velox/exec/MergeSource.cpp b/velox/exec/MergeSource.cpp index 36904506f8bd..0ba8b6463d69 100644 --- a/velox/exec/MergeSource.cpp +++ b/velox/exec/MergeSource.cpp @@ -125,7 +125,7 @@ class MergeExchangeSource : public MergeSource { folly::Executor* executor) : mergeExchange_(mergeExchange), client_(std::make_shared( - taskId, + mergeExchange->taskId(), destination, maxQueuedBytes, pool, @@ -167,6 +167,7 @@ class MergeExchangeSource : public MergeSource { auto lockedStats = mergeExchange_->stats().wlock(); lockedStats->addInputVector(data->estimateFlatSize(), data->size()); + lockedStats->rawInputPositions += data->size(); } // Since VectorStreamGroup::read() may cause inputStream to be at end, diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 189f209dd864..030e06fbc764 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -519,6 +519,10 @@ class Operator : public BaseRuntimeStatWriter { return operatorCtx_->operatorType(); } + const std::string& taskId() const { + return operatorCtx_->taskId(); + } + /// Registers 'translator' for mapping user defined PlanNode subclass /// instances to user-defined Operators. static void registerOperator(std::unique_ptr translator); diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index fd5addd7af67..364ba829fb4e 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2154,14 +2154,13 @@ folly::dynamic Task::toJson() const { obj["plan"] = planFragment_.planNode->toString(true, true); } - folly::dynamic driverObj = folly::dynamic::array; - int index = 0; - for (auto& driver : drivers_) { - if (driver) { - driverObj[index++] = driver->toJson(); + folly::dynamic drivers = folly::dynamic::object; + for (auto i = 0; i < drivers_.size(); ++i) { + if (drivers_[i] != nullptr) { + drivers[i] = drivers_[i]->toJson(); } } - obj["drivers"] = driverObj; + obj["drivers"] = drivers; if (auto buffers = bufferManager_.lock()) { if (auto buffer = buffers->getBufferIfExists(taskId_)) { diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 52f539138dea..32957ad8154e 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -404,8 +404,10 @@ TEST_F(MultiFragmentTest, mergeExchange) { } auto finalSortTaskId = makeTaskId("orderby", tasks.size()); + core::PlanNodeId mergeExchangeId; auto finalSortPlan = PlanBuilder() .mergeExchange(outputType, {"c0"}) + .capturePlanNodeId(mergeExchangeId) .partitionedOutput({}, 1) .planNode(); @@ -421,6 +423,15 @@ TEST_F(MultiFragmentTest, mergeExchange) { for (auto& task : tasks) { ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); } + + const auto finalSortStats = toPlanStats(task->taskStats()); + const auto& mergeExchangeStats = finalSortStats.at(mergeExchangeId); + + EXPECT_EQ(20'000, mergeExchangeStats.inputRows); + EXPECT_EQ(20'000, mergeExchangeStats.rawInputRows); + + EXPECT_LT(0, mergeExchangeStats.inputBytes); + EXPECT_LT(0, mergeExchangeStats.rawInputBytes); } // Test reordering and dropping columns in PartitionedOutput operator. diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index c64252719084..37c9e5618395 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -502,14 +502,7 @@ class TaskTest : public HiveConnectorTestBase { } }; -TEST_F(TaskTest, wrongPlanNodeForSplit) { - auto connectorSplit = std::make_shared( - "test", - "file:/tmp/abc", - facebook::velox::dwio::common::FileFormat::DWRF, - 0, - 100); - +TEST_F(TaskTest, toJson) { auto plan = PlanBuilder() .tableScan(ROW({"a", "b"}, {INTEGER(), DOUBLE()})) .project({"a * a", "b + b"}) @@ -525,11 +518,42 @@ TEST_F(TaskTest, wrongPlanNodeForSplit) { task->toString(), "{Task task-1 (task-1)Plan: -- Project\n\n drivers:\n"); ASSERT_EQ( folly::toPrettyJson(task->toJson()), - "{\n \"concurrentSplitGroups\": 1,\n \"drivers\": [],\n \"exchangeClientByPlanNode\": {},\n \"groupedPartitionedOutput\": false,\n \"id\": \"task-1\",\n \"noMoreOutputBuffers\": false,\n \"numDriversPerSplitGroup\": 0,\n \"numDriversUngrouped\": 0,\n \"numFinishedDrivers\": 0,\n \"numRunningDrivers\": 0,\n \"numRunningSplitGroups\": 0,\n \"numThreads\": 0,\n \"numTotalDrivers\": 0,\n \"onThreadSince\": \"0\",\n \"partitionedOutputConsumed\": false,\n \"pauseRequested\": false,\n \"plan\": \"-- Project[expressions: (p0:INTEGER, multiply(ROW[\\\"a\\\"],ROW[\\\"a\\\"])), (p1:DOUBLE, plus(ROW[\\\"b\\\"],ROW[\\\"b\\\"]))] -> p0:INTEGER, p1:DOUBLE\\n -- TableScan[table: hive_table] -> a:INTEGER, b:DOUBLE\\n\",\n \"shortId\": \"task-1\",\n \"state\": \"Running\",\n \"terminateRequested\": false\n}"); + "{\n \"concurrentSplitGroups\": 1,\n \"drivers\": {},\n \"exchangeClientByPlanNode\": {},\n \"groupedPartitionedOutput\": false,\n \"id\": \"task-1\",\n \"noMoreOutputBuffers\": false,\n \"numDriversPerSplitGroup\": 0,\n \"numDriversUngrouped\": 0,\n \"numFinishedDrivers\": 0,\n \"numRunningDrivers\": 0,\n \"numRunningSplitGroups\": 0,\n \"numThreads\": 0,\n \"numTotalDrivers\": 0,\n \"onThreadSince\": \"0\",\n \"partitionedOutputConsumed\": false,\n \"pauseRequested\": false,\n \"plan\": \"-- Project[expressions: (p0:INTEGER, multiply(ROW[\\\"a\\\"],ROW[\\\"a\\\"])), (p1:DOUBLE, plus(ROW[\\\"b\\\"],ROW[\\\"b\\\"]))] -> p0:INTEGER, p1:DOUBLE\\n -- TableScan[table: hive_table] -> a:INTEGER, b:DOUBLE\\n\",\n \"shortId\": \"task-1\",\n \"state\": \"Running\",\n \"terminateRequested\": false\n}"); ASSERT_EQ( folly::toPrettyJson(task->toShortJson()), "{\n \"id\": \"task-1\",\n \"numFinishedDrivers\": 0,\n \"numRunningDrivers\": 0,\n \"numThreads\": 0,\n \"numTotalDrivers\": 0,\n \"pauseRequested\": false,\n \"shortId\": \"task-1\",\n \"state\": \"Running\",\n \"terminateRequested\": false\n}"); + task->start(2); + + ASSERT_NO_THROW(task->toJson()); + ASSERT_NO_THROW(task->toShortJson()); + + task->noMoreSplits("0"); + waitForTaskCompletion(task.get()); + + ASSERT_NO_THROW(task->toJson()); + ASSERT_NO_THROW(task->toShortJson()); +} + +TEST_F(TaskTest, wrongPlanNodeForSplit) { + auto connectorSplit = std::make_shared( + "test", + "file:/tmp/abc", + facebook::velox::dwio::common::FileFormat::DWRF, + 0, + 100); + + auto plan = PlanBuilder() + .tableScan(ROW({"a", "b"}, {INTEGER(), DOUBLE()})) + .project({"a * a", "b + b"}) + .planFragment(); + + auto task = Task::create( + "task-1", + std::move(plan), + 0, + std::make_shared(driverExecutor_.get())); + // Add split for the source node. task->addSplit("0", exec::Split(folly::copy(connectorSplit))); diff --git a/velox/functions/lib/string/StringCore.h b/velox/functions/lib/string/StringCore.h index ba988b06c4af..8fdcc4c61892 100644 --- a/velox/functions/lib/string/StringCore.h +++ b/velox/functions/lib/string/StringCore.h @@ -264,6 +264,33 @@ cappedLengthUnicode(const char* input, size_t size, size_t maxChars) { return numChars; } +/// +/// Return an capped length in bytes(controlled by maxChars) of a unicode +/// string. The returned length may be greater than maxCharacters if there are +/// multi-byte characters present in the input string. +/// +/// This method is used to help with indexing unicode strings by byte position. +/// It is used to find the byte position of the Nth character in a string. +/// +/// @param input input buffer that hold the string +/// @param size size of input buffer +/// @param maxChars stop counting characters if the string is longer +/// than this value +/// @return the number of bytes represented by the input utf8 string up to +/// maxChars +/// +FOLLY_ALWAYS_INLINE int64_t +cappedByteLengthUnicode(const char* input, size_t size, int64_t maxChars) { + size_t utf8Position = 0; + size_t numCharacters = 0; + while (utf8Position < size && numCharacters < maxChars) { + auto charSize = utf8proc_char_length(input + utf8Position); + utf8Position += UNLIKELY(charSize < 0) ? 1 : charSize; + numCharacters++; + } + return utf8Position; +} + /// Returns the start byte index of the Nth instance of subString in /// string. Search starts from startPosition. Positions start with 0. If not /// found, -1 is returned. To facilitate finding overlapping strings, the diff --git a/velox/functions/lib/string/StringImpl.h b/velox/functions/lib/string/StringImpl.h index 871f3bffd194..73b6a4366162 100644 --- a/velox/functions/lib/string/StringImpl.h +++ b/velox/functions/lib/string/StringImpl.h @@ -111,7 +111,7 @@ FOLLY_ALWAYS_INLINE int64_t length(const T& input) { } } -/// Return a capped length(controlled by maxLength) of a string. +/// Return a capped length in characters(controlled by maxLength) of a string. /// The returned length is not greater than maxLength. template FOLLY_ALWAYS_INLINE int64_t cappedLength(const T& input, size_t maxLength) { @@ -122,6 +122,19 @@ FOLLY_ALWAYS_INLINE int64_t cappedLength(const T& input, size_t maxLength) { } } +/// Return a capped length in bytes(controlled by maxCharacters) of a string. +/// The returned length may be greater than maxCharacters if there are +/// multi-byte characters present in the input string. +template +FOLLY_ALWAYS_INLINE int64_t +cappedByteLength(const TString& input, size_t maxCharacters) { + if constexpr (isAscii) { + return input.size() > maxCharacters ? maxCharacters : input.size(); + } else { + return cappedByteLengthUnicode(input.data(), input.size(), maxCharacters); + } +} + /// Write the Unicode codePoint as string to the output string. The function /// behavior is undefined when code point it invalid. Implements the logic of /// presto chr function. diff --git a/velox/functions/lib/string/tests/StringImplTest.cpp b/velox/functions/lib/string/tests/StringImplTest.cpp index 258eb6f37053..883949e33c3a 100644 --- a/velox/functions/lib/string/tests/StringImplTest.cpp +++ b/velox/functions/lib/string/tests/StringImplTest.cpp @@ -196,6 +196,100 @@ TEST_F(StringImplTest, cappedLength) { ASSERT_EQ(cappedLength(input, 7), 5); } +TEST_F(StringImplTest, cappedUnicodeBytes) { + // Test functions use case for indexing + // UTF strings. + std::string stringInput = "\xF4\x90\x80\x80Hello"; + ASSERT_EQ('H', stringInput[cappedByteLength(stringInput, 2) - 1]); + ASSERT_EQ('e', stringInput[cappedByteLength(stringInput, 3) - 1]); + ASSERT_EQ('l', stringInput[cappedByteLength(stringInput, 4) - 1]); + ASSERT_EQ('l', stringInput[cappedByteLength(stringInput, 5) - 1]); + ASSERT_EQ('o', stringInput[cappedByteLength(stringInput, 6) - 1]); + ASSERT_EQ('o', stringInput[cappedByteLength(stringInput, 7) - 1]); + + // Multi-byte chars + stringInput = "♫¡Singing is fun!♫"; + auto sPos = cappedByteLength(stringInput, 2); + auto exPos = cappedByteLength(stringInput, 17); + ASSERT_EQ("Singing is fun!♫", stringInput.substr(sPos)); + ASSERT_EQ("♫¡Singing is fun!", stringInput.substr(0, exPos)); + ASSERT_EQ("Singing is fun!", stringInput.substr(sPos, exPos - sPos)); + + stringInput = std::string("abcd"); + auto stringViewInput = std::string_view(stringInput); + ASSERT_EQ(cappedByteLength(stringInput, 1), 1); + ASSERT_EQ(cappedByteLength(stringInput, 2), 2); + ASSERT_EQ(cappedByteLength(stringInput, 3), 3); + ASSERT_EQ(cappedByteLength(stringInput, 4), 4); + ASSERT_EQ(cappedByteLength(stringInput, 5), 4); + ASSERT_EQ(cappedByteLength(stringInput, 6), 4); + + ASSERT_EQ(cappedByteLength(stringViewInput, 1), 1); + ASSERT_EQ(cappedByteLength(stringViewInput, 2), 2); + ASSERT_EQ(cappedByteLength(stringViewInput, 3), 3); + ASSERT_EQ(cappedByteLength(stringViewInput, 4), 4); + ASSERT_EQ(cappedByteLength(stringViewInput, 5), 4); + ASSERT_EQ(cappedByteLength(stringViewInput, 6), 4); + + stringInput = std::string("你好a世界"); + stringViewInput = std::string_view(stringInput); + ASSERT_EQ(cappedByteLength(stringInput, 1), 3); + ASSERT_EQ(cappedByteLength(stringInput, 2), 6); + ASSERT_EQ(cappedByteLength(stringInput, 3), 7); + ASSERT_EQ(cappedByteLength(stringInput, 4), 10); + ASSERT_EQ(cappedByteLength(stringInput, 5), 13); + ASSERT_EQ(cappedByteLength(stringInput, 6), 13); + + ASSERT_EQ(cappedByteLength(stringViewInput, 1), 3); + ASSERT_EQ(cappedByteLength(stringViewInput, 2), 6); + ASSERT_EQ(cappedByteLength(stringViewInput, 3), 7); + ASSERT_EQ(cappedByteLength(stringViewInput, 4), 10); + ASSERT_EQ(cappedByteLength(stringViewInput, 5), 13); + ASSERT_EQ(cappedByteLength(stringViewInput, 6), 13); + + stringInput = std::string("\x80"); + stringViewInput = std::string_view(stringInput); + ASSERT_EQ(cappedByteLength(stringInput, 1), 1); + ASSERT_EQ(cappedByteLength(stringInput, 2), 1); + ASSERT_EQ(cappedByteLength(stringInput, 3), 1); + ASSERT_EQ(cappedByteLength(stringInput, 4), 1); + ASSERT_EQ(cappedByteLength(stringInput, 5), 1); + ASSERT_EQ(cappedByteLength(stringInput, 6), 1); + + ASSERT_EQ(cappedByteLength(stringViewInput, 1), 1); + ASSERT_EQ(cappedByteLength(stringViewInput, 2), 1); + ASSERT_EQ(cappedByteLength(stringViewInput, 3), 1); + ASSERT_EQ(cappedByteLength(stringViewInput, 4), 1); + ASSERT_EQ(cappedByteLength(stringViewInput, 5), 1); + ASSERT_EQ(cappedByteLength(stringViewInput, 6), 1); + + stringInput.resize(2); + // Create corrupt data below. + char16_t c = u'\u04FF'; + stringInput[0] = (char)c; + stringInput[1] = (char)c; + + ASSERT_EQ(cappedByteLength(stringInput, 1), 1); + + stringInput.resize(4); + c = u'\u04F4'; + char16_t c2 = u'\u048F'; + char16_t c3 = u'\u04BF'; + stringInput[0] = (char)c; + stringInput[1] = (char)c2; + stringInput[2] = (char)c3; + stringInput[3] = (char)c3; + + stringViewInput = std::string_view(stringInput); + ASSERT_EQ(cappedByteLength(stringInput, 1), 4); + ASSERT_EQ(cappedByteLength(stringInput, 2), 4); + ASSERT_EQ(cappedByteLength(stringInput, 3), 4); + + ASSERT_EQ(cappedByteLength(stringViewInput, 1), 4); + ASSERT_EQ(cappedByteLength(stringViewInput, 2), 4); + ASSERT_EQ(cappedByteLength(stringViewInput, 3), 4); +} + TEST_F(StringImplTest, badUnicodeLength) { ASSERT_EQ(0, length(std::string(""))); ASSERT_EQ(2, length(std::string("ab"))); diff --git a/velox/functions/prestosql/benchmarks/CMakeLists.txt b/velox/functions/prestosql/benchmarks/CMakeLists.txt index f7364c1d2287..e794460ef0d9 100644 --- a/velox/functions/prestosql/benchmarks/CMakeLists.txt +++ b/velox/functions/prestosql/benchmarks/CMakeLists.txt @@ -46,6 +46,11 @@ add_executable(velox_functions_prestosql_benchmarks_array_sum target_link_libraries(velox_functions_prestosql_benchmarks_array_sum ${BENCHMARK_DEPENDENCIES}) +add_executable(velox_functions_prestosql_benchmarks_field_reference + FieldReferenceBenchmark.cpp) +target_link_libraries(velox_functions_prestosql_benchmarks_field_reference + ${BENCHMARK_DEPENDENCIES}) + add_executable(velox_functions_prestosql_benchmarks_width_bucket WidthBucketBenchmark.cpp) target_link_libraries(velox_functions_prestosql_benchmarks_width_bucket diff --git a/velox/functions/prestosql/benchmarks/FieldReferenceBenchmark.cpp b/velox/functions/prestosql/benchmarks/FieldReferenceBenchmark.cpp new file mode 100644 index 000000000000..0abc546258ea --- /dev/null +++ b/velox/functions/prestosql/benchmarks/FieldReferenceBenchmark.cpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "velox/benchmarks/ExpressionBenchmarkBuilder.h" +#include "velox/functions/lib/benchmarks/FunctionBenchmarkBase.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::functions; + +std::vector getColumnNames(int children) { + std::vector result; + for (int i = 0; i < children; ++i) { + result.push_back(fmt::format("{}{}", 'c', i)); + } + return result; +} + +RowTypePtr getRowColumnType(FuzzerGenerator& rng, int children, int level) { + VELOX_CHECK_GE(level, 1); + VELOX_CHECK_GE(children, 3); + std::vector result; + result.push_back(ARRAY(INTEGER())); + result.push_back(INTEGER()); + if (level > 1) { + result.push_back(getRowColumnType(rng, children, level - 1)); + } else { + result.push_back(randType(rng, 2)); + } + for (int i = 0; i < children - 3; ++i) { + result.push_back(randType(rng, 2)); + } + return ROW(getColumnNames(children), std::move(result)); +} + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + + ExpressionBenchmarkBuilder benchmarkBuilder; + FuzzerGenerator rng; + + auto createSet = [&](bool withNulls, RowTypePtr& inputType) { + benchmarkBuilder + .addBenchmarkSet( + fmt::format("dereference_{}", withNulls ? "nulls" : "nullfree"), + inputType) + .withFuzzerOptions( + {.vectorSize = 1000, .nullRatio = withNulls ? 0.2 : 0}) + .addExpression("1LevelThenFlat", "(c0).c1") + .addExpression("1LevelThenComplex", "(c0).c0") + .addExpression("2LevelThenFlat", "(c0).c2.c1") + .addExpression("2LevelThenComplex", "(c0).c2.c0") + .addExpression("3LevelThenFlat", "(c0).c2.c2.c1") + .addExpression("3LevelThenComplex", "(c0).c2.c2.c0") + .addExpression("4LevelThenFlat", "(c0).c2.c2.c2.c1") + .addExpression("4LevelThenComplex", "(c0).c2.c2.c2.c0"); + }; + + // Create a nested row column of depth 4. Each level has 50 columns. Each ROW + // at depth n will have the first three columns as ARRAY(INTEGER()), INTEGER() + // and ROW {of depth 4-n} respectively. The third column for the deepest ROW + // however can be anything. + auto inputType = ROW({"c0"}, {getRowColumnType(rng, 50, 4)}); + + createSet(true, inputType); + createSet(false, inputType); + + benchmarkBuilder.registerBenchmarks(); + + folly::runBenchmarks(); + return 0; +} diff --git a/velox/functions/sparksql/DateTimeFunctions.h b/velox/functions/sparksql/DateTimeFunctions.h index 4daba89f2bf6..b951e40d797a 100644 --- a/velox/functions/sparksql/DateTimeFunctions.h +++ b/velox/functions/sparksql/DateTimeFunctions.h @@ -396,7 +396,7 @@ struct DateSubFunction { }; template -struct DayOfWeekFunction : public InitSessionTimezone { +struct DayOfWeekFunction { VELOX_DEFINE_FUNCTION_TYPES(T); // 1 = Sunday, 2 = Monday, ..., 7 = Saturday @@ -404,12 +404,6 @@ struct DayOfWeekFunction : public InitSessionTimezone { return time.tm_wday + 1; } - FOLLY_ALWAYS_INLINE void call( - int32_t& result, - const arg_type& timestamp) { - result = getDayOfWeek(getDateTime(timestamp, this->timeZone_)); - } - FOLLY_ALWAYS_INLINE void call(int32_t& result, const arg_type& date) { result = getDayOfWeek(getDateTime(date)); } diff --git a/velox/functions/sparksql/Register.cpp b/velox/functions/sparksql/Register.cpp index db08f41b6eb2..bd748af7dadf 100644 --- a/velox/functions/sparksql/Register.cpp +++ b/velox/functions/sparksql/Register.cpp @@ -304,10 +304,7 @@ void registerFunctions(const std::string& prefix) { registerFunction( {prefix + "doy", prefix + "dayofyear"}); - registerFunction( - {prefix + "dow", prefix + "dayofweek"}); - registerFunction( - {prefix + "dow", prefix + "dayofweek"}); + registerFunction({prefix + "dayofweek"}); registerFunction({prefix + "weekday"}); diff --git a/velox/functions/sparksql/tests/DateTimeFunctionsTest.cpp b/velox/functions/sparksql/tests/DateTimeFunctionsTest.cpp index 2a95a14ef812..683ba3711f21 100644 --- a/velox/functions/sparksql/tests/DateTimeFunctionsTest.cpp +++ b/velox/functions/sparksql/tests/DateTimeFunctionsTest.cpp @@ -427,73 +427,27 @@ TEST_F(DateTimeFunctionsTest, dayOfMonth) { } TEST_F(DateTimeFunctionsTest, dayOfWeekDate) { - const auto dayOfWeek = [&](std::optional date, - const std::string& func) { - return evaluateOnce( - fmt::format("{}(c0)", func), {date}, {DATE()}); + const auto dayOfWeek = [&](std::optional date) { + return evaluateOnce("dayofweek(c0)", {date}, {DATE()}); }; - for (const auto& func : {"dayofweek", "dow"}) { - EXPECT_EQ(std::nullopt, dayOfWeek(std::nullopt, func)); - EXPECT_EQ(5, dayOfWeek(0, func)); - EXPECT_EQ(4, dayOfWeek(-1, func)); - EXPECT_EQ(7, dayOfWeek(-40, func)); - EXPECT_EQ(5, dayOfWeek(parseDate("2009-07-30"), func)); - EXPECT_EQ(1, dayOfWeek(parseDate("2023-08-20"), func)); - EXPECT_EQ(2, dayOfWeek(parseDate("2023-08-21"), func)); - EXPECT_EQ(3, dayOfWeek(parseDate("2023-08-22"), func)); - EXPECT_EQ(4, dayOfWeek(parseDate("2023-08-23"), func)); - EXPECT_EQ(5, dayOfWeek(parseDate("2023-08-24"), func)); - EXPECT_EQ(6, dayOfWeek(parseDate("2023-08-25"), func)); - EXPECT_EQ(7, dayOfWeek(parseDate("2023-08-26"), func)); - EXPECT_EQ(1, dayOfWeek(parseDate("2023-08-27"), func)); - - // test cases from spark's DateExpressionSuite. - EXPECT_EQ(6, dayOfWeek(util::fromDateString("2011-05-06"), func)); - } -} - -TEST_F(DateTimeFunctionsTest, dayofWeekTs) { - const auto dayOfWeek = [&](std::optional date, - const std::string& func) { - return evaluateOnce(fmt::format("{}(c0)", func), date); - }; - - for (const auto& func : {"dayofweek", "dow"}) { - EXPECT_EQ(5, dayOfWeek(Timestamp(0, 0), func)); - EXPECT_EQ(4, dayOfWeek(Timestamp(-1, 0), func)); - EXPECT_EQ( - 1, - dayOfWeek(util::fromTimestampString("2023-08-20 20:23:00.001"), func)); - EXPECT_EQ( - 2, - dayOfWeek(util::fromTimestampString("2023-08-21 21:23:00.030"), func)); - EXPECT_EQ( - 3, - dayOfWeek(util::fromTimestampString("2023-08-22 11:23:00.100"), func)); - EXPECT_EQ( - 4, - dayOfWeek(util::fromTimestampString("2023-08-23 22:23:00.030"), func)); - EXPECT_EQ( - 5, - dayOfWeek(util::fromTimestampString("2023-08-24 15:23:00.000"), func)); - EXPECT_EQ( - 6, - dayOfWeek(util::fromTimestampString("2023-08-25 03:23:04.000"), func)); - EXPECT_EQ( - 7, - dayOfWeek(util::fromTimestampString("2023-08-26 01:03:00.300"), func)); - EXPECT_EQ( - 1, - dayOfWeek(util::fromTimestampString("2023-08-27 01:13:00.000"), func)); - // test cases from spark's DateExpressionSuite. - EXPECT_EQ( - 4, dayOfWeek(util::fromTimestampString("2015-04-08 13:10:15"), func)); - EXPECT_EQ( - 7, dayOfWeek(util::fromTimestampString("2017-05-27 13:10:15"), func)); - EXPECT_EQ( - 6, dayOfWeek(util::fromTimestampString("1582-10-15 13:10:15"), func)); - } + EXPECT_EQ(std::nullopt, dayOfWeek(std::nullopt)); + EXPECT_EQ(5, dayOfWeek(0)); + EXPECT_EQ(4, dayOfWeek(-1)); + EXPECT_EQ(7, dayOfWeek(-40)); + EXPECT_EQ(5, dayOfWeek(parseDate("2009-07-30"))); + EXPECT_EQ(1, dayOfWeek(parseDate("2023-08-20"))); + EXPECT_EQ(2, dayOfWeek(parseDate("2023-08-21"))); + EXPECT_EQ(3, dayOfWeek(parseDate("2023-08-22"))); + EXPECT_EQ(4, dayOfWeek(parseDate("2023-08-23"))); + EXPECT_EQ(5, dayOfWeek(parseDate("2023-08-24"))); + EXPECT_EQ(6, dayOfWeek(parseDate("2023-08-25"))); + EXPECT_EQ(7, dayOfWeek(parseDate("2023-08-26"))); + EXPECT_EQ(1, dayOfWeek(parseDate("2023-08-27"))); + EXPECT_EQ(6, dayOfWeek(util::fromDateString("2011-05-06"))); + EXPECT_EQ(4, dayOfWeek(util::fromDateString("2015-04-08"))); + EXPECT_EQ(7, dayOfWeek(util::fromDateString("2017-05-27"))); + EXPECT_EQ(6, dayOfWeek(util::fromDateString("1582-10-15"))); } TEST_F(DateTimeFunctionsTest, weekdayDate) { diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 149a70948fc5..bc9843174699 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -3320,7 +3320,6 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { public: PrestoIterativeVectorSerializer( const RowTypePtr& rowType, - std::vector encodings, int32_t numRows, StreamArena* streamArena, bool useLosslessTimestamp, @@ -3332,40 +3331,10 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { streams_.resize(numTypes); for (int i = 0; i < numTypes; ++i) { - std::optional encoding = std::nullopt; - if (i < encodings.size()) { - encoding = encodings[i]; - } streams_[i] = std::make_unique( types[i], - encoding, std::nullopt, - streamArena, - numRows, - useLosslessTimestamp); - } - } - - // Constructor that takes a row vector instead of only the types. This is - // different because then we know exactly how each vector is encoded - // (recursively). - PrestoIterativeVectorSerializer( - const RowVectorPtr& rowVector, - StreamArena* streamArena, - bool useLosslessTimestamp, - common::CompressionKind compressionKind) - : streamArena_(streamArena), - codec_(common::compressionKindToCodec(compressionKind)) { - auto numRows = rowVector->size(); - auto rowType = rowVector->type(); - auto numChildren = rowVector->childrenSize(); - streams_.resize(numChildren); - - for (int i = 0; i < numChildren; i++) { - streams_[i] = std::make_unique( - rowType->childAt(i), std::nullopt, - rowVector->childAt(i), streamArena, numRows, useLosslessTimestamp); @@ -3420,16 +3389,6 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { flushStreams(streams_, numRows_, *streamArena_, *codec_, out); } - void flushEncoded(const RowVectorPtr& vector, OutputStream* out) { - VELOX_CHECK_EQ(0, numRows_); - - std::vector ranges{{0, vector->size()}}; - Scratch scratch; - append(vector, folly::Range(ranges.data(), ranges.size()), scratch); - - flushStreams(streams_, vector->size(), *streamArena_, *codec_, out); - } - private: StreamArena* const streamArena_; const std::unique_ptr codec_; @@ -3464,7 +3423,6 @@ PrestoVectorSerde::createIterativeSerializer( const auto prestoOptions = toPrestoOptions(options); return std::make_unique( type, - prestoOptions.encodings, numRows, streamArena, prestoOptions.useLosslessTimestamp, @@ -3479,20 +3437,6 @@ std::unique_ptr PrestoVectorSerde::createBatchSerializer( pool, prestoOptions.useLosslessTimestamp, prestoOptions.compressionKind); } -void PrestoVectorSerde::deprecatedSerializeEncoded( - const RowVectorPtr& vector, - StreamArena* streamArena, - const Options* options, - OutputStream* out) { - auto prestoOptions = toPrestoOptions(options); - auto serializer = std::make_unique( - vector, - streamArena, - prestoOptions.useLosslessTimestamp, - prestoOptions.compressionKind); - serializer->flushEncoded(vector, out); -} - void PrestoVectorSerde::deserialize( ByteInputStream* source, velox::memory::MemoryPool* pool, diff --git a/velox/serializers/PrestoSerializer.h b/velox/serializers/PrestoSerializer.h index a957e27ce76e..287741afe0c8 100644 --- a/velox/serializers/PrestoSerializer.h +++ b/velox/serializers/PrestoSerializer.h @@ -58,9 +58,6 @@ class PrestoVectorSerde : public VectorSerde { bool useLosslessTimestamp{false}; common::CompressionKind compressionKind{ common::CompressionKind::CompressionKind_NONE}; - - /// Specifies the encoding for each of the top-level child vector. - std::vector encodings; }; /// Adds the serialized sizes of the rows of 'vector' in 'ranges[i]' to @@ -90,25 +87,6 @@ class PrestoVectorSerde : public VectorSerde { memory::MemoryPool* pool, const Options* options) override; - /// Serializes a single RowVector with possibly encoded children, preserving - /// their encodings. Encodings are preserved recursively for any RowVector - /// children, but not for children of other nested vectors such as Array, Map, - /// and Dictionary. - /// - /// PrestoPage does not support serialization of Dictionaries with nulls; - /// in case dictionaries contain null they are serialized as flat buffers. - /// - /// In order to override the encodings of top-level columns in the RowVector, - /// you can specifiy the encodings using PrestoOptions.encodings - /// - /// DEPRECATED: Use createBatchSerializer and the BatchVectorSerializer's - /// serialize function instead. - void deprecatedSerializeEncoded( - const RowVectorPtr& vector, - StreamArena* streamArena, - const Options* options, - OutputStream* out); - bool supportsAppendInDeserialize() const override { return true; } diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 6c75a67d6c3a..143abc7dc2cb 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -253,23 +253,6 @@ class PrestoSerializerTest return stats; } - void serializeEncoded( - const RowVectorPtr& rowVector, - std::ostream* output, - const serializer::presto::PrestoVectorSerde::PrestoOptions* - serdeOptions) { - facebook::velox::serializer::presto::PrestoOutputStreamListener listener; - OStreamOutputStream out(output, &listener); - StreamArena arena{pool_.get()}; - auto paramOptions = getParamSerdeOptions(serdeOptions); - - for (const auto& child : rowVector->children()) { - paramOptions.encodings.push_back(child->encoding()); - } - - serde_->deprecatedSerializeEncoded(rowVector, &arena, ¶mOptions, &out); - } - void assertEqualEncoding( const RowVectorPtr& expected, const RowVectorPtr& actual) { @@ -316,17 +299,6 @@ class PrestoSerializerTest assertEqualVectors(expected, result); } - void testEncodedRoundTrip( - const RowVectorPtr& data, - const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = - nullptr) { - std::ostringstream out; - serializeEncoded(data, &out, serdeOptions); - const auto serialized = out.str(); - - verifySerializedEncodedData(data, serialized, serdeOptions); - } - void serializeBatch( const RowVectorPtr& rowVector, std::ostream* output, @@ -744,34 +716,18 @@ TEST_P(PrestoSerializerTest, longDecimal) { testRoundTrip(vector); } -// Test that hierarchically encoded columns (rows) have their encodings -// preserved. -TEST_P(PrestoSerializerTest, encodings) { - testEncodedRoundTrip(encodingsTestVector()); -} - // Test that hierarchically encoded columns (rows) have their encodings // preserved by the PrestoBatchVectorSerializer. TEST_P(PrestoSerializerTest, encodingsBatchVectorSerializer) { testBatchVectorSerializerRoundTrip(encodingsTestVector()); } -// Test that array elements have their encodings preserved. -TEST_P(PrestoSerializerTest, encodingsArrayElements) { - testEncodedRoundTrip(encodingsArrayElementsTestVector()); -} - // Test that array elements have their encodings preserved by the // PrestoBatchVectorSerializer. TEST_P(PrestoSerializerTest, encodingsArrayElementsBatchVectorSerializer) { testBatchVectorSerializerRoundTrip(encodingsArrayElementsTestVector()); } -// Test that map values have their encodings preserved. -TEST_P(PrestoSerializerTest, encodingsMapValues) { - testEncodedRoundTrip(encodingsMapValuesTestVector()); -} - // Test that map values have their encodings preserved by the // PrestoBatchVectorSerializer. TEST_P(PrestoSerializerTest, encodingsMapValuesBatchVectorSerializer) { diff --git a/velox/type/OpaqueCustomTypes.h b/velox/type/OpaqueCustomTypes.h index fdb42d3d94a0..dacf0cfb87b7 100644 --- a/velox/type/OpaqueCustomTypes.h +++ b/velox/type/OpaqueCustomTypes.h @@ -33,8 +33,8 @@ class CastOperator; template class OpaqueCustomTypeRegister { public: - static void registerType() { - facebook::velox::registerCustomType( + static bool registerType() { + return facebook::velox::registerCustomType( customTypeName, std::make_unique()); } diff --git a/velox/vector/CMakeLists.txt b/velox/vector/CMakeLists.txt index faa906605cd0..1b38cc1d248d 100644 --- a/velox/vector/CMakeLists.txt +++ b/velox/vector/CMakeLists.txt @@ -37,7 +37,7 @@ add_subdirectory(fuzzer) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) -elseif(${VELOX_BUILD_TEST_UTILS}) +elseif(${VELOX_BUILD_TEST_UTILS} OR ${VELOX_BUILD_VECTOR_TEST_UTILS}) add_subdirectory(tests/utils) endif() diff --git a/velox/vector/tests/utils/CMakeLists.txt b/velox/vector/tests/utils/CMakeLists.txt index 8b7d335db7ab..5b77b40ba99b 100644 --- a/velox/vector/tests/utils/CMakeLists.txt +++ b/velox/vector/tests/utils/CMakeLists.txt @@ -13,5 +13,4 @@ # limitations under the License. add_library(velox_vector_test_lib VectorMaker.cpp VectorTestBase.cpp) -target_link_libraries(velox_vector_test_lib velox_exec velox_vector gtest - gtest_main) +target_link_libraries(velox_vector_test_lib velox_vector gtest gtest_main)