Skip to content

Commit

Permalink
🐛 fix update search result even if channel closed
Browse files Browse the repository at this point in the history
  • Loading branch information
rathijitpapon committed Jun 20, 2024
1 parent 09d7c75 commit f2c3faf
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions server/src/llms/summarizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub async fn generate_text_with_llm(
return Err(eyre!("Request failed with status: {:?}", response.status()).into());
}
let mut stream = response.bytes_stream();
let mut buffer = String::new();

while let Some(chunk) = stream.next().await {
// remove `data` from the start of the chunk and `\n\n` from the end
Expand All @@ -104,14 +105,19 @@ pub async fn generate_text_with_llm(
.process(summarizer_api_response.token.text.clone())
.await
.map_err(|e| eyre!("Failed to update result: {e}"))?;
search.result = summarizer_api_response.token.text;

tx.send(api_models::SearchByIdResponse {
search,
sources: vec![],
})
.await
.map_err(|e| eyre!("Failed to send response: {e}"))?;
buffer.push_str(&summarizer_api_response.token.text);
search.result = buffer.clone();
let tx_response = tx
.send(api_models::SearchByIdResponse {
search,
sources: vec![],
})
.await;

if let Ok(_) = tx_response {
buffer.clear();
}
}
}

Expand Down Expand Up @@ -183,6 +189,7 @@ pub async fn generate_text_with_openai(

let mut stream = response.bytes_stream();
let mut stream_data = String::new();
let mut buffer = String::new();

while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|e| eyre!("Failed to read chunk: {e}"))?;
Expand Down Expand Up @@ -213,14 +220,19 @@ pub async fn generate_text_with_openai(
.process(parsed_chunk.clone())
.await
.map_err(|e| eyre!("Failed to update result: {e}"))?;
search.result = parsed_chunk;

tx.send(api_models::SearchByIdResponse {
search,
sources: vec![],
})
.await
.map_err(|e| eyre!("Failed to send response: {e}"))?;
buffer.push_str(&parsed_chunk);
search.result = buffer.clone();
let tx_response = tx
.send(api_models::SearchByIdResponse {
search,
sources: vec![],
})
.await;

if let Ok(_) = tx_response {
buffer.clear();
}
}

Ok(())
Expand Down

0 comments on commit f2c3faf

Please sign in to comment.