Skip to content

Commit

Permalink
feat(shannon): refine refetch and minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ShannonBase committed Nov 17, 2024
1 parent 3ebefc4 commit 2d13fc6
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 32 deletions.
52 changes: 46 additions & 6 deletions storage/rapid_engine/imcs/chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,28 @@ int Chunk::is_deleted(const Rapid_load_context *context, row_id_t pos) {
return Utils::Util::bit_array_get(m_header->m_del_mask.get(), pos);
}

void Chunk::build_version(row_id_t rowid, Transaction::ID id, const uchar *data, size_t len) {
smu_item_t si(m_header->m_normailzed_pack_length);
if (len == UNIV_SQL_NULL) {
si.data = nullptr;
} else
std::memcpy(si.data.get(), data, len);
si.trxid = id;
if (m_header->m_smu->m_version_info.find(rowid) != m_header->m_smu->m_version_info.end()) {
m_header->m_smu->m_version_info[rowid].emplace_back(std::move(si));
} else {
smu_item_vec iv;
iv.emplace_back(std::move(si));
m_header->m_smu->m_version_info.emplace(rowid, std::move(iv));
}

return;
}

uchar *Chunk::read(const Rapid_load_context *context, uchar *data, size_t len) {
ut_a((!data && len == UNIV_SQL_NULL) || (data && len != UNIV_SQL_NULL));
check_data_type(len);

std::scoped_lock lk(m_data_mutex);
if (unlikely(m_rdata.load() + len > m_end.load())) {
m_rdata.store(m_base.load());
return nullptr;
Expand Down Expand Up @@ -248,7 +265,8 @@ uchar *Chunk::write(const Rapid_load_context *context, uchar *data, size_t len)
m_data.store(m_base.load());
return nullptr;
}
auto ret = reinterpret_cast<uchar *>(std::memcpy(m_data, data, len));
std::scoped_lock data_guard(m_data_mutex);
auto ret = reinterpret_cast<uchar *>(std::memcpy(m_data.load(), data, len));
m_data.fetch_add(len);

update_meta_info(ShannonBase::OPER_TYPE::OPER_INSERT, data);
Expand All @@ -264,7 +282,10 @@ uchar *Chunk::update(const Rapid_load_context *context, row_id_t where, uchar *n

ut_a(where < m_header->m_prows);

std::scoped_lock data_guard(m_data_mutex);
auto where_ptr = m_base + where * m_header->m_normailzed_pack_length;

build_version(where, context->m_extra_info.m_trxid, where_ptr, len);
if (len == UNIV_SQL_NULL) {
Utils::Util::bit_array_set(m_header->m_null_mask.get(), where);
len = m_header->m_normailzed_pack_length;
Expand All @@ -290,12 +311,17 @@ uchar *Chunk::del(const Rapid_load_context *context, uchar *data, size_t len) {
if (m_data <= m_base) return m_base;
std::atomic<uchar *> start_pos{m_base.load()};
size_t row_index{0};

std::scoped_lock data_guard(m_data_mutex);
while (start_pos < m_data.load()) {
if (!memcmp(start_pos, data, len)) { // same
Utils::Util::bit_array_set(m_header->m_del_mask.get(), row_index);
// to set the mem to blank holder.
update_meta_info(ShannonBase::OPER_TYPE::OPER_DELETE, start_pos);
auto is_null = Utils::Util::bit_array_get(m_header->m_null_mask.get(), row_index);
build_version(row_index, context->m_extra_info.m_trxid, data, is_null ? UNIV_SQL_NULL : len);

memcpy(start_pos, reinterpret_cast<void *>((uchar *)SHANNON_BLANK_PLACEHOLDER), len);
update_meta_info(ShannonBase::OPER_TYPE::OPER_DELETE, start_pos);
}

start_pos += m_header->m_source_fld->pack_length();
Expand All @@ -317,17 +343,23 @@ uchar *Chunk::del(const Rapid_load_context *context, row_id_t rowid) {

Utils::Util::bit_array_set(m_header->m_del_mask.get(), rowid);

std::scoped_lock data_guard(m_data_mutex);
del_from = m_base + rowid * m_header->m_normailzed_pack_length;
ut_a(del_from <= m_data);
// TODO: add to smu ptr. but, now we just replace with SHANNON_PLACEHOLDER
if (del_from) update_meta_info(ShannonBase::OPER_TYPE::OPER_DELETE, del_from);

// get the old data and insert smu ptr link. should check whether data is null.
auto is_null = Utils::Util::bit_array_get(m_header->m_null_mask.get(), rowid);
auto data_len = is_null ? UNIV_SQL_NULL : m_header->m_normailzed_pack_length;
build_version(rowid, context->m_extra_info.m_trxid, del_from, data_len);

del_from = (uchar *)memcpy(del_from, SHANNON_BLANK_PLACEHOLDER, m_header->m_normailzed_pack_length);

if (del_from) update_meta_info(ShannonBase::OPER_TYPE::OPER_DELETE, del_from);
return del_from;
}

void Chunk::truncate() {
std::scoped_lock lk(m_header_mutex);
std::scoped_lock lk(m_data_mutex);
if (m_base) {
ut::aligned_free(m_base);
m_base = m_data = nullptr;
Expand All @@ -347,5 +379,13 @@ uchar *Chunk::seek(row_id_t rowid) {
return m_base + rowid * m_header->m_normailzed_pack_length;
}

row_id_t Chunk::prows() { return m_header->m_prows; }

row_id_t Chunk::rows(Rapid_load_context *context) {
ut_a(false);
return m_header->m_prows;
;
}

} // namespace Imcs
} // namespace ShannonBase
57 changes: 47 additions & 10 deletions storage/rapid_engine/imcs/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@
#include <atomic>
#include <chrono>
#include <tuple>
#include <vector>
#include <unordered_map>

#include "field_types.h" //for MYSQL_TYPE_XXX
#include "my_inttypes.h"
#include "sql/field.h" //Field
#include "trx0types.h" //trx_id_t
//#include "trx0types.h" //trx_id_t

#include "sql/sql_class.h"
#include "storage/rapid_engine/include/rapid_arch_inf.h" //cache line sz
#include "storage/rapid_engine/include/rapid_const.h"
#include "storage/rapid_engine/include/rapid_object.h"
#include "storage/rapid_engine/trx/transaction.h" //Transaction

namespace ShannonBase {
class Rapid_load_context;
Expand All @@ -62,18 +63,37 @@ struct SHANNON_ALIGNAS chunk_deleter_helper {
// SMU. So if a trx can see the latest version data, it should travers the version
// link to check whether there's some visible data or not. if yes, return the old ver
// data or otherwise, go to check the next item.
struct smu_item_t {
struct SHANNON_ALIGNAS smu_item_t {
// trxid of old version value.
trx_id_t trxid;
Transaction::ID trxid;

// timestamp of the modification.
std::chrono::time_point<std::chrono::high_resolution_clock> tm_stamp;

// the old version of data.
uchar *data;
// the old version of data. all var data types were encoded.
std::unique_ptr<uchar[]> data;

smu_item_t(size_t size) : data(new uchar[size]) { tm_stamp = std::chrono::high_resolution_clock::now(); }
smu_item_t() = delete;
// Disable copying
smu_item_t(const smu_item_t &) = delete;
smu_item_t &operator=(const smu_item_t &) = delete;

// Define a move constructor
smu_item_t(smu_item_t &&other) noexcept : trxid(other.trxid), tm_stamp(other.tm_stamp), data(std::move(other.data)) {}

// Define a move assignment operator
smu_item_t &operator=(smu_item_t &&other) noexcept {
if (this != &other) {
trxid = other.trxid;
tm_stamp = other.tm_stamp;
data = std::move(other.data);
}
return *this;
}
};

using smu_item = struct smu_item_t;
using smu_item_vec = std::vector<smu_item_t>;

class Chunk : public MemoryObject {
public:
Expand All @@ -82,11 +102,19 @@ class Chunk : public MemoryObject {

class Snapshot_meta_unit {
public:
// an item of SMU. consist of <trxid, new_data>.
std::vector<smu_item> m_version_info;
/** an item of SMU. consist of <trxid, new_data>. pair of row_id_t and sum_item indicates
* that each row data has a version link. If this row data not been modified, it does not
* have any old version.
* |__|
* |__|<----->rowidN: {[{trxid:value1} | {trxid:value2} | {trxid:value3} | ...| {trxid:valueN}]}
* |__| rowidM: {[{trxid:value1} | {trxid:value2} | {trxid:value3} | ...| {trxid:valueN}]}
* |__|<-----/|\
* |__|
*/
std::unordered_map<row_id_t, smu_item_vec> m_version_info;
};

using Chunk_header = struct alignas(CACHE_LINE_SIZE) Chunk_header_t {
using Chunk_header = struct SHANNON_ALIGNAS Chunk_header_t {
public:
// a copy of source field info, only use its meta info. do NOT use it
// directly.
Expand Down Expand Up @@ -216,6 +244,12 @@ class Chunk : public MemoryObject {

uchar *seek(row_id_t rowid);

// gets the physical row count.
row_id_t prows();

// gets
row_id_t rows(Rapid_load_context *context);

private:
std::mutex m_header_mutex;
std::unique_ptr<Chunk_header> m_header{nullptr};
Expand Down Expand Up @@ -250,6 +284,9 @@ class Chunk : public MemoryObject {

// check the data type is leagal or not.
void check_data_type(size_t type_size);

// build up an old version.
inline void build_version(row_id_t rowid, Transaction::ID id, const uchar *data, size_t len);
};

} // namespace Imcs
Expand Down
10 changes: 9 additions & 1 deletion storage/rapid_engine/imcs/cu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,15 @@ row_id_t Cu::prows() {
return m_header->m_prows.load(std::memory_order_seq_cst);
}

row_id_t Cu::rows(Rapid_load_context *context) { return 0; }
row_id_t Cu::rows(Rapid_load_context *context) {
// now, we return the prows, in future, we will return mvcc-versioned row num.
ut_a(context->m_trx);
size_t rows{0u};
for (auto idx = 0u; idx < m_chunks.size(); idx++) {
rows += m_chunks[idx]->rows(context);
}
return m_header->m_prows.load(std::memory_order_seq_cst);
}

size_t Cu::normalized_pack_length() {
ut_a(m_chunks.size());
Expand Down
6 changes: 3 additions & 3 deletions storage/rapid_engine/imcs/data_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ int DataTable::next(uchar *buf) {
auto trx_id_ptr = m_context->m_trx_id_cu->chunk(current_chunk)->seek(offset_in_chunk);
// more info for __builtin_prefetch: https://gcc.gnu.org/onlinedocs/gcc/Other-Builtins.html
if ((trx_id_ptr + CACHE_LINE_SIZE) < m_context->m_trx_id_cu->chunk(current_chunk)->where())
__builtin_prefetch(trx_id_ptr + CACHE_LINE_SIZE, SHANNON_PREFETCH_FOR_READ, SHANNON_PREFETCH_L3_LOCALITY);
SHANNON_PREFETCH_R(trx_id_ptr + CACHE_LINE_SIZE);

Transaction::ID trx_id = mach_read_from_6(trx_id_ptr);

for (auto idx = 0u; idx < m_field_cus.size(); idx++) {
Expand All @@ -135,8 +136,7 @@ int DataTable::next(uchar *buf) {

// prefetch data for cpu to reduce the data cache miss.
if (cu->chunk(current_chunk)->seek(offset_in_chunk) + CACHE_LINE_SIZE < cu->chunk(current_chunk)->where())
__builtin_prefetch(cu->chunk(current_chunk)->seek(offset_in_chunk) + CACHE_LINE_SIZE, SHANNON_PREFETCH_FOR_READ,
SHANNON_PREFETCH_L3_LOCALITY);
SHANNON_PREFETCH_R(cu->chunk(current_chunk)->seek(offset_in_chunk) + CACHE_LINE_SIZE);

// visibility check. if it's not visibile and does not have old version, go to next.
if (!m_context->m_trx->is_visible(trx_id, m_context->m_table_name) &&
Expand Down
40 changes: 40 additions & 0 deletions storage/rapid_engine/include/rapid_arch_inf.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,44 @@
#define CACHE_L2_SIZE 524288
#define CACHE_L3_SIZE 8388608

// clang-format off
/* prefetch
* can be disabled, by declaring NO_PREFETCH build macro */
#if defined(SHANNON_NO_PREFETCH)
# define SHANNON_PREFETCH_L1(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_L2(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_L3(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_R(addr) (void)(addr) /* disabled */
# define SHANNON__PREFETCH_RW(addr) (void)(addr) /* disabled */
#else
# if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_I86)) /* _mm_prefetch() is not defined outside of x86/x64 */
# include <mmintrin.h> /* https://msdn.microsoft.com/fr-fr/library/84szxsww(v=vs.90).aspx */
# define SHANNON_PREFETCH_L1(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T0)
# define SHANNON_PREFETCH_L2(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T1)
# define SHANNON_PREFETCH_L3(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2)
# define SHANNON_PREFETCH_R(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2)
# define SHANNON__PREFETCH_RW(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2)
# elif defined(__GNUC__) && ( (__GNUC__ >= 4) || ( (__GNUC__ == 3) && (__GNUC_MINOR__ >= 1) ) )
# define SHANNON_PREFETCH_L1(addr) __builtin_prefetch((addr), 0 /* rw==read */, 3 /* locality */)
# define SHANNON_PREFETCH_L2(addr) __builtin_prefetch((addr), 0 /* rw==read */, 2 /* locality */)
# define SHANNON_PREFETCH_L3(addr) __builtin_prefetch((addr), 0 /* rw==read */, 1 /* locality */)
# define SHANNON_PREFETCH_R(addr) __builtin_prefetch((addr), 0 /* rw==read */, 1 /* locality */)
/* Minimize cache-miss latency by moving data at addr into a cache before it is read or written. */
# define SHANNON__PREFETCH_RW(addr) __builtin_prefetch((addr), 1 /* rw==write */, 1 /* locality */)
# elif defined(__aarch64__)
# define SHANNON_PREFETCH_L1(addr) __asm__ __volatile__("prfm pldl1keep, %0" ::"Q"(*(addr)))
# define SHANNON_PREFETCH_L2(addr) __asm__ __volatile__("prfm pldl2keep, %0" ::"Q"(*(addr)))
# define SHANNON_PREFETCH_L3(addr) __asm__ __volatile__("prfm pldl3keep, %0" ::"Q"(*(addr)))
# define SHANNON_PREFETCH_R(addr) __asm__ __volatile__("prfm pldl3keep, %0" ::"Q"(*(addr)))
# define SHANNON__PREFETCH_RW(addr) __asm__ __volatile__("prfm pldl3strm, %0" ::"Q"(*(addr)))
# else
# define SHANNON_PREFETCH_L1(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_L2(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_L3(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_R(addr) (void)(addr) /* disabled */
# define SHANNON__PREFETCH_RW(addr) (void)(addr) /* disabled */
# endif
#endif /* NO_PREFETCH */
// clang-format on

#endif //__SHANNON_RAPID_ARCH_INFO_H__
40 changes: 40 additions & 0 deletions storage/rapid_engine/include/rapid_arch_inf.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,44 @@
#define CACHE_L2_SIZE @CACHE_L2_SIZE@
#define CACHE_L3_SIZE @CACHE_L3_SIZE@

// clang-format off
/* prefetch
* can be disabled, by declaring NO_PREFETCH build macro */
#if defined(SHANNON_NO_PREFETCH)
# define SHANNON_PREFETCH_L1(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_L2(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_L3(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_R(addr) (void)(addr) /* disabled */
# define SHANNON__PREFETCH_RW(addr) (void)(addr) /* disabled */
#else
# if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_I86)) /* _mm_prefetch() is not defined outside of x86/x64 */
# include <mmintrin.h> /* https://msdn.microsoft.com/fr-fr/library/84szxsww(v=vs.90).aspx */
# define SHANNON_PREFETCH_L1(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T0)
# define SHANNON_PREFETCH_L2(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T1)
# define SHANNON_PREFETCH_L3(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2)
# define SHANNON_PREFETCH_R(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2)
# define SHANNON__PREFETCH_RW(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T2)
# elif defined(__GNUC__) && ( (__GNUC__ >= 4) || ( (__GNUC__ == 3) && (__GNUC_MINOR__ >= 1) ) )
# define SHANNON_PREFETCH_L1(addr) __builtin_prefetch((addr), 0 /* rw==read */, 3 /* locality */)
# define SHANNON_PREFETCH_L2(addr) __builtin_prefetch((addr), 0 /* rw==read */, 2 /* locality */)
# define SHANNON_PREFETCH_L3(addr) __builtin_prefetch((addr), 0 /* rw==read */, 1 /* locality */)
# define SHANNON_PREFETCH_R(addr) __builtin_prefetch((addr), 0 /* rw==read */, 1 /* locality */)
/* Minimize cache-miss latency by moving data at addr into a cache before it is read or written. */
# define SHANNON__PREFETCH_RW(addr) __builtin_prefetch((addr), 1 /* rw==write */, 1 /* locality */)
# elif defined(__aarch64__)
# define SHANNON_PREFETCH_L1(addr) __asm__ __volatile__("prfm pldl1keep, %0" ::"Q"(*(addr)))
# define SHANNON_PREFETCH_L2(addr) __asm__ __volatile__("prfm pldl2keep, %0" ::"Q"(*(addr)))
# define SHANNON_PREFETCH_L3(addr) __asm__ __volatile__("prfm pldl3keep, %0" ::"Q"(*(addr)))
# define SHANNON_PREFETCH_R(addr) __asm__ __volatile__("prfm pldl3keep, %0" ::"Q"(*(addr)))
# define SHANNON__PREFETCH_RW(addr) __asm__ __volatile__("prfm pldl3strm, %0" ::"Q"(*(addr)))
# else
# define SHANNON_PREFETCH_L1(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_L2(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_L3(addr) (void)(addr) /* disabled */
# define SHANNON_PREFETCH_R(addr) (void)(addr) /* disabled */
# define SHANNON__PREFETCH_RW(addr) (void)(addr) /* disabled */
# endif
#endif /* NO_PREFETCH */
// clang-format on

#endif //__SHANNON_RAPID_ARCH_INFO_H__
9 changes: 0 additions & 9 deletions storage/rapid_engine/include/rapid_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@

#define SHANNON_ALIGNAS alignas(CACHE_LINE_SIZE)

// THE FOLLOWING FOR PREFETCH CPU INSTRUCTION.
#define SHANNON_PREFETCH_FOR_READ 0
#define SHANNON_PREFETCH_FOR_WRITE 1

#define SHANNON_PREFETCH__NONE_LOCALITY 0
#define SHANNON_PREFETCH_L3_LOCALITY 1
#define SHANNON_PREFETCH_L2_LOCALITY 2
#define SAHNNON_PREFETCH_L1_LOCALITY 3

namespace ShannonBase {
using row_id_t = size_t;
/** Handler name for rapid */
Expand Down
4 changes: 3 additions & 1 deletion storage/rapid_engine/trx/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ bool Transaction::is_auto_commit() { return m_trx_impl->auto_commit; }

bool Transaction::is_active() { return trx_is_started(m_trx_impl); }

bool Transaction::is_visible(ID trx_id, const char *table_name) {
bool Transaction::is_visible(Transaction::ID trx_id, const char *table_name) {
if (MVCC::is_view_active(m_trx_impl->read_view)) {
table_name_t name;
name.m_name = const_cast<char *>(table_name);
Expand All @@ -199,4 +199,6 @@ bool Transaction::is_visible(ID trx_id, const char *table_name) {
return false;
}

Transaction::ID Transaction::get_id() { return m_trx_impl->id; }

} // namespace ShannonBase
Loading

0 comments on commit 2d13fc6

Please sign in to comment.