You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Sending an HTTP stream from the LiveKit agent to a FastAPI endpoint results in empty chunks being sent to the FastAPI endpoint. This issue occurs in the latest version of livekit-agents, which is 0.12.3.
This issue can be reproduced by setting up a basic LiveKit frontend, voice pipeline agent, and a FastAPI server. The LiveKit agent's entrypoint function initiates the stream to the FastAPI server.
Steps to reproduce
Create a frontend:
lk app create --template voice-assistant-frontend
Install dependencies:
cd <frontend_dir>
pnpm install
Set these env vars for the frontend in .env.local:
Create a file called server.py in the agents repository, and copy and paste this code into it:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/my-stream")
async def before_tts_stream(request: Request):
print("[FASTAPI] my-stream called")
async def read_chunks():
async for chunk in request.stream():
print("[FASTAPI] Got chunk:", chunk)
yield chunk
return StreamingResponse(read_chunks(), media_type="text/plain")
Open agent.py and copy and paste this code into it:
from dotenv import load_dotenv
from livekit.agents import (
AutoSubscribe,
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import openai, deepgram, silero
import asyncio
from typing import AsyncIterable
import httpx
import logging
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli
load_dotenv(dotenv_path=".env.local")
logger = logging.getLogger("voice-agent")
async def sample_text_generator() -> AsyncIterable[str]:
for i in range(3):
yield f"chunk {i}\n"
await asyncio.sleep(0.5)
async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are a voice assistant created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation. "
"You were created as a demo to showcase the capabilities of LiveKit's agents framework."
),
)
url = "http://localhost:8000/my-stream"
async def test_production_logic():
async def input_stream() -> AsyncIterable[bytes]:
async for chunk in sample_text_generator():
logger.info(f"[WORKER] Sending chunk: {chunk.strip()}")
yield chunk.encode("utf-8")
async def output_stream() -> AsyncIterable[str]:
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
url,
content=input_stream(),
headers={"Content-Type": "text/plain"},
) as resp:
async for out_chunk in resp.aiter_text():
yield out_chunk
logger.info("[WORKER] Starting streaming POST...")
async for response_chunk in output_stream():
logger.info(f"[WORKER] Got from server: {response_chunk}")
await test_production_logic()
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
logger.info(f"connecting to room {ctx.room.name}")
participant = await ctx.wait_for_participant()
logger.info(f"starting voice assistant for participant {participant.identity}")
agent = VoicePipelineAgent(
vad=silero.VAD.load(),
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(),
chat_ctx=initial_ctx,
)
agent.start(ctx.room, participant)
await agent.say("Hey, how can I help you today?", allow_interruptions=True)
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
),
)
Run the FastAPI server:
uvicorn server:app --host 0.0.0.0 --port 8000
In a different terminal, run the LiveKit server:
livekit-server --dev
In a different terminal, run the LiveKit agent:
python3 agent.py dev
In a browser, go to http://localhost:3000/ and then click "Start a Conversation"
Expected outcome
The FastAPI server should receive the streamed content, causing it to print the following logs:
@sam-goldman it's not clear your example has anything to do with agents.. if I'm reading it correctly, it spins up a new task that attempts to post to your API endpoint..
Sending an HTTP stream from the LiveKit agent to a FastAPI endpoint results in empty chunks being sent to the FastAPI endpoint. This issue occurs in the latest version of
livekit-agents
, which is0.12.3
.This issue can be reproduced by setting up a basic LiveKit frontend, voice pipeline agent, and a FastAPI server. The LiveKit agent's
entrypoint
function initiates the stream to the FastAPI server.Steps to reproduce
.env.local
:requirements.txt
and update thelivekit-agents
dependency to:.env.local
file to be the following, and fill in theOPENAI_API_KEY
andDEEPGRAM_API_KEY
:server.py
in the agents repository, and copy and paste this code into it:agent.py
and copy and paste this code into it:http://localhost:3000/
and then click "Start a Conversation"Expected outcome
The FastAPI server should receive the streamed content, causing it to print the following logs:
Actual outcome
The FastAPI server's logs display:
Notice how it only displays empty bytes instead of displaying the streamed content (e.g. "Got chunk: b'chunk 0\n'".)
The text was updated successfully, but these errors were encountered: