Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use global PRRTE event base for sigchld callback #1857

Merged
merged 1 commit into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions src/event/event-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,6 @@ typedef event_callback_fn prte_event_cbfunc_t;

BEGIN_C_DECLS

/* set the number of event priority levels */
#define PRTE_EVENT_NUM_PRI 8

#define PRTE_EV_ERROR_PRI 0
#define PRTE_EV_MSG_HI_PRI 1
#define PRTE_EV_SYS_HI_PRI 2
#define PRTE_EV_MSG_LO_PRI 3
#define PRTE_EV_SYS_LO_PRI 4
#define PRTE_EV_INFO_HI_PRI 5
#define PRTE_EV_INFO_LO_PRI 6
#define PRTE_EV_LOWEST_PRI 7

#define PRTE_EVENT_SIGNAL(ev) prte_event_get_signal(ev)

#define PRTE_TIMEOUT_DEFAULT \
Expand Down Expand Up @@ -102,8 +90,6 @@ PRTE_EXPORT prte_event_t *prte_event_alloc(void);
/* Event priority APIs */
#define prte_event_base_priority_init(b, n) event_base_priority_init((b), (n))

#define prte_event_set_priority(x, n) event_priority_set((x), (n))

/* Basic event APIs */
#define prte_event_enable_debug_mode() event_enable_debug_mode()

Expand Down Expand Up @@ -164,10 +150,9 @@ typedef struct {
PRTE_EXPORT PMIX_CLASS_DECLARATION(prte_event_list_item_t);

/* define a threadshift macro */
#define PRTE_PMIX_THREADSHIFT(x, eb, f, p) \
#define PRTE_PMIX_THREADSHIFT(x, eb, f) \
do { \
prte_event_set((eb), &((x)->ev), -1, PRTE_EV_WRITE, (f), (x)); \
prte_event_set_priority(&((x)->ev), (p)); \
PMIX_POST_OBJECT((x)); \
prte_event_active(&((x)->ev), PRTE_EV_WRITE, 1); \
} while (0)
Expand Down
7 changes: 1 addition & 6 deletions src/event/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Copyright (c) 2014-2020 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -40,11 +40,6 @@ int prte_event_base_open(void)
* base, so no progress thread is required */
prte_event_base = prte_sync_event_base;

/* set the number of priorities */
if (0 < PRTE_EVENT_NUM_PRI) {
prte_event_base_priority_init(prte_sync_event_base, PRTE_EVENT_NUM_PRI);
}

initialized = true;
return PRTE_SUCCESS;
}
Expand Down
8 changes: 4 additions & 4 deletions src/mca/errmgr/dvm/errmgr_dvm.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* All rights reserved.
* Copyright (c) 2014-2020 Intel, Inc. All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -88,14 +88,14 @@ static void check_send_notification(prte_job_t *jdata,
static int init(void)
{
/* setup state machine to trap job errors */
prte_state.add_job_state(PRTE_JOB_STATE_ERROR, job_errors, PRTE_ERROR_PRI);
prte_state.add_job_state(PRTE_JOB_STATE_ERROR, job_errors);

/* set the lost connection state to run at MSG priority so
* we can process any last messages from the proc
*/
prte_state.add_proc_state(PRTE_PROC_STATE_COMM_FAILED, proc_errors, PRTE_MSG_PRI);
prte_state.add_proc_state(PRTE_PROC_STATE_COMM_FAILED, proc_errors);

prte_state.add_proc_state(PRTE_PROC_STATE_ERROR, proc_errors, PRTE_ERROR_PRI);
prte_state.add_proc_state(PRTE_PROC_STATE_ERROR, proc_errors);

return PRTE_SUCCESS;
}
Expand Down
8 changes: 3 additions & 5 deletions src/mca/errmgr/prted/errmgr_prted.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ static void proc_errors(int fd, short args, void *cbdata);
static int init(void)
{
/* setup state machine to trap job errors */
prte_state.add_job_state(PRTE_JOB_STATE_ERROR, job_errors, PRTE_ERROR_PRI);
prte_state.add_job_state(PRTE_JOB_STATE_ERROR, job_errors);

/* set the lost connection state to run at MSG priority so
* we can process any last messages from the proc
*/
prte_state.add_proc_state(PRTE_PROC_STATE_COMM_FAILED, proc_errors, PRTE_MSG_PRI);
prte_state.add_proc_state(PRTE_PROC_STATE_COMM_FAILED, proc_errors);

/* setup state machine to trap proc errors */
prte_state.add_proc_state(PRTE_PROC_STATE_ERROR, proc_errors, PRTE_ERROR_PRI);
prte_state.add_proc_state(PRTE_PROC_STATE_ERROR, proc_errors);

return PRTE_SUCCESS;
}
Expand Down Expand Up @@ -229,7 +229,6 @@ static void prted_abort(int error_code, char *fmt, ...)
timer->tv.tv_sec = 5;
timer->tv.tv_usec = 0;
prte_event_evtimer_set(prte_event_base, timer->ev, wakeup, NULL);
prte_event_set_priority(timer->ev, PRTE_ERROR_PRI);
PMIX_POST_OBJECT(timer);
prte_event_evtimer_add(timer->ev, &timer->tv);
}
Expand Down Expand Up @@ -414,7 +413,6 @@ static void proc_errors(int fd, short args, void *cbdata)
t2->evb = prte_event_base;
prte_event_set(t2->evb, &t2->ev, -1, PRTE_EV_WRITE,
prte_odls_base_default_wait_local_proc, t2);
prte_event_set_priority(&t2->ev, PRTE_MSG_PRI);
prte_event_active(&t2->ev, PRTE_EV_WRITE, 1);
goto cleanup;
}
Expand Down
3 changes: 1 addition & 2 deletions src/mca/ess/base/ess_base_std_prted.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* Copyright (c) 2017 IBM Corporation. All rights reserved.
* Copyright (c) 2019 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -90,7 +90,6 @@ static prte_event_t *forward_signals_events = NULL;
static void setup_sighandler(int signal, prte_event_t *ev, prte_event_cbfunc_t cbfunc)
{
prte_event_signal_set(prte_event_base, ev, signal, cbfunc, ev);
prte_event_set_priority(ev, PRTE_ERROR_PRI);
prte_event_signal_add(ev, NULL);
}

Expand Down
4 changes: 2 additions & 2 deletions src/mca/filem/raw/filem_raw_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ static int raw_preposition_files(prte_job_t *jdata,
xfer->app_idx = fs->app_idx;
xfer->outbound = outbound;
pmix_list_append(&outbound->xfers, &xfer->super);
PRTE_PMIX_THREADSHIFT(xfer, prte_event_base, send_chunk, PRTE_MSG_PRI);
PRTE_PMIX_THREADSHIFT(xfer, prte_event_base, send_chunk);
PMIX_RELEASE(item);
}
PMIX_DESTRUCT(&fsets);
Expand Down Expand Up @@ -1054,7 +1054,7 @@ static void recv_files(int status, pmix_proc_t *sender, pmix_data_buffer_t *buff
}
free(tmp);
incoming->pending = true;
PRTE_PMIX_THREADSHIFT(incoming, prte_event_base, write_handler, PRTE_MSG_PRI);
PRTE_PMIX_THREADSHIFT(incoming, prte_event_base, write_handler);
}
/* create an output object for this data */
output = PMIX_NEW(prte_filem_raw_output_t);
Expand Down
1 change: 0 additions & 1 deletion src/mca/grpcomm/base/grpcomm_base_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ int prte_grpcomm_API_allgather(prte_pmix_mdx_caddy_t *cd)
/* must push this into the event library to ensure we can
* access framework-global data safely */
prte_event_set(prte_event_base, &cd->ev, -1, PRTE_EV_WRITE, allgather_stub, cd);
prte_event_set_priority(&cd->ev, PRTE_MSG_PRI);
PMIX_POST_OBJECT(cd);
prte_event_active(&cd->ev, PRTE_EV_WRITE, 1);
return PRTE_SUCCESS;
Expand Down
4 changes: 1 addition & 3 deletions src/mca/iof/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* Copyright (c) 2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -184,7 +184,6 @@ static inline bool prte_iof_base_fd_always_ready(int fd)
prte_event_set(prte_event_base, ep->wev->ev, ep->wev->fd, PRTE_EV_WRITE, wrthndlr, \
ep); \
} \
prte_event_set_priority(ep->wev->ev, PRTE_MSG_PRI); \
} \
*(snk) = ep; \
PMIX_POST_OBJECT(ep); \
Expand Down Expand Up @@ -234,7 +233,6 @@ static inline bool prte_iof_base_fd_always_ready(int fd)
} else { \
prte_event_set(prte_event_base, rev->ev, (fid), PRTE_EV_READ, (cbfunc), rev); \
} \
prte_event_set_priority(rev->ev, PRTE_MSG_PRI); \
if ((actv)) { \
PRTE_IOF_READ_ACTIVATE(rev) \
} \
Expand Down
3 changes: 1 addition & 2 deletions src/mca/odls/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* Copyright (c) 2011-2020 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved.
* Copyright (c) 2017-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -122,7 +122,6 @@ PRTE_EXPORT PMIX_CLASS_DECLARATION(prte_odls_launch_local_t);
ll->fork_local = (f); \
prte_event_set(prte_event_base, ll->ev, -1, PRTE_EV_WRITE, \
prte_odls_base_default_launch_local, ll); \
prte_event_set_priority(ll->ev, PRTE_SYS_PRI); \
prte_event_active(ll->ev, PRTE_EV_WRITE, 1); \
} while (0);

Expand Down
10 changes: 4 additions & 6 deletions src/mca/odls/base/odls_base_default_fns.c
Original file line number Diff line number Diff line change
Expand Up @@ -1397,7 +1397,7 @@ void prte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
/* set the waitpid callback here for thread protection and
* to ensure we can capture the callback on shortlived apps */
PRTE_FLAG_SET(child, PRTE_PROC_FLAG_ALIVE);
prte_wait_cb(child, prte_odls_base_default_wait_local_proc, evb, NULL);
prte_wait_cb(child, prte_odls_base_default_wait_local_proc, prte_event_base, NULL);

/* dispatch this child to the next available launch thread */
cd = PMIX_NEW(prte_odls_spawn_caddy_t);
Expand All @@ -1411,8 +1411,8 @@ void prte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
cd->opts.usepty = PRTE_ENABLE_PTY_SUPPORT;

/* do we want to setup stdin? */
if (jobdat->stdin_target == PMIX_RANK_WILDCARD
|| child->name.rank == jobdat->stdin_target) {
if (jobdat->stdin_target == PMIX_RANK_WILDCARD ||
child->name.rank == jobdat->stdin_target) {
cd->opts.connect_stdin = true;
} else {
cd->opts.connect_stdin = false;
Expand All @@ -1439,7 +1439,6 @@ void prte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), PRTE_NAME_PRINT(&child->name),
prte_odls_globals.next_base);
prte_event_set(evb, &cd->ev, -1, PRTE_EV_WRITE, prte_odls_base_spawn_proc, cd);
prte_event_set_priority(&cd->ev, PRTE_MSG_PRI);
prte_event_active(&cd->ev, PRTE_EV_WRITE, 1);
}
}
Expand Down Expand Up @@ -2054,13 +2053,12 @@ int prte_odls_base_default_restart_proc(prte_proc_t *child,
prte_odls_globals.next_base = 0;
}
evb = prte_odls_globals.ev_bases[prte_odls_globals.next_base];
prte_wait_cb(child, prte_odls_base_default_wait_local_proc, evb, NULL);
prte_wait_cb(child, prte_odls_base_default_wait_local_proc, prte_event_base, NULL);

PMIX_OUTPUT_VERBOSE((5, prte_odls_base_framework.framework_output, "%s restarting app %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), app->app));

prte_event_set(evb, &cd->ev, -1, PRTE_EV_WRITE, prte_odls_base_spawn_proc, cd);
prte_event_set_priority(&cd->ev, PRTE_MSG_PRI);
prte_event_active(&cd->ev, PRTE_EV_WRITE, 1);

CLEANUP:
Expand Down
13 changes: 10 additions & 3 deletions src/mca/odls/base/odls_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* Copyright (c) 2014-2019 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -194,6 +194,8 @@ void prte_odls_base_start_threads(prte_job_t *jdata)
}
prte_odls_globals.ev_bases = prte_event_base_ptr;
} else {
pmix_output_verbose(5, prte_odls_base_framework.framework_output,
"START %d LAUNCH THREADS", prte_odls_globals.num_threads);
prte_odls_globals.ev_bases = (prte_event_base_t **) malloc(prte_odls_globals.num_threads
* sizeof(prte_event_base_t *));
for (i = 0; i < prte_odls_globals.num_threads; i++) {
Expand Down Expand Up @@ -337,7 +339,10 @@ static void launch_local_dest(prte_odls_launch_local_t *ptr)
{
prte_event_free(ptr->ev);
}
PMIX_CLASS_INSTANCE(prte_odls_launch_local_t, pmix_object_t, launch_local_const, launch_local_dest);
PMIX_CLASS_INSTANCE(prte_odls_launch_local_t,
pmix_object_t,
launch_local_const,
launch_local_dest);

static void sccon(prte_odls_spawn_caddy_t *p)
{
Expand All @@ -362,4 +367,6 @@ static void scdes(prte_odls_spawn_caddy_t *p)
PMIX_ARGV_FREE_COMPAT(p->env);
}
}
PMIX_CLASS_INSTANCE(prte_odls_spawn_caddy_t, pmix_object_t, sccon, scdes);
PMIX_CLASS_INSTANCE(prte_odls_spawn_caddy_t,
pmix_object_t,
sccon, scdes);
2 changes: 1 addition & 1 deletion src/mca/oob/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ PRTE_EXPORT void prte_oob_base_send_nb(int fd, short args, void *cbdata);
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), __FILE__, __LINE__); \
prte_oob_send_cd = PMIX_NEW(prte_oob_send_t); \
prte_oob_send_cd->msg = (m); \
PRTE_PMIX_THREADSHIFT(prte_oob_send_cd, prte_event_base, prte_oob_base_send_nb, PRTE_MSG_PRI); \
PRTE_PMIX_THREADSHIFT(prte_oob_send_cd, prte_event_base, prte_oob_base_send_nb); \
} while (0)

PRTE_EXPORT prte_oob_base_peer_t *prte_oob_base_get_peer(const pmix_proc_t *pr);
Expand Down
4 changes: 1 addition & 3 deletions src/mca/oob/tcp/oob_tcp_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* Copyright (c) 2016 Mellanox Technologies Ltd. All rights reserved.
* Copyright (c) 2020 Amazon.com, Inc. or its affiliates. All Rights
* reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -606,15 +606,13 @@ static void tcp_peer_event_init(prte_oob_tcp_peer_t *peer)
assert(!peer->send_ev_active && !peer->recv_ev_active);
prte_event_set(prte_event_base, &peer->recv_event, peer->sd, PRTE_EV_READ | PRTE_EV_PERSIST,
prte_oob_tcp_recv_handler, peer);
prte_event_set_priority(&peer->recv_event, PRTE_MSG_PRI);
if (peer->recv_ev_active) {
prte_event_del(&peer->recv_event);
peer->recv_ev_active = false;
}

prte_event_set(prte_event_base, &peer->send_event, peer->sd,
PRTE_EV_WRITE | PRTE_EV_PERSIST, prte_oob_tcp_send_handler, peer);
prte_event_set_priority(&peer->send_event, PRTE_MSG_PRI);
if (peer->send_ev_active) {
prte_event_del(&peer->send_event);
peer->send_ev_active = false;
Expand Down
3 changes: 1 addition & 2 deletions src/mca/oob/tcp/oob_tcp_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@ PMIX_CLASS_DECLARATION(prte_oob_tcp_conn_op_t);
__FILE__, __LINE__, PRTE_NAME_PRINT((&(p)->name))); \
cop = PMIX_NEW(prte_oob_tcp_conn_op_t); \
cop->peer = (p); \
PRTE_PMIX_THREADSHIFT(cop, prte_event_base, (cbfunc), PRTE_MSG_PRI); \
PRTE_PMIX_THREADSHIFT(cop, prte_event_base, (cbfunc)); \
} while (0);

#define PRTE_ACTIVATE_TCP_ACCEPT_STATE(s, a, cbfunc) \
do { \
prte_oob_tcp_conn_op_t *cop; \
cop = PMIX_NEW(prte_oob_tcp_conn_op_t); \
prte_event_set(prte_event_base, &cop->ev, s, PRTE_EV_READ, (cbfunc), cop); \
prte_event_set_priority(&cop->ev, PRTE_MSG_PRI); \
PMIX_POST_OBJECT(cop); \
prte_event_add(&cop->ev, 0); \
} while (0);
Expand Down
5 changes: 1 addition & 4 deletions src/mca/oob/tcp/oob_tcp_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* Copyright (c) 2013-2020 Intel, Inc. All rights reserved.
* Copyright (c) 2015-2019 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
* Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -160,7 +160,6 @@ int prte_oob_tcp_start_listening(void)
listener->ev_active = true;
prte_event_set(prte_event_base, &listener->event, listener->sd,
PRTE_EV_READ | PRTE_EV_PERSIST, connection_event_handler, 0);
prte_event_set_priority(&listener->event, PRTE_MSG_PRI);
PMIX_POST_OBJECT(listener);
prte_event_add(&listener->event, 0);
}
Expand Down Expand Up @@ -643,7 +642,6 @@ static void *listen_thread(pmix_object_t *obj)
pending_connection = PMIX_NEW(prte_oob_tcp_pending_connection_t);
prte_event_set(prte_event_base, &pending_connection->ev, -1, PRTE_EV_WRITE,
connection_handler, pending_connection);
prte_event_set_priority(&pending_connection->ev, PRTE_MSG_PRI);
pending_connection->fd = accept(sd, (struct sockaddr *) &(pending_connection->addr),
&addrlen);

Expand Down Expand Up @@ -737,7 +735,6 @@ static void *listen_thread(pmix_object_t *obj)
PRTE_EV_READ|PRTE_EV_PERSIST,
connection_event_handler,
0);
prte_event_set_priority(listener->event, PRTE_MSG_PRI);
prte_event_add(listener->event, 0);
}
#endif
Expand Down
12 changes: 6 additions & 6 deletions src/mca/oob/tcp/oob_tcp_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ typedef struct {
} prte_oob_tcp_peer_op_t;
PMIX_CLASS_DECLARATION(prte_oob_tcp_peer_op_t);

#define PRTE_ACTIVATE_TCP_CMP_OP(p, cbfunc) \
do { \
prte_oob_tcp_peer_op_t *pop; \
pop = PMIX_NEW(prte_oob_tcp_peer_op_t); \
PMIX_XFER_PROCID(&pop->peer, &(p)->name); \
PRTE_PMIX_THREADSHIFT(pop, prte_event_base, (cbfunc), PRTE_MSG_PRI); \
#define PRTE_ACTIVATE_TCP_CMP_OP(p, cbfunc) \
do { \
prte_oob_tcp_peer_op_t *pop; \
pop = PMIX_NEW(prte_oob_tcp_peer_op_t); \
PMIX_XFER_PROCID(&pop->peer, &(p)->name); \
PRTE_PMIX_THREADSHIFT(pop, prte_event_base, (cbfunc)); \
} while (0);

#endif /* _MCA_OOB_TCP_PEER_H_ */
Loading