Skip to content

Commit

Permalink
Add CallbackOnLastSignal (facebookincubator#9506)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#9506

Utility to emit a signal just once, and make sure that it's emitted (from the destructor) even if never called.

Reviewed By: bikramSingh91, helfman

Differential Revision: D56210938

fbshipit-source-id: a877473f318c03c836dc937e8ae016b818104711
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed Apr 19, 2024
1 parent e18a4cf commit f8fde93
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 0 deletions.
83 changes: 83 additions & 0 deletions velox/dwio/common/UnitLoaderTools.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,87 @@ inline std::optional<Measure> measureBlockedOnIo(
return std::nullopt;
}

// This class can create many callbacks that can be distributed to unit loader
// factories. Only when the last created callback is activated, this class will
// emit the original callback.
// If the callbacks created are never activated (because of an exception for
// example), this will still work, since they will emit the signal on the
// destructor.
// The class expects that all callbacks are created before they start emiting
// signals.
class CallbackOnLastSignal {
class EnsureCallOnlyOnce {
public:
EnsureCallOnlyOnce(std::function<void()> cb) : cb_{std::move(cb)} {}

void callOriginalIfNotCalled() {
if (cb_) {
cb_();
cb_ = nullptr;
}
}

~EnsureCallOnlyOnce() {
callOriginalIfNotCalled();
}

private:
std::function<void()> cb_;
};

class CallWhenOneInstanceLeft {
public:
CallWhenOneInstanceLeft(
std::shared_ptr<EnsureCallOnlyOnce> callWrapper,
std::shared_ptr<size_t> factoryExists)
: factoryExists_{std::move(factoryExists)},
callOnce_{std::move(callWrapper)} {}

CallWhenOneInstanceLeft(const CallWhenOneInstanceLeft& other)
: factoryExists_{other.factoryExists_}, callOnce_{other.callOnce_} {}

CallWhenOneInstanceLeft(CallWhenOneInstanceLeft&& other) noexcept {
factoryExists_ = std::move(other.factoryExists_);
callOnce_ = std::move(other.callOnce_);
}

void operator()() {
if (!callOnce_) {
return;
}
// If this is the last callback created out callOnce_, call the
// original callback.
if (callOnce_.use_count() <= (1 + *factoryExists_)) {
callOnce_->callOriginalIfNotCalled();
}
callOnce_ = nullptr;
}

~CallWhenOneInstanceLeft() {
(*this)();
}

private:
std::shared_ptr<size_t> factoryExists_;
std::shared_ptr<EnsureCallOnlyOnce> callOnce_;
};

public:
CallbackOnLastSignal(std::function<void()> cb)
: factoryExists_{std::make_shared<size_t>(1)},
callWrapper_{std::make_shared<EnsureCallOnlyOnce>(std::move(cb))} {}

~CallbackOnLastSignal() {
*factoryExists_ = 0;
}

std::function<void()> getCallback() const {
return CallWhenOneInstanceLeft{callWrapper_, factoryExists_};
}

private:
std::shared_ptr<size_t> factoryExists_;
std::shared_ptr<EnsureCallOnlyOnce> callWrapper_;
};

} // namespace facebook::velox::dwio::common::unit_loader_tools
1 change: 1 addition & 0 deletions velox/dwio/common/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ add_executable(
RetryTests.cpp
TestBufferedInput.cpp
TypeTests.cpp
UnitLoaderTestToolsTests.cpp
WriterTest.cpp)
add_test(velox_dwio_common_test velox_dwio_common_test)
target_link_libraries(
Expand Down
137 changes: 137 additions & 0 deletions velox/dwio/common/tests/UnitLoaderTestToolsTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gtest/gtest.h>

#include "velox/dwio/common/UnitLoaderTools.h"

using namespace ::testing;
using namespace ::facebook::velox::dwio::common;
using namespace ::facebook::velox::dwio::common::unit_loader_tools;

TEST(UnitLoaderTestToolsTests, NoCallbacksCreated) {
std::atomic_size_t callCount = 0;
{
CallbackOnLastSignal callback([&callCount]() { ++callCount; });
EXPECT_EQ(callCount, 0);
}
EXPECT_EQ(callCount, 1);
}

TEST(UnitLoaderTestToolsTests, NoExplicitCalls) {
std::atomic_size_t callCount = 0;
{
CallbackOnLastSignal callback([&callCount]() { ++callCount; });
EXPECT_EQ(callCount, 0);
{
auto c1 = callback.getCallback();
auto c4 = callback.getCallback();
EXPECT_EQ(callCount, 0);

auto c2 = std::move(c1);
auto c3(c2);
EXPECT_EQ(callCount, 0);

auto c5 = std::move(c4);
auto c6(c5);
EXPECT_EQ(callCount, 0);
}
EXPECT_EQ(callCount, 1);
}
EXPECT_EQ(callCount, 1);
}

TEST(UnitLoaderTestToolsTests, NoExplicitCallsFactoryDeletedFirst) {
std::atomic_size_t callCount = 0;
{
std::function<void()> c1, c2;
{
CallbackOnLastSignal callback([&callCount]() { ++callCount; });
EXPECT_EQ(callCount, 0);

c1 = callback.getCallback();
c2 = callback.getCallback();
EXPECT_EQ(callCount, 0);
}
EXPECT_EQ(callCount, 0);
}
EXPECT_EQ(callCount, 1);
}

TEST(UnitLoaderTestToolsTests, ExplicitCalls) {
std::atomic_size_t callCount = 0;
{
CallbackOnLastSignal callback([&callCount]() { ++callCount; });
EXPECT_EQ(callCount, 0);
{
auto c1 = callback.getCallback();
auto c4 = callback.getCallback();
EXPECT_EQ(callCount, 0);

c1();
auto c2 = std::move(c1);
c2();
auto c3(c2);
c3();
EXPECT_EQ(callCount, 0);

c4();
EXPECT_EQ(callCount, 1);
auto c5 = std::move(c4);
c5();
auto c6(c2);
c6();
EXPECT_EQ(callCount, 1);
}
EXPECT_EQ(callCount, 1);
}
EXPECT_EQ(callCount, 1);
}

TEST(UnitLoaderTestToolsTests, WillOnlyCallbackOnce) {
std::atomic_size_t callCount = 0;
{
CallbackOnLastSignal callback([&callCount]() { ++callCount; });
EXPECT_EQ(callCount, 0);
{
auto c1 = callback.getCallback();
auto c4 = callback.getCallback();
EXPECT_EQ(callCount, 0);

c1();
auto c2 = std::move(c1);
c2();
auto c3(c2);
c3();
EXPECT_EQ(callCount, 0);

c4();
EXPECT_EQ(callCount, 1);
auto c5 = std::move(c4);
c5();
auto c6(c2);
c6();
EXPECT_EQ(callCount, 1);

// This won't emit a new call
auto c7 = callback.getCallback();
c7();
EXPECT_EQ(callCount, 1);
}
EXPECT_EQ(callCount, 1);
}
EXPECT_EQ(callCount, 1);
}

0 comments on commit f8fde93

Please sign in to comment.