Skip to content

Commit

Permalink
[BACKPORT 2.14][#15909] docdb: Move sys catalog writes out of catalog…
Browse files Browse the repository at this point in the history
… loader.

Summary:
Original commit: 31a9e48 / D23291
This diff moves migration writes (to the sys catalog) out of the catalog loading phase. This should make the master leader startup faster by avoiding network and disk IO, and avoid deadlocks like in #15849. Specifically, this diff moves the following two writes to the catalog manager background thread pool:
 1. The migration that inserts table_id into table_ids for old empty colocated tables.
 2. The migration that deletes an orphaned tablet (part of a deleting table that crashed).

Other things in this diff:
 - Move some tests that were not part of sys catalog out of sys_catalog-test
 - Add a ForceUpsert to sys catalog. This is required because Upsert only does a write if the PB is dirty. Since the in-memory PB update is done separately, the PB is not dirty when the async sys catalog write happens. We want to force a write here to persist the migration changes. (I tried doing this with a default flag on Upsert / Mutate, but variadic arguments + defaults having to be last makes this hard, without modifying everywhere we currently call upsert).
Jira: DB-5315

Test Plan:
`ybd --cxx-test sys_catalog-test --gtest_filter="SysCatalogTest.TestOrphanedTabletsDeleted"`
`ybd --cxx-test sys_catalog-test --gtest_filter="SysCatalogTest.TestTableIdsHasAtLeastOneTable"`

Reviewers: zdrudi, jhe

Reviewed By: jhe

Subscribers: ybase, bogdan

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D29208
  • Loading branch information
SrivastavaAnubhav committed Oct 11, 2023
1 parent 83c8665 commit 2c49d67
Show file tree
Hide file tree
Showing 17 changed files with 704 additions and 573 deletions.
4 changes: 2 additions & 2 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon
virtual ~CatalogManager();
void CompleteShutdown();

Status RunLoaders(int64_t term) override REQUIRES(mutex_);
Status RunLoaders(int64_t term, SysCatalogLoadingState* state) override REQUIRES(mutex_);

// API to start a snapshot creation.
Status CreateSnapshot(const CreateSnapshotRequestPB* req,
Expand Down Expand Up @@ -574,7 +574,7 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon

void Started() override;

void SysCatalogLoaded(int64_t term) override;
void SysCatalogLoaded(int64_t term, const SysCatalogLoadingState& state) override;

Status AddNamespaceEntriesToPB(
const std::vector<TableDescription>& tables,
Expand Down
7 changes: 4 additions & 3 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ void CatalogManager::CompleteShutdown() {
super::CompleteShutdown();
}

Status CatalogManager::RunLoaders(int64_t term) {
RETURN_NOT_OK(super::RunLoaders(term));
Status CatalogManager::RunLoaders(int64_t term, SysCatalogLoadingState* state) {
RETURN_NOT_OK(super::RunLoaders(term, state));

// Clear the snapshots.
non_txn_snapshot_ids_map_.clear();
Expand Down Expand Up @@ -6567,7 +6567,8 @@ bool CatalogManager::IsPitrActive() {
return snapshot_coordinator_.IsPitrActive();
}

void CatalogManager::SysCatalogLoaded(int64_t term) {
void CatalogManager::SysCatalogLoaded(int64_t term, const SysCatalogLoadingState& state) {
super::SysCatalogLoaded(term, state);
return snapshot_coordinator_.SysCatalogLoaded(term);
}

Expand Down
4 changes: 3 additions & 1 deletion src/yb/integration-tests/create-table-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "yb/integration-tests/yb_mini_cluster_test_base.h"

#include "yb/master/catalog_entity_info.h"
#include "yb/master/catalog_loaders.h"
#include "yb/master/catalog_manager_if.h"
#include "yb/master/master-test-util.h"
#include "yb/master/master.h"
Expand Down Expand Up @@ -604,7 +605,8 @@ TEST_F(CreateTableStressTest, TestConcurrentCreateTableAndReloadMetadata) {

thread reload_metadata_thread([&]() {
while (!stop.Load()) {
CHECK_OK(cluster_->mini_master()->catalog_manager().VisitSysCatalog(0));
master::SysCatalogLoadingState state;
CHECK_OK(cluster_->mini_master()->catalog_manager_impl().VisitSysCatalog(0, &state));
// Give table creation a chance to run.
SleepFor(MonoDelta::FromMilliseconds(10 * kTimeMultiplier));
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/master/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ set(YB_TEST_LINK_LIBS master
yb_client
yb_tools_util
${YB_MIN_TEST_LIBS})
ADD_YB_TEST(catalog_entity_info-test)
ADD_YB_TEST(catalog_manager-test)
ADD_YB_TEST(flush_manager-test)
ADD_YB_TEST(master-test)
Expand Down
122 changes: 122 additions & 0 deletions src/yb/master/catalog_entity_info-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) Yugabyte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include "yb/master/catalog_entity_info.h"
#include "yb/util/test_util.h"

namespace yb {
namespace master {

class CatalogEntityInfoTest : public YBTest {};

// Verify that data mutations are not available from metadata() until commit.
TEST_F(CatalogEntityInfoTest, TestNamespaceInfoCommit) {
scoped_refptr<NamespaceInfo> ns(new NamespaceInfo("deadbeafdeadbeafdeadbeafdeadbeaf"));

// Mutate the namespace, under the write lock.
auto writer_lock = ns->LockForWrite();
writer_lock.mutable_data()->pb.set_name("foo");

// Changes should not be visible to a reader.
// The reader can still lock for read, since readers don't block
// writers in the RWC lock.
{
auto reader_lock = ns->LockForRead();
ASSERT_NE("foo", reader_lock->name());
}

// Commit the changes
writer_lock.Commit();

// Verify that the data is visible
{
auto reader_lock = ns->LockForRead();
ASSERT_EQ("foo", reader_lock->name());
}
}

// Verify that data mutations are not available from metadata() until commit.
TEST_F(CatalogEntityInfoTest, TestTableInfoCommit) {
scoped_refptr<TableInfo> table =
make_scoped_refptr<TableInfo>("123" /* table_id */);

// Mutate the table, under the write lock.
auto writer_lock = table->LockForWrite();
writer_lock.mutable_data()->pb.set_name("foo");

// Changes should not be visible to a reader.
// The reader can still lock for read, since readers don't block
// writers in the RWC lock.
{
auto reader_lock = table->LockForRead();
ASSERT_NE("foo", reader_lock->name());
}
writer_lock.mutable_data()->set_state(SysTablesEntryPB::RUNNING, "running");

{
auto reader_lock = table->LockForRead();
ASSERT_NE("foo", reader_lock->pb.name());
ASSERT_NE("running", reader_lock->pb.state_msg());
ASSERT_NE(SysTablesEntryPB::RUNNING, reader_lock->pb.state());
}

// Commit the changes
writer_lock.Commit();

// Verify that the data is visible
{
auto reader_lock = table->LockForRead();
ASSERT_EQ("foo", reader_lock->pb.name());
ASSERT_EQ("running", reader_lock->pb.state_msg());
ASSERT_EQ(SysTablesEntryPB::RUNNING, reader_lock->pb.state());
}
}

// Verify that data mutations are not available from metadata() until commit.
TEST_F(CatalogEntityInfoTest, TestTabletInfoCommit) {
scoped_refptr<TabletInfo> tablet(new TabletInfo(nullptr, "123"));

// Mutate the tablet, the changes should not be visible
auto l = tablet->LockForWrite();
PartitionPB* partition = l.mutable_data()->pb.mutable_partition();
partition->set_partition_key_start("a");
partition->set_partition_key_end("b");
l.mutable_data()->set_state(SysTabletsEntryPB::RUNNING, "running");
{
// Changes shouldn't be visible, and lock should still be
// acquired even though the mutation is under way.
auto read_lock = tablet->LockForRead();
ASSERT_NE("a", read_lock->pb.partition().partition_key_start());
ASSERT_NE("b", read_lock->pb.partition().partition_key_end());
ASSERT_NE("running", read_lock->pb.state_msg());
ASSERT_NE(SysTabletsEntryPB::RUNNING,
read_lock->pb.state());
}

// Commit the changes
l.Commit();

// Verify that the data is visible
{
auto read_lock = tablet->LockForRead();
ASSERT_EQ("a", read_lock->pb.partition().partition_key_start());
ASSERT_EQ("b", read_lock->pb.partition().partition_key_end());
ASSERT_EQ("running", read_lock->pb.state_msg());
ASSERT_EQ(SysTabletsEntryPB::RUNNING,
read_lock->pb.state());
}
}

} // namespace master
} // namespace yb
58 changes: 31 additions & 27 deletions src/yb/master/catalog_loaders.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ Status TableLoader::Visit(const TableId& table_id, const SysTablesEntryPB& metad
TransactionMetadata txn = VERIFY_RESULT(TransactionMetadata::FromPB(metadata.transaction()));
std::function<Status(bool)> when_done =
std::bind(&CatalogManager::VerifyTablePgLayer, catalog_manager_, table, _1);
WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc(
std::bind(&YsqlTransactionDdl::VerifyTransaction, catalog_manager_->ysql_transaction_.get(),
txn, when_done)),
"Could not submit VerifyTransaction to thread pool");
state_->AddPostLoadTask(
std::bind(&YsqlTransactionDdl::VerifyTransaction,
catalog_manager_->ysql_transaction_.get(),
txn, when_done),
"VerifyTransaction");
}

LOG(INFO) << "Loaded metadata for table " << table->ToString() << ", state: "
Expand Down Expand Up @@ -161,6 +162,7 @@ Status TabletLoader::Visit(const TabletId& tablet_id, const SysTabletsEntryPB& m
std::map<ColocationId, TableId> tablet_colocation_map;
bool tablet_deleted;
bool listed_as_hidden;
bool needs_async_write_to_sys_catalog = false;
TabletInfoPtr tablet(new TabletInfo(first_table, tablet_id));
{
auto l = tablet->LockForWrite();
Expand All @@ -182,14 +184,12 @@ Status TabletLoader::Visit(const TabletId& tablet_id, const SysTabletsEntryPB& m
// list contains the first table that created the tablet. If the table_ids field
// was empty, we "upgrade" the master to support this new invariant.
if (metadata.table_ids_size() == 0) {
LOG(INFO) << Format("Updating table_ids field in-memory for tablet $0 to include table_id "
"field ($1). Sys catalog will be updated asynchronously.", tablet->id(),
metadata.table_id());
l.mutable_data()->pb.add_table_ids(metadata.table_id());
Status s = catalog_manager_->sys_catalog_->Upsert(
catalog_manager_->leader_ready_term(), tablet);
if (PREDICT_FALSE(!s.ok())) {
return STATUS_FORMAT(
IllegalState, "An error occurred while inserting to sys-tablets: $0", s);
}
table_ids.push_back(metadata.table_id());
needs_async_write_to_sys_catalog = true;
}

tablet_deleted = l.mutable_data()->is_deleted();
Expand Down Expand Up @@ -267,19 +267,23 @@ Status TabletLoader::Visit(const TabletId& tablet_id, const SysTabletsEntryPB& m
}
}


if (should_delete_tablet) {
LOG(WARNING)
<< "Deleting tablet " << tablet->id() << " for table " << first_table->ToString();
LOG(INFO) << Format("Marking tablet $0 for table $1 as DELETED in-memory. Sys catalog will "
"be updated asynchronously.", tablet->id(), first_table->ToString());
string deletion_msg = "Tablet deleted at " + LocalTimeAsString();
l.mutable_data()->set_state(SysTabletsEntryPB::DELETED, deletion_msg);
RETURN_NOT_OK_PREPEND(catalog_manager_->sys_catalog()->Upsert(term_, tablet),
Format("Error deleting tablet $0", tablet->id()));
needs_async_write_to_sys_catalog = true;
}

l.Commit();
}

if (needs_async_write_to_sys_catalog) {
state_->AddPostLoadTask(
std::bind(&CatalogManager::WriteTabletToSysCatalog, catalog_manager_, tablet->tablet_id()),
"WriteTabletToSysCatalog");
}

if (first_table->IsColocationParentTable()) {
SCHECK(tablet_colocation_map.size() == existing_table_ids.size(), IllegalState,
Format("Tablet $0 has $1 tables, but only $2 of them were colocated",
Expand Down Expand Up @@ -380,10 +384,10 @@ Status NamespaceLoader::Visit(const NamespaceId& ns_id, const SysNamespaceEntryP
TransactionMetadata::FromPB(metadata.transaction()));
std::function<Status(bool)> when_done =
std::bind(&CatalogManager::VerifyNamespacePgLayer, catalog_manager_, ns, _1);
WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc(
state_->AddPostLoadTask(
std::bind(&YsqlTransactionDdl::VerifyTransaction,
catalog_manager_->ysql_transaction_.get(), txn, when_done)),
"Could not submit VerifyTransaction to thread pool");
catalog_manager_->ysql_transaction_.get(), txn, when_done),
"VerifyTransaction");
}
break;
case SysNamespaceEntryPB::PREPARING:
Expand All @@ -405,23 +409,23 @@ Status NamespaceLoader::Visit(const NamespaceId& ns_id, const SysNamespaceEntryP
l.Commit();
LOG(INFO) << "Loaded metadata to DELETE namespace " << ns->ToString();
if (ns->database_type() != YQL_DATABASE_PGSQL) {
WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc(
std::bind(&CatalogManager::DeleteYcqlDatabaseAsync, catalog_manager_, ns)),
"Could not submit DeleteYcqlDatabaseAsync to thread pool");
state_->AddPostLoadTask(
std::bind(&CatalogManager::DeleteYcqlDatabaseAsync, catalog_manager_, ns),
"DeleteYcqlDatabaseAsync");
} else {
WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc(
std::bind(&CatalogManager::DeleteYsqlDatabaseAsync, catalog_manager_, ns)),
"Could not submit DeleteYsqlDatabaseAsync to thread pool");
state_->AddPostLoadTask(
std::bind(&CatalogManager::DeleteYsqlDatabaseAsync, catalog_manager_, ns),
"DeleteYsqlDatabaseAsync");
}
break;
case SysNamespaceEntryPB::DELETED:
LOG(INFO) << "Skipping metadata for namespace (state=" << metadata.state()
<< "): " << ns->ToString();
// Garbage collection. Async remove the Namespace from the SysCatalog.
// No in-memory state needed since tablet deletes have already been processed.
WARN_NOT_OK(catalog_manager_->background_tasks_thread_pool_->SubmitFunc(
std::bind(&CatalogManager::DeleteYsqlDatabaseAsync, catalog_manager_, ns)),
"Could not submit DeleteYsqlDatabaseAsync to thread pool");
state_->AddPostLoadTask(
std::bind(&CatalogManager::DeleteYsqlDatabaseAsync, catalog_manager_, ns),
"DeleteYsqlDatabaseAsync");
break;
default:
FATAL_INVALID_ENUM_VALUE(SysNamespaceEntryPB_State, state);
Expand Down
20 changes: 18 additions & 2 deletions src/yb/master/catalog_loaders.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,27 @@
namespace yb {
namespace master {

struct SysCatalogLoadingState {
std::vector<std::pair<std::function<void()>, std::string>> post_load_tasks;

void AddPostLoadTask(std::function<void()>&& func, std::string&& msg) {
post_load_tasks.push_back({std::move(func), std::move(msg)});
}

void Reset() {
post_load_tasks.clear();
}
};

#define DECLARE_LOADER_CLASS(name, key_type, entry_pb_name, mutex) \
class BOOST_PP_CAT(name, Loader) : \
public Visitor<BOOST_PP_CAT(BOOST_PP_CAT(Persistent, name), Info)> { \
public: \
explicit BOOST_PP_CAT(name, Loader)( \
CatalogManager* catalog_manager, int64_t term = OpId::kUnknownTerm) \
: catalog_manager_(catalog_manager), term_(term) {} \
CatalogManager* catalog_manager, \
SysCatalogLoadingState* state, \
int64_t term = OpId::kUnknownTerm) \
: catalog_manager_(catalog_manager), state_(state), term_(term) {} \
\
private: \
Status Visit( \
Expand All @@ -60,6 +74,8 @@ namespace master {
\
CatalogManager *catalog_manager_; \
\
SysCatalogLoadingState* state_; \
\
int64_t term_; \
\
DISALLOW_COPY_AND_ASSIGN(BOOST_PP_CAT(name, Loader)); \
Expand Down
Loading

0 comments on commit 2c49d67

Please sign in to comment.