Overview
WebSocket transports provide both client and server WebSocket implementations for real-time bidirectional communication. These transports support audio streaming, frame serialization, and connection management, making them ideal for prototyping and lightweight client-server applications where WebRTC might be overkill.
WebSocket transports are best suited for prototyping and controlled network environments.For production client-server applications, we recommend WebRTC-based transports for more robust network handling, NAT traversal, and media optimization.
Installation
To use WebSocket transports, install the required dependencies:
pip install "pipecat-ai[websocket]"
No additional API keys are required for WebSocket communication.
WebSocket transports use Protobuf serialization by default for efficient
binary messaging, but support custom serializers for different protocols.
Frames
InputAudioRawFrame
- Raw audio data from WebSocket peer
Frame
- Other frame types based on configured serializer
Output
OutputAudioRawFrame
- Audio data to WebSocket peer (with optional WAV headers)
TransportMessageFrame
- Application messages to peer
TransportMessageUrgentFrame
- Urgent messages to peer
Key Features
- Client & Server Support: Both WebSocket client and server implementations
- Frame Serialization: Configurable serializers including Protobuf by default
- Audio Timing: Simulates audio device timing for proper streaming flow
- Session Management: Built-in connection monitoring and timeout handling
- WAV Header Support: Optional WAV header generation for audio compatibility
- Single Connection: Server supports one active client at a time (new connections replace existing)
Usage Example
WebSocket Server Transport
The server transport creates a WebSocket server that clients can connect to:
import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.transports.network.websocket_server import (
WebsocketServerParams,
WebsocketServerTransport,
)
async def run_websocket_server():
# Create WebSocket server transport
transport = WebsocketServerTransport(
host="localhost",
port=8765,
params=WebsocketServerParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
serializer=ProtobufFrameSerializer(),
session_timeout=180, # 3 minutes
),
)
# Your services (STT, LLM, TTS, etc.)
# ...
# Create pipeline
pipeline = Pipeline([
transport.input(), # Receive data from WebSocket clients
# ... your processing chain
transport.output(), # Send data to WebSocket clients
])
# Event handlers
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, websocket):
logger.info(f"Client connected from {websocket.remote_address}")
# Start conversation
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, websocket):
logger.info(f"Client disconnected: {websocket.remote_address}")
await task.cancel()
@transport.event_handler("on_session_timeout")
async def on_session_timeout(transport, websocket):
logger.info(f"Session timeout for {websocket.remote_address}")
await task.cancel()
# Run the pipeline
runner = PipelineRunner()
await runner.run(task)
WebSocket Client Transport
The client transport connects to an existing WebSocket server:
from pipecat.transports.network.websocket_client import (
WebsocketClientParams,
WebsocketClientTransport,
)
async def run_websocket_client():
# Create WebSocket client transport
transport = WebsocketClientTransport(
uri="ws://localhost:8765",
params=WebsocketClientParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
serializer=ProtobufFrameSerializer(),
add_wav_header=True, # Optional WAV headers
),
)
# Your pipeline setup
pipeline = Pipeline([
transport.input(),
# ... your processing chain
transport.output(),
])
# Event handlers
@transport.event_handler("on_connected")
async def on_connected(transport, websocket):
logger.info("Connected to WebSocket server")
@transport.event_handler("on_disconnected")
async def on_disconnected(transport, websocket):
logger.info("Disconnected from WebSocket server")
await task.cancel()
# Run the pipeline
runner = PipelineRunner()
await runner.run(task)
Combined Server Setup
For a complete application, you might run both a FastAPI web server and WebSocket server:
import asyncio
import uvicorn
from fastapi import FastAPI
app = FastAPI()
@app.post("/connect")
async def get_websocket_url():
return {"wsUrl": "ws://localhost:8765"}
async def main():
# Run both WebSocket bot and web server concurrently
tasks = [
run_websocket_server(), # Your WebSocket bot
uvicorn.Server(uvicorn.Config(app, host="0.0.0.0", port=7860)).serve()
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Event Handling
WebSocket transports provide event callbacks for connection management. Register callbacks using the @transport.event_handler()
decorator:
Server Events
on_client_connected
- Client connects to the server
on_client_disconnected
- Client disconnects from the server
on_session_timeout
- Client session times out (if configured)
on_websocket_ready
- WebSocket server is ready to accept connections
Client Events
on_connected
- Successfully connected to WebSocket server
on_disconnected
- Disconnected from WebSocket server
For complete event details and parameters, see the API reference documentation
for specific callback signatures and usage examples.
Example Usage
# Server event handling
@server_transport.event_handler("on_websocket_ready")
async def on_server_ready(transport):
logger.info("WebSocket server ready for connections")
@server_transport.event_handler("on_client_connected")
async def on_client_connected(transport, websocket):
logger.info(f"New client: {websocket.remote_address}")
# Initialize conversation or send welcome message
# Client event handling
@client_transport.event_handler("on_connected")
async def on_connected(transport, websocket):
logger.info("Connected to server")
# Start sending data or audio
@client_transport.event_handler("on_disconnected")
async def on_disconnected(transport, websocket):
logger.info("Server connection lost")
# Handle reconnection or cleanup
Advanced Configuration
Audio Processing
# Server configuration
server_params = WebsocketServerParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_out_sample_rate=24000,
audio_out_channels=1,
add_wav_header=False, # Binary audio data
vad_analyzer=SileroVADAnalyzer(),
)
# Client configuration
client_params = WebsocketClientParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=True, # WAV headers for compatibility
serializer=ProtobufFrameSerializer(),
)
Session Management
params = WebsocketServerParams(
session_timeout=300, # 5 minute timeout
# ... other parameters
)
@transport.event_handler("on_session_timeout")
async def handle_timeout(transport, websocket):
logger.info("Session timed out, cleaning up")
await task.cancel()
Use Cases
Server Transport
- Voice AI Applications: Host voice assistants that clients connect to
- Real-time Audio Processing: Server-side audio analysis and response
- Prototyping: Quick setup for testing conversational AI flows
- Controlled Environments: Internal networks where WebRTC complexity isn’t needed
Client Transport
- Bot Clients: Connect bots to existing WebSocket servers
- Testing Tools: Automated testing of WebSocket-based services
- Integration: Connect Pipecat to third-party WebSocket APIs
- Monitoring: Client bots that connect to monitor or interact with services