Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple commits #1829

Merged
merged 2 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/docs/show-help-files/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@

OUTDIR = _build
SPHINX_CONFIG = conf.py
#SPHINX_OPTS ?= -W --keep-going -j auto
SPHINX_OPTS ?= --keep-going -j auto
SPHINX_OPTS ?= -W --keep-going -j auto

# All RST source files, including those that are not installed
RST_SOURCE_FILES = \
Expand Down
88 changes: 42 additions & 46 deletions src/mca/iof/hnp/iof_hnp.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* Copyright (c) 2016-2020 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* Copyright (c) 2020 IBM Corporation. All rights reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -210,7 +210,34 @@ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz)
}
}

/* do we already have this process in our list? */
/* lookup the daemon hosting the target proc */
PMIX_LOAD_PROCID(&p, PRTE_PROC_MY_NAME->nspace, PMIX_RANK_WILDCARD);
p.rank = prte_get_proc_daemon_vpid(dst_name);
if (PMIX_RANK_INVALID == p.rank) {
PRTE_ERROR_LOG(PRTE_ERR_ADDRESSEE_UNKNOWN);
return PRTE_ERR_ADDRESSEE_UNKNOWN;
}

/* if the host isn't me, send it to them */
if (p.rank != PRTE_PROC_MY_NAME->rank) {
/* send the data to the daemon so it can
* write it to the proc's fd - in this case,
* we pass dst_name to indicate who is to
* receive the data. If the connection closed,
* numbytes will be zero so zero bytes will be
* sent - this will tell the daemon to close
* the fd for stdin to that proc
*/
rc = prte_iof_hnp_send_data_to_endpoint(&p, dst_name,
PRTE_IOF_STDIN,
data, sz);
if (PRTE_SUCCESS != rc) {
PRTE_ERROR_LOG(rc);
}
return rc;
}

/* local proc - see if we have this process in our list */
PMIX_LIST_FOREACH(proct, &prte_mca_iof_hnp_component.procs, prte_iof_proc_t)
{
if (PMIX_CHECK_PROCID(&proct->name, dst_name)) {
Expand All @@ -220,50 +247,19 @@ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz)
continue;
}

/* if the daemon is me, then this is a local sink */
if (proct->stdinev->daemon.rank == PRTE_PROC_MY_NAME->rank) {
PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"%s read %d bytes from stdin - writing to %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz,
PRTE_NAME_PRINT(&proct->name)));
/* send the bytes down the pipe - we even send 0 byte events
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (NULL != proct->stdinev->wev) {
if (PRTE_IOF_MAX_INPUT_BUFFERS < prte_iof_base_write_output(&proct->name,
PRTE_IOF_STDIN, data, sz,
proct->stdinev->wev)) {
/* getting too backed up - stop the read event for now if it is still active */

PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"buffer backed up - holding"));
return PRTE_ERR_OUT_OF_RESOURCE;
}
}
} else {
PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"%s sending %d bytes from stdinev to daemon %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz,
PRTE_NAME_PRINT(&proct->stdinev->daemon)));

/* send the data to the daemon so it can
* write it to the proc's fd - in this case,
* we pass sink->name to indicate who is to
* receive the data. If the connection closed,
* numbytes will be zero so zero bytes will be
* sent - this will tell the daemon to close
* the fd for stdin to that proc
*/
rc = prte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon,
&proct->stdinev->name,
PRTE_IOF_STDIN,
data, sz);
if (PRTE_SUCCESS != rc) {
/* if the addressee is unknown, remove the sink from the list */
if (PRTE_ERR_ADDRESSEE_UNKNOWN == rc) {
PMIX_RELEASE(proct->stdinev);
}
/* send the bytes down the pipe - we even send 0 byte events
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (NULL != proct->stdinev->wev) {
if (PRTE_IOF_MAX_INPUT_BUFFERS < prte_iof_base_write_output(&proct->name,
PRTE_IOF_STDIN, data, sz,
proct->stdinev->wev)) {
/* getting too backed up - stop the read event for now if it is still active */

PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output,
"buffer backed up - holding"));
return PRTE_ERR_OUT_OF_RESOURCE;
}
}
}
Expand Down