From bff26063438c93079acfd8dd929de14000b5fb58 Mon Sep 17 00:00:00 2001 From: Silvio Traversaro Date: Fri, 18 Oct 2024 08:35:04 +0200 Subject: [PATCH] Add general mutex for ShmemInputStreamImpl To avoid failures like https://github.com/robotology/icub-tech-support/issues/1937 --- src/carriers/shmem_carrier/ShmemInputStream.cpp | 10 ++++++++++ src/carriers/shmem_carrier/ShmemInputStream.h | 3 +++ 2 files changed, 13 insertions(+) diff --git a/src/carriers/shmem_carrier/ShmemInputStream.cpp b/src/carriers/shmem_carrier/ShmemInputStream.cpp index 7ea612ffeda..fb427f5d8f8 100644 --- a/src/carriers/shmem_carrier/ShmemInputStream.cpp +++ b/src/carriers/shmem_carrier/ShmemInputStream.cpp @@ -43,6 +43,8 @@ bool ShmemInputStreamImpl::isOk() const bool ShmemInputStreamImpl::open(int port, ACE_SOCK_Stream* pSock, int size) { + std::lock_guard l_guard(m_generalMutex); + m_pSock = pSock; m_pAccessMutex = m_pWaitDataMutex = nullptr; @@ -102,6 +104,8 @@ bool ShmemInputStreamImpl::open(int port, ACE_SOCK_Stream* pSock, int size) bool ShmemInputStreamImpl::Resize() { + std::lock_guard l_guard(m_generalMutex); + ++m_ResizeNum; ACE_Shared_Memory* pNewMap; @@ -155,6 +159,8 @@ bool ShmemInputStreamImpl::Resize() int ShmemInputStreamImpl::read(char* data, int len) { + std::lock_guard l_guard(m_generalMutex); + m_pAccessMutex->acquire(); if (m_pHeader->close) { @@ -193,6 +199,8 @@ int ShmemInputStreamImpl::read(char* data, int len) yarp::conf::ssize_t ShmemInputStreamImpl::read(yarp::os::Bytes& b) { + std::lock_guard l_guard(m_generalMutex); + m_ReadSerializerMutex.lock(); if (!m_bOpen) { @@ -230,6 +238,8 @@ yarp::conf::ssize_t ShmemInputStreamImpl::read(yarp::os::Bytes& b) void ShmemInputStreamImpl::close() { + std::lock_guard l_guard(m_generalMutex); + if (!m_bOpen) { return; } diff --git a/src/carriers/shmem_carrier/ShmemInputStream.h b/src/carriers/shmem_carrier/ShmemInputStream.h index 96099ec377b..d7b4aa69580 100644 --- a/src/carriers/shmem_carrier/ShmemInputStream.h +++ b/src/carriers/shmem_carrier/ShmemInputStream.h @@ -63,6 +63,9 @@ class ShmemInputStreamImpl std::mutex m_ReadSerializerMutex; + // Mutex for the whole class, to avoid concurrent close and read + std::recursive_mutex m_generalMutex; + ACE_Shared_Memory* m_pMap; char* m_pData; ShmemHeader_t* m_pHeader;