diff --git a/src/yb/docdb/usearch_vector_index-test.cc b/src/yb/docdb/usearch_vector_index-test.cc index 1a22001f58d..827be632abd 100644 --- a/src/yb/docdb/usearch_vector_index-test.cc +++ b/src/yb/docdb/usearch_vector_index-test.cc @@ -67,7 +67,6 @@ TEST_F(UsearchVectorIndexTest, CreateAndQuery) { for (size_t thread_index = 0; thread_index < kNumIndexingThreads; ++thread_index) { indexing_thread_holder.AddThreadFunctor( [&num_vectors_inserted, &index, &latch, &uniform_distrib]() { - std::random_device rd; size_t vector_id; while ((vector_id = num_vectors_inserted.fetch_add(1)) < kNumVectors) { auto vec = RandomFloatVector(kDimensions, uniform_distrib); diff --git a/src/yb/docdb/vector_index.cc b/src/yb/docdb/vector_index.cc index c917bfdd5a7..4a371d709d3 100644 --- a/src/yb/docdb/vector_index.cc +++ b/src/yb/docdb/vector_index.cc @@ -33,9 +33,21 @@ #include "yb/vector_index/usearch_wrapper.h" #include "yb/vector_index/vector_lsm.h" -DEFINE_RUNTIME_uint64(vector_index_initial_chunk_size, 1024, +DEFINE_RUNTIME_uint64(vector_index_initial_chunk_size, 100000, "Number of vector in initial vector index chunk"); +DEFINE_RUNTIME_PREVIEW_uint32(vector_index_ef, 128, + "The \"expansion\" parameter for search"); + +DEFINE_RUNTIME_PREVIEW_uint32(vector_index_ef_construction, 256, + "The \"expansion\" parameter during graph construction"); + +DEFINE_RUNTIME_PREVIEW_uint32(vector_index_num_neighbors_per_vertex, 32, + "Number of neighbors per graph node"); + +DEFINE_RUNTIME_PREVIEW_uint32(vector_index_num_neighbors_per_vertex_base, 128, + "Number of neighbors per graph node in base level graph"); + namespace yb::docdb { const std::string kVectorIndexDirPrefix = "vi-"; @@ -48,6 +60,10 @@ auto VectorLSMFactory(size_t dimensions) { return [dimensions] { vector_index::HNSWOptions hnsw_options = { .dimensions = dimensions, + .num_neighbors_per_vertex = FLAGS_vector_index_num_neighbors_per_vertex, + .num_neighbors_per_vertex_base = FLAGS_vector_index_num_neighbors_per_vertex_base, + .ef_construction = FLAGS_vector_index_ef_construction, + .ef = FLAGS_vector_index_ef, }; return FactoryImpl::Create(hnsw_options); }; diff --git a/src/yb/util/random_util.h b/src/yb/util/random_util.h index 0a156148372..d043dc83276 100644 --- a/src/yb/util/random_util.h +++ b/src/yb/util/random_util.h @@ -149,10 +149,14 @@ typename Collection::const_reference RandomElement(const Collection& collection, std::string RandomHumanReadableString(size_t len, std::mt19937_64* rng = nullptr); template -std::vector RandomFloatVector(size_t dimensions, Distribution& dis) { +std::vector RandomFloatVector( + size_t dimensions, Distribution& dis, std::mt19937_64* rng = nullptr) { + if (!rng) { + rng = &ThreadLocalRandom(); + } std::vector vec(dimensions); for (auto& v : vec) { - v = dis(ThreadLocalRandom()); + v = dis(*rng); } return vec; } diff --git a/src/yb/yql/pgwrapper/pg_vector_index-test.cc b/src/yb/yql/pgwrapper/pg_vector_index-test.cc index 8a0deacc4e4..2b8d5bac356 100644 --- a/src/yb/yql/pgwrapper/pg_vector_index-test.cc +++ b/src/yb/yql/pgwrapper/pg_vector_index-test.cc @@ -11,6 +11,8 @@ // under the License. // +#include + #include "yb/client/snapshot_test_util.h" #include "yb/consensus/consensus.h" @@ -28,6 +30,8 @@ #include "yb/util/backoff_waiter.h" #include "yb/util/test_thread_holder.h" +#include "yb/vector_index/usearch_include_wrapper_internal.h" + #include "yb/yql/pgwrapper/pg_mini_test_base.h" DECLARE_bool(TEST_skip_process_apply); @@ -38,6 +42,12 @@ DECLARE_uint32(vector_index_concurrent_writes); namespace yb::pgwrapper { +using FloatVector = std::vector; + +const unum::usearch::byte_t* VectorToBytePtr(const FloatVector& vector) { + return pointer_cast(vector.data()); +} + class PgVectorIndexTest : public PgMiniTestBase, public testing::WithParamInterface { protected: void SetUp() override { @@ -57,7 +67,7 @@ class PgVectorIndexTest : public PgMiniTestBase, public testing::WithParamInterf return IsColocated() ? ConnectToDB("colocated_db") : PgMiniTestBase::Connect(); } - Result MakeIndex(int num_tablets = 0) { + Result MakeIndex(size_t dimensions = 3) { auto colocated = IsColocated(); auto conn = VERIFY_RESULT(PgMiniTestBase::Connect()); std::string create_suffix; @@ -65,35 +75,37 @@ class PgVectorIndexTest : public PgMiniTestBase, public testing::WithParamInterf create_suffix = " WITH (COLOCATED = 1)"; RETURN_NOT_OK(conn.ExecuteFormat("CREATE DATABASE colocated_db COLOCATION = true")); conn = VERIFY_RESULT(Connect()); - } else if (num_tablets) { - create_suffix = Format(" SPLIT INTO $0 TABLETS", num_tablets); } RETURN_NOT_OK(conn.Execute("CREATE EXTENSION vector")); - RETURN_NOT_OK(conn.Execute( - "CREATE TABLE test (id bigserial PRIMARY KEY, embedding vector(3))" + create_suffix)); + RETURN_NOT_OK(conn.ExecuteFormat( + "CREATE TABLE test (id bigserial PRIMARY KEY, embedding vector($0))$1", + dimensions, create_suffix)); RETURN_NOT_OK(conn.Execute("CREATE INDEX ON test USING ybhnsw (embedding vector_l2_ops)")); return conn; } - Status WaitForLoadBalance(int num_tablet_servers) { - return WaitFor( - [&]() -> Result { return client_->IsLoadBalanced(num_tablet_servers); }, - 60s * kTimeMultiplier, - Format("Wait for load balancer to balance to $0 tservers.", num_tablet_servers)); - } - - Result MakeIndexAndFill(int num_rows, int num_tablets = 0); - Status InsertRows(PGConn& conn, int start_row, int end_row); + Result MakeIndexAndFill(size_t num_rows); + Result MakeIndexAndFillRandom(size_t num_rows, size_t dimensions); + Status InsertRows(PGConn& conn, size_t start_row, size_t end_row); + Status InsertRandomRows(PGConn& conn, size_t num_rows, size_t dimensions); - void VerifyRead(PGConn& conn, int limit, bool add_filter); + void VerifyRead(PGConn& conn, size_t limit, bool add_filter); void VerifyRows( - PGConn& conn, bool add_filter, const std::vector& expected, int limit = -1); + PGConn& conn, bool add_filter, const std::vector& expected, int64_t limit = -1); void TestSimple(); void TestManyRows(bool add_filter); void TestRestart(tablet::FlushFlags flush_flags); + + FloatVector RandomVector(size_t dimensions) { + return RandomFloatVector(dimensions, distribution_, &rng_); + } + + std::vector vectors_; + std::uniform_real_distribution<> distribution_; + std::mt19937_64 rng_{42}; }; void PgVectorIndexTest::TestSimple() { @@ -167,27 +179,44 @@ std::string ExpectedRow(int64_t id) { return BuildRow(id, VectorAsString(id)); } -Status PgVectorIndexTest::InsertRows(PGConn& conn, int start_row, int end_row) { +Status PgVectorIndexTest::InsertRows(PGConn& conn, size_t start_row, size_t end_row) { RETURN_NOT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); - for (int i = start_row; i <= end_row; ++i) { + for (auto i = start_row; i <= end_row; ++i) { RETURN_NOT_OK(conn.ExecuteFormat( "INSERT INTO test VALUES ($0, '$1')", i, VectorAsString(i))); } return conn.CommitTransaction(); } -Result PgVectorIndexTest::MakeIndexAndFill(int num_rows, int num_tablets) { - auto conn = VERIFY_RESULT(MakeIndex(num_tablets)); +Status PgVectorIndexTest::InsertRandomRows(PGConn& conn, size_t num_rows, size_t dimensions) { + RETURN_NOT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); + for (size_t i = 0; i != num_rows; ++i) { + auto vector = RandomVector(dimensions); + RETURN_NOT_OK(conn.ExecuteFormat( + "INSERT INTO test VALUES ($0, '$1')", vectors_.size(), AsString(vector))); + vectors_.push_back(std::move(vector)); + } + return conn.CommitTransaction(); +} + +Result PgVectorIndexTest::MakeIndexAndFill(size_t num_rows) { + auto conn = VERIFY_RESULT(MakeIndex()); RETURN_NOT_OK(InsertRows(conn, 1, num_rows)); return conn; } +Result PgVectorIndexTest::MakeIndexAndFillRandom(size_t num_rows, size_t dimensions) { + auto conn = VERIFY_RESULT(MakeIndex(dimensions)); + RETURN_NOT_OK(InsertRandomRows(conn, num_rows, dimensions)); + return conn; +} + void PgVectorIndexTest::VerifyRows( - PGConn& conn, bool add_filter, const std::vector& expected, int limit) { + PGConn& conn, bool add_filter, const std::vector& expected, int64_t limit) { auto result = ASSERT_RESULT((conn.FetchRows(Format( "SELECT * FROM test $0 ORDER BY embedding <-> '[0.0, 0.0, 0.0]' LIMIT $1", add_filter ? "WHERE id + 3 <= 5" : "", - limit == -1 ? expected.size() : make_unsigned(limit))))); + limit < 0 ? expected.size() : make_unsigned(limit))))); EXPECT_EQ(result.size(), expected.size()); for (size_t i = 0; i != std::min(result.size(), expected.size()); ++i) { SCOPED_TRACE(Format("Row $0", i)); @@ -195,25 +224,25 @@ void PgVectorIndexTest::VerifyRows( } } -void PgVectorIndexTest::VerifyRead(PGConn& conn, int limit, bool add_filter) { +void PgVectorIndexTest::VerifyRead(PGConn& conn, size_t limit, bool add_filter) { std::vector expected; - for (int i = 1; i <= limit; ++i) { + for (size_t i = 1; i <= limit; ++i) { expected.push_back(ExpectedRow(i)); } VerifyRows(conn, add_filter, expected); } void PgVectorIndexTest::TestManyRows(bool add_filter) { - constexpr int kNumRows = RegularBuildVsSanitizers(2000, 64); - const int query_limit = add_filter ? 1 : 5; + constexpr size_t kNumRows = RegularBuildVsSanitizers(2000, 64); + const size_t query_limit = add_filter ? 1 : 5; auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows)); ASSERT_NO_FATALS(VerifyRead(conn, query_limit, add_filter)); } TEST_P(PgVectorIndexTest, Split) { - constexpr int kNumRows = RegularBuildVsSanitizers(500, 64); - constexpr int kQueryLimit = 5; + constexpr size_t kNumRows = RegularBuildVsSanitizers(500, 64); + constexpr size_t kQueryLimit = 5; auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows)); ASSERT_OK(cluster_->FlushTablets()); @@ -236,17 +265,17 @@ TEST_P(PgVectorIndexTest, ManyReads) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_vector_index_concurrent_reads) = 1; ANNOTATE_UNPROTECTED_WRITE(FLAGS_vector_index_concurrent_writes) = 1; - constexpr int kNumRows = 64; - constexpr int kNumReads = 16; + constexpr size_t kNumRows = 64; + constexpr size_t kNumReads = 16; auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows)); TestThreadHolder threads; - for (int i = 1; i <= kNumReads; ++i) { + for (size_t i = 1; i <= kNumReads; ++i) { threads.AddThreadFunctor([this, &stop_flag = threads.stop_flag()] { auto conn = ASSERT_RESULT(Connect()); while (!stop_flag.load()) { - auto id = RandomUniformInt(1, kNumRows); + auto id = RandomUniformInt(1, kNumRows); auto vector = VectorAsString(id); auto rows = ASSERT_RESULT(conn.FetchAllAsString(Format( "SELECT * FROM test ORDER BY embedding <-> '$0' LIMIT 1", vector))); @@ -259,8 +288,8 @@ TEST_P(PgVectorIndexTest, ManyReads) { } void PgVectorIndexTest::TestRestart(tablet::FlushFlags flush_flags) { - constexpr int kNumRows = 64; - constexpr int kQueryLimit = 5; + constexpr size_t kNumRows = 64; + constexpr size_t kQueryLimit = 5; auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows)); ASSERT_NO_FATALS(VerifyRead(conn, kQueryLimit, false)); @@ -284,7 +313,7 @@ TEST_P(PgVectorIndexTest, BootstrapFlushedIntentsDB) { } TEST_P(PgVectorIndexTest, DeleteAndUpdate) { - constexpr int kNumRows = 64; + constexpr size_t kNumRows = 64; const std::string kDistantVector = "[100, 500, 9000]"; const std::string kCloseVector = "[0.125, 0.25, 0.375]"; @@ -309,12 +338,12 @@ TEST_P(PgVectorIndexTest, DeleteAndUpdate) { } TEST_P(PgVectorIndexTest, RemoteBootstrap) { - constexpr int kNumRows = 64; - constexpr int kQueryLimit = 5; + constexpr size_t kNumRows = 64; + constexpr size_t kQueryLimit = 5; auto* mts = cluster_->mini_tablet_server(2); mts->Shutdown(); - auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows, 3)); + auto conn = ASSERT_RESULT(MakeIndexAndFill(kNumRows)); const auto table_id = ASSERT_RESULT(GetTableIDFromTableName("test")); ASSERT_OK(cluster_->FlushTablets()); for (const auto& peer : ListTableActiveTabletPeers(cluster_.get(), table_id)) { @@ -355,8 +384,8 @@ TEST_P(PgVectorIndexTest, RemoteBootstrap) { } TEST_P(PgVectorIndexTest, SnapshotSchedule) { - constexpr int kNumRows = 128; - constexpr int kQueryLimit = 5; + constexpr size_t kNumRows = 128; + constexpr size_t kQueryLimit = 5; client::SnapshotTestUtil snapshot_util; snapshot_util.SetProxy(&client_->proxy_cache()); @@ -383,6 +412,58 @@ TEST_P(PgVectorIndexTest, SnapshotSchedule) { ASSERT_NO_FATALS(VerifyRead(conn, kQueryLimit, false)); } +TEST_P(PgVectorIndexTest, Random) { + constexpr size_t kLimit = 10; + constexpr size_t kDimensions = 64; + constexpr size_t kNumRows = RegularBuildVsDebugVsSanitizers(10000, 1000, 100); + constexpr int kNumIterations = RegularBuildVsDebugVsSanitizers(100, 20, 10); + + unum::usearch::metric_punned_t metric( + kDimensions, unum::usearch::metric_kind_t::l2sq_k, unum::usearch::scalar_kind_t::f32_k); + + auto conn = ASSERT_RESULT(MakeIndexAndFillRandom(kNumRows, kDimensions)); + size_t sum_missing = 0; + std::vector counts; + for (int i = 0; i != kNumIterations; ++i) { + auto query_vector = RandomVector(kDimensions); + auto rows = ASSERT_RESULT(conn.FetchRows(Format( + "SELECT id FROM test ORDER BY embedding <-> '$0' LIMIT $1", query_vector, kLimit))); + std::vector expected(vectors_.size()); + std::generate(expected.begin(), expected.end(), [n{0LL}]() mutable { return n++; }); + std::sort( + expected.begin(), expected.end(), + [&metric, &query_vector, &vectors = vectors_](size_t li, size_t ri) { + const auto& lhs = vectors[li]; + const auto& rhs = vectors[ri]; + return metric(VectorToBytePtr(query_vector), VectorToBytePtr(lhs)) < + metric(VectorToBytePtr(query_vector), VectorToBytePtr(rhs)); + }); + size_t ep = 0; + for (int64_t id : rows) { + while (ep < expected.size() && id != expected[ep]) { + ++ep; + } + ASSERT_LT(ep, expected.size()); + ASSERT_EQ(id, expected[ep]); + ++ep; + } + size_t missing = ep - kLimit; + if (missing > counts.size()) { + LOG(INFO) + << "New max: " << missing << ", fetched: " << AsString(rows) << ", expected: " + << AsString(boost::make_iterator_range( + expected.begin(), expected.begin() + kLimit + missing)); + } + counts.resize(std::max(counts.size(), missing + 1)); + ++counts[missing]; + sum_missing += missing; + } + LOG(INFO) + << "Counts: " << AsString(counts) + << ", recall: " << 1.0 - sum_missing * 1.0 / (kLimit * kNumIterations); + ASSERT_LE(sum_missing * 50, kLimit * kNumIterations); +} + std::string ColocatedToString(const testing::TestParamInfo& param_info) { return param_info.param ? "Colocated" : "Distributed"; }