Skip to main content

RealTime Pipeline

The Realtime Pipeline provides direct speech-to-speech processing with minimal latency. It uses unified models that handle the entire audio processing pipeline in a single step, offering the fastest possible response times for conversational AI.

Realtime Pipeline Architecture

tip

The RealTimePipeline is specifically designed for real-time AI models that provide end-to-end speech processing which is better for conversational agent. For use cases requiring more granular control over individual components (STT, LLM, TTS) for context support and response control, the CascadingPipeline would be more appropriate.

Basic Usage

Setting up a RealTimePipeline is straightforward. You simply need to initialize your chosen real-time model and pass it to the pipeline's constructor.

from videosdk.agents import RealTimePipeline
from videosdk.plugins.openai import OpenAIRealtime, OpenAIRealtimeConfig

# Initialize the desired real-time model
model = OpenAIRealtime(
model="gpt-4o-realtime-preview",
config=OpenAIRealtimeConfig(
voice="alloy",
response_modalities=["AUDIO"]
)
)

# Create the pipeline with the model
pipeline = RealTimePipeline(model=model)

In addition to OpenAI, the Realtime Pipeline also supports other advanced models like Google Gemini (Live API) and AWS Nova Sonic, each offering unique features for building high-performance conversational agents, you can check their pages for advance configuration options.

tip

Choose a model based on its optimal audio sample rate (OpenAI/Nova Sonic: 16kHz, Gemini: 24kHz) to best fit your needs.

tip

For cloud providers like AWS, select the server region closest to your users to minimize network latency.

Custom Model Integration

To integrate a custom real-time model, you need to implement the RealtimeBaseModel interface, which requires implementing methods like connect(), handle_audio_input(), send_message(), and interrupt().

from videosdk.agents import RealtimeBaseModel, Agent, CustomAudioStreamTrack  
from typing import Literal, Optional
import asyncio

class CustomRealtime(RealtimeBaseModel[Literal["user_speech_started", "error"]]):
"""Custom real-time AI model implementation"""

def __init__(self, model_name: str, api_key: str):
super().__init__()
self.model_name = model_name
self.api_key = api_key
self.audio_track: Optional[CustomAudioStreamTrack] = None
self._instructions = ""
self._tools = []
self._connected = False

def set_agent(self, agent: Agent) -> None:
"""Set agent instructions and tools"""
self._instructions = agent.instructions
self._tools = agent.tools

async def connect(self) -> None:
"""Initialize connection to your AI service"""
# Your connection logic here
self._connected = True
print(f"Connected to {self.model_name}")

async def handle_audio_input(self, audio_data: bytes) -> None:
"""Process incoming audio from user"""
if not self._connected:
return

# Process audio and generate response
# Your audio processing logic here

# Emit user speech detection
self.emit("user_speech_started", {"type": "detected"})

# Generate and play response audio
if self.audio_track:
response_audio = b"your_generated_audio_bytes"
await self.audio_track.add_new_bytes(response_audio)

async def send_message(self, message: str) -> None:
"""Send text message to model"""
# Your text processing logic here
pass

async def interrupt(self) -> None:
"""Interrupt current response"""
if self.audio_track:
self.audio_track.interrupt()

async def aclose(self) -> None:
"""Cleanup resources"""
self._connected = False
if self.audio_track:
await self.audio_track.cleanup()

Comparison with Cascading Pipeline

The key architectural difference is that RealTimePipeline uses integrated models that handle the entire speech-to-speech pipeline internally, while cascading pipelines coordinate separate STT, LLM, and TTS components.

FeatureRealtime PipelineCascading Pipeline
LatencySignificantly lower latency, ideal for highly interactive, real-time conversations.Higher latency due to coordinating separate STT, LLM, and TTS components.
ControlLess granular control; tools are handled directly by the integrated model.Granular control over each step (STT, LLM, TTS), allowing for more complex logic.
FlexibilityLimited to the capabilities of the single, chosen real-time model.Allows mixing and matching different providers for each component (e.g., Google STT, OpenAI LLM).
ComplexitySimpler to configure as it involves a single, unified model.More complex to set up due to the coordination of multiple separate components.
CostVaries depending on the chosen real-time model and usage patterns.Varies depending on the combination of providers and usage for each component.

Got a Question? Ask us on discord