Skip to content

Commit

Permalink
Refactor the grpcomm support
Browse files Browse the repository at this point in the history
Separate the fence and group operation backing collectives so
we can deal more easily with their unique needs. In particular,
the group construct operation has responsibilities to assign
context IDs, collect endpt and group info, and generate event
notifications during release so that "add members" are
released.

Signed-off-by: Ralph Castain <[email protected]>
  • Loading branch information
rhc54 committed Oct 28, 2024
1 parent 177e861 commit bc3c11e
Show file tree
Hide file tree
Showing 34 changed files with 3,119 additions and 2,775 deletions.
73 changes: 54 additions & 19 deletions examples/dynamic.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved.
* Copyright (c) 2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2021 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -51,9 +51,9 @@ int main(int argc, char **argv)
pmix_app_t *app;
char hostname[1024], dir[1024];
pmix_proc_t *peers;
size_t npeers, ntmp = 0;
size_t npeers, ntmp = 0, n;
char *nodelist;
char *cmd;
char *cmd, *tmp;

if (0 > gethostname(hostname, sizeof(hostname))) {
exit(1);
Expand Down Expand Up @@ -96,6 +96,51 @@ int main(int argc, char **argv)
PMIX_VALUE_RELEASE(val);
fprintf(stderr, "Client %s:%d universe size %d\n", myproc.nspace, myproc.rank, nprocs);


// put some values
if (0 > asprintf(&tmp, "%s-%d-remote1", myproc.nspace, myproc.rank)) {
exit(1);
}
value.type = PMIX_UINT64;
value.data.uint64 = 1234;
if (PMIX_SUCCESS != (rc = PMIx_Put(PMIX_REMOTE, tmp, &value))) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Put internal failed: %d\n", myproc.nspace,
myproc.rank, rc);
goto done;
}
free(tmp);

if (0 > asprintf(&tmp, "%s-%d-remote2", myproc.nspace, myproc.rank)) {
exit(1);
}
value.type = PMIX_UINT64;
value.data.uint64 = 12345;
if (PMIX_SUCCESS != (rc = PMIx_Put(PMIX_REMOTE, tmp, &value))) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Put internal failed: %d\n", myproc.nspace,
myproc.rank, rc);
goto done;
}
free(tmp);

if (0 > asprintf(&tmp, "%s-%d-remote3", myproc.nspace, myproc.rank)) {
exit(1);
}
value.type = PMIX_UINT64;
value.data.uint64 = 123456;
if (PMIX_SUCCESS != (rc = PMIx_Put(PMIX_REMOTE, tmp, &value))) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Put internal failed: %d\n", myproc.nspace,
myproc.rank, rc);
goto done;
}
free(tmp);

/* push the data to our PMIx server */
if (PMIX_SUCCESS != (rc = PMIx_Commit())) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Commit failed: %d\n", myproc.nspace,
myproc.rank, rc);
goto done;
}

/* call fence to sync */
(void) strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN);
proc.rank = PMIX_RANK_WILDCARD;
Expand Down Expand Up @@ -173,15 +218,10 @@ int main(int argc, char **argv)
exitcode = rc;
goto done;
}
if ((nprocs + ntmp) != npeers) {
fprintf(stderr,
"Client ns %s rank %d: PMIx_Resolve_peers returned incorrect npeers: %d vs %d\n",
myproc.nspace, myproc.rank, (int) (nprocs + ntmp), (int) npeers);
exitcode = 1;
goto done;
fprintf(stderr, "Client ns %s rank %d PEERS:\n", myproc.nspace, myproc.rank);
for (n=0; n < npeers; n++) {
fprintf(stderr, "\t%s:%d\n", peers[n].nspace, peers[n].rank);
}
fprintf(stderr, "Client ns %s rank %d: PMIx_Resolve_peers returned %d npeers\n",
myproc.nspace, myproc.rank, (int) npeers);
if (PMIX_SUCCESS != (rc = PMIx_Resolve_nodes(nsp2, &nodelist))) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Resolve_nodes failed for nspace %s: %d\n",
myproc.nspace, myproc.rank, nsp2, rc);
Expand All @@ -197,15 +237,10 @@ int main(int argc, char **argv)
exitcode = rc;
goto done;
}
if (nprocs != npeers) {
fprintf(stderr,
"Client ns %s rank %d: PMIx_Resolve_peers returned incorrect npeers: %d vs %d\n",
myproc.nspace, myproc.rank, nprocs, (int) npeers);
exitcode = rc;
goto done;
fprintf(stderr, "Client ns %s rank %d PEERS:\n", myproc.nspace, myproc.rank);
for (n=0; n < npeers; n++) {
fprintf(stderr, "\t%s:%d\n", peers[n].nspace, peers[n].rank);
}
fprintf(stderr, "Client ns %s rank %d: PMIx_Resolve_peers returned %d npeers\n",
myproc.nspace, myproc.rank, (int) npeers);
if (PMIX_SUCCESS != (rc = PMIx_Resolve_nodes(NULL, &nodelist))) {
fprintf(stderr, "Client ns %s rank %d: PMIx_Resolve_nodes failed: %d\n", myproc.nspace,
myproc.rank, rc);
Expand Down
9 changes: 1 addition & 8 deletions src/mca/errmgr/dvm/errmgr_dvm.c
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,6 @@ static void check_send_notification(prte_job_t *jdata,
prte_proc_t *proc,
pmix_status_t event)
{
prte_grpcomm_signature_t sig;
int rc;
pmix_info_t *info;
size_t ninfo;
Expand Down Expand Up @@ -673,14 +672,8 @@ static void check_send_notification(prte_job_t *jdata,
PMIX_INFO_FREE(info, ninfo);

/* xcast it to everyone */
PMIX_CONSTRUCT(&sig, prte_grpcomm_signature_t);
sig.signature = (pmix_proc_t *) malloc(sizeof(pmix_proc_t));
PMIX_LOAD_PROCID(&sig.signature[0], PRTE_PROC_MY_NAME->nspace, PMIX_RANK_WILDCARD);
sig.sz = 1;

if (PRTE_SUCCESS != (rc = prte_grpcomm.xcast(&sig, PRTE_RML_TAG_NOTIFICATION, &pbkt))) {
if (PRTE_SUCCESS != (rc = prte_grpcomm.xcast(PRTE_RML_TAG_NOTIFICATION, &pbkt))) {
PRTE_ERROR_LOG(rc);
}
PMIX_DESTRUCT(&sig);
PMIX_DATA_BUFFER_DESTRUCT(&pbkt);
}
8 changes: 1 addition & 7 deletions src/mca/filem/raw/filem_raw_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,6 @@ static void send_chunk(int xxx, short argc, void *cbdata)
int32_t numbytes;
int rc;
pmix_data_buffer_t chunk;
prte_grpcomm_signature_t *sig;
PRTE_HIDE_UNUSED_PARAMS(xxx, argc);

PMIX_ACQUIRE_OBJECT(rev);
Expand Down Expand Up @@ -786,18 +785,13 @@ static void send_chunk(int xxx, short argc, void *cbdata)
}

/* goes to all daemons */
sig = PMIX_NEW(prte_grpcomm_signature_t);
sig->signature = (pmix_proc_t *) malloc(sizeof(pmix_proc_t));
sig->sz = 1;
PMIX_LOAD_PROCID(&sig->signature[0], PRTE_PROC_MY_NAME->nspace, PMIX_RANK_WILDCARD);
if (PRTE_SUCCESS != (rc = prte_grpcomm.xcast(sig, PRTE_RML_TAG_FILEM_BASE, &chunk))) {
if (PRTE_SUCCESS != (rc = prte_grpcomm.xcast(PRTE_RML_TAG_FILEM_BASE, &chunk))) {
PRTE_ERROR_LOG(rc);
PMIX_DATA_BUFFER_DESTRUCT(&chunk);
close(fd);
return;
}
PMIX_DATA_BUFFER_DESTRUCT(&chunk);
PMIX_RELEASE(sig);
rev->nchunk++;

/* if num_bytes was zero, then we need to terminate the event
Expand Down
5 changes: 2 additions & 3 deletions src/mca/grpcomm/base/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# Copyright (c) 2011-2013 Los Alamos National Security, LLC.
# All rights reserved.
# Copyright (c) 2019 Intel, Inc. All rights reserved.
# Copyright (c) 2022 Nanook Consulting. All rights reserved.
# Copyright (c) 2022-2024 Nanook Consulting All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
Expand All @@ -25,5 +25,4 @@ headers += \

libprtemca_grpcomm_la_SOURCES += \
base/grpcomm_base_select.c \
base/grpcomm_base_frame.c \
base/grpcomm_base_stubs.c
base/grpcomm_base_frame.c
26 changes: 1 addition & 25 deletions src/mca/grpcomm/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* Copyright (c) 2017-2020 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting All rights reserved.
* Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -61,34 +61,10 @@ PRTE_EXPORT int prte_grpcomm_base_select(void);
* globals that might be needed
*/
typedef struct {
pmix_list_item_t super;
int pri;
prte_grpcomm_base_module_t *module;
pmix_mca_base_component_t *component;
} prte_grpcomm_base_active_t;
PMIX_CLASS_DECLARATION(prte_grpcomm_base_active_t);

typedef struct {
pmix_list_t actives;
pmix_list_t ongoing;
pmix_hash_table_t sig_table;
char *transports;
uint32_t context_id;
} prte_grpcomm_base_t;

PRTE_EXPORT extern prte_grpcomm_base_t prte_grpcomm_base;

/* Public API stubs */
PRTE_EXPORT int prte_grpcomm_API_xcast(prte_grpcomm_signature_t *sig, prte_rml_tag_t tag,
pmix_data_buffer_t *buf);

PRTE_EXPORT int prte_grpcomm_API_allgather(prte_pmix_mdx_caddy_t *cd);

PRTE_EXPORT prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig,
bool create);

PRTE_EXPORT int prte_pack_ctrl_options(pmix_byte_object_t *bo,
const pmix_info_t *info, size_t ninfo);

END_C_DECLS
#endif
134 changes: 25 additions & 109 deletions src/mca/grpcomm/base/grpcomm_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,56 +47,17 @@
* Global variables
*/
prte_grpcomm_base_t prte_grpcomm_base = {
.actives = PMIX_LIST_STATIC_INIT,
.ongoing = PMIX_LIST_STATIC_INIT,
.sig_table = PMIX_HASH_TABLE_STATIC_INIT,
.transports = NULL,
.context_id = 0
.context_id = UINT32_MAX
};

prte_grpcomm_API_module_t prte_grpcomm = {
.xcast = prte_grpcomm_API_xcast,
.allgather = prte_grpcomm_API_allgather
};

static int base_register(pmix_mca_base_register_flag_t flags)
{
PRTE_HIDE_UNUSED_PARAMS(flags);
prte_grpcomm_base_module_t prte_grpcomm = {0};

prte_grpcomm_base.context_id = 1;
pmix_mca_base_var_register("prte", "grpcomm", "base", "starting_context_id",
"Starting value for assigning context id\'s",
PMIX_MCA_BASE_VAR_TYPE_INT,
&prte_grpcomm_base.context_id);

return PRTE_SUCCESS;
}

static int prte_grpcomm_base_close(void)
{
prte_grpcomm_base_active_t *active;
void *key;
size_t size;
uint32_t *seq_number;

PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_XCAST);

/* Close the active modules */
PMIX_LIST_FOREACH(active, &prte_grpcomm_base.actives, prte_grpcomm_base_active_t)
{
if (NULL != active->module->finalize) {
active->module->finalize();
}
}
PMIX_LIST_DESTRUCT(&prte_grpcomm_base.actives);
PMIX_LIST_DESTRUCT(&prte_grpcomm_base.ongoing);
for (void *_nptr = NULL;
PRTE_SUCCESS
== pmix_hash_table_get_next_key_ptr(&prte_grpcomm_base.sig_table, &key, &size,
(void **) &seq_number, _nptr, &_nptr);) {
free(seq_number);
if (NULL != prte_grpcomm.finalize) {
prte_grpcomm.finalize();
}
PMIX_DESTRUCT(&prte_grpcomm_base.sig_table);

return pmix_mca_base_framework_components_close(&prte_grpcomm_base_framework, NULL);
}
Expand All @@ -107,82 +68,37 @@ static int prte_grpcomm_base_close(void)
*/
static int prte_grpcomm_base_open(pmix_mca_base_open_flag_t flags)
{
PMIX_CONSTRUCT(&prte_grpcomm_base.actives, pmix_list_t);
PMIX_CONSTRUCT(&prte_grpcomm_base.ongoing, pmix_list_t);
PMIX_CONSTRUCT(&prte_grpcomm_base.sig_table, pmix_hash_table_t);
pmix_hash_table_init(&prte_grpcomm_base.sig_table, 128);
prte_grpcomm_base.context_id = UINT32_MAX;

return pmix_mca_base_framework_components_open(&prte_grpcomm_base_framework, flags);
}

PMIX_MCA_BASE_FRAMEWORK_DECLARE(prte, grpcomm, "GRPCOMM", base_register, prte_grpcomm_base_open,
PMIX_MCA_BASE_FRAMEWORK_DECLARE(prte, grpcomm, "GRPCOMM", NULL, prte_grpcomm_base_open,
prte_grpcomm_base_close, prte_grpcomm_base_static_components,
PMIX_MCA_BASE_FRAMEWORK_FLAG_DEFAULT);

PMIX_CLASS_INSTANCE(prte_grpcomm_base_active_t,
pmix_list_item_t,
NULL, NULL);

static void scon(prte_grpcomm_signature_t *p)
{
p->groupID = NULL;
p->ctxid = 0;
p->ctxid_assigned = false;
p->signature = NULL;
p->sz = 0;
p->addmembers = NULL;
p->nmembers = 0;
p->bootstrap = 0;
p->finalmembership = NULL;
p->nfinal = 0;
}
static void sdes(prte_grpcomm_signature_t *p)
{
if (NULL != p->groupID) {
free(p->groupID);
}
if (NULL != p->signature) {
free(p->signature);
}
if (NULL != p->addmembers) {
free(p->addmembers);
}
if (NULL != p->finalmembership) {
free(p->finalmembership);
}
}
PMIX_CLASS_INSTANCE(prte_grpcomm_signature_t,
pmix_object_t,
scon, sdes);

static void ccon(prte_grpcomm_coll_t *p)
static void grpcon(prte_pmix_grp_caddy_t *p)
{
p->sig = NULL;
p->status = PMIX_SUCCESS;
PMIX_DATA_BUFFER_CONSTRUCT(&p->bucket);
p->dmns = NULL;
p->ndmns = 0;
p->nexpected = 0;
p->nreported = 0;
p->assignID = false;
p->timeout = 0;
p->memsize = 0;
PMIX_CONSTRUCT(&p->addmembers, pmix_list_t);
PMIX_CONSTRUCT_LOCK(&p->lock);
p->op = PMIX_GROUP_NONE;
p->grpid = NULL;
p->procs = NULL;
p->nprocs = 0;
p->directives = NULL;
p->ndirs = 0;
p->info = NULL;
p->ninfo = 0;
p->cbfunc = NULL;
p->cbdata = NULL;
p->buffers = NULL;
}
static void cdes(prte_grpcomm_coll_t *p)
static void grpdes(prte_pmix_grp_caddy_t *p)
{
if (NULL != p->sig) {
PMIX_RELEASE(p->sig);
PMIX_DESTRUCT_LOCK(&p->lock);
if (NULL != p->grpid) {
free(p->grpid);
}
if (NULL != p->info) {
PMIX_INFO_FREE(p->info, p->ninfo);
}
PMIX_DATA_BUFFER_DESTRUCT(&p->bucket);
PMIX_LIST_DESTRUCT(&p->addmembers);
free(p->dmns);
free(p->buffers);
}
PMIX_CLASS_INSTANCE(prte_grpcomm_coll_t,
pmix_list_item_t,
ccon, cdes);
PMIX_CLASS_INSTANCE(prte_pmix_grp_caddy_t,
pmix_object_t,
grpcon, grpdes);
Loading

0 comments on commit bc3c11e

Please sign in to comment.