diff --git a/velox/buffer/Buffer.h b/velox/buffer/Buffer.h index 14ae371c7142..b7aad5d07530 100644 --- a/velox/buffer/Buffer.h +++ b/velox/buffer/Buffer.h @@ -381,34 +381,19 @@ class AlignedBuffer : public Buffer { auto oldCapacity = checkedPlus(old->capacity(), kPaddedSize); auto preferredSize = pool->preferredSize(checkedPlus(size, kPaddedSize)); - // Make the buffer no longer owned by '*buffer' because reallocate - // may free the old buffer. Reassigning the new buffer to - // '*buffer' would be a double free. + + void* newPtr = pool->reallocate(old, oldCapacity, preferredSize); + + // Make the old buffer no longer owned by '*buffer' because reallocate + // freed the old buffer. Reassigning the new buffer to + // '*buffer' would be a double free if we didn't do this. buffer->detach(); - // Decrement the reference count. No need to check, we just - // checked old->unique(). - old->referenceCount_.fetch_sub(1); - void* newPtr; - try { - newPtr = pool->reallocate(old, oldCapacity, preferredSize); - } catch (const std::exception&) { - *buffer = old; - throw; - } - if (newPtr == reinterpret_cast(old)) { - // The pointer did not change. Put the old pointer back in the - // smart pointer and adjust capacity. - *buffer = old; - (*buffer)->setCapacity(preferredSize - kPaddedSize); - (*buffer)->setSize(size); - reinterpret_cast(buffer->get()) - ->fillNewMemory(oldSize, size, initValue); - return; - } + auto newBuffer = new (newPtr) AlignedBuffer(pool, preferredSize - kPaddedSize); newBuffer->setSize(size); newBuffer->fillNewMemory(oldSize, size, initValue); + *buffer = newBuffer; } diff --git a/velox/buffer/tests/BufferTest.cpp b/velox/buffer/tests/BufferTest.cpp index 90ce32f93170..8756e967591d 100644 --- a/velox/buffer/tests/BufferTest.cpp +++ b/velox/buffer/tests/BufferTest.cpp @@ -17,6 +17,8 @@ #include "velox/buffer/Buffer.h" #include "folly/Range.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/testutil/TestValue.h" #include "velox/type/StringView.h" #include @@ -182,6 +184,117 @@ TEST_F(BufferTest, testReallocate) { EXPECT_GT(numMoved, 0); } +TEST_F(BufferTest, testReallocateNoReuse) { + // This test checks that regardless of how we resize a Buffer in reallocate + // (up, down, the same) as long as we hit MemoryPool::reallocate, the Buffer + // always points to a new location in memory. If this test fails, it's not + // necessarily a problem, but it's worth looking at optimizations we could do + // in reallocate when the pointer doesn't change. + + enum BufferResizeOption { + BIGGER, + SMALLER, + SAME, + }; + + auto test = [&](BufferResizeOption bufferResizeOption, + bool useMmapAllocator) { + memory::MemoryManagerOptions options; + options.useMmapAllocator = useMmapAllocator; + options.allocatorCapacity = 1024 * 1024; + memory::MemoryManager memoryManager(options); + + auto pool = memoryManager.addLeafPool("testReallocateNoReuse"); + + const size_t originalBufferSize = 10; + auto buffer = AlignedBuffer::allocate(originalBufferSize, pool.get()); + auto* originalBufferPtr = buffer.get(); + + size_t newSize; + switch (bufferResizeOption) { + case SMALLER: + newSize = originalBufferSize - 1; + break; + case SAME: + newSize = originalBufferSize; + break; + case BIGGER: + // Make sure the new size is large enough that we hit + // MemoryPoolImpl::reallocate. + newSize = buffer->capacity() + 1; + break; + default: + VELOX_FAIL("Unexpected buffer resize option"); + } + + AlignedBuffer::reallocate(&buffer, newSize); + + EXPECT_NE(buffer.get(), originalBufferPtr); + }; + + test(SMALLER, true); + test(SAME, true); + test(BIGGER, true); + + test(SMALLER, false); + test(SAME, false); + test(BIGGER, false); +} + +DEBUG_ONLY_TEST_F(BufferTest, testReallocateFails) { + // Reallocating a buffer can cause an exception to be thrown e.g. if we + // run out of memory. If the buffer is left in an invalid state this can + // cause crahses, e.g. if VectorSaver attempts to write out a Vector that + // was in the midst of resizing. This test verifies the buffer is valid at + // different points in the exception's lifecycle. + + const size_t bufferSize = 10; + auto buffer = AlignedBuffer::allocate(bufferSize, pool_.get()); + + ::memset(buffer->asMutable(), 'a', bufferSize); + + common::testutil::TestValue::enable(); + + const std::string kErrorMessage = "Expected out of memory exception"; + SCOPED_TESTVALUE_SET( + "facebook::velox::memory::MemoryPoolImpl::reserveThreadSafe", + std::function([&](memory::MemoryPool*) { + VELOX_MEM_POOL_CAP_EXCEEDED(kErrorMessage); + })); + + { + ExceptionContextSetter setter( + {.messageFunc = [](VeloxException::Type, + void* untypedArg) -> std::string { + // Validate that the buffer is still valid at the point + // the exception is thrown. + auto bufferArg = *static_cast(untypedArg); + + const auto* bufferContents = bufferArg->as(); + VELOX_CHECK_EQ(bufferArg->size(), 10); + for (int i = 0; i < 10; i++) { + VELOX_CHECK_EQ(bufferContents[i], 'a'); + } + + return "Exception context message func called."; + }, + .arg = &buffer}); + + VELOX_ASSERT_THROW_CODE( + AlignedBuffer::reallocate( + &buffer, pool_->availableReservation() + 1), + error_code::kMemCapExceeded, + kErrorMessage); + } + + // Validate the buffer is valid after the exception is caught. + const auto* bufferContents = buffer->as(); + VELOX_CHECK_EQ(buffer->size(), bufferSize); + for (int i = 0; i < bufferSize; i++) { + VELOX_CHECK_EQ(bufferContents[i], 'a'); + } +} + struct MockCachePin { void addRef() { ++pinCount; diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index de8a864a02f4..ff5ec06da2f2 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -237,9 +237,7 @@ class MemoryPool : public std::enable_shared_from_this { virtual void* allocateZeroFilled(int64_t numEntries, int64_t sizeEach) = 0; /// Re-allocates from an existing buffer with 'newSize' and update memory - /// usage counting accordingly. If 'newSize' is larger than the current buffer - /// 'size', the function will allocate a new buffer and free the old buffer. - /// If the new allocation fails, this method will throw and not free 'p'. + /// usage counting accordingly. virtual void* reallocate(void* p, int64_t size, int64_t newSize) = 0; /// Frees an allocated buffer.