Skip to content

Commit

Permalink
perf/lua: introduce sparse_mode option for column_insert.lua
Browse files Browse the repository at this point in the history
It allows to choose how non-NULL columns are distributed across the
dataset - sequentially or randomly (new default mode).

NO_DOC=perf test
NO_TEST=perf test
NO_CHANGELOG=perf test
  • Loading branch information
Gumix authored and sergepetrenko committed Dec 16, 2024
1 parent 399de7d commit 4adb68a
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 41 deletions.
13 changes: 11 additions & 2 deletions perf/lua/column_insert.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ local benchmark = require('benchmark')
local USAGE = [[
engine <string, 'memtx'> - space engine to use for the test
wal_mode <string, 'write'> - write-ahead log mode to use for the test
sparse_mode <string, 'rand'> -
* seq - the first 10% (column_count_batch / column_count_total) columns are
filled in sequential order;
* rand - sparse columns are randomly distributed:
in non-Arrow mode columns are randomly chosen for every row;
in Arrow mode columns are randomly chosen for every batch.
column_count_total <number, 1000> - number of columns in the test space
column_count_batch <number, 10> - number of columns in the record batch
column_count_batch <number, 100> - number of columns in the record batch
row_count_total <number, 1000000> - number of inserted rows
row_count_batch <number, 1000> - number of rows per record batch
use_arrow_api <boolean, false> - use the Arrow API for batch insertion
Expand All @@ -29,6 +35,7 @@ local USAGE = [[
local params = benchmark.argparse(arg, {
{'engine', 'string'},
{'wal_mode', 'string'},
{'sparse_mode', 'string'},
{'column_count_total', 'number'},
{'column_count_batch', 'number'},
{'row_count_total', 'number'},
Expand All @@ -38,13 +45,15 @@ local params = benchmark.argparse(arg, {

local DEFAULT_ENGINE = 'memtx'
local DEFAULT_WAL_MODE = 'write'
local DEFAULT_SPARSE_MODE = 'rand'
local DEFAULT_COLUMN_COUNT_TOTAL = 1000
local DEFAULT_COLUMN_COUNT_BATCH = 10
local DEFAULT_COLUMN_COUNT_BATCH = 100
local DEFAULT_ROW_COUNT_TOTAL = 1000 * 1000
local DEFAULT_ROW_COUNT_BATCH = 1000

params.engine = params.engine or DEFAULT_ENGINE
params.wal_mode = params.wal_mode or DEFAULT_WAL_MODE
params.sparse_mode = params.sparse_mode or DEFAULT_SPARSE_MODE
params.column_count_total = params.column_count_total or
DEFAULT_COLUMN_COUNT_TOTAL
params.column_count_batch = params.column_count_batch or
Expand Down
179 changes: 140 additions & 39 deletions perf/lua/column_insert_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
# define ENABLE_BATCH_INSERT 1
#endif

enum sparse_mode {
SPARSE_MODE_SEQ,
SPARSE_MODE_RAND,
};

static struct {
int64_t row_count;
int64_t column_count;
Expand All @@ -23,35 +28,87 @@ static struct {
} *columns;
} dataset;

static bool
rand_bool_with_probability(int probability)
{
if (probability <= 0)
return false;
if (probability >= 100)
return true;
int rand_value = rand() % 100;
return rand_value < probability;
}

static enum sparse_mode
sparse_mode_from_str(const char *sparse_mode)
{
if (strcmp(sparse_mode, "seq") == 0)
return SPARSE_MODE_SEQ;
else if (strcmp(sparse_mode, "rand") == 0)
return SPARSE_MODE_RAND;
abort();
}

static char *
encode_mp_data(char *data, int row, int column_count,
enum sparse_mode sparse_mode)
{
char *data_end = data;
data_end = mp_encode_array(data_end, dataset.column_count);
switch (sparse_mode) {
case SPARSE_MODE_SEQ: {
int i;
for (i = 0; i < column_count; i++) {
uint64_t val = dataset.columns[i].data[row];
data_end = mp_encode_uint(data_end, val);
}
for (; i < dataset.column_count; i++)
data_end = mp_encode_nil(data_end);
break;
}
case SPARSE_MODE_RAND: {
int prob = 100 * column_count / dataset.column_count;
for (int i = 0; i < dataset.column_count; i++) {
if (i == 0 || rand_bool_with_probability(prob)) {
uint64_t val = dataset.columns[i].data[row];
data_end = mp_encode_uint(data_end, val);
} else {
data_end = mp_encode_nil(data_end);
}
}
break;
}
default:
abort();
}
return data_end;
}

static int
insert_serial_lua_func(struct lua_State *L)
{
uint32_t space_id = luaL_checkinteger(L, 1);
luaL_checktype(L, 2, LUA_TTABLE);
lua_getfield(L, 2, "sparse_mode");
enum sparse_mode sparse_mode = sparse_mode_from_str(
luaL_checklstring(L, -1, NULL));
lua_getfield(L, 2, "column_count_batch");
int column_count = luaL_checkinteger(L, -1);
lua_getfield(L, 2, "row_count_batch");
int batch_row_count = luaL_checkinteger(L, -1);
lua_getfield(L, 2, "column_count_total");
int total_column_count = luaL_checkinteger(L, -1);
lua_pop(L, 2);
static char tuple_data[1000 * 1000];
int row_count = luaL_checkinteger(L, -1);
lua_pop(L, 3);

static char mp_data[1000 * 1000];
VERIFY(box_txn_begin() == 0);
for (int64_t i = 0; i < dataset.row_count; i++) {
char *data_end = tuple_data;
data_end = mp_encode_array(data_end, total_column_count);
int j;
for (j = 0; j < dataset.column_count; j++) {
uint64_t val = dataset.columns[j].data[i];
data_end = mp_encode_uint(data_end, val);
}
for (; j < total_column_count; j++)
data_end = mp_encode_nil(data_end);
size_t tuple_size = data_end - tuple_data;
if (tuple_size > sizeof(tuple_data))
char *mp_data_end = encode_mp_data(
mp_data, i, column_count, sparse_mode);
size_t data_size = mp_data_end - mp_data;
if (data_size > sizeof(mp_data))
abort();
if (box_insert(space_id, tuple_data, data_end, NULL) != 0)
if (box_insert(space_id, mp_data, mp_data_end, NULL) != 0)
return luaT_error(L);
if (i % batch_row_count == 0) {
if (i % row_count == 0) {
VERIFY(box_txn_commit() == 0);
VERIFY(box_txn_begin() == 0);
}
Expand All @@ -75,25 +132,28 @@ arrow_schema_destroy(struct ArrowSchema *schema)
}

static void
arrow_schema_init(struct ArrowSchema *schema)
arrow_schema_init(struct ArrowSchema *schema, int *column_numbers,
int column_count)
{
*schema = (struct ArrowSchema) {
.format = "+s",
.name = NULL,
.metadata = NULL,
.flags = 0,
.n_children = dataset.column_count,
.n_children = column_count,
.children = xmalloc(sizeof(struct ArrowSchema *) *
dataset.column_count),
column_count),
.dictionary = NULL,
.release = arrow_schema_destroy,
.private_data = NULL,
};
for (int i = 0; i < dataset.column_count; i++) {
for (int i = 0; i < column_count; i++) {
int num = column_numbers[i];
assert(num < dataset.column_count);
schema->children[i] = xmalloc(sizeof(*schema->children[i]));
*schema->children[i] = (struct ArrowSchema) {
.format = dataset.columns[i].type,
.name = dataset.columns[i].name,
.format = dataset.columns[num].type,
.name = dataset.columns[num].name,
.metadata = NULL,
.flags = 0,
.n_children = 0,
Expand Down Expand Up @@ -122,22 +182,22 @@ arrow_array_destroy(struct ArrowArray *array)
}

static void
arrow_array_init(struct ArrowArray *array, int row_count)
arrow_array_init(struct ArrowArray *array, int *column_numbers,
int column_count, int row_count, int row_offset)
{
*array = (struct ArrowArray) {
.length = row_count,
.null_count = 0,
.offset = 0,
.n_buffers = 1,
.n_children = dataset.column_count,
.n_children = column_count,
.buffers = xcalloc(1, sizeof(void *)),
.children = xmalloc(sizeof(struct ArrowArray *)
* dataset.column_count),
.children = xmalloc(sizeof(struct ArrowArray *) * column_count),
.dictionary = NULL,
.release = arrow_array_destroy,
.private_data = NULL,
};
for (int i = 0; i < dataset.column_count; i++) {
for (int i = 0; i < column_count; i++) {
array->children[i] = xmalloc(sizeof(*array->children[i]));
*array->children[i] = (struct ArrowArray) {
.length = row_count,
Expand All @@ -151,46 +211,87 @@ arrow_array_init(struct ArrowArray *array, int row_count)
.release = arrow_array_destroy,
.private_data = NULL,
};
int num = column_numbers[i];
assert(num < dataset.column_count);
array->children[i]->buffers[1] =
&dataset.columns[num].data[row_offset];
};
}

static void
arrow_batch_init(struct ArrowSchema *schema, struct ArrowArray *array,
int batch, int batch_column_count, int batch_row_count,
enum sparse_mode sparse_mode)
{
int column_numbers[batch_column_count];
column_numbers[0] = 0; /* PK */

switch (sparse_mode) {
case SPARSE_MODE_SEQ:
for (int i = 1; i < batch_column_count; i++)
column_numbers[i] = i;
break;
case SPARSE_MODE_RAND: {
for (int i = 1; i < batch_column_count; i++) {
int r;
gen_rand_column_number:
r = rand() % dataset.column_count;
for (int j = 0; j < i; j++) {
if (column_numbers[j] == r)
goto gen_rand_column_number;
}
column_numbers[i] = r;
}
break;
}
default:
abort();
}

arrow_schema_init(schema, column_numbers, batch_column_count);
arrow_array_init(array, column_numbers, batch_column_count,
batch_row_count, batch * batch_row_count);
}

static int
insert_batch_lua_func(struct lua_State *L)
{
uint32_t space_id = luaL_checkinteger(L, 1);
luaL_checktype(L, 2, LUA_TTABLE);
lua_getfield(L, 2, "sparse_mode");
enum sparse_mode sparse_mode = sparse_mode_from_str(
luaL_checklstring(L, -1, NULL));
lua_getfield(L, 2, "column_count_batch");
int batch_column_count = luaL_checkinteger(L, -1);
lua_getfield(L, 2, "row_count_batch");
int batch_row_count = luaL_checkinteger(L, -1);
lua_pop(L, 1);
lua_pop(L, 3);

struct ArrowSchema schema;
arrow_schema_init(&schema);
struct ArrowArray array;
arrow_array_init(&array, batch_row_count);

assert(dataset.row_count % batch_row_count == 0);
for (int i = 0; i < dataset.row_count / batch_row_count; i++) {
for (int j = 0; j < dataset.column_count; j++) {
array.children[j]->buffers[1] =
&dataset.columns[j].data[i * batch_row_count];
}
arrow_batch_init(&schema, &array, i, batch_column_count,
batch_row_count, sparse_mode);
if (box_insert_arrow(space_id, &array, &schema) != 0)
return luaT_error(L);
schema.release(&schema);
array.release(&array);
}
schema.release(&schema);
array.release(&array);
return 0;
}
#endif /* defined(ENABLE_BATCH_INSERT) */

static int
init_lua_func(struct lua_State *L)
{
srand(time(NULL));
say_info("Generating the test data set...");
luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L, 1, "row_count_total");
dataset.row_count = luaL_checkinteger(L, -1);
lua_getfield(L, 1, "column_count_batch");
lua_getfield(L, 1, "column_count_total");
dataset.column_count = luaL_checkinteger(L, -1);
lua_pop(L, 2);
dataset.columns = xmalloc(dataset.column_count *
Expand Down

0 comments on commit 4adb68a

Please sign in to comment.