Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to drop the request #732

Merged
merged 9 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/cpp/include/openvino/genai/generation_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class GenerationStream;
class OPENVINO_GENAI_EXPORTS GenerationHandleImpl {
std::shared_ptr<GenerationStream> m_generation_stream;
ov::genai::GenerationConfig m_sampling_params;

bool is_dropped();

public:
GenerationHandleImpl(std::shared_ptr<GenerationStream> generation_stream, const ov::genai::GenerationConfig& sampling_params) :
Expand All @@ -74,12 +76,14 @@ class OPENVINO_GENAI_EXPORTS GenerationHandleImpl {

bool can_read();

void drop();

GenerationOutputs back();
// Reads result of a generation for single iteration
GenerationOutputs read();
// Reads all generated tokens for all sequences
std::vector<GenerationOutput> read_all();
};

using GenerationHandle = std::unique_ptr<GenerationHandleImpl>;
using GenerationHandle = std::shared_ptr<GenerationHandleImpl>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With that change we can have multiple handles pointing to a single stream and we give an explicit option to drop generation via method call. This means that one handle can call drop and invalidate handle for not only itself but potentially other handles.

I think that if we go this way we should block any calls on handle that has been dropped (throw errors for example).

Copy link
Collaborator Author

@dkalinowski dkalinowski Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it is safe to say that current approach with unique_ptr did not restrict GenAI API users from handle misusage, one could simply take the reference of the handle and do whatever.

Changing to shared_ptr and exposing explicit drop() method gives more flexibilty which OVMS needed - dropping the request in HTTP client disconnection callback (look up OVMS pull request). Now, multiple threads can use the handle (http thread and the mediapipe thread) and the generation is dropped once all handle shared references are gone.

I also agree with you that we could verify nobody calls read/read_all methods after handle is dropped. Will add that

}
20 changes: 19 additions & 1 deletion src/cpp/src/continuous_batching_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ class ContinuousBatchingPipeline::Impl {
ChatHistory m_history;


void _notify_requests_dropped_by_handle() {
// Notify the last time by pushing empty output
// This causes read() to unblock by adding anything to the queue
Copy link
Contributor

@ilya-lavrenov ilya-lavrenov Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with this comment I don't fully understand why do we need to send empty outputs?
If handle is dropped by user, user should not expect any empty outputs from this request / handle

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's additional protection in multithreading scenarios. When generation handle is dropped from thread #1 and thread #2 is blocked on read() this operation unlocks it. That's the case in model server where handle drop is called from a callback triggered by HTTP server on client disconnect.

for (SequenceGroup::Ptr& request : m_requests) {
if (request->handle_dropped())
request->push_empty_outputs();
}
}

void _free_non_running_requests() {
std::vector<SequenceGroup::Ptr>::iterator requests_iterator = m_requests.begin();
while (requests_iterator != m_requests.end()) {
Expand Down Expand Up @@ -136,7 +145,7 @@ class ContinuousBatchingPipeline::Impl {
std::lock_guard<std::mutex> lock{m_awaiting_requests_mutex};
m_awaiting_requests.push_back(sequence_group);
}
return std::make_unique<GenerationHandleImpl>(sequence_group->get_generation_stream(), sampling_params);
return std::make_shared<GenerationHandleImpl>(sequence_group->get_generation_stream(), sampling_params);
}

GenerationHandle add_request(uint64_t request_id, const std::string& prompt, ov::genai::GenerationConfig sampling_params) {
Expand Down Expand Up @@ -227,6 +236,15 @@ class ContinuousBatchingPipeline::Impl {
timer.end();
}

// notify requests dropped by handle

{
static ManualTimer timer("notify requests dropped by handle");
timer.start();
_notify_requests_dropped_by_handle();
timer.end();
}

// free non running requests for current step

{
Expand Down
15 changes: 13 additions & 2 deletions src/cpp/src/generation_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,32 @@
using namespace ov::genai;

GenerationHandleImpl::~GenerationHandleImpl() {
m_generation_stream->drop();
drop();
}

GenerationStatus GenerationHandleImpl::get_status() {
return m_generation_stream->get_status();
}

bool GenerationHandleImpl::can_read() {
return m_generation_stream->can_read();
return !is_dropped() && m_generation_stream->can_read();
}

bool GenerationHandleImpl::is_dropped() {
return get_status() == GenerationStatus::DROPPED_BY_HANDLE;
}

void GenerationHandleImpl::drop() {
m_generation_stream->drop();
}

std::unordered_map<uint64_t, GenerationOutput> GenerationHandleImpl::back() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should block all methods not just read.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

OPENVINO_ASSERT(!is_dropped(), "GenerationHandle cannot be used after it is dropped.");
return m_generation_stream->back();
}

std::unordered_map<uint64_t, GenerationOutput> GenerationHandleImpl::read() {
OPENVINO_ASSERT(!is_dropped(), "GenerationHandle cannot be used after it is dropped.");
return m_generation_stream->read();
}

Expand All @@ -41,6 +51,7 @@ void add_partial_result(std::unordered_map<uint64_t, GenerationOutput>& partial_
}

std::vector<GenerationOutput> GenerationHandleImpl::read_all() {
OPENVINO_ASSERT(!is_dropped(), "GenerationHandle cannot be used after it is dropped.");
std::vector<GenerationOutput> results;
std::unordered_map<uint64_t, GenerationOutput> partial_results;
// We iterate until generation is running or there are tokens we haven't read yet
Expand Down
4 changes: 4 additions & 0 deletions src/cpp/src/sequence_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,10 @@ class SequenceGroup {
return m_generation_stream->get_status() == GenerationStatus::DROPPED_BY_HANDLE;
}

void push_empty_outputs() {
m_generation_stream->push({});
}

void push_outputs() {
GenerationOutputs outputs;
for (auto& sequence: m_sequences) {
Expand Down
Loading