Skip to content

Commit

Permalink
FEATURE: Add an upsert commnad for map structure
Browse files Browse the repository at this point in the history
  • Loading branch information
ing-eoking committed Jun 24, 2024
1 parent 3e4ba98 commit c1dfbac
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 24 deletions.
20 changes: 13 additions & 7 deletions engines/default/coll_map.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ static void do_map_elem_replace(map_meta_info *info,
}

static ENGINE_ERROR_CODE do_map_elem_link(map_meta_info *info, map_elem_item *elem,
const bool replace_if_exist, const void *cookie)
const bool replace_if_exist, bool *replaced,
const void *cookie)
{
assert(info->root != NULL);
map_hash_node *node = info->root;
Expand All @@ -339,6 +340,8 @@ static ENGINE_ERROR_CODE do_map_elem_link(map_meta_info *info, map_elem_item *el
map_prev_info pinfo;
ENGINE_ERROR_CODE res = ENGINE_SUCCESS;

if (replaced) *replaced = false;

int hidx = -1;

/* map hash value */
Expand Down Expand Up @@ -373,6 +376,7 @@ static ENGINE_ERROR_CODE do_map_elem_link(map_meta_info *info, map_elem_item *el
pinfo.prev = prev;
pinfo.hidx = hidx;
do_map_elem_replace(info, &pinfo, elem);
if (replaced) *replaced = true;
return ENGINE_SUCCESS;
} else {
return ENGINE_ELEM_EEXISTS;
Expand Down Expand Up @@ -678,7 +682,8 @@ static uint32_t do_map_elem_get(map_meta_info *info,
}

static ENGINE_ERROR_CODE do_map_elem_insert(hash_item *it, map_elem_item *elem,
const bool replace_if_exist, const void *cookie)
const bool replace_if_exist,
bool *replaced, const void *cookie)
{
map_meta_info *info = (map_meta_info *)item_get_meta(it);
uint32_t real_mcnt = (info->mcnt > 0 ? info->mcnt : config->max_map_size);
Expand All @@ -702,7 +707,7 @@ static ENGINE_ERROR_CODE do_map_elem_insert(hash_item *it, map_elem_item *elem,
}

/* insert the element */
ret = do_map_elem_link(info, elem, replace_if_exist, cookie);
ret = do_map_elem_link(info, elem, replace_if_exist, replaced, cookie);
if (ret != ENGINE_SUCCESS) {
if (new_root_flag) {
do_map_node_unlink(info, NULL, 0);
Expand Down Expand Up @@ -776,8 +781,8 @@ void map_elem_release(map_elem_item **elem_array, const int elem_count)
}

ENGINE_ERROR_CODE map_elem_insert(const char *key, const uint32_t nkey,
map_elem_item *elem, item_attr *attrp,
bool *created, const void *cookie)
map_elem_item *elem, const bool replace_if_exist, item_attr *attrp,
bool *replaced, bool *created, const void *cookie)
{
hash_item *it = NULL;
ENGINE_ERROR_CODE ret;
Expand All @@ -799,7 +804,7 @@ ENGINE_ERROR_CODE map_elem_insert(const char *key, const uint32_t nkey,
}
}
if (ret == ENGINE_SUCCESS) {
ret = do_map_elem_insert(it, elem, false /* replace_if_exist */, cookie);
ret = do_map_elem_insert(it, elem, replace_if_exist, replaced, cookie);
if (ret != ENGINE_SUCCESS && *created) {
do_item_unlink(it, ITEM_UNLINK_NORMAL);
}
Expand Down Expand Up @@ -1111,6 +1116,7 @@ ENGINE_ERROR_CODE map_apply_elem_insert(void *engine, hash_item *it,
{
const char *key = item_get_key(it);
map_elem_item *elem;
bool replaced;
ENGINE_ERROR_CODE ret;

logger->log(ITEM_APPLY_LOG_LEVEL, NULL,
Expand All @@ -1133,7 +1139,7 @@ ENGINE_ERROR_CODE map_apply_elem_insert(void *engine, hash_item *it,
}
memcpy(elem->data, field, nfield + nbytes);

ret = do_map_elem_insert(it, elem, true /* replace_if_exist */, NULL);
ret = do_map_elem_insert(it, elem, true /* replace_if_exist */, &replaced, NULL);
if (ret != ENGINE_SUCCESS) {
do_map_elem_free(elem);
logger->log(EXTENSION_LOG_WARNING, NULL, "map_apply_elem_insert failed."
Expand Down
6 changes: 3 additions & 3 deletions engines/default/coll_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ void map_elem_free(map_elem_item *elem);
void map_elem_release(map_elem_item **elem_array, const int elem_count);

ENGINE_ERROR_CODE map_elem_insert(const char *key, const uint32_t nkey,
map_elem_item *elem,
item_attr *attrp,
bool *created, const void *cookie);
map_elem_item *elem, const bool replace_if_exist,
item_attr *attrp, bool *replaced, bool *created,
const void *cookie);

ENGINE_ERROR_CODE map_elem_update(const char *key, const uint32_t nkey,
const field_t *field,
Expand Down
7 changes: 5 additions & 2 deletions engines/default/default_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -736,14 +736,17 @@ default_map_elem_release(ENGINE_HANDLE* handle, const void *cookie,
static ENGINE_ERROR_CODE
default_map_elem_insert(ENGINE_HANDLE* handle, const void* cookie,
const void* key, const int nkey, eitem *eitem,
item_attr *attrp, bool *created, uint16_t vbucket)
const bool replace_if_exist, item_attr *attrp,
bool *replaced, bool *created, uint16_t vbucket)
{
struct default_engine *engine = get_handle(handle);
ENGINE_ERROR_CODE ret;
VBUCKET_GUARD(engine, vbucket);

ACTION_BEFORE_WRITE(cookie, key, nkey);
ret = map_elem_insert(key, nkey, (map_elem_item*)eitem, attrp, created, cookie);
ret = map_elem_insert(key, nkey, (map_elem_item*)eitem,
replace_if_exist, attrp,
replaced, created, cookie);
ACTION_AFTER_WRITE(cookie, engine, ret);
return ret;
}
Expand Down
3 changes: 2 additions & 1 deletion engines/demo/demo_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ Demo_map_elem_release(ENGINE_HANDLE* handle, const void *cookie,
static ENGINE_ERROR_CODE
Demo_map_elem_insert(ENGINE_HANDLE* handle, const void* cookie,
const void* key, const int nkey, eitem *eitem,
item_attr *attrp, bool *created, uint16_t vbucket)
const bool replace_if_exist, item_attr *attrp,
bool *replaced, bool *created, uint16_t vbucket)
{
return ENGINE_ENOTSUP;
}
Expand Down
2 changes: 2 additions & 0 deletions include/memcached/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,9 @@ extern "C" {
const void* key,
const int nkey,
eitem *eitem,
const bool replace_if_exist,
item_attr *attrp,
bool *replaced,
bool *created,
uint16_t vbucket);
ENGINE_ERROR_CODE (*map_elem_update)(ENGINE_HANDLE* handle,
Expand Down
1 change: 1 addition & 0 deletions include/memcached/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ extern "C" {
/* map operation */
OPERATION_MOP_CREATE = 0x70, /**< Map operation with create structure semantics */
OPERATION_MOP_INSERT, /**< Map operation with insert element semantics */
OPERATION_MOP_UPSERT, /**< Map operation with upsert element semantics */
OPERATION_MOP_UPDATE, /**< Map operation with update element semantics */
OPERATION_MOP_DELETE, /**< Map operation with delete element semantics */
OPERATION_MOP_GET, /**< Map operation with get element semantics */
Expand Down
33 changes: 22 additions & 11 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -1839,7 +1839,8 @@ static int make_mop_elem_response(char *bufptr, eitem_info *einfo)

static void process_mop_insert_complete(conn *c)
{
assert(c->coll_op == OPERATION_MOP_INSERT);
assert(c->coll_op == OPERATION_MOP_INSERT ||
c->coll_op == OPERATION_MOP_UPSERT);
assert(c->coll_eitem != NULL);
ENGINE_ERROR_CODE ret;

Expand All @@ -1853,8 +1854,11 @@ static void process_mop_insert_complete(conn *c)
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
bool created;
bool replaced;
bool replace_if_exist = (c->coll_op == OPERATION_MOP_UPSERT ? true : false);
ret = mc_engine.v1->map_elem_insert(mc_engine.v0, c, c->coll_key, c->coll_nkey,
c->coll_eitem, c->coll_attrp, &created, 0);
c->coll_eitem, replace_if_exist, c->coll_attrp,
&replaced, &created, 0);
CONN_CHECK_AND_SET_EWOULDBLOCK(ret, c);
if (settings.detail_enabled) {
stats_prefix_record_mop_insert(c->coll_key, c->coll_nkey, (ret==ENGINE_SUCCESS));
Expand All @@ -1863,8 +1867,12 @@ static void process_mop_insert_complete(conn *c)
switch (ret) {
case ENGINE_SUCCESS:
STATS_HITS(c, mop_insert, c->coll_key, c->coll_nkey);
if (created) out_string(c, "CREATED_STORED");
else out_string(c, "STORED");
if (replaced) {
out_string(c, "REPLACED");
} else {
if (created) out_string(c, "CREATED_STORED");
else out_string(c, "STORED");
}
break;
case ENGINE_KEY_ENOENT:
STATS_MISSES(c, mop_insert, c->coll_key, c->coll_nkey);
Expand Down Expand Up @@ -3165,7 +3173,8 @@ static void complete_update_ascii(conn *c)
else if (c->coll_op == OPERATION_SOP_INSERT) process_sop_insert_complete(c);
else if (c->coll_op == OPERATION_SOP_DELETE) process_sop_delete_complete(c);
else if (c->coll_op == OPERATION_SOP_EXIST) process_sop_exist_complete(c);
else if (c->coll_op == OPERATION_MOP_INSERT) process_mop_insert_complete(c);
else if (c->coll_op == OPERATION_MOP_INSERT ||
c->coll_op == OPERATION_MOP_UPSERT) process_mop_insert_complete(c);
else if (c->coll_op == OPERATION_MOP_UPDATE) process_mop_update_complete(c);
else if (c->coll_op == OPERATION_MOP_DELETE) process_mop_delete_complete(c);
else if (c->coll_op == OPERATION_MOP_GET) process_mop_get_complete(c);
Expand Down Expand Up @@ -11826,13 +11835,12 @@ static inline int get_efilter_from_tokens(token_t *tokens, const int ntokens, ef

static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey, field_t *field, size_t vlen)
{
assert(cmd == (int)OPERATION_MOP_INSERT || (int)OPERATION_MOP_UPDATE);
eitem *elem = NULL;
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;

if (vlen > settings.max_element_bytes) {
ret = ENGINE_E2BIG;
} else if (cmd == OPERATION_MOP_INSERT) {
} else if (cmd == OPERATION_MOP_INSERT || cmd == OPERATION_MOP_UPSERT) {
ret = mc_engine.v1->map_elem_alloc(mc_engine.v0, c, key, nkey, field->length, vlen, &elem);
} else {
if ((elem = (eitem *)malloc(sizeof(value_item) + vlen)) == NULL)
Expand All @@ -11841,7 +11849,7 @@ static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey,
((value_item*)elem)->len = vlen;
}
if (ret == ENGINE_SUCCESS) {
if (cmd == OPERATION_MOP_INSERT) {
if (cmd == OPERATION_MOP_INSERT || cmd == OPERATION_MOP_UPSERT) {
mc_engine.v1->get_elem_info(mc_engine.v0, c, ITEM_TYPE_MAP, elem, &c->einfo);
ritem_set_first(c, CONN_RTYPE_EINFO, vlen);
} else {
Expand All @@ -11858,7 +11866,7 @@ static void process_mop_prepare_nread(conn *c, int cmd, char *key, size_t nkey,
if (settings.detail_enabled) {
stats_prefix_record_mop_insert(key, nkey, false);
}
if (cmd == OPERATION_MOP_INSERT) {
if (cmd == OPERATION_MOP_INSERT || cmd == OPERATION_MOP_UPSERT) {
STATS_CMD_NOKEY(c, mop_insert);
} else if (cmd == OPERATION_MOP_UPDATE) {
STATS_CMD_NOKEY(c, mop_update);
Expand Down Expand Up @@ -11944,6 +11952,7 @@ static void process_mop_command(conn *c, token_t *tokens, const size_t ntokens)
char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
char *key = tokens[MOP_KEY_TOKEN].value;
size_t nkey = tokens[MOP_KEY_TOKEN].length;
int subcommid;

if (nkey > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
Expand All @@ -11952,7 +11961,9 @@ static void process_mop_command(conn *c, token_t *tokens, const size_t ntokens)
c->coll_key = key;
c->coll_nkey = nkey;

if ((ntokens >= 6 && ntokens <= 13) && (strcmp(subcommand,"insert") == 0))
if ((ntokens >= 6 && ntokens <= 13) &&
((strcmp(subcommand,"insert") == 0 && (subcommid = (int)OPERATION_MOP_INSERT)) ||
(strcmp(subcommand,"upsert") == 0 && (subcommid = (int)OPERATION_MOP_UPSERT)) ))
{
field_t field;
int32_t vlen;
Expand Down Expand Up @@ -11997,7 +12008,7 @@ static void process_mop_command(conn *c, token_t *tokens, const size_t ntokens)
}

if (check_and_handle_pipe_state(c)) {
process_mop_prepare_nread(c, (int)OPERATION_MOP_INSERT, key, nkey, &field, vlen);
process_mop_prepare_nread(c, subcommid, key, nkey, &field, vlen);
} else { /* pipe error */
c->sbytes = vlen;
conn_set_state(c, conn_swallow);
Expand Down

0 comments on commit c1dfbac

Please sign in to comment.