Skip to content

Commit

Permalink
FEATURE: Add an upsert command for map structure
Browse files Browse the repository at this point in the history
  • Loading branch information
ing-eoking authored and jhpark816 committed Jul 3, 2024
1 parent 8e82283 commit f1995f0
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 33 deletions.
36 changes: 21 additions & 15 deletions engines/default/coll_map.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ static void do_map_elem_replace(map_meta_info *info,
}

static ENGINE_ERROR_CODE do_map_elem_link(map_meta_info *info, map_elem_item *elem,
const bool replace_if_exist, const void *cookie)
const bool replace_if_exist, bool *replaced,
const void *cookie)
{
assert(info->root != NULL);
map_hash_node *node = info->root;
Expand Down Expand Up @@ -373,6 +374,7 @@ static ENGINE_ERROR_CODE do_map_elem_link(map_meta_info *info, map_elem_item *el
pinfo.prev = prev;
pinfo.hidx = hidx;
do_map_elem_replace(info, &pinfo, elem);
if (replaced) *replaced = true;
return ENGINE_SUCCESS;
} else {
return ENGINE_ELEM_EEXISTS;
Expand All @@ -387,6 +389,12 @@ static ENGINE_ERROR_CODE do_map_elem_link(map_meta_info *info, map_elem_item *el
}
#endif

/* overflow check */
assert(info->ovflact == OVFL_ERROR);
if (info->ccnt >= (info->mcnt > 0 ? info->mcnt : config->max_map_size)) {
return ENGINE_EOVERFLOW;
}

if (node->hcnt[hidx] >= MAP_MAX_HASHCHAIN_SIZE) {
map_hash_node *n_node = do_map_node_alloc(node->hdepth+1, cookie);
if (n_node == NULL) {
Expand All @@ -399,6 +407,8 @@ static ENGINE_ERROR_CODE do_map_elem_link(map_meta_info *info, map_elem_item *el
hidx = MAP_GET_HASHIDX(elem->hval, node->hdepth);
}

CLOG_MAP_ELEM_INSERT(info, NULL, elem);

elem->next = node->htab[hidx];
node->htab[hidx] = elem;
node->hcnt[hidx] += 1;
Expand Down Expand Up @@ -678,18 +688,12 @@ static uint32_t do_map_elem_get(map_meta_info *info,
}

static ENGINE_ERROR_CODE do_map_elem_insert(hash_item *it, map_elem_item *elem,
const bool replace_if_exist, const void *cookie)
const bool replace_if_exist, bool *replaced,
const void *cookie)
{
map_meta_info *info = (map_meta_info *)item_get_meta(it);
uint32_t real_mcnt = (info->mcnt > 0 ? info->mcnt : config->max_map_size);
ENGINE_ERROR_CODE ret;

/* overflow check */
assert(info->ovflact == OVFL_ERROR);
if (info->ccnt >= real_mcnt) {
return ENGINE_EOVERFLOW;
}

/* create the root hash node if it does not exist */
bool new_root_flag = false;
if (info->root == NULL) { /* empty map */
Expand All @@ -702,15 +706,14 @@ static ENGINE_ERROR_CODE do_map_elem_insert(hash_item *it, map_elem_item *elem,
}

/* insert the element */
ret = do_map_elem_link(info, elem, replace_if_exist, cookie);
ret = do_map_elem_link(info, elem, replace_if_exist, replaced, cookie);
if (ret != ENGINE_SUCCESS) {
if (new_root_flag) {
do_map_node_unlink(info, NULL, 0);
}
return ret;
}

CLOG_MAP_ELEM_INSERT(info, NULL, elem);
return ENGINE_SUCCESS;
}

Expand Down Expand Up @@ -776,14 +779,16 @@ void map_elem_release(map_elem_item **elem_array, const int elem_count)
}

ENGINE_ERROR_CODE map_elem_insert(const char *key, const uint32_t nkey,
map_elem_item *elem, item_attr *attrp,
bool *created, const void *cookie)
map_elem_item *elem, const bool replace_if_exist,
item_attr *attrp, bool *replaced, bool *created,
const void *cookie)
{
hash_item *it = NULL;
ENGINE_ERROR_CODE ret;
PERSISTENCE_ACTION_BEGIN(cookie, UPD_MAP_ELEM_INSERT);

*created = false;
*replaced = false;

LOCK_CACHE();
ret = do_map_item_find(key, nkey, DONT_UPDATE, &it);
Expand All @@ -799,7 +804,7 @@ ENGINE_ERROR_CODE map_elem_insert(const char *key, const uint32_t nkey,
}
}
if (ret == ENGINE_SUCCESS) {
ret = do_map_elem_insert(it, elem, false /* replace_if_exist */, cookie);
ret = do_map_elem_insert(it, elem, replace_if_exist, replaced, cookie);
if (ret != ENGINE_SUCCESS && *created) {
do_item_unlink(it, ITEM_UNLINK_NORMAL);
}
Expand Down Expand Up @@ -1111,6 +1116,7 @@ ENGINE_ERROR_CODE map_apply_elem_insert(void *engine, hash_item *it,
{
const char *key = item_get_key(it);
map_elem_item *elem;
bool replaced;
ENGINE_ERROR_CODE ret;

logger->log(ITEM_APPLY_LOG_LEVEL, NULL,
Expand All @@ -1133,7 +1139,7 @@ ENGINE_ERROR_CODE map_apply_elem_insert(void *engine, hash_item *it,
}
memcpy(elem->data, field, nfield + nbytes);

ret = do_map_elem_insert(it, elem, true /* replace_if_exist */, NULL);
ret = do_map_elem_insert(it, elem, true /* replace_if_exist */, &replaced, NULL);
if (ret != ENGINE_SUCCESS) {
do_map_elem_free(elem);
logger->log(EXTENSION_LOG_WARNING, NULL, "map_apply_elem_insert failed."
Expand Down
6 changes: 3 additions & 3 deletions engines/default/coll_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ void map_elem_free(map_elem_item *elem);
void map_elem_release(map_elem_item **elem_array, const int elem_count);

ENGINE_ERROR_CODE map_elem_insert(const char *key, const uint32_t nkey,
map_elem_item *elem,
item_attr *attrp,
bool *created, const void *cookie);
map_elem_item *elem, const bool replace_if_exist,
item_attr *attrp, bool *replaced, bool *created,
const void *cookie);

ENGINE_ERROR_CODE map_elem_update(const char *key, const uint32_t nkey,
const field_t *field,
Expand Down
7 changes: 5 additions & 2 deletions engines/default/default_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -736,14 +736,17 @@ default_map_elem_release(ENGINE_HANDLE* handle, const void *cookie,
static ENGINE_ERROR_CODE
default_map_elem_insert(ENGINE_HANDLE* handle, const void* cookie,
const void* key, const int nkey, eitem *eitem,
item_attr *attrp, bool *created, uint16_t vbucket)
const bool replace_if_exist, item_attr *attrp,
bool *replaced, bool *created, uint16_t vbucket)
{
struct default_engine *engine = get_handle(handle);
ENGINE_ERROR_CODE ret;
VBUCKET_GUARD(engine, vbucket);

ACTION_BEFORE_WRITE(cookie, key, nkey);
ret = map_elem_insert(key, nkey, (map_elem_item*)eitem, attrp, created, cookie);
ret = map_elem_insert(key, nkey, (map_elem_item*)eitem,
replace_if_exist, attrp,
replaced, created, cookie);
ACTION_AFTER_WRITE(cookie, engine, ret);
return ret;
}
Expand Down
3 changes: 2 additions & 1 deletion engines/demo/demo_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ Demo_map_elem_release(ENGINE_HANDLE* handle, const void *cookie,
static ENGINE_ERROR_CODE
Demo_map_elem_insert(ENGINE_HANDLE* handle, const void* cookie,
const void* key, const int nkey, eitem *eitem,
item_attr *attrp, bool *created, uint16_t vbucket)
const bool replace_if_exist, item_attr *attrp,
bool *replaced, bool *created, uint16_t vbucket)
{
return ENGINE_ENOTSUP;
}
Expand Down
2 changes: 2 additions & 0 deletions include/memcached/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,9 @@ extern "C" {
const void* key,
const int nkey,
eitem *eitem,
const bool replace_if_exist,
item_attr *attrp,
bool *replaced,
bool *created,
uint16_t vbucket);
ENGINE_ERROR_CODE (*map_elem_update)(ENGINE_HANDLE* handle,
Expand Down
1 change: 1 addition & 0 deletions include/memcached/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ extern "C" {
/* map operation */
OPERATION_MOP_CREATE = 0x70, /**< Map operation with create structure semantics */
OPERATION_MOP_INSERT, /**< Map operation with insert element semantics */
OPERATION_MOP_UPSERT, /**< Map operation with upsert element semantics */
OPERATION_MOP_UPDATE, /**< Map operation with update element semantics */
OPERATION_MOP_DELETE, /**< Map operation with delete element semantics */
OPERATION_MOP_GET, /**< Map operation with get element semantics */
Expand Down
39 changes: 27 additions & 12 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ static void conn_coll_eitem_free(conn *c)
break;
/* mop */
case OPERATION_MOP_INSERT:
case OPERATION_MOP_UPSERT:
mc_engine.v1->map_elem_free(mc_engine.v0, c, c->coll_eitem);
break;
case OPERATION_MOP_UPDATE:
Expand Down Expand Up @@ -1841,7 +1842,8 @@ static int make_mop_elem_response(char *bufptr, eitem_info *einfo)

static void process_mop_insert_complete(conn *c)
{
assert(c->coll_op == OPERATION_MOP_INSERT);
assert(c->coll_op == OPERATION_MOP_INSERT ||
c->coll_op == OPERATION_MOP_UPSERT);
assert(c->coll_eitem != NULL);
ENGINE_ERROR_CODE ret;

Expand All @@ -1855,8 +1857,11 @@ static void process_mop_insert_complete(conn *c)
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
bool created;
bool replaced;
bool replace_if_exist = (c->coll_op == OPERATION_MOP_UPSERT ? true : false);
ret = mc_engine.v1->map_elem_insert(mc_engine.v0, c, c->coll_key, c->coll_nkey,
c->coll_eitem, c->coll_attrp, &created, 0);
c->coll_eitem, replace_if_exist, c->coll_attrp,
&replaced, &created, 0);
CONN_CHECK_AND_SET_EWOULDBLOCK(ret, c);
if (settings.detail_enabled) {
stats_prefix_record_mop_insert(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS));
Expand All @@ -1865,8 +1870,12 @@ static void process_mop_insert_complete(conn *c)
switch (ret) {
case ENGINE_SUCCESS:
STATS_HITS(c, mop_insert, c->coll_key, c->coll_nkey);
if (created) out_string(c, "CREATED_STORED");
else out_string(c, "STORED");
if (replaced) {
out_string(c, "REPLACED");
} else {
if (created) out_string(c, "CREATED_STORED");
else out_string(c, "STORED");
}
break;
case ENGINE_KEY_ENOENT:
STATS_MISSES(c, mop_insert, c->coll_key, c->coll_nkey);
Expand Down Expand Up @@ -3167,7 +3176,8 @@ static void complete_update_ascii(conn *c)
else if (c->coll_op == OPERATION_SOP_INSERT) process_sop_insert_complete(c);
else if (c->coll_op == OPERATION_SOP_DELETE) process_sop_delete_complete(c);
else if (c->coll_op == OPERATION_SOP_EXIST) process_sop_exist_complete(c);
else if (c->coll_op == OPERATION_MOP_INSERT) process_mop_insert_complete(c);
else if (c->coll_op == OPERATION_MOP_INSERT ||
c->coll_op == OPERATION_MOP_UPSERT) process_mop_insert_complete(c);
else if (c->coll_op == OPERATION_MOP_UPDATE) process_mop_update_complete(c);
else if (c->coll_op == OPERATION_MOP_DELETE) process_mop_delete_complete(c);
else if (c->coll_op == OPERATION_MOP_GET) process_mop_get_complete(c);
Expand Down Expand Up @@ -11828,13 +11838,15 @@ static inline int get_efilter_from_tokens(token_t *tokens, const int ntokens, ef

static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, field_t *field, size_t vlen)
{
assert(cmd == (int)OPERATION_MOP_INSERT || cmd == (int)OPERATION_MOP_UPDATE);
assert(cmd == (int)OPERATION_MOP_INSERT ||
cmd == (int)OPERATION_MOP_UPSERT ||
cmd == (int)OPERATION_MOP_UPDATE);
eitem *elem = NULL;
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;

if (vlen > settings.max_element_bytes) {
ret = ENGINE_E2BIG;
} else if (cmd == OPERATION_MOP_INSERT) {
} else if (cmd == OPERATION_MOP_INSERT || cmd == OPERATION_MOP_UPSERT) {
ret = mc_engine.v1->map_elem_alloc(mc_engine.v0, c, key, nkey, field->length, vlen, &elem);
} else {
if ((elem = (eitem *)malloc(sizeof(value_item) + vlen)) == NULL)
Expand All @@ -11843,7 +11855,7 @@ static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey,
((value_item*)elem)->len = vlen;
}
if (ret == ENGINE_SUCCESS) {
if (cmd == OPERATION_MOP_INSERT) {
if (cmd == OPERATION_MOP_INSERT || cmd == OPERATION_MOP_UPSERT) {
mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_MAP, elem, &c->einfo);
ritem_set_first(c, CONN_RTYPE_EINFO, vlen);
} else {
Expand All @@ -11857,12 +11869,12 @@ static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey,
c->coll_field = *field;
conn_set_state(c, conn_nread);
} else {
if (cmd == OPERATION_MOP_INSERT) {
if (cmd == OPERATION_MOP_INSERT || cmd == OPERATION_MOP_UPSERT) {
if (settings.detail_enabled) {
stats_prefix_record_mop_insert(key, nkey, false);
}
STATS_CMD_NOKEY(c, mop_insert);
} else if (cmd == OPERATION_MOP_UPDATE) {
} else { /* OPERATION_MOP_UPDATE */
if (settings.detail_enabled) {
stats_prefix_record_mop_update(key, nkey, false);
}
Expand Down Expand Up @@ -11949,6 +11961,7 @@ static void process_mop_command(conn *c, token_t *tokens, const size_t ntokens)
char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
char *key = tokens[MOP_KEY_TOKEN].value;
size_t nkey = tokens[MOP_KEY_TOKEN].length;
int subcommid;

if (nkey > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
Expand All @@ -11957,7 +11970,9 @@ static void process_mop_command(conn *c, token_t *tokens, const size_t ntokens)
c->coll_key = key;
c->coll_nkey = nkey;

if ((ntokens >= 6 && ntokens <= 13) && (strcmp(subcommand,"insert") == 0))
if ((ntokens >= 6 && ntokens <= 13) &&
((strcmp(subcommand,"insert") == 0 && (subcommid = (int)OPERATION_MOP_INSERT)) ||
(strcmp(subcommand,"upsert") == 0 && (subcommid = (int)OPERATION_MOP_UPSERT)) ))
{
field_t field;
int32_t vlen;
Expand Down Expand Up @@ -12002,7 +12017,7 @@ static void process_mop_command(conn *c, token_t *tokens, const size_t ntokens)
}

if (check_and_handle_pipe_state(c)) {
process_mop_prepare_nread(c, (int)OPERATION_MOP_INSERT, key, nkey, &field, vlen);
process_mop_prepare_nread(c, subcommid, key, nkey, &field, vlen);
} else { /* pipe error */
c->sbytes = vlen;
conn_set_state(c, conn_swallow);
Expand Down
86 changes: 86 additions & 0 deletions t/coll_mop_upsert.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/perl

use strict;
use Test::More tests => 14;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;

=head
get kvkey
get mkey1
mop upsert mkey1 field1 6 create 11 0 0
datum9
mop upsert mkey1 field2 6
datum7
setattr mkey1 maxcount=2
mop upsert mkey1 field1 6
datum5
mop get mkey1 13 2
field1 field2
mop upsert mkey2 field1 6
datum2
set kvkey 0 0 6
datumx
mop upsert kvkey field1 6
datum2
mop upsert mkey1 field1 6
datum4
setattr mkey1 maxcount=4000
delete kvkey
delete mkey1
=cut

my $engine = shift;
my $server = get_memcached($engine);
my $sock = $server->sock;

my $cmd;
my $val;
my $rst;

# Initialize
$cmd = "get kvkey"; $rst = "END";
mem_cmd_is($sock, $cmd, "", $rst);
$cmd = "get mkey1"; $rst = "END";
mem_cmd_is($sock, $cmd, "", $rst);

# Success Cases
$cmd = "mop upsert mkey1 field1 6 create 11 0 0"; $val = "datum9"; $rst = "CREATED_STORED";
mem_cmd_is($sock, $cmd, $val, $rst);
$cmd = "mop upsert mkey1 field2 6"; $val = "datum7"; $rst = "STORED";
mem_cmd_is($sock, $cmd, $val, $rst);
$cmd = "setattr mkey1 maxcount=2"; $rst = "OK";
mem_cmd_is($sock, $cmd, "", $rst);
$cmd = "mop upsert mkey1 field1 6"; $val = "datum0"; $rst = "REPLACED";
mem_cmd_is($sock, $cmd, $val, $rst);
$cmd = "mop get mkey1 13 2"; $val = "field1 field2";
$rst = "VALUE 11 2
field1 6 datum0
field2 6 datum7
END";
mem_cmd_is($sock, $cmd, $val, $rst);

# Fail Cases
$cmd = "mop upsert mkey2 field1 6"; $val = "datum2"; $rst = "NOT_FOUND";
mem_cmd_is($sock, $cmd, $val, $rst);
$cmd = "set kvkey 0 0 6"; $val = "datumx"; $rst = "STORED";
mem_cmd_is($sock, $cmd, $val, $rst);
$cmd = "mop upsert kvkey field1 6"; $val = "datum2"; $rst = "TYPE_MISMATCH";
mem_cmd_is($sock, $cmd, $val, $rst);
$cmd = "mop upsert mkey1 field3 6"; $val = "datum5"; $rst = "OVERFLOWED";
mem_cmd_is($sock, $cmd, $val, $rst);
$cmd = "setattr mkey1 maxcount=4000"; $rst = "OK";
mem_cmd_is($sock, $cmd, "", $rst);

# Finalize
$cmd = "delete kvkey"; $rst = "DELETED";
mem_cmd_is($sock, $cmd, "", $rst);
$cmd = "delete mkey1"; $rst = "DELETED";
mem_cmd_is($sock, $cmd, "", $rst);

# after test
release_memcached($engine, $server);

Loading

0 comments on commit f1995f0

Please sign in to comment.