Skip to content

Commit

Permalink
Make AsyncProcessor::processInteraction pure virtual
Browse files Browse the repository at this point in the history
Summary: `processInteraction()` is part of the critical path for interactions in Resource Pools and can lead to undefined behavior if it is not implemented. Let's make the contract more clear and force custom `AsyncProcessor` implementations to provide an implementation for it.

Reviewed By: tlj77

Differential Revision: D62590107

fbshipit-source-id: 69fe9e9bee745766d891b5fc5f33d9953d6845eb
  • Loading branch information
Akrama Baig Mirza authored and facebook-github-bot committed Oct 3, 2024
1 parent b125f36 commit c9482e5
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 1 deletion.
2 changes: 1 addition & 1 deletion thrift/lib/cpp2/async/AsyncProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ class AsyncProcessor : public TProcessorBase {
virtual void destroyAllInteractions(
Cpp2ConnContext& conn, folly::EventBase&) noexcept;

virtual void processInteraction(ServerRequest&&) {}
virtual void processInteraction(ServerRequest&&) = 0;

// This is the main interface we are migrating to. Eventually it should
// replace all the processSerialized... methods.
Expand Down
6 changes: 6 additions & 0 deletions thrift/lib/cpp2/async/PreprocessingAsyncProcessorWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ class PreprocessingAsyncProcessorWrapper : public AsyncProcessor {

const char* getServiceName() override final;

void processInteraction(ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}

protected:
using ProcessSerializedCompressedRequestReturnT = std::
tuple<ResponseChannelRequest::UniquePtr, SerializedCompressedRequest>;
Expand Down
12 changes: 12 additions & 0 deletions thrift/lib/cpp2/server/test/ParallelConcurrencyControllerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class MockAsyncProcessor : public AsyncProcessor {
}
}

void processInteraction(apache::thrift::ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}

void setFunc(Func func) { executeRequestFunc_ = std::move(func); }

private:
Expand Down Expand Up @@ -593,6 +599,12 @@ TEST(ParallelConcurrencyControllerTest, FinishCallbackExceptionSafe) {
});
}

void processInteraction(apache::thrift::ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}

private:
folly::Baton<>& baton_;
};
Expand Down
6 changes: 6 additions & 0 deletions thrift/lib/cpp2/test/AsyncProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ TEST_P(
processor_->executeRequest(std::move(request), methodMetadata);
}

void processInteraction(apache::thrift::ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}

explicit Processor(std::unique_ptr<AsyncProcessor> processor)
: processor_(std::move(processor)) {}

Expand Down
6 changes: 6 additions & 0 deletions thrift/lib/cpp2/test/MultiplexAsyncProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,12 @@ class WildcardThrowsInternalError : public TProcessorFactory {
inner_->destroyAllInteractions(ctx, eb);
}

void processInteraction(ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}

explicit Processor(
std::unique_ptr<AsyncProcessor>&& inner,
const MessageVariant& message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ class MockAsyncProcessor : public AsyncProcessor {
void(
ServerRequest&& request,
const AsyncProcessorFactory::MethodMetadata& methodMetadata));

void processInteraction(apache::thrift::ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}
};

class TestChannelRequest : public ResponseChannelRequest {
Expand Down
12 changes: 12 additions & 0 deletions thrift/lib/cpp2/test/server/ThriftServerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4331,6 +4331,12 @@ class HeaderOrRocketCompression
tm);
}

void processInteraction(apache::thrift::ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}

std::unique_ptr<AsyncProcessor> underlyingProcessor_;
CompressionAlgorithm compression_;
};
Expand Down Expand Up @@ -4365,6 +4371,12 @@ class HeaderOrRocketCompression
tm);
}

void processInteraction(apache::thrift::ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}

std::unique_ptr<AsyncProcessor> underlyingProcessor_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,12 @@ void TransportCompatibilityTest::TestCustomAsyncProcessor() {
tm);
}

void processInteraction(apache::thrift::ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}

private:
std::unique_ptr<apache::thrift::AsyncProcessor> underlyingProcessor_;
};
Expand Down
6 changes: 6 additions & 0 deletions thrift/lib/cpp2/transport/rocket/test/fuzz/FuzzUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ class FakeProcessor final : public apache::thrift::AsyncProcessor {
"place holder"),
"1" /* doesnt matter */);
}

void processInteraction(apache::thrift::ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}
};

class FakeProcessorFactory final
Expand Down
5 changes: 5 additions & 0 deletions thrift/lib/cpp2/util/EmptyAsyncProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ std::unique_ptr<AsyncProcessor> EmptyAsyncProcessorFactory::getProcessor() {
concurrency::ThreadManager*) override {
LOG(FATAL) << "This method should never be called on EmptyAsyncProcessor";
}
void processInteraction(ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}
};

return std::make_unique<EmptyAsyncProcessor>();
Expand Down
6 changes: 6 additions & 0 deletions thrift/lib/py/server/CppServerWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ class PythonAsyncProcessor : public AsyncProcessor {
true /* fromExecuteRequest */);
}

void processInteraction(apache::thrift::ServerRequest&&) override {
LOG(FATAL)
<< "This AsyncProcessor doesn't support Thrift interactions. "
<< "Please implement processInteraction to support interactions.";
}

/**
* Get the priority of the request
* Check the headers directly in C++ since noone seems to override that logic
Expand Down

0 comments on commit c9482e5

Please sign in to comment.