Skip to content

Commit

Permalink
Fix teardown deadlock between receive close and send close
Browse files Browse the repository at this point in the history
  • Loading branch information
tvegas1 authored and bureddy committed Oct 31, 2023
1 parent 4ccb98a commit 17b32e3
Showing 1 changed file with 37 additions and 19 deletions.
56 changes: 37 additions & 19 deletions src/ucx_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,14 @@ struct nccl_ucx_worker {
int count; /* number of connections that uses this worker */
struct ep_list *eps; /* oob conection to all endpoints that were opened on this worker */
ucp_tag_t last_tag; /* tag that last created connection uses */

struct nccl_ucx_worker *next; /* teardown list */
};
static struct nccl_ucx_worker workers[MAX_IB_DEVS];

static struct nccl_ucx_worker *worker_gc_list = NULL;
static int worker_count = 0;

typedef struct ucx_gpu_flush {
int enabled;
int hostMem;
Expand Down Expand Up @@ -270,12 +275,10 @@ static ncclResult_t ucx_worker_get_netaddress(ucp_worker_h worker,
return ncclSuccess;
}

#define UCX_SHARED_WORKER
static ncclResult_t ucx_get_ctx_and_worker(int dev, ucp_context_h *ctx,
ucp_worker_h *worker,
ucp_tag_t *newtag) {
pthread_mutex_lock(&nccl_ucx_lock);
#ifdef UCX_SHARED_WORKER
if (ncclNIbDevs < dev) {
WARN("Device index is too large");
return ncclSystemError;
Expand All @@ -285,6 +288,7 @@ static ncclResult_t ucx_get_ctx_and_worker(int dev, ucp_context_h *ctx,
ucx_init_context(&workers[dev].ctx, dev);
ucx_init_worker(workers[dev].ctx, &workers[dev].worker);
workers[dev].last_tag = tag;
worker_count++;
}

*ctx = workers[dev].ctx;
Expand All @@ -297,46 +301,60 @@ static ncclResult_t ucx_get_ctx_and_worker(int dev, ucp_context_h *ctx,

ucp_worker_progress(*worker);
workers[dev].count++;
#else
ucx_init_context(ctx, dev);
ucx_init_worker(*ctx, worker);
if (newtag != NULL) {
*newtag = tag;
}
#endif
pthread_mutex_unlock(&nccl_ucx_lock);
return ncclSuccess;
}

static ncclResult_t nccl_ucx_free_worker(ucp_worker_h worker) {
int i, dummy, count;
int i, dummy, count, done = 0;
struct ep_list *ep, *cur;
struct nccl_ucx_worker *ucx_worker, *next;
ncclResult_t result;

pthread_mutex_lock(&nccl_ucx_lock);
for(i = 0; i < ncclNIbDevs; i++) {
if (workers[i].count > 0 && worker == workers[i].worker) {
count = --workers[i].count;
if (count == 0) {
workers[i].next = worker_gc_list;
worker_gc_list = &workers[i];
worker_count--;
done = worker_count == 0;
}
break;
}
}
pthread_mutex_unlock(&nccl_ucx_lock);

if (i < ncclNIbDevs && count == 0) {
ep = workers[i].eps;
while(ep){
if (!done) {
return ncclSuccess;
}

for (ucx_worker = worker_gc_list; ucx_worker != NULL; ucx_worker = next) {
next = ucx_worker->next;
assert(ucx_worker->count == 0);

ep = ucx_worker->eps;
while (ep) {
cur = ep;
NCCLCHECK(ncclSocketRecv(ep->sock, &dummy, sizeof(int)));
result = ncclSocketRecv(ep->sock, &dummy, sizeof(int));
if (result != ncclSuccess) {
WARN("Failed to receive close for worker cleanup (res:%d)", result);
}

ep = ep->next;
close(cur->sock->fd);
free(cur);
}
ucp_worker_destroy(workers[i].worker);
ucp_cleanup(workers[i].ctx);
workers[i].eps = NULL;
workers[i].worker = NULL;
workers[i].ctx = NULL;
ucp_worker_destroy(ucx_worker->worker);
ucp_cleanup(ucx_worker->ctx);
ucx_worker->eps = NULL;
ucx_worker->worker = NULL;
ucx_worker->ctx = NULL;
}

worker_gc_list = NULL;

return ncclSuccess;
}

Expand Down

0 comments on commit 17b32e3

Please sign in to comment.