Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic bits #2010

Merged
merged 2 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/core/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ extern int nni_atomic_get(nni_atomic_int *);
extern void nni_atomic_set(nni_atomic_int *, int);
extern int nni_atomic_swap(nni_atomic_int *, int);

// These are used with acquire release semantics.
// Used for pollers in the POSIX code. They return the previous value.
extern int nni_atomic_or(nni_atomic_int *, int);
extern int nni_atomic_and(nni_atomic_int *, int);

// These versions are tuned for use as reference
// counters. Relaxed order when possible to increase
// reference count, acquire-release order for dropping
Expand Down
51 changes: 50 additions & 1 deletion src/platform/posix/posix_atomic.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -72,6 +72,21 @@ nni_atomic_sub(nni_atomic_int *v, int bump)
(void) atomic_fetch_sub_explicit(&v->v, bump, memory_order_relaxed);
}

// atomic or is used to set events in the posix pollers.. no other use at
// present. We use acquire release semantics.
int
nni_atomic_or(nni_atomic_int *v, int mask)
{
return (atomic_fetch_or_explicit(&v->v, mask, memory_order_acq_rel));
}

// and is used in the pollers to clear the events that we have processed
int
nni_atomic_and(nni_atomic_int *v, int mask)
{
return (atomic_fetch_and_explicit(&v->v, mask, memory_order_acq_rel));
}

int
nni_atomic_get(nni_atomic_int *v)
{
Expand Down Expand Up @@ -338,6 +353,18 @@ nni_atomic_dec_nv(nni_atomic_int *v)
return (__atomic_sub_fetch(&v->v, 1, __ATOMIC_SEQ_CST));
}

int
nni_atomic_or(nni_atomic_int *v, int mask)
{
return (__atomic_fetch_or(&v->v, mask, __ATOMIC_ACQ_REL));
}

int
nni_atomic_and(nni_atomic_int *v, int mask)
{
return (__atomic_fetch_and(&v->v, mask, __ATOMIC_ACQ_REL));
}

bool
nni_atomic_cas(nni_atomic_int *v, int comp, int new)
{
Expand Down Expand Up @@ -571,6 +598,28 @@ nni_atomic_swap(nni_atomic_int *v, int i)
return (rv);
}

int
nni_atomic_or(nni_atomic_int *v, int mask)
{
int rv;
pthread_mutex_lock(&plat_atomic_lock);
rv = v->v;
v->v |= mask;
pthread_mutex_unlock(&plat_atomic_lock);
return (rv);
}

int
nni_atomic_and(nni_atomic_int *v, int mask)
{
int rv;
pthread_mutex_lock(&plat_atomic_lock);
rv = v->v;
v->v &= mask;
pthread_mutex_unlock(&plat_atomic_lock);
return (rv);
}

void
nni_atomic_inc(nni_atomic_int *v)
{
Expand Down
32 changes: 8 additions & 24 deletions src/platform/posix/posix_pollq_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,15 @@ nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);

nni_mtx_init(&pfd->mtx);
nni_atomic_init(&pfd->events);
nni_atomic_flag_reset(&pfd->stopped);
nni_atomic_flag_reset(&pfd->closing);

pfd->pq = pq;
pfd->fd = fd;
pfd->cb = cb;
pfd->arg = arg;
pfd->events = 0;
pfd->closed = false;
pfd->added = false;
pfd->pq = pq;
pfd->fd = fd;
pfd->cb = cb;
pfd->arg = arg;
pfd->added = false;

NNI_LIST_NODE_INIT(&pfd->node);
}
Expand All @@ -83,15 +81,8 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
// forth. This turns out to be true both for Linux and the illumos
// epoll implementation.

nni_mtx_lock(&pfd->mtx);
struct epoll_event ev;

if (pfd->closed) {
nni_mtx_unlock(&pfd->mtx);
return (NNG_ECLOSED);
}
pfd->events |= events;
events = pfd->events;
events |= nni_atomic_or(&pfd->events, (int) events);

memset(&ev, 0, sizeof(ev));
ev.events = events | EPOLLONESHOT;
Expand All @@ -110,7 +101,6 @@ nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
if (rv != 0) {
rv = nni_plat_errno(errno);
}
nni_mtx_unlock(&pfd->mtx);
return (rv);
}

Expand All @@ -131,13 +121,10 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
return;
}

nni_mtx_lock(&pfd->mtx);
struct epoll_event ev; // Not actually used.

(void) shutdown(pfd->fd, SHUT_RDWR);
pfd->closed = true;
(void) epoll_ctl(pq->epfd, EPOLL_CTL_DEL, pfd->fd, &ev);
nni_mtx_unlock(&pfd->mtx);
}

void
Expand Down Expand Up @@ -191,7 +178,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)
}

(void) close(pfd->fd);
nni_mtx_fini(&pfd->mtx);
}

static void
Expand Down Expand Up @@ -239,9 +225,7 @@ nni_posix_poll_thr(void *arg)
((unsigned) (EPOLLIN | EPOLLOUT |
EPOLLERR));

nni_mtx_lock(&pfd->mtx);
pfd->events &= ~mask;
nni_mtx_unlock(&pfd->mtx);
nni_atomic_and(&pfd->events, (int) ~mask);

// Execute the callback with lock released
pfd->cb(pfd->arg, mask);
Expand Down
4 changes: 1 addition & 3 deletions src/platform/posix/posix_pollq_epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ struct nni_posix_pfd {
int fd;
nni_posix_pfd_cb cb;
void *arg;
unsigned events;
nni_mtx mtx;
nni_atomic_int events;
bool added;
bool closed;
nni_atomic_flag stopped;
nni_atomic_flag closing;
};
Expand Down
23 changes: 8 additions & 15 deletions src/platform/posix/posix_pollq_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,13 @@ nni_posix_pfd_init(nni_posix_pfd *pf, int fd, nni_posix_pfd_cb cb, void *arg)

pq = &nni_posix_global_pollq;

nni_mtx_init(&pf->mtx);
nni_atomic_init(&pf->events);
nni_cv_init(&pf->cv, &pq->mtx);

pf->pq = pq;
pf->fd = fd;
pf->cb = cb;
pf->arg = arg;
pf->events = 0;
pf->pq = pq;
pf->fd = fd;
pf->cb = cb;
pf->arg = arg;

nni_atomic_flag_reset(&pf->closed);
nni_atomic_flag_reset(&pf->stopped);
Expand Down Expand Up @@ -150,7 +149,6 @@ nni_posix_pfd_fini(nni_posix_pfd *pf)

(void) close(pf->fd);
nni_cv_fini(&pf->cv);
nni_mtx_fini(&pf->mtx);
}

int
Expand All @@ -167,16 +165,13 @@ nni_posix_pfd_arm(nni_posix_pfd *pf, unsigned events)
unsigned flags = EV_ENABLE | EV_DISPATCH | EV_CLEAR;
nni_posix_pollq *pq = pf->pq;

nni_mtx_lock(&pf->mtx);
pf->events |= events;
events = pf->events;
nni_mtx_unlock(&pf->mtx);

if (events == 0) {
// No events, and kqueue is oneshot, so nothing to do.
return (0);
}

nni_atomic_or(&pf->events, (int) events);

if (events & NNI_POLL_IN) {
EV_SET(&ev[nev++], pf->fd, EVFILT_READ, flags, 0, 0, pf);
}
Expand Down Expand Up @@ -244,9 +239,7 @@ nni_posix_poll_thr(void *arg)
revents |= NNI_POLL_HUP;
}

nni_mtx_lock(&pf->mtx);
pf->events &= ~(revents);
nni_mtx_unlock(&pf->mtx);
nni_atomic_and(&pf->events, (int) (~revents));

pf->cb(pf->arg, revents);
}
Expand Down
3 changes: 1 addition & 2 deletions src/platform/posix/posix_pollq_kqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ struct nni_posix_pfd {
nni_atomic_flag closed;
nni_atomic_flag stopped;
bool reaped;
unsigned events;
nni_atomic_int events;
nni_cv cv; // signaled when poller has unregistered
nni_mtx mtx;
};

#define NNI_POLL_IN (0x0001)
Expand Down
22 changes: 7 additions & 15 deletions src/platform/posix/posix_pollq_poll.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@ nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)

NNI_LIST_NODE_INIT(&pfd->node);
NNI_LIST_NODE_INIT(&pfd->reap);
nni_mtx_init(&pfd->mtx);
pfd->fd = fd;
pfd->events = 0;
pfd->cb = cb;
pfd->arg = arg;
pfd->pq = pq;
pfd->reaped = false;
nni_atomic_init(&pfd->events);
nni_mtx_lock(&pq->mtx);
nni_list_append(&pq->addq, pfd);
nni_mtx_unlock(&pq->mtx);
Expand Down Expand Up @@ -137,17 +136,14 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)

// We're exclusive now.
(void) close(pfd->fd);
nni_mtx_fini(&pfd->mtx);
}

int
nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
{
nni_posix_pollq *pq = pfd->pq;

nni_mtx_lock(&pfd->mtx);
pfd->events |= events;
nni_mtx_unlock(&pfd->mtx);
(void) nni_atomic_or(&pfd->events, (int) events);

// If we're running on the callback, then don't bother to kick
// the pollq again. This is necessary because we cannot modify
Expand All @@ -166,7 +162,7 @@ nni_posix_poll_thr(void *arg)
nni_posix_pfd **pfds = pq->pfds;
nni_posix_pfd *pfd;
int nfds;
unsigned events;
int events;

for (;;) {

Expand All @@ -181,13 +177,11 @@ nni_posix_poll_thr(void *arg)
// Set up the poll list.
NNI_LIST_FOREACH (&pq->pollq, pfd) {

nni_mtx_lock(&pfd->mtx);
events = pfd->events;
nni_mtx_unlock(&pfd->mtx);
events = nni_atomic_get(&pfd->events);

if (events != 0) {
fds[nfds].fd = pfd->fd;
fds[nfds].events = events;
fds[nfds].events = (unsigned) events;
fds[nfds].revents = 0;
nfds++;
}
Expand All @@ -207,7 +201,7 @@ nni_posix_poll_thr(void *arg)
bool stop = false;
for (int i = 0; i < nfds; i++) {
int fd = fds[i].fd;
events = fds[i].revents;
events = (int) fds[i].revents;
pfd = pfds[fd];
if (events == 0) {
continue;
Expand All @@ -224,9 +218,7 @@ nni_posix_poll_thr(void *arg)
// to finish reading.
events &= ~POLLHUP;
}
nni_mtx_lock(&pfd->mtx);
pfd->events &= ~events;
nni_mtx_unlock(&pfd->mtx);
(void) nni_atomic_and(&pfd->events, ~events);

pfd->cb(pfd->arg, events);
}
Expand Down
3 changes: 1 addition & 2 deletions src/platform/posix/posix_pollq_poll.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ struct nni_posix_pfd {
int fd;
nni_list_node node;
nni_list_node reap;
nni_mtx mtx;
unsigned events;
nni_atomic_int events;
nni_posix_pfd_cb cb;
void *arg;
bool reaped;
Expand Down
12 changes: 12 additions & 0 deletions src/platform/windows/win_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,18 @@ nni_atomic_init(nni_atomic_int *v)
InterlockedExchange(&v->v, 0);
}

int
nni_atomic_or(nni_atomic_int *v, int mask)
{
return (InterlockedOr(&v->v, mask));
}

int
nni_atomic_and(nni_atomic_int *v, int mask)
{
return (InterlockedAnd(&v->v, mask));
}

void
nni_atomic_inc(nni_atomic_int *v)
{
Expand Down
Loading