Skip to content

Commit

Permalink
dev: clientlib: add macros for clarification
Browse files Browse the repository at this point in the history
  • Loading branch information
dmuhamedagic committed Apr 26, 2018
1 parent 6aaf252 commit 6c9e80e
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions clientlib/fsprotocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ FSTATIC void _fsprotocol_flush_pending_connshut(FsProtoElem* fspe);
#define AUDITFSPE(fspe) { if (fspe) _fsprotocol_auditfspe(fspe, __FUNCTION__, __LINE__); }
#define AUDITIREADY(self) {_fsprotocol_auditiready(__FUNCTION__, __LINE__, self);}

#define outq_len(fspe) (fspe)->outq->_q->length
#define is_outq_empty(fspe) (outq_len(fspe)== 0)
#define set_pending(fspe) { \
(fspe)->parent->unacked = g_list_prepend((fspe)->parent->unacked, fspe); \
}
#define reset_pending(fspe) { \
(fspe)->parent->unacked = g_list_remove((fspe)->parent->unacked, fspe); \
}
#define is_pending(fspe) (g_list_find((fspe)->parent->unacked, fspe) != NULL)


DEBUGDECLARATIONS
/// @defgroup FsProtocol FsProtocol class
Expand Down Expand Up @@ -466,7 +476,7 @@ _fsprotocol_fsa(FsProtoElem* fspe, ///< The FSPE we're processing
fspe->finalizetimer = g_timeout_add_seconds(1+parent->acktimeout/1000000, _fsprotocol_finalizetimer, fspe);
}
// Check for possible errors in our FSA tables...
if (FSPR_NONE == nextstate && curstate != nextstate && fspe->outq->_q->length != 0) {
if (FSPR_NONE == nextstate && curstate != nextstate && !is_outq_empty(fspe)) {
char * deststr = fspe->endpoint->baseclass.toString(&fspe->endpoint->baseclass);
g_critical("%s.%d: Inappropriate transition for %s to state NONE"
": (%s, %s) => %s. Actions=[%s], outq length=%d"
Expand All @@ -475,7 +485,7 @@ _fsprotocol_fsa(FsProtoElem* fspe, ///< The FSPE we're processing
, _fsprotocol_fsa_inputs(input)
, _fsprotocol_fsa_states(nextstate)
, _fsprotocol_fsa_actions(action)
, fspe->outq->_q->length);
, outq_len(fspe));
FREE(deststr); deststr = NULL;
_fsprotocol_fsa_log_history(fspe, curstate, nextstate, input, action);
}
Expand Down Expand Up @@ -514,25 +524,22 @@ _fsprotocol_flush_pending_connshut(FsProtoElem* fspe)
FSTATIC void
_fsprotocol_auditfspe(const FsProtoElem* self, const char * function, int lineno)
{
guint outqlen = self->outq->_q->length;
FsProtocol* parent = self->parent;
gboolean in_unackedlist = (g_list_find(parent->unacked, self) != NULL);
guint64 now = g_get_monotonic_time();

if (outqlen != 0 && !in_unackedlist) {
g_critical("%s:%d: outqlen is %d but not in unacked list"
, function, lineno, outqlen);
if (!is_outq_empty(self) && !is_pending(self)) {
g_critical("%s:%d: outqlen is %d but not in the pending list"
, function, lineno, outq_len(self));
DUMP("WARN: previous unacked warning was for this address",
&self->endpoint->baseclass, NULL);
}
if (outqlen == 0 && in_unackedlist) {
g_critical("%s:%d: outqlen is zero but it IS in the unacked list"
if (is_outq_empty(self) && is_pending(self)) {
g_critical("%s:%d: outqlen is zero but it IS in the pending list"
, function, lineno);
DUMP("WARN: previous unacked warning was for this address",
&self->endpoint->baseclass, NULL);
}
// If something is hung, it should start complaining soon...
if (in_unackedlist && now > (self->nextrexmit + self->parent->rexmit_interval)) {
if (!is_pending(self) && now > (self->nextrexmit + self->parent->rexmit_interval)) {
g_critical("%s:%d: Overdue retransmissions in FSPE %p", function, lineno, self);
DUMP("WARN: previous overdue warning was for this IP addr",
&self->endpoint->baseclass, NULL);
Expand Down Expand Up @@ -780,7 +787,7 @@ _fsprotocol_fspe_reinit(FsProtoElem* self)
if (!g_queue_is_empty(self->outq->_q)) {
DUMP3("REINIT OF OUTQ", &self->outq->baseclass, __FUNCTION__);
self->outq->flush(self->outq);
self->parent->unacked = g_list_remove(self->parent->unacked, self);
reset_pending(self);
self->outq->isready = FALSE;
}
// See the code in _fsqueue_enq and also in seqnoframe_new_init for how all these pieces
Expand Down Expand Up @@ -1143,8 +1150,8 @@ _fsprotocol_receive(FsProtocol* self ///< Self pointer
// and got a duplicate ACK
DUMP3("Received bad ACK from", &fspe->endpoint->baseclass, NULL);
DUMP3(__FUNCTION__, &fs->baseclass, " was ACK received.");
}else if (fspe->outq->_q->length == 0) {
fspe->parent->unacked = g_list_remove(fspe->parent->unacked, fspe);
}else if (is_outq_empty(fspe)) {
reset_pending(fspe);
fspe->nextrexmit = 0;
TRYXMIT(fspe);
fspe->acktimeout = 0;
Expand Down Expand Up @@ -1265,11 +1272,11 @@ _fsprotocol_send1(FsProtocol* self ///< Our object
DEBUGMSG3("%s.%d: calling fsprotocol_fsa(FSPROTO_REQSEND)", __FUNCTION__, __LINE__);
_fsprotocol_fsa(fspe, FSPROTO_REQSEND, NULL);

if (fspe->outq->_q->length == 0) {
if (is_outq_empty(fspe)) {
guint64 now = g_get_monotonic_time();
///@todo: This might be slow if we send a lot of packets to an endpoint
/// before getting a response, but that's not very likely.
fspe->parent->unacked = g_list_prepend(fspe->parent->unacked, fspe);
set_pending(fspe);
fspe->nextrexmit = now + self->rexmit_interval;
fspe->acktimeout = now + self->acktimeout;
}
Expand Down Expand Up @@ -1395,28 +1402,26 @@ FSTATIC void
_fsprotocol_xmitifwecan(FsProtoElem* fspe) ///< The FrameSet protocol element to operate on
{
GList* qelem;
FsQueue* outq;
FsProtocol* parent;
SeqnoFrame* lastseq;
NetIO* io;
guint orig_outstanding;
guint orig_pending;
gint64 now;

g_return_if_fail(fspe != NULL);
outq = fspe->outq;
parent = fspe->parent;
lastseq = fspe->lastseqsent;
io = parent->io;
orig_outstanding = fspe->outq->_q->length;
orig_pending = outq_len(fspe);

AUDITFSPE(fspe);
// Look for any new packets that might have showed up to send
// Check to see if we've exceeded our window size...
if (fspe->outq->_q->length < parent->window_size) {
if (outq_len(fspe) < parent->window_size) {
// Nope. Look for packets that we haven't yet sent.
// This code is sub-optimal when congestion occurs and we have a larger
// window size (i.e. when we have a number of un-ACKed packets)
for (qelem=outq->_q->head; NULL != qelem; qelem=qelem->next) {
for (qelem=fspe->outq->_q->head; NULL != qelem; qelem=qelem->next) {
FrameSet* fs = CASTTOCLASS(FrameSet, qelem->data);
SeqnoFrame* seq = fs->getseqno(fs);
if (NULL != lastseq && NULL != seq && seq->compare(seq, lastseq) <= 0) {
Expand All @@ -1437,20 +1442,20 @@ _fsprotocol_xmitifwecan(FsProtoElem* fspe) ///< The FrameSet protocol element to
}
lastseq = fspe->lastseqsent = seq;
REF2(lastseq);
if (fspe->outq->_q->length >= parent->window_size) {
if (outq_len(fspe) >= parent->window_size) {
break;
}
}
}
AUDITFSPE(fspe);
now = g_get_monotonic_time();

if (fspe->nextrexmit == 0 && fspe->outq->_q->length > 0) {
if (fspe->nextrexmit == 0 && !is_outq_empty(fspe)) {
// Next retransmission time not yet set...
fspe->nextrexmit = now + parent->rexmit_interval;
AUDITFSPE(fspe);
} else if (fspe->nextrexmit != 0 && now > fspe->nextrexmit) {
FrameSet* fs = outq->qhead(outq);
FrameSet* fs = fspe->outq->qhead(fspe->outq);
// It's time to retransmit something. Hurray!
if (NULL != fs) {
// Update next retransmission time...
Expand All @@ -1472,9 +1477,9 @@ _fsprotocol_xmitifwecan(FsProtoElem* fspe) ///< The FrameSet protocol element to
}

// Make sure we remember to check this periodicially for retransmits...
if (orig_outstanding == 0 && fspe->outq->_q->length > 0) {
if (orig_pending == 0 && !is_outq_empty(fspe)) {
// Put 'fspe' on the list of fspe's with unacked packets
fspe->parent->unacked = g_list_prepend(fspe->parent->unacked, fspe);
set_pending(fspe);
// See comment in the _send function regarding eventual efficiency concerns
}
AUDITFSPE(fspe);
Expand Down

0 comments on commit 6c9e80e

Please sign in to comment.