diff --git a/include/raft.h b/include/raft.h
index 5da6435c..865bda8f 100644
--- a/include/raft.h
+++ b/include/raft.h
@@ -206,7 +206,7 @@ typedef int (
);
/** Callback for saving who we voted for to disk.
- * This callback MUST flush the change to disk.
+ * For safety reasons this callback MUST flush the change to disk.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] voted_for The node we voted for
@@ -220,16 +220,21 @@ typedef int (
);
/** Callback for saving log entry changes.
+ *
* This callback is used for:
*
* - Adding entries to the log (ie. offer)
*
- Removing the first entry from the log (ie. polling)
*
- Removing the last entry from the log (ie. popping)
*
- * This callback MUST flush the change to disk.
+ *
+ * For safety reasons this callback MUST flush the change to disk.
+ *
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
- * @param[in] entry The entry that the event is happening to
+ * @param[in] entry The entry that the event is happening to.
+ * The user is allowed to change the memory pointed to in the
+ * raft_entry_data_t struct. This MUST be done if the memory is temporary.
* @param[in] entry_idx The entries index in the log
* @return 0 on success */
typedef int (
@@ -253,23 +258,27 @@ typedef struct
func_applylog_f applylog;
/** Callback for persisting vote data
- * This callback MUST flush the change to disk. */
+ * For safety reasons this callback MUST flush the change to disk. */
func_persist_int_f persist_vote;
/** Callback for persisting term data
- * This callback MUST flush the change to disk. */
+ * For safety reasons this callback MUST flush the change to disk. */
func_persist_int_f persist_term;
/** Callback for adding an entry to the log
- * This callback MUST flush the change to disk. */
+ * For safety reasons this callback MUST flush the change to disk. */
func_logentry_event_f log_offer;
/** Callback for removing the oldest entry from the log
- * This callback MUST flush the change to disk. */
+ * For safety reasons this callback MUST flush the change to disk.
+ * @note If memory was malloc'd in log_offer then this should be the right
+ * time to free the memory. */
func_logentry_event_f log_poll;
/** Callback for removing the youngest entry from the log
- * This callback MUST flush the change to disk. */
+ * For safety reasons this callback MUST flush the change to disk.
+ * @note If memory was malloc'd in log_offer then this should be the right
+ * time to free the memory. */
func_logentry_event_f log_pop;
/** Callback for catching debugging log messages
@@ -318,11 +327,11 @@ __attribute__ ((deprecated));
/** Add node.
*
* @note This library does not yet support membership changes.
- * Once raft_periodic has been run this will fail.
+ * Once raft_periodic has been run this will fail.
*
* @note The order this call is made is important.
- * This call MUST be made in the same order as the other raft nodes.
- * This is because the node ID is assigned depending on when this call is made
+ * This call MUST be made in the same order as the other raft nodes.
+ * This is because the node ID is assigned depending on when this call is made
*
* @param[in] user_data The user data for the node.
* This is obtained using raft_node_get_udata.
@@ -351,7 +360,18 @@ void raft_set_request_timeout(raft_server_t* me, int msec);
int raft_periodic(raft_server_t* me, int msec_elapsed);
/** Receive an appendentries message.
- * This function will block if it needs to append the message.
+ *
+ * Will block (ie. by syncing to disk) if we need to append a message.
+ *
+ * Might call malloc once to increase the log entry array size.
+ *
+ * The log_offer callback will be called.
+ *
+ * @note The memory pointer (ie. raft_entry_data_t) for each msg_entry_t is
+ * copied directly. If the memory is temporary you MUST either make the
+ * memory permanent (ie. via malloc) OR re-assign the memory within the
+ * log_offer callback.
+ *
* @param[in] node Index of the node who sent us this message
* @param[in] ae The appendentries message
* @param[out] r The resulting response
@@ -387,11 +407,20 @@ int raft_recv_requestvote_response(raft_server_t* me,
int node,
msg_requestvote_response_t* r);
-/** Receive an entry message from client.
+/** Receive an entry message from the client.
*
* Append the entry to the log and send appendentries to followers.
*
- * This function will block if it needs to append the message.
+ * Will block (ie. by syncing to disk) if we need to append a message.
+ *
+ * Might call malloc once to increase the log entry array size.
+ *
+ * The log_offer callback will be called.
+ *
+ * @note The memory pointer (ie. raft_entry_data_t) in msg_entry_t is
+ * copied directly. If the memory is temporary you MUST either make the
+ * memory permanent (ie. via malloc) OR re-assign the memory within the
+ * log_offer callback.
*
* Will fail:
*
diff --git a/src/raft_server.c b/src/raft_server.c
index a1d1c83a..5682de96 100644
--- a/src/raft_server.c
+++ b/src/raft_server.c
@@ -314,14 +314,11 @@ int raft_recv_appendentries(
{
msg_entry_t* cmd = &ae->entries[i];
- /* TODO: replace malloc with mempoll/arena */
- raft_entry_t* c = (raft_entry_t*)malloc(sizeof(raft_entry_t));
- c->term = cmd->term;
- memcpy(&c->data, &cmd->data, sizeof(raft_entry_data_t));
- c->data.buf = (unsigned char*)malloc(cmd->data.len);
- memcpy(c->data.buf, cmd->data.buf, cmd->data.len);
- c->id = cmd->id;
- int e = raft_append_entry(me_, c);
+ raft_entry_t ety;
+ ety.term = cmd->term;
+ ety.id = cmd->id;
+ memcpy(&ety.data, &cmd->data, sizeof(raft_entry_data_t));
+ int e = raft_append_entry(me_, &ety);
if (-1 == e)
{
__log(me_, "AE failure; couldn't append entry");
@@ -423,7 +420,6 @@ int raft_recv_entry(raft_server_t* me_, int node, msg_entry_t* e,
msg_entry_response_t *r)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
- raft_entry_t ety;
int i;
if (!raft_is_leader(me_))
@@ -431,11 +427,10 @@ int raft_recv_entry(raft_server_t* me_, int node, msg_entry_t* e,
__log(me_, "received entry from: %d", node);
+ raft_entry_t ety;
ety.term = me->current_term;
ety.id = e->id;
- ety.data.len = e->data.len;
- ety.data.buf = malloc(e->data.len);
- memcpy(ety.data.buf, e->data.buf, e->data.len);
+ memcpy(&ety.data, &e->data, sizeof(raft_entry_data_t));
raft_append_entry(me_, &ety);
for (i = 0; i < me->num_nodes; i++)
if (me->nodeid != i)
@@ -463,10 +458,10 @@ int raft_send_requestvote(raft_server_t* me_, int node)
return 0;
}
-int raft_append_entry(raft_server_t* me_, raft_entry_t* c)
+int raft_append_entry(raft_server_t* me_, raft_entry_t* ety)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
- return log_append_entry(me->log, c);
+ return log_append_entry(me->log, ety);
}
int raft_apply_entry(raft_server_t* me_)