From fb4ab4402ba1463cd080d08b888197b56fba81a6 Mon Sep 17 00:00:00 2001 From: shannon data ai Date: Sat, 16 Nov 2024 20:37:20 +0800 Subject: [PATCH] feat(shannon): refine refetch and minor changes --- storage/rapid_engine/imcs/chunk.cpp | 52 ++++++++++++++++--- storage/rapid_engine/imcs/chunk.h | 51 ++++++++++++++---- storage/rapid_engine/imcs/cu.cpp | 10 +++- storage/rapid_engine/imcs/data_table.cpp | 6 +-- storage/rapid_engine/include/rapid_arch_inf.h | 38 ++++++++++++++ .../rapid_engine/include/rapid_arch_inf.h.in | 38 ++++++++++++++ storage/rapid_engine/include/rapid_const.h | 9 ---- storage/rapid_engine/trx/transaction.cpp | 4 +- storage/rapid_engine/trx/transaction.h | 11 +++- 9 files changed, 187 insertions(+), 32 deletions(-) diff --git a/storage/rapid_engine/imcs/chunk.cpp b/storage/rapid_engine/imcs/chunk.cpp index cbb6a003c..6051fffb9 100644 --- a/storage/rapid_engine/imcs/chunk.cpp +++ b/storage/rapid_engine/imcs/chunk.cpp @@ -210,7 +210,6 @@ uchar *Chunk::read(const Rapid_load_context *context, uchar *data, size_t len) { ut_a((!data && len == UNIV_SQL_NULL) || (data && len != UNIV_SQL_NULL)); check_data_type(len); - std::scoped_lock lk(m_data_mutex); if (unlikely(m_rdata.load() + len > m_end.load())) { m_rdata.store(m_base.load()); return nullptr; @@ -248,7 +247,8 @@ uchar *Chunk::write(const Rapid_load_context *context, uchar *data, size_t len) m_data.store(m_base.load()); return nullptr; } - auto ret = reinterpret_cast(std::memcpy(m_data, data, len)); + std::scoped_lock data_guard(m_data_mutex); + auto ret = reinterpret_cast(std::memcpy(m_data.load(), data, len)); m_data.fetch_add(len); update_meta_info(ShannonBase::OPER_TYPE::OPER_INSERT, data); @@ -264,7 +264,10 @@ uchar *Chunk::update(const Rapid_load_context *context, row_id_t where, uchar *n ut_a(where < m_header->m_prows); + std::scoped_lock data_guard(m_data_mutex); auto where_ptr = m_base + where * m_header->m_normailzed_pack_length; + + build_version(where, context->m_trx->get_id(), where_ptr, len); if (len == UNIV_SQL_NULL) { Utils::Util::bit_array_set(m_header->m_null_mask.get(), where); len = m_header->m_normailzed_pack_length; @@ -290,12 +293,17 @@ uchar *Chunk::del(const Rapid_load_context *context, uchar *data, size_t len) { if (m_data <= m_base) return m_base; std::atomic start_pos{m_base.load()}; size_t row_index{0}; + + std::scoped_lock data_guard(m_data_mutex); while (start_pos < m_data.load()) { if (!memcmp(start_pos, data, len)) { // same Utils::Util::bit_array_set(m_header->m_del_mask.get(), row_index); // to set the mem to blank holder. - update_meta_info(ShannonBase::OPER_TYPE::OPER_DELETE, start_pos); + auto is_null = Utils::Util::bit_array_get(m_header->m_null_mask.get(), row_index); + build_version(row_index, context->m_trx->get_id(), data, is_null ? UNIV_SQL_NULL : len); + memcpy(start_pos, reinterpret_cast((uchar *)SHANNON_BLANK_PLACEHOLDER), len); + update_meta_info(ShannonBase::OPER_TYPE::OPER_DELETE, start_pos); } start_pos += m_header->m_source_fld->pack_length(); @@ -317,17 +325,41 @@ uchar *Chunk::del(const Rapid_load_context *context, row_id_t rowid) { Utils::Util::bit_array_set(m_header->m_del_mask.get(), rowid); + std::scoped_lock data_guard(m_data_mutex); del_from = m_base + rowid * m_header->m_normailzed_pack_length; ut_a(del_from <= m_data); - // TODO: add to smu ptr. but, now we just replace with SHANNON_PLACEHOLDER - if (del_from) update_meta_info(ShannonBase::OPER_TYPE::OPER_DELETE, del_from); + + // get the old data and insert smu ptr link. should check whether data is null. + auto is_null = Utils::Util::bit_array_get(m_header->m_null_mask.get(), rowid); + auto data_len = is_null ? UNIV_SQL_NULL : m_header->m_normailzed_pack_length; + build_version(rowid, context->m_trx->get_id(), del_from, data_len); del_from = (uchar *)memcpy(del_from, SHANNON_BLANK_PLACEHOLDER, m_header->m_normailzed_pack_length); + + if (del_from) update_meta_info(ShannonBase::OPER_TYPE::OPER_DELETE, del_from); return del_from; } +void Chunk::build_version(row_id_t rowid, Transaction::ID id, const uchar *data, size_t len) { + smu_item_t si(m_header->m_normailzed_pack_length); + if (len == UNIV_SQL_NULL) { + si.data = nullptr; + } else + std::memcpy(si.data.get(), data, len); + si.trxid = id; + if (m_header->m_smu->m_version_info.find(rowid) != m_header->m_smu->m_version_info.end()) { + m_header->m_smu->m_version_info[rowid].emplace_back(std::move(si)); + } else { + smu_item_vec iv; + iv.emplace_back(std::move(si)); + m_header->m_smu->m_version_info.emplace(rowid, std::move(iv)); + } + + return; +} + void Chunk::truncate() { - std::scoped_lock lk(m_header_mutex); + std::scoped_lock lk(m_data_mutex); if (m_base) { ut::aligned_free(m_base); m_base = m_data = nullptr; @@ -347,5 +379,13 @@ uchar *Chunk::seek(row_id_t rowid) { return m_base + rowid * m_header->m_normailzed_pack_length; } +row_id_t Chunk::prows() { return m_header->m_prows; } + +row_id_t Chunk::rows(Rapid_load_context *context) { + ut_a(false); + return m_header->m_prows; + ; +} + } // namespace Imcs } // namespace ShannonBase \ No newline at end of file diff --git a/storage/rapid_engine/imcs/chunk.h b/storage/rapid_engine/imcs/chunk.h index f38279c58..bfb6c3e37 100644 --- a/storage/rapid_engine/imcs/chunk.h +++ b/storage/rapid_engine/imcs/chunk.h @@ -29,17 +29,18 @@ #include #include #include -#include +#include #include "field_types.h" //for MYSQL_TYPE_XXX #include "my_inttypes.h" #include "sql/field.h" //Field -#include "trx0types.h" //trx_id_t +//#include "trx0types.h" //trx_id_t #include "sql/sql_class.h" #include "storage/rapid_engine/include/rapid_arch_inf.h" //cache line sz #include "storage/rapid_engine/include/rapid_const.h" #include "storage/rapid_engine/include/rapid_object.h" +#include "storage/rapid_engine/trx/transaction.h" //Transaction namespace ShannonBase { class Rapid_load_context; @@ -62,18 +63,37 @@ struct SHANNON_ALIGNAS chunk_deleter_helper { // SMU. So if a trx can see the latest version data, it should travers the version // link to check whether there's some visible data or not. if yes, return the old ver // data or otherwise, go to check the next item. -struct smu_item_t { +struct SHANNON_ALIGNAS smu_item_t { // trxid of old version value. - trx_id_t trxid; + Transaction::ID trxid; // timestamp of the modification. std::chrono::time_point tm_stamp; - // the old version of data. - uchar *data; + // the old version of data. all var data types were encoded. + std::unique_ptr data; + + smu_item_t(size_t size) : data(new uchar[size]) { tm_stamp = std::chrono::high_resolution_clock::now(); } + smu_item_t() = delete; + // Disable copying + smu_item_t(const smu_item_t &) = delete; + smu_item_t &operator=(const smu_item_t &) = delete; + + // Define a move constructor + smu_item_t(smu_item_t &&other) noexcept : trxid(other.trxid), tm_stamp(other.tm_stamp), data(std::move(other.data)) {} + + // Define a move assignment operator + smu_item_t &operator=(smu_item_t &&other) noexcept { + if (this != &other) { + trxid = other.trxid; + tm_stamp = other.tm_stamp; + data = std::move(other.data); + } + return *this; + } }; -using smu_item = struct smu_item_t; +using smu_item_vec = std::vector; class Chunk : public MemoryObject { public: @@ -82,11 +102,13 @@ class Chunk : public MemoryObject { class Snapshot_meta_unit { public: - // an item of SMU. consist of . - std::vector m_version_info; + // an item of SMU. consist of . pair of row_id_t and sum_item indicates + // that each row data has a version link. If this row data not been modified, it does not + // have any old version. + std::unordered_map m_version_info; }; - using Chunk_header = struct alignas(CACHE_LINE_SIZE) Chunk_header_t { + using Chunk_header = struct SHANNON_ALIGNAS Chunk_header_t { public: // a copy of source field info, only use its meta info. do NOT use it // directly. @@ -216,6 +238,12 @@ class Chunk : public MemoryObject { uchar *seek(row_id_t rowid); + // gets the physical row count. + row_id_t prows(); + + // gets + row_id_t rows(Rapid_load_context *context); + private: std::mutex m_header_mutex; std::unique_ptr m_header{nullptr}; @@ -250,6 +278,9 @@ class Chunk : public MemoryObject { // check the data type is leagal or not. void check_data_type(size_t type_size); + + // build up an old version. + inline void build_version(row_id_t rowid, Transaction::ID id, const uchar *data, size_t len); }; } // namespace Imcs diff --git a/storage/rapid_engine/imcs/cu.cpp b/storage/rapid_engine/imcs/cu.cpp index 76064b072..c23b3b45e 100644 --- a/storage/rapid_engine/imcs/cu.cpp +++ b/storage/rapid_engine/imcs/cu.cpp @@ -107,7 +107,15 @@ row_id_t Cu::prows() { return m_header->m_prows.load(std::memory_order_seq_cst); } -row_id_t Cu::rows(Rapid_load_context *context) { return 0; } +row_id_t Cu::rows(Rapid_load_context *context) { + // now, we return the prows, in future, we will return mvcc-versioned row num. + ut_a(context->m_trx); + size_t rows{0u}; + for (auto idx = 0u; idx < m_chunks.size(); idx++) { + rows += m_chunks[idx]->rows(context); + } + return m_header->m_prows.load(std::memory_order_seq_cst); +} size_t Cu::normalized_pack_length() { ut_a(m_chunks.size()); diff --git a/storage/rapid_engine/imcs/data_table.cpp b/storage/rapid_engine/imcs/data_table.cpp index bee13ad68..a91f1c133 100644 --- a/storage/rapid_engine/imcs/data_table.cpp +++ b/storage/rapid_engine/imcs/data_table.cpp @@ -120,7 +120,8 @@ int DataTable::next(uchar *buf) { auto trx_id_ptr = m_context->m_trx_id_cu->chunk(current_chunk)->seek(offset_in_chunk); // more info for __builtin_prefetch: https://gcc.gnu.org/onlinedocs/gcc/Other-Builtins.html if ((trx_id_ptr + CACHE_LINE_SIZE) < m_context->m_trx_id_cu->chunk(current_chunk)->where()) - __builtin_prefetch(trx_id_ptr + CACHE_LINE_SIZE, SHANNON_PREFETCH_FOR_READ, SHANNON_PREFETCH_L3_LOCALITY); + SHANNON_PREFETCH_R(trx_id_ptr + CACHE_LINE_SIZE); + Transaction::ID trx_id = mach_read_from_6(trx_id_ptr); for (auto idx = 0u; idx < m_field_cus.size(); idx++) { @@ -135,8 +136,7 @@ int DataTable::next(uchar *buf) { // prefetch data for cpu to reduce the data cache miss. if (cu->chunk(current_chunk)->seek(offset_in_chunk) + CACHE_LINE_SIZE < cu->chunk(current_chunk)->where()) - __builtin_prefetch(cu->chunk(current_chunk)->seek(offset_in_chunk) + CACHE_LINE_SIZE, SHANNON_PREFETCH_FOR_READ, - SHANNON_PREFETCH_L3_LOCALITY); + SHANNON_PREFETCH_R(cu->chunk(current_chunk)->seek(offset_in_chunk) + CACHE_LINE_SIZE); // visibility check. if it's not visibile and does not have old version, go to next. if (!m_context->m_trx->is_visible(trx_id, m_context->m_table_name) && diff --git a/storage/rapid_engine/include/rapid_arch_inf.h b/storage/rapid_engine/include/rapid_arch_inf.h index 8fa0c48d1..6f0241ae2 100644 --- a/storage/rapid_engine/include/rapid_arch_inf.h +++ b/storage/rapid_engine/include/rapid_arch_inf.h @@ -33,4 +33,42 @@ #define CACHE_L2_SIZE 524288 #define CACHE_L3_SIZE 8388608 +/* prefetch + * can be disabled, by declaring NO_PREFETCH build macro */ +#if defined(SHANNON_NO_PREFETCH) +# define SHANNON_PREFETCH_L1(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_L2(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_L3(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_R(addr) (void)(addr) /* disabled */ +# define SHANNON__PREFETCH_RW(addr) (void)(addr) /* disabled */ +#else +# if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_I86)) /* _mm_prefetch() is not defined outside of x86/x64 */ +# include /* https://msdn.microsoft.com/fr-fr/library/84szxsww(v=vs.90).aspx */ +# define SHANNON_PREFETCH_L1(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T0) +# define SHANNON_PREFETCH_L2(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T1) +# define SHANNON_PREFETCH_L3(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2) +# define SHANNON_PREFETCH_R(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2) +# define SHANNON__PREFETCH_RW(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2) +# elif defined(__GNUC__) && ( (__GNUC__ >= 4) || ( (__GNUC__ == 3) && (__GNUC_MINOR__ >= 1) ) ) +# define SHANNON_PREFETCH_L1(addr) __builtin_prefetch((addr), 0 /* rw==read */, 3 /* locality */) +# define SHANNON_PREFETCH_L2(addr) __builtin_prefetch((addr), 0 /* rw==read */, 2 /* locality */) +# define SHANNON_PREFETCH_L3(addr) __builtin_prefetch((addr), 0 /* rw==read */, 1 /* locality */) +# define SHANNON_PREFETCH_R(addr) __builtin_prefetch((addr), 0 /* rw==read */, 1 /* locality */) +/* Minimize cache-miss latency by moving data at addr into a cache before it is read or written. */ +# define SHANNON__PREFETCH_RW(addr) __builtin_prefetch((addr), 1 /* rw==write */, 1 /* locality */) +# elif defined(__aarch64__) +# define SHANNON_PREFETCH_L1(addr) __asm__ __volatile__("prfm pldl1keep, %0" ::"Q"(*(addr))) +# define SHANNON_PREFETCH_L2(addr) __asm__ __volatile__("prfm pldl2keep, %0" ::"Q"(*(addr))) +# define SHANNON_PREFETCH_L3(addr) __asm__ __volatile__("prfm pldl3keep, %0" ::"Q"(*(addr))) +# define SHANNON_PREFETCH_R(addr) __asm__ __volatile__("prfm pldl3keep, %0" ::"Q"(*(addr))) +# define SHANNON__PREFETCH_RW(addr) __asm__ __volatile__("prfm pldl3strm, %0" ::"Q"(*(addr))) +# else +# define SHANNON_PREFETCH_L1(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_L2(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_L3(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_R(addr) (void)(addr) /* disabled */ +# define SHANNON__PREFETCH_RW(addr) (void)(addr) /* disabled */ +# endif +#endif /* NO_PREFETCH */ + #endif //__SHANNON_RAPID_ARCH_INFO_H__ diff --git a/storage/rapid_engine/include/rapid_arch_inf.h.in b/storage/rapid_engine/include/rapid_arch_inf.h.in index 178575e48..253ff3760 100644 --- a/storage/rapid_engine/include/rapid_arch_inf.h.in +++ b/storage/rapid_engine/include/rapid_arch_inf.h.in @@ -33,4 +33,42 @@ #define CACHE_L2_SIZE @CACHE_L2_SIZE@ #define CACHE_L3_SIZE @CACHE_L3_SIZE@ +/* prefetch + * can be disabled, by declaring NO_PREFETCH build macro */ +#if defined(SHANNON_NO_PREFETCH) +# define SHANNON_PREFETCH_L1(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_L2(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_L3(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_R(addr) (void)(addr) /* disabled */ +# define SHANNON__PREFETCH_RW(addr) (void)(addr) /* disabled */ +#else +# if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_I86)) /* _mm_prefetch() is not defined outside of x86/x64 */ +# include /* https://msdn.microsoft.com/fr-fr/library/84szxsww(v=vs.90).aspx */ +# define SHANNON_PREFETCH_L1(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T0) +# define SHANNON_PREFETCH_L2(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T1) +# define SHANNON_PREFETCH_L3(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2) +# define SHANNON_PREFETCH_R(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2) +# define SHANNON__PREFETCH_RW(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2) +# elif defined(__GNUC__) && ( (__GNUC__ >= 4) || ( (__GNUC__ == 3) && (__GNUC_MINOR__ >= 1) ) ) +# define SHANNON_PREFETCH_L1(addr) __builtin_prefetch((addr), 0 /* rw==read */, 3 /* locality */) +# define SHANNON_PREFETCH_L2(addr) __builtin_prefetch((addr), 0 /* rw==read */, 2 /* locality */) +# define SHANNON_PREFETCH_L3(addr) __builtin_prefetch((addr), 0 /* rw==read */, 1 /* locality */) +# define SHANNON_PREFETCH_R(addr) __builtin_prefetch((addr), 0 /* rw==read */, 1 /* locality */) +/* Minimize cache-miss latency by moving data at addr into a cache before it is read or written. */ +# define SHANNON__PREFETCH_RW(addr) __builtin_prefetch((addr), 1 /* rw==write */, 1 /* locality */) +# elif defined(__aarch64__) +# define SHANNON_PREFETCH_L1(addr) __asm__ __volatile__("prfm pldl1keep, %0" ::"Q"(*(addr))) +# define SHANNON_PREFETCH_L2(addr) __asm__ __volatile__("prfm pldl2keep, %0" ::"Q"(*(addr))) +# define SHANNON_PREFETCH_L3(addr) __asm__ __volatile__("prfm pldl3keep, %0" ::"Q"(*(addr))) +# define SHANNON_PREFETCH_R(addr) __asm__ __volatile__("prfm pldl3keep, %0" ::"Q"(*(addr))) +# define SHANNON__PREFETCH_RW(addr) __asm__ __volatile__("prfm pldl3strm, %0" ::"Q"(*(addr))) +# else +# define SHANNON_PREFETCH_L1(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_L2(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_L3(addr) (void)(addr) /* disabled */ +# define SHANNON_PREFETCH_R(addr) (void)(addr) /* disabled */ +# define SHANNON__PREFETCH_RW(addr) (void)(addr) /* disabled */ +# endif +#endif /* NO_PREFETCH */ + #endif //__SHANNON_RAPID_ARCH_INFO_H__ \ No newline at end of file diff --git a/storage/rapid_engine/include/rapid_const.h b/storage/rapid_engine/include/rapid_const.h index 9c3561f1b..5bac4e101 100644 --- a/storage/rapid_engine/include/rapid_const.h +++ b/storage/rapid_engine/include/rapid_const.h @@ -40,15 +40,6 @@ #define SHANNON_ALIGNAS alignas(CACHE_LINE_SIZE) -// THE FOLLOWING FOR PREFETCH CPU INSTRUCTION. -#define SHANNON_PREFETCH_FOR_READ 0 -#define SHANNON_PREFETCH_FOR_WRITE 1 - -#define SHANNON_PREFETCH__NONE_LOCALITY 0 -#define SHANNON_PREFETCH_L3_LOCALITY 1 -#define SHANNON_PREFETCH_L2_LOCALITY 2 -#define SAHNNON_PREFETCH_L1_LOCALITY 3 - namespace ShannonBase { using row_id_t = size_t; /** Handler name for rapid */ diff --git a/storage/rapid_engine/trx/transaction.cpp b/storage/rapid_engine/trx/transaction.cpp index ca27bbf5f..8f8bdbd11 100644 --- a/storage/rapid_engine/trx/transaction.cpp +++ b/storage/rapid_engine/trx/transaction.cpp @@ -189,7 +189,7 @@ bool Transaction::is_auto_commit() { return m_trx_impl->auto_commit; } bool Transaction::is_active() { return trx_is_started(m_trx_impl); } -bool Transaction::is_visible(ID trx_id, const char *table_name) { +bool Transaction::is_visible(Transaction::ID trx_id, const char *table_name) { if (MVCC::is_view_active(m_trx_impl->read_view)) { table_name_t name; name.m_name = const_cast(table_name); @@ -199,4 +199,6 @@ bool Transaction::is_visible(ID trx_id, const char *table_name) { return false; } +Transaction::ID Transaction::get_id() { return m_trx_impl->id; } + } // namespace ShannonBase \ No newline at end of file diff --git a/storage/rapid_engine/trx/transaction.h b/storage/rapid_engine/trx/transaction.h index 79378b1ea..0e23cea35 100644 --- a/storage/rapid_engine/trx/transaction.h +++ b/storage/rapid_engine/trx/transaction.h @@ -23,6 +23,9 @@ The fundmental code for imcs. for transaction. */ +#ifndef __SHANNONBASE_TRANSACTION_H__ +#define __SHANNONBASE_TRANSACTION_H__ + #include "sql/current_thd.h" #include "storage/innobase/include/trx0types.h" //trx_id_t #include "storage/rapid_engine/include/rapid_object.h" @@ -87,7 +90,9 @@ class Transaction : public MemoryObject { virtual bool is_active(); - virtual bool is_visible(ID trx_id, const char *table_name); + virtual bool is_visible(Transaction::ID trx_id, const char *table_name); + + virtual Transaction::ID get_id(); private: Transaction(THD *thd = current_thd); @@ -105,4 +110,6 @@ class Transaction : public MemoryObject { ISOLATION_LEVEL m_iso_level{ISOLATION_LEVEL::READ_REPEATABLE}; }; -} // namespace ShannonBase \ No newline at end of file +} // namespace ShannonBase + +#endif //__SHANNONBASE_TRANSACTION_H__ \ No newline at end of file