From 2c98308b4563d0c58ab016708b835bb7fce4a9ce Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Fri, 10 May 2024 17:56:07 -0700 Subject: [PATCH] Clean up legacy code from Buffer::reallocate and reduce window where Buffer is invalid (#9755) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/9755 When running Velox with the save input on expression setting enabled, I'm occasionally seeing SIGSEGVs writing out the Vectors when the exception was thrown while resizing the offset or length Buffer in a MapVector or an ArrayVector, e.g. because of an OOM. When this happens in Buffer::reallocate it can leave the Buffer in an invalid state before the exception is caught (when the exception message is being generated) triggering the crash. While I doubt we can 100% guarantee VectorSaver won't trigger a crash we can at least fix this case. Instead of detaching buffer before reallocating it, if we detach it after we still avoid a double free and the buffer is (almost) always valid. There are windows after the old buffer is freed before MemoryPool::reallocate returns, and while setting up newBuffer where buffer would still be invalid, but these are much less likely to run into issues (these would mainly be bugs in the code). While digging through this I also noticed that the comments on MemoryPool::reallocate have gotten out of date and Buffer::reallocate is handling some unnecessary cases (newPtr can never be equal to old), so I've cleaned those up. Reviewed By: xiaoxmeng Differential Revision: D57139357 fbshipit-source-id: b2a0391a77d6ccbdee9e96f3a85bde9ab815b2e9 --- velox/buffer/Buffer.h | 31 +++----- velox/buffer/tests/BufferTest.cpp | 113 ++++++++++++++++++++++++++++++ velox/common/memory/MemoryPool.h | 4 +- 3 files changed, 122 insertions(+), 26 deletions(-) diff --git a/velox/buffer/Buffer.h b/velox/buffer/Buffer.h index 14ae371c7142..b7aad5d07530 100644 --- a/velox/buffer/Buffer.h +++ b/velox/buffer/Buffer.h @@ -381,34 +381,19 @@ class AlignedBuffer : public Buffer { auto oldCapacity = checkedPlus(old->capacity(), kPaddedSize); auto preferredSize = pool->preferredSize(checkedPlus(size, kPaddedSize)); - // Make the buffer no longer owned by '*buffer' because reallocate - // may free the old buffer. Reassigning the new buffer to - // '*buffer' would be a double free. + + void* newPtr = pool->reallocate(old, oldCapacity, preferredSize); + + // Make the old buffer no longer owned by '*buffer' because reallocate + // freed the old buffer. Reassigning the new buffer to + // '*buffer' would be a double free if we didn't do this. buffer->detach(); - // Decrement the reference count. No need to check, we just - // checked old->unique(). - old->referenceCount_.fetch_sub(1); - void* newPtr; - try { - newPtr = pool->reallocate(old, oldCapacity, preferredSize); - } catch (const std::exception&) { - *buffer = old; - throw; - } - if (newPtr == reinterpret_cast(old)) { - // The pointer did not change. Put the old pointer back in the - // smart pointer and adjust capacity. - *buffer = old; - (*buffer)->setCapacity(preferredSize - kPaddedSize); - (*buffer)->setSize(size); - reinterpret_cast(buffer->get()) - ->fillNewMemory(oldSize, size, initValue); - return; - } + auto newBuffer = new (newPtr) AlignedBuffer(pool, preferredSize - kPaddedSize); newBuffer->setSize(size); newBuffer->fillNewMemory(oldSize, size, initValue); + *buffer = newBuffer; } diff --git a/velox/buffer/tests/BufferTest.cpp b/velox/buffer/tests/BufferTest.cpp index 90ce32f93170..8756e967591d 100644 --- a/velox/buffer/tests/BufferTest.cpp +++ b/velox/buffer/tests/BufferTest.cpp @@ -17,6 +17,8 @@ #include "velox/buffer/Buffer.h" #include "folly/Range.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/testutil/TestValue.h" #include "velox/type/StringView.h" #include @@ -182,6 +184,117 @@ TEST_F(BufferTest, testReallocate) { EXPECT_GT(numMoved, 0); } +TEST_F(BufferTest, testReallocateNoReuse) { + // This test checks that regardless of how we resize a Buffer in reallocate + // (up, down, the same) as long as we hit MemoryPool::reallocate, the Buffer + // always points to a new location in memory. If this test fails, it's not + // necessarily a problem, but it's worth looking at optimizations we could do + // in reallocate when the pointer doesn't change. + + enum BufferResizeOption { + BIGGER, + SMALLER, + SAME, + }; + + auto test = [&](BufferResizeOption bufferResizeOption, + bool useMmapAllocator) { + memory::MemoryManagerOptions options; + options.useMmapAllocator = useMmapAllocator; + options.allocatorCapacity = 1024 * 1024; + memory::MemoryManager memoryManager(options); + + auto pool = memoryManager.addLeafPool("testReallocateNoReuse"); + + const size_t originalBufferSize = 10; + auto buffer = AlignedBuffer::allocate(originalBufferSize, pool.get()); + auto* originalBufferPtr = buffer.get(); + + size_t newSize; + switch (bufferResizeOption) { + case SMALLER: + newSize = originalBufferSize - 1; + break; + case SAME: + newSize = originalBufferSize; + break; + case BIGGER: + // Make sure the new size is large enough that we hit + // MemoryPoolImpl::reallocate. + newSize = buffer->capacity() + 1; + break; + default: + VELOX_FAIL("Unexpected buffer resize option"); + } + + AlignedBuffer::reallocate(&buffer, newSize); + + EXPECT_NE(buffer.get(), originalBufferPtr); + }; + + test(SMALLER, true); + test(SAME, true); + test(BIGGER, true); + + test(SMALLER, false); + test(SAME, false); + test(BIGGER, false); +} + +DEBUG_ONLY_TEST_F(BufferTest, testReallocateFails) { + // Reallocating a buffer can cause an exception to be thrown e.g. if we + // run out of memory. If the buffer is left in an invalid state this can + // cause crahses, e.g. if VectorSaver attempts to write out a Vector that + // was in the midst of resizing. This test verifies the buffer is valid at + // different points in the exception's lifecycle. + + const size_t bufferSize = 10; + auto buffer = AlignedBuffer::allocate(bufferSize, pool_.get()); + + ::memset(buffer->asMutable(), 'a', bufferSize); + + common::testutil::TestValue::enable(); + + const std::string kErrorMessage = "Expected out of memory exception"; + SCOPED_TESTVALUE_SET( + "facebook::velox::memory::MemoryPoolImpl::reserveThreadSafe", + std::function([&](memory::MemoryPool*) { + VELOX_MEM_POOL_CAP_EXCEEDED(kErrorMessage); + })); + + { + ExceptionContextSetter setter( + {.messageFunc = [](VeloxException::Type, + void* untypedArg) -> std::string { + // Validate that the buffer is still valid at the point + // the exception is thrown. + auto bufferArg = *static_cast(untypedArg); + + const auto* bufferContents = bufferArg->as(); + VELOX_CHECK_EQ(bufferArg->size(), 10); + for (int i = 0; i < 10; i++) { + VELOX_CHECK_EQ(bufferContents[i], 'a'); + } + + return "Exception context message func called."; + }, + .arg = &buffer}); + + VELOX_ASSERT_THROW_CODE( + AlignedBuffer::reallocate( + &buffer, pool_->availableReservation() + 1), + error_code::kMemCapExceeded, + kErrorMessage); + } + + // Validate the buffer is valid after the exception is caught. + const auto* bufferContents = buffer->as(); + VELOX_CHECK_EQ(buffer->size(), bufferSize); + for (int i = 0; i < bufferSize; i++) { + VELOX_CHECK_EQ(bufferContents[i], 'a'); + } +} + struct MockCachePin { void addRef() { ++pinCount; diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index de8a864a02f4..ff5ec06da2f2 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -237,9 +237,7 @@ class MemoryPool : public std::enable_shared_from_this { virtual void* allocateZeroFilled(int64_t numEntries, int64_t sizeEach) = 0; /// Re-allocates from an existing buffer with 'newSize' and update memory - /// usage counting accordingly. If 'newSize' is larger than the current buffer - /// 'size', the function will allocate a new buffer and free the old buffer. - /// If the new allocation fails, this method will throw and not free 'p'. + /// usage counting accordingly. virtual void* reallocate(void* p, int64_t size, int64_t newSize) = 0; /// Frees an allocated buffer.