Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dev/worker_segment
Browse files Browse the repository at this point in the history
  • Loading branch information
tvegas1 committed Nov 9, 2023
2 parents 3846444 + 9a72fae commit 71567a0
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 124 deletions.
4 changes: 2 additions & 2 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ AC_ARG_ENABLE([debug],AS_HELP_STRING([--enable-debug], [Enable extra debugging c

if test $enable_debug = yes; then
AC_DEFINE([ENABLE_DEBUG], [1], [Enable debugging code])
CFLAGS="$CFLAGS -O0 -g3 -Werror"
CFLAGS="$CFLAGS -O0 -g3 -Wall -Werror"
else
CFLAGS="$CFLAGS -O3 -DNDEBUG -Werror"
CFLAGS="$CFLAGS -O3 -DNDEBUG -Wall -Werror"
fi

#check for cuda
Expand Down
2 changes: 0 additions & 2 deletions include/p2p_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

#include <stdint.h>
#include <unistd.h>
#define ENABLE_TIMER 0
#include "timer.h"
#include <assert.h>

#include "nccl.h"
Expand Down
6 changes: 3 additions & 3 deletions include/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ static double startTimes[8];
printf("\n"); \
} while (0);
#else
#define TIME_START(index) while(0);
#define TIME_STOP(index) while(0);
#define TIME_CANCEL(index) while(0);
#define TIME_START(index) do {} while(0);
#define TIME_STOP(index) do {} while(0);
#define TIME_CANCEL(index) do {} while(0);
#define TIME_PRINT(name)
#endif
#endif
1 change: 0 additions & 1 deletion include/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ struct netIf {

int parseStringList(const char* string, struct netIf* ifList, int maxList);
int matchIfList(const char* string, int port, struct netIf* ifList, int listSize, int matchExact);
int readFileNumber(long *value, const char *filename_fmt, ...);
const char *get_plugin_lib_path();

#endif
24 changes: 2 additions & 22 deletions src/ib_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>

#define ENABLE_TIMER 0
#include "timer.h"

#include "p2p_plugin.h"
#include "core.h"
#include "socket.h"
Expand Down Expand Up @@ -46,25 +46,11 @@ NCCL_PARAM(IbTc, "IB_TC", 0);
NCCL_PARAM(IbArThreshold, "IB_AR_THRESHOLD", 8192);
NCCL_PARAM(IbPciRelaxedOrdering, "IB_PCI_RELAXED_ORDERING", 2);

pthread_t ncclIbAsyncThread;
static void* ncclIbAsyncThreadMain(void* args) {
struct ibv_context* context = (struct ibv_context*)args;
while (1) {
struct ibv_async_event event;
if (ncclSuccess != wrap_ibv_get_async_event(context, &event)) { break; }
char *str;
if (ncclSuccess != wrap_ibv_event_type_str(&str, event.event_type)) { break; }
if (event.event_type != IBV_EVENT_COMM_EST)
WARN("NET/IB : Got async event : %s", str);
if (ncclSuccess != wrap_ibv_ack_async_event(&event)) { break; }
}
return NULL;
}
static pthread_t ncclIbAsyncThread;

// Determine whether RELAXED_ORDERING is enabled and possible
int ncclIbRelaxedOrderingCapable(void) {
int roMode = ncclParamIbPciRelaxedOrdering();
ncclResult_t r = ncclInternalError;
if (roMode == 1 || roMode == 2) {
if (!IBV_ACCESS_RELAXED_ORDERING) {
if(roMode == 1)
Expand Down Expand Up @@ -110,11 +96,6 @@ ncclResult_t ncclIbGetProperties_v6(int dev, ncclNetProperties_v6_t* props_v6)
return ncclSuccess;
}

static ncclResult_t GetSocketAddr(union ncclSocketAddress* addr) {
memcpy(addr, &ncclIbIfAddr, sizeof(*addr));
return ncclSuccess;
}

#define NCCL_IB_MAX_QPS 128

typedef struct ncclIbQpInfo {
Expand Down Expand Up @@ -365,7 +346,6 @@ ncclResult_t ncclIbListen(int dev, void* opaqueHandle, void** listenComm) {

ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNetDeviceHandle_t** sendDevComm) {
struct ncclIbHandle* handle = (struct ncclIbHandle*) opaqueHandle;
enum ncclSocketState conState;
struct ncclIbCommStage* stage = &handle->stage;
struct ncclIbSendComm* comm = (struct ncclIbSendComm*)stage->comm;
struct ncclIbQpInfo remQpInfo;
Expand Down
12 changes: 5 additions & 7 deletions src/p2p_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ static nccl_p2p_plugin_t p2p_plugin = NCCL_P2P_LAST;

static void pluginSetup()
{

p2p_plugin = NCCL_P2P_IB;
const char *plugin_path = get_plugin_lib_path();
if (plugin_path != NULL) {
Expand All @@ -84,11 +83,6 @@ static void pluginSetup()
}
}
switch (p2p_plugin) {
case NCCL_P2P_IB:
ncclNetPlugin_v7 = ibPlugin_v7;
ncclNetPlugin_v6 = ibPlugin_v6;
ncclNetPlugin_v5 = ibPlugin_v5;
break;
#ifdef HAVE_UCX_PLUGIN
case NCCL_P2P_UCX:
ncclNetPlugin_v7 = ucxPlugin_v7;
Expand All @@ -101,6 +95,11 @@ static void pluginSetup()
ncclNetPlugin_v5 = ucxRmaPlugin_v5;
break;
#endif
default:
ncclNetPlugin_v7 = ibPlugin_v7;
ncclNetPlugin_v6 = ibPlugin_v6;
ncclNetPlugin_v5 = ibPlugin_v5;
break;
}

}
Expand Down Expand Up @@ -413,4 +412,3 @@ nccl_p2p_plugin_t nccl_p2p_get_plugin_type()

struct ncclIbDev ncclIbDevs[MAX_IB_DEVS];
struct ncclIbDev userIbDevs[MAX_IB_DEVS];

2 changes: 1 addition & 1 deletion src/sharp_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ int ncclSharpAllGather(void *context, void *buf, int len) {
if (rrequest == NULL) NCCLCHECK(ncclNetPlugin_v7.irecv(cComm->recvComm, 1, &rbuf, &len, &tag, &rMhandle, &rrequest));
}
while (srequest || rrequest) {
int done;
int done = 0; /* silent uninitialized false positive */
if (rrequest) NCCLCHECK(ncclNetPlugin_v7.test(rrequest, &done, NULL));
if (done) rrequest = NULL;
if (srequest) NCCLCHECK(ncclNetPlugin_v7.test(srequest, &done, NULL));
Expand Down
27 changes: 9 additions & 18 deletions src/ucx_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,6 @@ static void recv_handler_nbx(void *request, ucs_status_t status,
static union ncclSocketAddress nccl_ucx_if_addr;
static char if_name[MAX_IF_NAME_SIZE];

static ncclResult_t GetSocketAddr(union ncclSocketAddress *addr) {
memcpy(addr, &nccl_ucx_if_addr, sizeof(*addr));
return ncclSuccess;
}

static ncclResult_t ucx_init_context(ucp_context_h *ctx, int dev) {
ucp_params_t ucp_params;
ucp_config_t *config;
Expand Down Expand Up @@ -330,7 +325,7 @@ static ncclResult_t ucx_get_ctx_and_worker(int dev, ucp_context_h *ctx,
}

static ncclResult_t nccl_ucx_free_worker(nccl_ucx_worker_t *ucx_worker) {
int i, dev, dummy, count, done = 0;
int dev, dummy, done = 0;
struct ep_list *ep, *cur;
struct nccl_ucx_worker *next;
ncclResult_t result;
Expand Down Expand Up @@ -431,14 +426,14 @@ static void ucx_request_init(ucx_comm_t *comm) {
}

ncclResult_t nccl_ucx_connect(int dev, void *handle, void **send_comm, ncclNetDeviceHandle_t** sendDevComm) {
ucx_listen_handle_t *recv_handle = (ucx_listen_handle_t*)handle;
struct ncclUCXCommStage* stage = &recv_handle->stage;
ucx_comm_t *comm = stage->comm;
ucp_address_t *my_addr;
size_t local_addr_len;
enum ncclSocketState conState;
ucx_listen_handle_t *recv_handle = (ucx_listen_handle_t*)handle;
struct ncclUCXCommStage *stage = &recv_handle->stage;
ucx_comm_t *comm = stage->comm;
ucp_address_t *my_addr;
size_t local_addr_len;
int ready;

*send_comm = NULL;
int ready;

if (stage->state == ncclUCXCommStateConnect) goto ucx_connect_check;

Expand Down Expand Up @@ -478,13 +473,11 @@ ncclResult_t nccl_ucx_connect_v6(int dev, void *handle, void **send_comm) {
ncclResult_t nccl_ucx_accept(void *listen_comm, void **recv_comm, ncclNetDeviceHandle_v7_t** recvDevComm)
{
ucx_listen_comm_t *l_comm = (ucx_listen_comm_t *)listen_comm;
socklen_t socklen = sizeof(struct sockaddr_in);
struct ncclUCXCommStage* stage = &l_comm->stage;
ucx_comm_t *r_comm = (ucx_comm_t *)stage->comm;
size_t peer_addr_len;
ucp_address_t *peer_addr;
ucp_ep_params_t ep_params;
struct sockaddr_in sockaddr;
int ready;

*recv_comm = NULL;
Expand Down Expand Up @@ -596,10 +589,8 @@ ncclResult_t nccl_ucx_regmr_dmabuf(void* comm, void* data, size_t size, int type
}

static ucx_request_t *ucx_request_get(ucx_comm_t *comm) {
static const size_t entries = sizeof(comm->reqs) / sizeof(*comm->reqs);
ucx_request_t *req;
ucx_request_t *req = comm->free_req;

req = comm->free_req;
if (req == NULL) {
WARN("NET/UCX: unable to allocate NCCL request");
return NULL;
Expand Down
26 changes: 12 additions & 14 deletions src/ucx_rma_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,6 @@ typedef struct nccl_ucx_rma_recv_comm {
static union ncclSocketAddress nccl_ucx_if_addr;
static char if_name[MAX_IF_NAME_SIZE];

static ncclResult_t GetSocketAddr(union ncclSocketAddress *addr) {
memcpy(addr, &nccl_ucx_if_addr, sizeof(*addr));
return ncclSuccess;
}

typedef struct nccl_ucx_am_request {
nccl_ucx_rma_request_t *req;
} nccl_ucx_am_request_t;
Expand Down Expand Up @@ -425,22 +420,28 @@ static ucs_status_t nccl_ucx_rma_am_rkey_cb(void *arg, void *data, size_t length
{
nccl_ucx_rma_send_comm_t *comm = (nccl_ucx_rma_send_comm_t*)arg;
nccl_ucx_rma_rkey_buf_t *rkey_buf = (nccl_ucx_rma_rkey_buf_t*)data;
ucs_status_t status;

if (comm->rkeys[rkey_buf->index].rkey) {
ucp_rkey_destroy(comm->rkeys[rkey_buf->index].rkey);
}
comm->rkeys[rkey_buf->index].id = rkey_buf->id;
UCXCHECK(ucp_ep_rkey_unpack(comm->ep, rkey_buf->buf,
&comm->rkeys[rkey_buf->index].rkey));
status = ucp_ep_rkey_unpack(comm->ep, rkey_buf->buf,
&comm->rkeys[rkey_buf->index].rkey);
if (status != UCS_OK) {
WARN("Failed: UCX am rkey cb: rkey unpack error %s",
ucs_status_string(status));
}

return UCS_OK;
}


ncclResult_t nccl_ucx_rma_connect(int dev, void *handle, void **send_comm, ncclNetDeviceHandle_t** sendDevComm)
{
ucx_rma_listen_handle_t *recv_handle = (ucx_rma_listen_handle_t*)handle;
struct ncclUCXCommStage* stage = &recv_handle->stage;
nccl_ucx_rma_send_comm_t *comm = stage->comm;
struct ncclUCXCommStage* stage = &recv_handle->stage;
nccl_ucx_rma_send_comm_t *comm = stage->comm;
ucp_mem_map_params_t mmap_params;
size_t rkey_buf_size;
void *rkey_buf;
Expand Down Expand Up @@ -549,10 +550,8 @@ static ncclResult_t nccl_ucx_rma_init_ep(struct ncclSocket *sock, ucp_worker_h w
ncclResult_t nccl_ucx_rma_accept(void *listen_comm, void **recv_comm, ncclNetDeviceHandle_v7_t** recvDevComm)
{
nccl_ucx_rma_listen_comm_t *l_comm = (nccl_ucx_rma_listen_comm_t *)listen_comm;
socklen_t socklen = sizeof(struct sockaddr_in);
struct ncclUCXCommStage* stage = &l_comm->stage;
nccl_ucx_rma_recv_comm_t *r_comm;
struct sockaddr_in sockaddr;
struct ncclUCXCommStage* stage = &l_comm->stage;
nccl_ucx_rma_recv_comm_t *r_comm = stage->comm;
void *rkey_buf;
size_t rkey_buf_size;
int ready;
Expand Down Expand Up @@ -1126,7 +1125,6 @@ ncclResult_t nccl_ucx_rma_close_recv(void *recv_comm)
{
nccl_ucx_rma_recv_comm_t *comm = (nccl_ucx_rma_recv_comm_t*)recv_comm;
void *close_req;
int debug = 1;
int close = 1;

if (recv_comm) {
Expand Down
54 changes: 0 additions & 54 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,60 +110,6 @@ int matchIfList(const char* string, int port, struct netIf* ifList, int listSize
return 0;
}

static size_t readFileVarArg(char *buffer, size_t max,
const char *filename_fmt, va_list ap)
{
char filename[PATH_MAX];
ssize_t read_bytes;
int fd;

vsnprintf(filename, PATH_MAX, filename_fmt, ap);

fd = open(filename, O_RDONLY);
if (fd < 0) {
return -1;
}

read_bytes = read(fd, buffer, max - 1);
if (read_bytes < 0) {
return -1;
}

if (read_bytes < max) {
buffer[read_bytes] = '\0';
}

out_close:
close(fd);
}

int readFileNumber(long *value, const char *filename_fmt, ...)
{
char buffer[64], *tail;
ssize_t read_bytes;
va_list ap;
long n;

va_start(ap, filename_fmt);
read_bytes = readFileVarArg(buffer, sizeof(buffer) - 1,
filename_fmt, ap);
va_end(ap);

if (read_bytes < 0) {
/* read error */
return -1;
}

n = strtol(buffer, &tail, 0);
if ((*tail != '\0') && !isspace(*tail)) {
/* parse error */
return -1;
}

*value = n;
return 0;
}

const char *get_plugin_lib_path()
{
Dl_info dl_info;
Expand Down

0 comments on commit 71567a0

Please sign in to comment.