diff --git a/velox/docs/develop/testing/join-fuzzer.rst b/velox/docs/develop/testing/join-fuzzer.rst index be7d61a467bf..1bbfbfc7df41 100644 --- a/velox/docs/develop/testing/join-fuzzer.rst +++ b/velox/docs/develop/testing/join-fuzzer.rst @@ -42,7 +42,7 @@ Use velox_join_fuzzer_test binary to run join fuzzer: velox/exec/tests/velox_join_fuzzer_test -By default, the fuzzer will go through 10 interations. Use --steps +By default, the fuzzer will go through 10 iterations. Use --steps or --duration-sec flag to run fuzzer for longer. Use --seed to reproduce fuzzer failures. diff --git a/velox/docs/develop/testing/row-number-fuzzer.rst b/velox/docs/develop/testing/row-number-fuzzer.rst new file mode 100644 index 000000000000..6f304a50f72b --- /dev/null +++ b/velox/docs/develop/testing/row-number-fuzzer.rst @@ -0,0 +1,55 @@ +================ +RowNumber Fuzzer +================ + +The RowNumberFuzzer is a testing tool that automatically generate equivalent query plans and then executes these plans +to validate the consistency of the results. It works as follows: + +1. Data Generation: It starts by generating a random set of input data, also known as a vector. This data can + have a variety of encodings and data layouts to ensure thorough testing. +2. Plan Generation: Generate two equivalent query plans, one is row-number over ValuesNode as the base plan. + and the other is over TableScanNode as the alter plan. +3. Query Execution: Executes those equivalent query plans using the generated data and asserts that the results are + consistent across different plans. + i. Execute the base plan, compare the result with the reference (DuckDB or Presto) and use it as the expected result. + #. Execute the alter plan multiple times with and without spill, and compare each result with the + expected result. +4. Iteration: This process is repeated multiple times to ensure reliability and robustness. + +How to run +---------- + +Use velox_row_number_fuzzer_test binary to run rowNumber fuzzer: + +:: + + velox/exec/tests/velox_row_number_fuzzer_test --seed 123 --duration_sec 60 + +By default, the fuzzer will go through 10 iterations. Use --steps +or --duration-sec flag to run fuzzer for longer. Use --seed to +reproduce fuzzer failures. + +Here is a full list of supported command line arguments. + +* ``–-steps``: How many iterations to run. Each iteration generates and + evaluates one expression or aggregation. Default is 10. + +* ``–-duration_sec``: For how long to run in seconds. If both ``-–steps`` + and ``-–duration_sec`` are specified, –duration_sec takes precedence. + +* ``–-seed``: The seed to generate random expressions and input vectors with. + +* ``–-v=1``: Verbose logging (from `Google Logging Library `_). + +* ``–-batch_size``: The size of input vectors to generate. Default is 100. + +* ``--num_batches``: The number of input vectors of size `--batch_size` to + generate. Default is 5. + +* ``--enable_spill``: Whether to test with spilling or not. Default is true. + +* ``--presto_url`` The PrestoQueryRunner url along with its port number. + +* ``--req_timeout_ms`` Timeout in milliseconds of an HTTP request to the PrestoQueryRunner. + +If running from CLion IDE, add ``--logtostderr=1`` to see the full output. diff --git a/velox/exec/fuzzer/CMakeLists.txt b/velox/exec/fuzzer/CMakeLists.txt index 96ca34cb9cc4..a169100a6bdf 100644 --- a/velox/exec/fuzzer/CMakeLists.txt +++ b/velox/exec/fuzzer/CMakeLists.txt @@ -57,3 +57,9 @@ target_link_libraries( velox_expression_test_utility velox_aggregation_fuzzer_base velox_temp_path) + +add_library(velox_row_number_fuzzer RowNumberFuzzer.cpp) + +target_link_libraries( + velox_row_number_fuzzer velox_fuzzer_util velox_type velox_vector_fuzzer + velox_exec_test_lib velox_expression_test_utility) diff --git a/velox/exec/fuzzer/DuckQueryRunner.cpp b/velox/exec/fuzzer/DuckQueryRunner.cpp index d926addfd921..e19b1d33a7b0 100644 --- a/velox/exec/fuzzer/DuckQueryRunner.cpp +++ b/velox/exec/fuzzer/DuckQueryRunner.cpp @@ -133,21 +133,26 @@ std::optional DuckQueryRunner::toSql( } } - if (auto projectNode = + if (const auto projectNode = std::dynamic_pointer_cast(plan)) { return toSql(projectNode); } - if (auto windowNode = + if (const auto windowNode = std::dynamic_pointer_cast(plan)) { return toSql(windowNode); } - if (auto aggregationNode = + if (const auto aggregationNode = std::dynamic_pointer_cast(plan)) { return toSql(aggregationNode); } + if (const auto rowNumberNode = + std::dynamic_pointer_cast(plan)) { + return toSql(rowNumberNode); + } + VELOX_NYI(); } @@ -297,4 +302,31 @@ std::optional DuckQueryRunner::toSql( return sql.str(); } + +std::optional DuckQueryRunner::toSql( + const std::shared_ptr& rowNumberNode) { + std::stringstream sql; + sql << "SELECT "; + + const auto& inputType = rowNumberNode->sources()[0]->outputType(); + for (auto i = 0; i < inputType->size(); ++i) { + appendComma(i, sql); + sql << inputType->nameOf(i); + } + + sql << ", row_number() OVER ("; + + const auto& partitionKeys = rowNumberNode->partitionKeys(); + if (!partitionKeys.empty()) { + sql << "partition by "; + for (auto i = 0; i < partitionKeys.size(); ++i) { + appendComma(i, sql); + sql << partitionKeys[i]->name(); + } + } + + sql << ") as row_number FROM tmp"; + + return sql.str(); +} } // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/DuckQueryRunner.h b/velox/exec/fuzzer/DuckQueryRunner.h index a683652946a4..a5dc3f785716 100644 --- a/velox/exec/fuzzer/DuckQueryRunner.h +++ b/velox/exec/fuzzer/DuckQueryRunner.h @@ -49,6 +49,9 @@ class DuckQueryRunner : public ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& projectNode); + std::optional toSql( + const std::shared_ptr& rowNumberNode); + std::unordered_set aggregateFunctionNames_; }; diff --git a/velox/exec/fuzzer/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp index bc81a452ae5e..7831bf350a7a 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -159,21 +159,26 @@ PrestoQueryRunner::PrestoQueryRunner( std::optional PrestoQueryRunner::toSql( const core::PlanNodePtr& plan) { - if (auto projectNode = + if (const auto projectNode = std::dynamic_pointer_cast(plan)) { return toSql(projectNode); } - if (auto windowNode = + if (const auto windowNode = std::dynamic_pointer_cast(plan)) { return toSql(windowNode); } - if (auto aggregationNode = + if (const auto aggregationNode = std::dynamic_pointer_cast(plan)) { return toSql(aggregationNode); } + if (const auto rowNumberNode = + std::dynamic_pointer_cast(plan)) { + return toSql(rowNumberNode); + } + VELOX_NYI(); } @@ -500,6 +505,37 @@ std::optional PrestoQueryRunner::toSql( return sql.str(); } +std::optional PrestoQueryRunner::toSql( + const std::shared_ptr& rowNumberNode) { + if (!isSupportedDwrfType(rowNumberNode->sources()[0]->outputType())) { + return std::nullopt; + } + + std::stringstream sql; + sql << "SELECT "; + + const auto& inputType = rowNumberNode->sources()[0]->outputType(); + for (auto i = 0; i < inputType->size(); ++i) { + appendComma(i, sql); + sql << inputType->nameOf(i); + } + + sql << ", row_number() OVER ("; + + const auto& partitionKeys = rowNumberNode->partitionKeys(); + if (!partitionKeys.empty()) { + sql << "partition by "; + for (auto i = 0; i < partitionKeys.size(); ++i) { + appendComma(i, sql); + sql << partitionKeys[i]->name(); + } + } + + sql << ") as row_number FROM tmp"; + + return sql.str(); +} + std::multiset> PrestoQueryRunner::execute( const std::string& sql, const std::vector& input, diff --git a/velox/exec/fuzzer/PrestoQueryRunner.h b/velox/exec/fuzzer/PrestoQueryRunner.h index dfa8fabea93f..7490e91a03c2 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.h +++ b/velox/exec/fuzzer/PrestoQueryRunner.h @@ -86,6 +86,9 @@ class PrestoQueryRunner : public velox::exec::test::ReferenceQueryRunner { std::optional toSql( const std::shared_ptr& projectNode); + std::optional toSql( + const std::shared_ptr& rowNumberNode); + std::string startQuery(const std::string& sql); std::string fetchNext(const std::string& nextUri); diff --git a/velox/exec/fuzzer/RowNumberFuzzer.cpp b/velox/exec/fuzzer/RowNumberFuzzer.cpp new file mode 100644 index 000000000000..c7a482859c33 --- /dev/null +++ b/velox/exec/fuzzer/RowNumberFuzzer.cpp @@ -0,0 +1,550 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/fuzzer/RowNumberFuzzer.h" +#include +#include +#include "velox/common/file/FileSystems.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/dwio/dwrf/reader/DwrfReader.h" +#include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +DEFINE_int32(steps, 10, "Number of plans to generate and test."); + +DEFINE_int32( + duration_sec, + 0, + "For how long it should run (in seconds). If zero, " + "it executes exactly --steps iterations and exits."); + +DEFINE_int32( + batch_size, + 100, + "The number of elements on each generated vector."); + +DEFINE_int32(num_batches, 10, "The number of generated vectors."); + +DEFINE_double( + null_ratio, + 0.1, + "Chance of adding a null value in a vector " + "(expressed as double from 0 to 1)."); + +DEFINE_bool(enable_spill, true, "Whether to test plans with spilling enabled."); + +DEFINE_bool( + enable_oom_injection, + false, + "When enabled OOMs will randomly be triggered while executing query " + "plans. The goal of this mode is to ensure unexpected exceptions " + "aren't thrown and the process isn't killed in the process of cleaning " + "up after failures. Therefore, results are not compared when this is " + "enabled. Note that this option only works in debug builds."); + +namespace facebook::velox::exec::test { +namespace { + +class RowNumberFuzzer { + public: + explicit RowNumberFuzzer( + size_t initialSeed, + std::unique_ptr); + + void go(); + + struct PlanWithSplits { + core::PlanNodePtr plan; + std::vector> splits; + + explicit PlanWithSplits( + core::PlanNodePtr _plan, + const std::vector>& _splits = + {}) + : plan(std::move(_plan)), splits(_splits) {} + }; + + private: + static VectorFuzzer::Options getFuzzerOptions() { + VectorFuzzer::Options opts; + opts.vectorSize = FLAGS_batch_size; + opts.stringVariableLength = true; + opts.stringLength = 100; + opts.nullRatio = FLAGS_null_ratio; + return opts; + } + + static inline const std::string kHiveConnectorId = "test-hive"; + + // Makes a connector split from a file path on storage. + static std::shared_ptr makeSplit( + const std::string& filePath); + + void seed(size_t seed) { + currentSeed_ = seed; + vectorFuzzer_.reSeed(seed); + rng_.seed(currentSeed_); + } + + void reSeed() { + seed(rng_()); + } + + // Runs one test iteration from query plans generations, executions and result + // verifications. + void verify(); + + int32_t randInt(int32_t min, int32_t max) { + return boost::random::uniform_int_distribution(min, max)(rng_); + } + + std::pair, std::vector> + generatePartitionKeys(); + + std::vector generateInput( + const std::vector& keyNames, + const std::vector& keyTypes); + + std::optional computeReferenceResults( + core::PlanNodePtr& plan, + const std::vector& input); + + RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); + + void addPlansWithTableScan( + const std::string& tableDir, + const std::vector& partitionKeys, + const std::vector& input, + std::vector& altPlans); + + // Makes the query plan with default settings in RowNumberFuzzer and value + // inputs for both probe and build sides. + // + // NOTE: 'input' could either input rows with lazy + // vectors or flatten ones. + static PlanWithSplits makeDefaultPlan( + const std::vector& partitionKeys, + const std::vector& input); + + static PlanWithSplits makePlanWithTableScan( + const RowTypePtr& type, + const std::vector& partitionKeys, + const std::vector>& splits); + + FuzzerGenerator rng_; + size_t currentSeed_{0}; + + std::shared_ptr rootPool_{ + memory::memoryManager()->addRootPool( + "rowNumberFuzzer", + memory::kMaxMemory, + memory::MemoryReclaimer::create())}; + std::shared_ptr pool_{rootPool_->addLeafChild( + "rowNumberFuzzerLeaf", + true, + exec::MemoryReclaimer::create())}; + std::shared_ptr writerPool_{rootPool_->addAggregateChild( + "rowNumberFuzzerWriter", + exec::MemoryReclaimer::create())}; + VectorFuzzer vectorFuzzer_; + std::unique_ptr referenceQueryRunner_; +}; + +RowNumberFuzzer::RowNumberFuzzer( + size_t initialSeed, + std::unique_ptr referenceQueryRunner) + : vectorFuzzer_{getFuzzerOptions(), pool_.get()}, + referenceQueryRunner_{std::move(referenceQueryRunner)} { + filesystems::registerLocalFileSystem(); + + // Make sure not to run out of open file descriptors. + const std::unordered_map hiveConfig = { + {connector::hive::HiveConfig::kNumCacheFileHandles, "1000"}}; + auto hiveConnector = + connector::getConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName) + ->newConnector( + kHiveConnectorId, std::make_shared(hiveConfig)); + connector::registerConnector(hiveConnector); + + seed(initialSeed); +} + +void writeToFile( + const std::string& path, + const VectorPtr& vector, + memory::MemoryPool* pool) { + dwrf::WriterOptions options; + options.schema = vector->type(); + options.memoryPool = pool; + auto writeFile = std::make_unique(path, true, false); + auto sink = + std::make_unique(std::move(writeFile), path); + dwrf::Writer writer(std::move(sink), options); + writer.write(vector); + writer.close(); +} + +// static +std::shared_ptr RowNumberFuzzer::makeSplit( + const std::string& filePath) { + return std::make_shared( + kHiveConnectorId, filePath, dwio::common::FileFormat::DWRF); +} + +template +bool isDone(size_t i, T startTime) { + if (FLAGS_duration_sec > 0) { + std::chrono::duration elapsed = + std::chrono::system_clock::now() - startTime; + return elapsed.count() >= FLAGS_duration_sec; + } + return i >= FLAGS_steps; +} + +std::vector flatten(const std::vector& vectors) { + std::vector flatVectors; + for (const auto& vector : vectors) { + auto flat = BaseVector::create( + vector->type(), vector->size(), vector->pool()); + flat->copy(vector.get(), 0, 0, vector->size()); + flatVectors.push_back(flat); + } + + return flatVectors; +} + +std::pair, std::vector> +RowNumberFuzzer::generatePartitionKeys() { + const auto numKeys = randInt(1, 3); + std::vector names; + std::vector types; + for (auto i = 0; i < numKeys; ++i) { + names.push_back(fmt::format("c{}", i)); + types.push_back(vectorFuzzer_.randType(/*maxDepth=*/1)); + } + return std::make_pair(names, types); +} + +std::vector RowNumberFuzzer::generateInput( + const std::vector& keyNames, + const std::vector& keyTypes) { + std::vector names = keyNames; + std::vector types = keyTypes; + // Add up to 3 payload columns. + const auto numPayload = randInt(0, 3); + for (auto i = 0; i < numPayload; ++i) { + names.push_back(fmt::format("c{}", i + keyNames.size())); + types.push_back(vectorFuzzer_.randType(/*maxDepth=*/2)); + } + + const auto inputType = ROW(std::move(names), std::move(types)); + std::vector input; + input.reserve(FLAGS_num_batches); + for (auto i = 0; i < FLAGS_num_batches; ++i) { + input.push_back(vectorFuzzer_.fuzzInputRow(inputType)); + } + + return input; +} + +RowNumberFuzzer::PlanWithSplits RowNumberFuzzer::makeDefaultPlan( + const std::vector& partitionKeys, + const std::vector& input) { + auto planNodeIdGenerator = std::make_shared(); + std::vector projectFields = partitionKeys; + projectFields.emplace_back("row_number"); + auto plan = PlanBuilder() + .values(input) + .rowNumber(partitionKeys) + .project(projectFields) + .planNode(); + return PlanWithSplits{std::move(plan)}; +} + +bool containsType(const TypePtr& type, const TypePtr& search) { + if (type->equivalent(*search)) { + return true; + } + + for (auto i = 0; i < type->size(); ++i) { + if (containsType(type->childAt(i), search)) { + return true; + } + } + return false; +} + +bool containsTypeKind(const TypePtr& type, const TypeKind& search) { + if (type->kind() == search) { + return true; + } + + for (auto i = 0; i < type->size(); ++i) { + if (containsTypeKind(type->childAt(i), search)) { + return true; + } + } + + return false; +} + +bool containsUnsupportedTypes(const TypePtr& type) { + // Skip queries that use Timestamp, Varbinary, and IntervalDayTime types. + // DuckDB doesn't support nanosecond precision for timestamps or casting from + // Bigint to Interval. + // TODO Investigate mismatches reported when comparing Varbinary. + return containsTypeKind(type, TypeKind::TIMESTAMP) || + containsTypeKind(type, TypeKind::VARBINARY) || + containsType(type, INTERVAL_DAY_TIME()); +} + +std::optional RowNumberFuzzer::computeReferenceResults( + core::PlanNodePtr& plan, + const std::vector& input) { + if (containsUnsupportedTypes(input[0]->type())) { + return std::nullopt; + } + + if (auto sql = referenceQueryRunner_->toSql(plan)) { + return referenceQueryRunner_->execute( + sql.value(), input, plan->outputType()); + } + + LOG(INFO) << "Query not supported by the reference DB"; + return std::nullopt; +} + +RowVectorPtr RowNumberFuzzer::execute( + const PlanWithSplits& plan, + bool injectSpill) { + LOG(INFO) << "Executing query plan: " << plan.plan->toString(true, true); + + AssertQueryBuilder builder(plan.plan); + if (!plan.splits.empty()) { + builder.splits(plan.splits); + } + + std::shared_ptr spillDirectory; + int32_t spillPct{0}; + if (injectSpill) { + spillDirectory = exec::test::TempDirectoryPath::create(); + builder.config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kRowNumberSpillEnabled, true) + .spillDirectory(spillDirectory->getPath()); + spillPct = 10; + } + + ScopedOOMInjector oomInjector( + []() -> bool { return folly::Random::oneIn(10); }, + 10); // Check the condition every 10 ms. + if (FLAGS_enable_oom_injection) { + oomInjector.enable(); + } + + // Wait for the task to be destroyed before start next query execution to + // avoid the potential interference of the background activities across query + // executions. + auto stopGuard = folly::makeGuard([&]() { waitForAllTasksToBeDeleted(); }); + + TestScopedSpillInjection scopedSpillInjection(spillPct); + RowVectorPtr result; + try { + result = builder.copyResults(pool_.get()); + } catch (VeloxRuntimeError& e) { + if (FLAGS_enable_oom_injection && + e.errorCode() == facebook::velox::error_code::kMemCapExceeded && + e.message() == ScopedOOMInjector::kErrorMessage) { + // If we enabled OOM injection we expect the exception thrown by the + // ScopedOOMInjector. + return nullptr; + } + + throw e; + } + + if (VLOG_IS_ON(1)) { + VLOG(1) << std::endl << result->toString(0, result->size()); + } + + return result; +} + +RowNumberFuzzer::PlanWithSplits RowNumberFuzzer::makePlanWithTableScan( + const RowTypePtr& type, + const std::vector& partitionKeys, + const std::vector>& splits) { + std::vector projectFields = partitionKeys; + projectFields.emplace_back("row_number"); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanId; + auto plan = PlanBuilder(planNodeIdGenerator) + .tableScan(type) + .rowNumber(partitionKeys) + .project(projectFields) + .planNode(); + return PlanWithSplits{plan, splits}; +} + +bool isTableScanSupported(const TypePtr& type) { + if (type->kind() == TypeKind::ROW && type->size() == 0) { + return false; + } + if (type->kind() == TypeKind::UNKNOWN) { + return false; + } + if (type->kind() == TypeKind::HUGEINT) { + return false; + } + // Disable testing with TableScan when input contains TIMESTAMP type, due to + // the issue #8127. + if (type->kind() == TypeKind::TIMESTAMP) { + return false; + } + + for (auto i = 0; i < type->size(); ++i) { + if (!isTableScanSupported(type->childAt(i))) { + return false; + } + } + + return true; +} + +void RowNumberFuzzer::addPlansWithTableScan( + const std::string& tableDir, + const std::vector& partitionKeys, + const std::vector& input, + std::vector& altPlans) { + VELOX_CHECK(!tableDir.empty()); + + if (!isTableScanSupported(input[0]->type())) { + return; + } + + std::vector> inputSplits; + for (auto i = 0; i < input.size(); ++i) { + const std::string filePath = fmt::format("{}/row_number/{}", tableDir, i); + writeToFile(filePath, input[i], writerPool_.get()); + inputSplits.push_back(makeSplit(filePath)); + } + + altPlans.push_back(makePlanWithTableScan( + asRowType(input[0]->type()), partitionKeys, inputSplits)); +} + +void RowNumberFuzzer::verify() { + const auto [keyNames, keyTypes] = generatePartitionKeys(); + const auto input = generateInput(keyNames, keyTypes); + // Flatten inputs. + const auto flatInput = flatten(input); + + if (VLOG_IS_ON(1)) { + VLOG(1) << "Input: " << input[0]->toString(); + for (const auto& v : flatInput) { + VLOG(1) << std::endl << v->toString(0, v->size()); + } + } + + auto defaultPlan = makeDefaultPlan(keyNames, input); + const auto expected = execute(defaultPlan, /*injectSpill=*/false); + + if (expected != nullptr) { + if (const auto referenceResult = + computeReferenceResults(defaultPlan.plan, input)) { + VELOX_CHECK( + assertEqualResults( + referenceResult.value(), + defaultPlan.plan->outputType(), + {expected}), + "Velox and Reference results don't match"); + } + } + + std::vector altPlans; + altPlans.push_back(std::move(defaultPlan)); + + const auto tableScanDir = exec::test::TempDirectoryPath::create(); + addPlansWithTableScan(tableScanDir->getPath(), keyNames, input, altPlans); + + for (auto i = 0; i < altPlans.size(); ++i) { + LOG(INFO) << "Testing plan #" << i; + auto actual = execute(altPlans[i], /*injectSpill=*/false); + if (actual != nullptr && expected != nullptr) { + VELOX_CHECK( + assertEqualResults({expected}, {actual}), + "Logically equivalent plans produced different results"); + } else { + VELOX_CHECK( + FLAGS_enable_oom_injection, "Got unexpected nullptr for results"); + } + + if (FLAGS_enable_spill) { + LOG(INFO) << "Testing plan #" << i << " with spilling"; + actual = execute(altPlans[i], /*=injectSpill=*/true); + if (actual != nullptr && expected != nullptr) { + try { + VELOX_CHECK( + assertEqualResults({expected}, {actual}), + "Logically equivalent plans produced different results"); + } catch (const VeloxException& e) { + LOG(ERROR) << "Expected\n" + << expected->toString(0, expected->size()) << "\nActual\n" + << actual->toString(0, actual->size()); + throw; + } + } else { + VELOX_CHECK( + FLAGS_enable_oom_injection, "Got unexpected nullptr for results"); + } + } + } +} + +void RowNumberFuzzer::go() { + VELOX_USER_CHECK( + FLAGS_steps > 0 || FLAGS_duration_sec > 0, + "Either --steps or --duration_sec needs to be greater than zero.") + VELOX_USER_CHECK_GE(FLAGS_batch_size, 10, "Batch size must be at least 10."); + + const auto startTime = std::chrono::system_clock::now(); + size_t iteration = 0; + + while (!isDone(iteration, startTime)) { + LOG(INFO) << "==============================> Started iteration " + << iteration << " (seed: " << currentSeed_ << ")"; + verify(); + LOG(INFO) << "==============================> Done with iteration " + << iteration; + + reSeed(); + ++iteration; + } +} +} // namespace + +void rowNumberFuzzer( + size_t seed, + std::unique_ptr referenceQueryRunner) { + RowNumberFuzzer(seed, std::move(referenceQueryRunner)).go(); +} +} // namespace facebook::velox::exec::test diff --git a/velox/exec/fuzzer/RowNumberFuzzer.h b/velox/exec/fuzzer/RowNumberFuzzer.h new file mode 100644 index 000000000000..30cd960e327f --- /dev/null +++ b/velox/exec/fuzzer/RowNumberFuzzer.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" + +namespace facebook::velox::exec::test { +void rowNumberFuzzer( + size_t seed, + std::unique_ptr referenceQueryRunner); +} diff --git a/velox/exec/fuzzer/RowNumberFuzzerRunner.h b/velox/exec/fuzzer/RowNumberFuzzerRunner.h new file mode 100644 index 000000000000..2d018f81d306 --- /dev/null +++ b/velox/exec/fuzzer/RowNumberFuzzerRunner.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/common/file/FileSystems.h" + +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" +#include "velox/exec/fuzzer/RowNumberFuzzer.h" +#include "velox/serializers/PrestoSerializer.h" + +/// RowNumber FuzzerRunner leverages RowNumberFuzzer and VectorFuzzer to +/// automatically generate and execute tests. It works as follows: +/// +/// 1. Plan Generation: Generate two equivalent query plans, one is row-number +/// over ValuesNode and the other is over TableScanNode. +/// 2. Executes a variety of logically equivalent query plans and checks the +/// results are the same. +/// 3. Rinse and repeat. +/// +/// It is used as follows: +/// +/// $ ./velox_row_number_fuzzer_test --duration_sec 600 +/// +/// The flags that configure RowNumberFuzzer's behavior are: +/// +/// --steps: how many iterations to run. +/// --duration_sec: alternatively, for how many seconds it should run (takes +/// precedence over --steps). +/// --seed: pass a deterministic seed to reproduce the behavior (each iteration +/// will print a seed as part of the logs). +/// --v=1: verbose logging; print a lot more details about the execution. +/// --batch_size: size of input vector batches generated. +/// --num_batches: number if input vector batches to generate. +/// --enable_spill: test plans with spilling enabled. +/// --enable_oom_injection: randomly trigger OOM while executing query plans. +/// e.g: +/// +/// $ ./velox_row_number_fuzzer_test \ +/// --seed 123 \ +/// --duration_sec 600 \ +/// --v=1 + +namespace facebook::velox::exec::test { + +class RowNumberFuzzerRunner { + public: + static int run( + size_t seed, + std::unique_ptr referenceQueryRunner) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + filesystems::registerLocalFileSystem(); + rowNumberFuzzer(seed, std::move(referenceQueryRunner)); + return RUN_ALL_TESTS(); + } +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index ddfb25743d23..6b1f0f86b7da 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -207,6 +207,12 @@ add_library(velox_join_fuzzer JoinFuzzer.cpp) target_link_libraries(velox_join_fuzzer velox_type velox_vector_fuzzer velox_exec_test_lib velox_expression_test_utility) +# RowNumber Fuzzer. +add_executable(velox_row_number_fuzzer_test RowNumberFuzzerTest.cpp) + +target_link_libraries(velox_row_number_fuzzer_test velox_row_number_fuzzer + gtest gtest_main) + add_executable(velox_join_fuzzer_test JoinFuzzerTest.cpp) target_link_libraries(velox_join_fuzzer_test velox_join_fuzzer gtest gtest_main) diff --git a/velox/exec/tests/RowNumberFuzzerTest.cpp b/velox/exec/tests/RowNumberFuzzerTest.cpp new file mode 100644 index 000000000000..3abdc9fd3e76 --- /dev/null +++ b/velox/exec/tests/RowNumberFuzzerTest.cpp @@ -0,0 +1,96 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "velox/common/memory/SharedArbitrator.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/exec/MemoryReclaimer.h" +#include "velox/exec/fuzzer/DuckQueryRunner.h" +#include "velox/exec/fuzzer/PrestoQueryRunner.h" +#include "velox/exec/fuzzer/ReferenceQueryRunner.h" +#include "velox/exec/fuzzer/RowNumberFuzzerRunner.h" + +DEFINE_int64( + seed, + 0, + "Initial seed for random number generator used to reproduce previous " + "results (0 means start with random seed)."); + +DEFINE_string( + presto_url, + "", + "Presto coordinator URI along with port. If set, we use Presto " + "source of truth. Otherwise, use DuckDB. Example: " + "--presto_url=http://127.0.0.1:8080"); + +DEFINE_uint32( + req_timeout_ms, + 1000, + "Timeout in milliseconds for HTTP requests made to reference DB, " + "such as Presto. Example: --req_timeout_ms=2000"); + +using namespace facebook::velox::exec; + +namespace { +std::unique_ptr setupReferenceQueryRunner( + const std::string& prestoUrl, + const std::string& runnerName, + const uint32_t& reqTimeoutMs) { + if (prestoUrl.empty()) { + auto duckQueryRunner = std::make_unique(); + LOG(INFO) << "Using DuckDB as the reference DB."; + return duckQueryRunner; + } + + LOG(INFO) << "Using Presto as the reference DB."; + return std::make_unique( + prestoUrl, + runnerName, + static_cast(reqTimeoutMs)); +} + +// Invoked to set up memory system with arbitration. +void setupMemory() { + FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; + FLAGS_velox_memory_leak_check_enabled = true; + facebook::velox::memory::SharedArbitrator::registerFactory(); + facebook::velox::memory::MemoryManagerOptions options; + options.allocatorCapacity = 8L << 30; + options.arbitratorCapacity = 6L << 30; + options.arbitratorKind = "SHARED"; + options.checkUsageLeak = true; + options.arbitrationStateCheckCb = memoryArbitrationStateCheck; + facebook::velox::memory::MemoryManager::initialize(options); +} +} // namespace + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + + // Calls common init functions in the necessary order, initializing + // singletons, installing proper signal handlers for better debugging + // experience, and initialize glog and gflags. + folly::Init init(&argc, &argv); + setupMemory(); + auto referenceQueryRunner = setupReferenceQueryRunner( + FLAGS_presto_url, "row_number_fuzzer", FLAGS_req_timeout_ms); + const size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed; + return test::RowNumberFuzzerRunner::run( + initialSeed, std::move(referenceQueryRunner)); +}