Skip to content

Commit

Permalink
Correctly forward stdin to remote procs
Browse files Browse the repository at this point in the history
Find out where the target destination is executing
and the daemon hosting it, then send the stdin to
that host for local relay.

Signed-off-by: Ralph Castain <[email protected]>
  • Loading branch information
rhc54 committed Oct 18, 2023
1 parent 9c88464 commit 62d98ba
Showing 1 changed file with 42 additions and 46 deletions.
88 changes: 42 additions & 46 deletions src/mca/iof/hnp/iof_hnp.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* Copyright (c) 2016-2020 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* Copyright (c) 2020 IBM Corporation. All rights reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -210,7 +210,34 @@ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz)
}
}

/* do we already have this process in our list? */
/* lookup the daemon hosting the target proc */
PMIX_LOAD_PROCID(&p, PRTE_PROC_MY_NAME->nspace, PMIX_RANK_WILDCARD);
p.rank = prte_get_proc_daemon_vpid(dst_name);
if (PMIX_RANK_INVALID == p.rank) {
PRTE_ERROR_LOG(PRTE_ERR_ADDRESSEE_UNKNOWN);
return PRTE_ERR_ADDRESSEE_UNKNOWN;
}

/* if the host isn't me, send it to them */
if (p.rank != PRTE_PROC_MY_NAME->rank) {
/* send the data to the daemon so it can
* write it to the proc's fd - in this case,
* we pass dst_name to indicate who is to
* receive the data. If the connection closed,
* numbytes will be zero so zero bytes will be
* sent - this will tell the daemon to close
* the fd for stdin to that proc
*/
rc = prte_iof_hnp_send_data_to_endpoint(&p, dst_name,
PRTE_IOF_STDIN,
data, sz);
if (PRTE_SUCCESS != rc) {
PRTE_ERROR_LOG(rc);
}
return rc;
}

/* local proc - see if we have this process in our list */
PMIX_LIST_FOREACH(proct, &prte_mca_iof_hnp_component.procs, prte_iof_proc_t)
{
if (PMIX_CHECK_PROCID(&proct->name, dst_name)) {
Expand All @@ -220,50 +247,19 @@ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz)
continue;
}

/* if the daemon is me, then this is a local sink */
if (proct->stdinev->daemon.rank == PRTE_PROC_MY_NAME->rank) {
PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"%s read %d bytes from stdin - writing to %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz,
PRTE_NAME_PRINT(&proct->name)));
/* send the bytes down the pipe - we even send 0 byte events
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (NULL != proct->stdinev->wev) {
if (PRTE_IOF_MAX_INPUT_BUFFERS < prte_iof_base_write_output(&proct->name,
PRTE_IOF_STDIN, data, sz,
proct->stdinev->wev)) {
/* getting too backed up - stop the read event for now if it is still active */

PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"buffer backed up - holding"));
return PRTE_ERR_OUT_OF_RESOURCE;
}
}
} else {
PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"%s sending %d bytes from stdinev to daemon %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz,
PRTE_NAME_PRINT(&proct->stdinev->daemon)));

/* send the data to the daemon so it can
* write it to the proc's fd - in this case,
* we pass sink->name to indicate who is to
* receive the data. If the connection closed,
* numbytes will be zero so zero bytes will be
* sent - this will tell the daemon to close
* the fd for stdin to that proc
*/
rc = prte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon,
&proct->stdinev->name,
PRTE_IOF_STDIN,
data, sz);
if (PRTE_SUCCESS != rc) {
/* if the addressee is unknown, remove the sink from the list */
if (PRTE_ERR_ADDRESSEE_UNKNOWN == rc) {
PMIX_RELEASE(proct->stdinev);
}
/* send the bytes down the pipe - we even send 0 byte events
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (NULL != proct->stdinev->wev) {
if (PRTE_IOF_MAX_INPUT_BUFFERS < prte_iof_base_write_output(&proct->name,
PRTE_IOF_STDIN, data, sz,
proct->stdinev->wev)) {
/* getting too backed up - stop the read event for now if it is still active */

PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"buffer backed up - holding"));
return PRTE_ERR_OUT_OF_RESOURCE;
}
}
}
Expand Down

0 comments on commit 62d98ba

Please sign in to comment.