From 62d98ba941b33bb448797b27880d4f439033aaed Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 18 Oct 2023 09:46:38 -0600 Subject: [PATCH] Correctly forward stdin to remote procs Find out where the target destination is executing and the daemon hosting it, then send the stdin to that host for local relay. Signed-off-by: Ralph Castain --- src/mca/iof/hnp/iof_hnp.c | 88 +++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 46 deletions(-) diff --git a/src/mca/iof/hnp/iof_hnp.c b/src/mca/iof/hnp/iof_hnp.c index 622fd2d360..3b9c082235 100644 --- a/src/mca/iof/hnp/iof_hnp.c +++ b/src/mca/iof/hnp/iof_hnp.c @@ -18,7 +18,7 @@ * Copyright (c) 2016-2020 Intel, Inc. All rights reserved. * Copyright (c) 2017 Mellanox Technologies. All rights reserved. * Copyright (c) 2020 IBM Corporation. All rights reserved. - * Copyright (c) 2021-2022 Nanook Consulting. All rights reserved. + * Copyright (c) 2021-2023 Nanook Consulting. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -210,7 +210,34 @@ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz) } } - /* do we already have this process in our list? */ + /* lookup the daemon hosting the target proc */ + PMIX_LOAD_PROCID(&p, PRTE_PROC_MY_NAME->nspace, PMIX_RANK_WILDCARD); + p.rank = prte_get_proc_daemon_vpid(dst_name); + if (PMIX_RANK_INVALID == p.rank) { + PRTE_ERROR_LOG(PRTE_ERR_ADDRESSEE_UNKNOWN); + return PRTE_ERR_ADDRESSEE_UNKNOWN; + } + + /* if the host isn't me, send it to them */ + if (p.rank != PRTE_PROC_MY_NAME->rank) { + /* send the data to the daemon so it can + * write it to the proc's fd - in this case, + * we pass dst_name to indicate who is to + * receive the data. If the connection closed, + * numbytes will be zero so zero bytes will be + * sent - this will tell the daemon to close + * the fd for stdin to that proc + */ + rc = prte_iof_hnp_send_data_to_endpoint(&p, dst_name, + PRTE_IOF_STDIN, + data, sz); + if (PRTE_SUCCESS != rc) { + PRTE_ERROR_LOG(rc); + } + return rc; + } + + /* local proc - see if we have this process in our list */ PMIX_LIST_FOREACH(proct, &prte_mca_iof_hnp_component.procs, prte_iof_proc_t) { if (PMIX_CHECK_PROCID(&proct->name, dst_name)) { @@ -220,50 +247,19 @@ static int push_stdin(const pmix_proc_t *dst_name, uint8_t *data, size_t sz) continue; } - /* if the daemon is me, then this is a local sink */ - if (proct->stdinev->daemon.rank == PRTE_PROC_MY_NAME->rank) { - PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output, - "%s read %d bytes from stdin - writing to %s", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz, - PRTE_NAME_PRINT(&proct->name))); - /* send the bytes down the pipe - we even send 0 byte events - * down the pipe so it forces out any preceding data before - * closing the output stream - */ - if (NULL != proct->stdinev->wev) { - if (PRTE_IOF_MAX_INPUT_BUFFERS < prte_iof_base_write_output(&proct->name, - PRTE_IOF_STDIN, data, sz, - proct->stdinev->wev)) { - /* getting too backed up - stop the read event for now if it is still active */ - - PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output, - "buffer backed up - holding")); - return PRTE_ERR_OUT_OF_RESOURCE; - } - } - } else { - PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output, - "%s sending %d bytes from stdinev to daemon %s", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) sz, - PRTE_NAME_PRINT(&proct->stdinev->daemon))); - - /* send the data to the daemon so it can - * write it to the proc's fd - in this case, - * we pass sink->name to indicate who is to - * receive the data. If the connection closed, - * numbytes will be zero so zero bytes will be - * sent - this will tell the daemon to close - * the fd for stdin to that proc - */ - rc = prte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon, - &proct->stdinev->name, - PRTE_IOF_STDIN, - data, sz); - if (PRTE_SUCCESS != rc) { - /* if the addressee is unknown, remove the sink from the list */ - if (PRTE_ERR_ADDRESSEE_UNKNOWN == rc) { - PMIX_RELEASE(proct->stdinev); - } + /* send the bytes down the pipe - we even send 0 byte events + * down the pipe so it forces out any preceding data before + * closing the output stream + */ + if (NULL != proct->stdinev->wev) { + if (PRTE_IOF_MAX_INPUT_BUFFERS < prte_iof_base_write_output(&proct->name, + PRTE_IOF_STDIN, data, sz, + proct->stdinev->wev)) { + /* getting too backed up - stop the read event for now if it is still active */ + + PMIX_OUTPUT_VERBOSE((1, prte_iof_base_framework.framework_output, + "buffer backed up - holding")); + return PRTE_ERR_OUT_OF_RESOURCE; } } }