From 4104962dfc897f51f0b6e7bfabd2e9766085400e Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 22 Dec 2024 17:17:38 -0800 Subject: [PATCH 1/2] pollers: use atomic bit masking operations to eliminate lockss This is done for kqueue and poll. Others coming soon. --- src/core/platform.h | 5 +++ src/platform/posix/posix_atomic.c | 51 ++++++++++++++++++++++++- src/platform/posix/posix_pollq_kqueue.c | 23 ++++------- src/platform/posix/posix_pollq_kqueue.h | 3 +- src/platform/posix/posix_pollq_poll.c | 22 ++++------- src/platform/posix/posix_pollq_poll.h | 3 +- src/platform/windows/win_thread.c | 12 ++++++ 7 files changed, 84 insertions(+), 35 deletions(-) diff --git a/src/core/platform.h b/src/core/platform.h index 2f5b5c172..8b7aabbc7 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -217,6 +217,11 @@ extern int nni_atomic_get(nni_atomic_int *); extern void nni_atomic_set(nni_atomic_int *, int); extern int nni_atomic_swap(nni_atomic_int *, int); +// These are used with acquire release semantics. +// Used for pollers in the POSIX code. They return the previous value. +extern int nni_atomic_or(nni_atomic_int *, int); +extern int nni_atomic_and(nni_atomic_int *, int); + // These versions are tuned for use as reference // counters. Relaxed order when possible to increase // reference count, acquire-release order for dropping diff --git a/src/platform/posix/posix_atomic.c b/src/platform/posix/posix_atomic.c index 69c2a2c1c..ad608d45a 100644 --- a/src/platform/posix/posix_atomic.c +++ b/src/platform/posix/posix_atomic.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -72,6 +72,21 @@ nni_atomic_sub(nni_atomic_int *v, int bump) (void) atomic_fetch_sub_explicit(&v->v, bump, memory_order_relaxed); } +// atomic or is used to set events in the posix pollers.. no other use at +// present. We use acquire release semantics. +int +nni_atomic_or(nni_atomic_int *v, int mask) +{ + return (atomic_fetch_or_explicit(&v->v, mask, memory_order_acq_rel)); +} + +// and is used in the pollers to clear the events that we have processed +int +nni_atomic_and(nni_atomic_int *v, int mask) +{ + return (atomic_fetch_and_explicit(&v->v, mask, memory_order_acq_rel)); +} + int nni_atomic_get(nni_atomic_int *v) { @@ -338,6 +353,18 @@ nni_atomic_dec_nv(nni_atomic_int *v) return (__atomic_sub_fetch(&v->v, 1, __ATOMIC_SEQ_CST)); } +int +nni_atomic_or(nni_atomic_int *v, int mask) +{ + return (__atomic_fetch_or(&v->v, mask, __ATOMIC_ACQ_REL)); +} + +int +nni_atomic_and(nni_atomic_int *v, int mask) +{ + return (__atomic_fetch_and(&v->v, mask, __ATOMIC_ACQ_REL)); +} + bool nni_atomic_cas(nni_atomic_int *v, int comp, int new) { @@ -571,6 +598,28 @@ nni_atomic_swap(nni_atomic_int *v, int i) return (rv); } +int +nni_atomic_or(nni_atomic_int *v, int mask) +{ + int rv; + pthread_mutex_lock(&plat_atomic_lock); + rv = v->v; + v->v |= mask; + pthread_mutex_unlock(&plat_atomic_lock); + return (rv); +} + +int +nni_atomic_and(nni_atomic_int *v, int mask) +{ + int rv; + pthread_mutex_lock(&plat_atomic_lock); + rv = v->v; + v->v &= mask; + pthread_mutex_unlock(&plat_atomic_lock); + return (rv); +} + void nni_atomic_inc(nni_atomic_int *v) { diff --git a/src/platform/posix/posix_pollq_kqueue.c b/src/platform/posix/posix_pollq_kqueue.c index c754535a2..f1f742dbd 100644 --- a/src/platform/posix/posix_pollq_kqueue.c +++ b/src/platform/posix/posix_pollq_kqueue.c @@ -60,14 +60,13 @@ nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg) pq = &nni_posix_global_pollq; - nni_mtx_init(&pf->mtx); + nni_atomic_init(&pf->events); nni_cv_init(&pf->cv, &pq->mtx); - pf->pq = pq; - pf->fd = fd; - pf->cb = cb; - pf->arg = arg; - pf->events = 0; + pf->pq = pq; + pf->fd = fd; + pf->cb = cb; + pf->arg = arg; nni_atomic_flag_reset(&pf->closed); nni_atomic_flag_reset(&pf->stopped); @@ -150,7 +149,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pf) (void) close(pf->fd); nni_cv_fini(&pf->cv); - nni_mtx_fini(&pf->mtx); } int @@ -167,16 +165,13 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events) unsigned flags = EV_ENABLE | EV_DISPATCH | EV_CLEAR; nni_posix_pollq *pq = pf->pq; - nni_mtx_lock(&pf->mtx); - pf->events |= events; - events = pf->events; - nni_mtx_unlock(&pf->mtx); - if (events == 0) { // No events, and kqueue is oneshot, so nothing to do. return (0); } + nni_atomic_or(&pf->events, (int) events); + if (events & NNI_POLL_IN) { EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf); } @@ -244,9 +239,7 @@ nni_posix_poll_thr(void *arg) revents |= NNI_POLL_HUP; } - nni_mtx_lock(&pf->mtx); - pf->events &= ~(revents); - nni_mtx_unlock(&pf->mtx); + nni_atomic_and(&pf->events, (int) (~revents)); pf->cb(pf->arg, revents); } diff --git a/src/platform/posix/posix_pollq_kqueue.h b/src/platform/posix/posix_pollq_kqueue.h index a153b3639..69cf6dae7 100644 --- a/src/platform/posix/posix_pollq_kqueue.h +++ b/src/platform/posix/posix_pollq_kqueue.h @@ -22,9 +22,8 @@ struct nni_posix_pfd { nni_atomic_flag closed; nni_atomic_flag stopped; bool reaped; - unsigned events; + nni_atomic_int events; nni_cv cv; // signaled when poller has unregistered - nni_mtx mtx; }; #define NNI_POLL_IN (0x0001) diff --git a/src/platform/posix/posix_pollq_poll.c b/src/platform/posix/posix_pollq_poll.c index e5b095ffb..76a9d3bc8 100644 --- a/src/platform/posix/posix_pollq_poll.c +++ b/src/platform/posix/posix_pollq_poll.c @@ -70,13 +70,12 @@ nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg) NNI_LIST_NODE_INIT(&pfd->node); NNI_LIST_NODE_INIT(&pfd->reap); - nni_mtx_init(&pfd->mtx); pfd->fd = fd; - pfd->events = 0; pfd->cb = cb; pfd->arg = arg; pfd->pq = pq; pfd->reaped = false; + nni_atomic_init(&pfd->events); nni_mtx_lock(&pq->mtx); nni_list_append(&pq->addq, pfd); nni_mtx_unlock(&pq->mtx); @@ -137,7 +136,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd) // We're exclusive now. (void) close(pfd->fd); - nni_mtx_fini(&pfd->mtx); } int @@ -145,9 +143,7 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) { nni_posix_pollq *pq = pfd->pq; - nni_mtx_lock(&pfd->mtx); - pfd->events |= events; - nni_mtx_unlock(&pfd->mtx); + (void) nni_atomic_or(&pfd->events, (int) events); // If we're running on the callback, then don't bother to kick // the pollq again. This is necessary because we cannot modify @@ -166,7 +162,7 @@ nni_posix_poll_thr(void *arg) nni_posix_pfd **pfds = pq->pfds; nni_posix_pfd *pfd; int nfds; - unsigned events; + int events; for (;;) { @@ -181,13 +177,11 @@ nni_posix_poll_thr(void *arg) // Set up the poll list. NNI_LIST_FOREACH (&pq->pollq, pfd) { - nni_mtx_lock(&pfd->mtx); - events = pfd->events; - nni_mtx_unlock(&pfd->mtx); + events = nni_atomic_get(&pfd->events); if (events != 0) { fds[nfds].fd = pfd->fd; - fds[nfds].events = events; + fds[nfds].events = (unsigned) events; fds[nfds].revents = 0; nfds++; } @@ -207,7 +201,7 @@ nni_posix_poll_thr(void *arg) bool stop = false; for (int i = 0; i < nfds; i++) { int fd = fds[i].fd; - events = fds[i].revents; + events = (int) fds[i].revents; pfd = pfds[fd]; if (events == 0) { continue; @@ -224,9 +218,7 @@ nni_posix_poll_thr(void *arg) // to finish reading. events &= ~POLLHUP; } - nni_mtx_lock(&pfd->mtx); - pfd->events &= ~events; - nni_mtx_unlock(&pfd->mtx); + (void) nni_atomic_and(&pfd->events, ~events); pfd->cb(pfd->arg, events); } diff --git a/src/platform/posix/posix_pollq_poll.h b/src/platform/posix/posix_pollq_poll.h index 824676428..b96d24514 100644 --- a/src/platform/posix/posix_pollq_poll.h +++ b/src/platform/posix/posix_pollq_poll.h @@ -20,8 +20,7 @@ struct nni_posix_pfd { int fd; nni_list_node node; nni_list_node reap; - nni_mtx mtx; - unsigned events; + nni_atomic_int events; nni_posix_pfd_cb cb; void *arg; bool reaped; diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index 9e74056dd..c76209c1c 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -325,6 +325,18 @@ nni_atomic_init(nni_atomic_int *v) InterlockedExchange(&v->v, 0); } +int +nni_atomic_or(nni_atomic_int *v, int mask) +{ + return (InterlockedOr(&v->v, mask)); +} + +int +nni_atomic_and(nni_atomic_int *v, int mask) +{ + return (InterlockedAnd(&v->v, mask)); +} + void nni_atomic_inc(nni_atomic_int *v) { From be379a29b849b1d3e759e14d97ad9b977ed3da4f Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 22 Dec 2024 17:26:00 -0800 Subject: [PATCH 2/2] epoll: use atomic event mask --- src/platform/posix/posix_pollq_epoll.c | 32 +++++++------------------- src/platform/posix/posix_pollq_epoll.h | 4 +--- 2 files changed, 9 insertions(+), 27 deletions(-) diff --git a/src/platform/posix/posix_pollq_epoll.c b/src/platform/posix/posix_pollq_epoll.c index 675d0b22a..90d109268 100644 --- a/src/platform/posix/posix_pollq_epoll.c +++ b/src/platform/posix/posix_pollq_epoll.c @@ -57,17 +57,15 @@ nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg) (void) fcntl(fd, F_SETFD, FD_CLOEXEC); (void) fcntl(fd, F_SETFL, O_NONBLOCK); - nni_mtx_init(&pfd->mtx); + nni_atomic_init(&pfd->events); nni_atomic_flag_reset(&pfd->stopped); nni_atomic_flag_reset(&pfd->closing); - pfd->pq = pq; - pfd->fd = fd; - pfd->cb = cb; - pfd->arg = arg; - pfd->events = 0; - pfd->closed = false; - pfd->added = false; + pfd->pq = pq; + pfd->fd = fd; + pfd->cb = cb; + pfd->arg = arg; + pfd->added = false; NNI_LIST_NODE_INIT(&pfd->node); } @@ -83,15 +81,8 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) // forth. This turns out to be true both for Linux and the illumos // epoll implementation. - nni_mtx_lock(&pfd->mtx); struct epoll_event ev; - - if (pfd->closed) { - nni_mtx_unlock(&pfd->mtx); - return (NNG_ECLOSED); - } - pfd->events |= events; - events = pfd->events; + events |= nni_atomic_or(&pfd->events, (int) events); memset(&ev, 0, sizeof(ev)); ev.events = events | EPOLLONESHOT; @@ -110,7 +101,6 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events) if (rv != 0) { rv = nni_plat_errno(errno); } - nni_mtx_unlock(&pfd->mtx); return (rv); } @@ -131,13 +121,10 @@ nni_posix_pfd_close(nni_posix_pfd *pfd) return; } - nni_mtx_lock(&pfd->mtx); struct epoll_event ev; // Not actually used. (void) shutdown(pfd->fd, SHUT_RDWR); - pfd->closed = true; (void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev); - nni_mtx_unlock(&pfd->mtx); } void @@ -191,7 +178,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd) } (void) close(pfd->fd); - nni_mtx_fini(&pfd->mtx); } static void @@ -239,9 +225,7 @@ nni_posix_poll_thr(void *arg) ((unsigned) (EPOLLIN | EPOLLOUT | EPOLLERR)); - nni_mtx_lock(&pfd->mtx); - pfd->events &= ~mask; - nni_mtx_unlock(&pfd->mtx); + nni_atomic_and(&pfd->events, (int) ~mask); // Execute the callback with lock released pfd->cb(pfd->arg, mask); diff --git a/src/platform/posix/posix_pollq_epoll.h b/src/platform/posix/posix_pollq_epoll.h index 1ce7e3cb4..a6ef5ad84 100644 --- a/src/platform/posix/posix_pollq_epoll.h +++ b/src/platform/posix/posix_pollq_epoll.h @@ -21,10 +21,8 @@ struct nni_posix_pfd { int fd; nni_posix_pfd_cb cb; void *arg; - unsigned events; - nni_mtx mtx; + nni_atomic_int events; bool added; - bool closed; nni_atomic_flag stopped; nni_atomic_flag closing; };