Overview
FastAPIWebsocketTransport
provides WebSocket support for FastAPI web applications, enabling real-time audio communication over WebSocket connections. It’s primarily designed for telephony integrations with providers like Twilio, Telnyx, and Plivo, supporting bidirectional audio streams with configurable serializers and voice activity detection.
FastAPIWebsocketTransport is best suited for telephony applications and server-side WebSocket integrations.For general client/server applications, we recommend using WebRTC-based transports for more robust network and media handling.
Installation
To use FastAPIWebsocketTransport
, install the required dependencies:
pip install "pipecat-ai[websocket]"
No additional API keys are required for the transport itself, but you’ll need credentials for your chosen telephony provider.
This transport is commonly used with telephony providers. See the Frame
Serializers documentation for
provider-specific setup.
Frames
InputAudioRawFrame
- Raw audio data from WebSocket client
Frame
- Other frame types based on configured serializer
Output
OutputAudioRawFrame
- Audio data to WebSocket client (with optional WAV headers)
TransportMessageFrame
- Application messages to client
TransportMessageUrgentFrame
- Urgent messages to client
Key Features
- Telephony Integration: Optimized for phone call audio streaming with major providers
- Frame Serialization: Configurable serializers for different telephony protocols
- Session Management: Built-in connection monitoring and timeout handling
- Audio Timing: Simulates audio device timing for proper call flow
- WAV Header Support: Optional WAV header generation for compatibility
Usage Example
Using the Development Runner (Recommended)
The easiest way to use FastAPIWebsocketTransport
for telephony is with Pipecat’s development runner:
import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.runner.types import RunnerArguments, WebSocketRunnerArguments
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
async def run_bot(transport):
"""Your core bot logic - works with any transport."""
# Your services (STT, LLM, TTS, etc.)
# ...
# Create pipeline
pipeline = Pipeline([
transport.input(), # Receive audio from caller
stt, # Convert speech to text
context_aggregator.user(), # Add user messages to context
llm, # Process text with LLM
tts, # Convert text to speech
transport.output(), # Send audio to caller
context_aggregator.assistant(), # Add assistant responses to context
])
# Event handlers
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Start conversation when caller connects
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
await task.cancel()
# Run the pipeline
# ...
async def bot(runner_args: RunnerArguments):
"""Entry point called by the development runner."""
if isinstance(runner_args, WebSocketRunnerArguments):
# Auto-detect telephony provider and create appropriate serializer
from pipecat.runner.utils import parse_telephony_websocket
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
# Create serializer based on detected provider
if transport_type == "twilio":
from pipecat.serializers.twilio import TwilioFrameSerializer
serializer = TwilioFrameSerializer(
stream_sid=call_data["stream_id"],
call_sid=call_data["call_id"],
account_sid=os.getenv("TWILIO_ACCOUNT_SID"),
auth_token=os.getenv("TWILIO_AUTH_TOKEN"),
)
transport = FastAPIWebsocketTransport(
websocket=runner_args.websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
serializer=serializer,
),
)
await run_bot(transport)
if __name__ == "__main__":
from pipecat.runner.run import main
main()
Run your telephony bot with:
python bot.py -t twilio -x your-ngrok-domain.ngrok.io
python bot.py -t telnyx -x your-ngrok-domain.ngrok.io
python bot.py -t plivo -x your-ngrok-domain.ngrok.io
The development runner automatically:
- Creates FastAPI server with telephony webhook endpoints
- Sets up WebSocket endpoints for audio streaming
- Handles provider-specific message parsing and serialization
- Manages WebSocket connection lifecycle
The -x
flag specifies your public domain (like ngrok) that telephony
providers can reach for webhooks.
Manual FastAPI Implementation
For custom deployments, you can implement the FastAPI server manually:
1. FastAPI Server Setup
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import HTMLResponse
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/")
async def start_call():
"""Handle telephony provider webhook."""
# Return TwiML/XML response that establishes WebSocket connection
xml_response = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://your-domain.com/ws"></Stream>
</Connect>
<Pause length="40"/>
</Response>"""
return HTMLResponse(content=xml_response, media_type="application/xml")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""Handle WebSocket connections for audio streaming."""
await websocket.accept()
# Parse initial connection data from provider
start_data = websocket.iter_text()
await start_data.__anext__()
call_data = json.loads(await start_data.__anext__())
# Extract call information
stream_sid = call_data["start"]["streamSid"]
call_sid = call_data["start"]["callSid"]
await run_bot(websocket, stream_sid, call_sid)
Websocket message parsing varies based on the telephony provider. Use the
parse_telephony_websocket()
utility to auto-detect and extract call data.
2. Bot Implementation
import json
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
async def run_bot(websocket: WebSocket, stream_sid: str, call_sid: str):
"""Run the Pipecat bot for a specific call."""
# Create serializer for telephony provider
serializer = TwilioFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
account_sid=os.getenv("TWILIO_ACCOUNT_SID"),
auth_token=os.getenv("TWILIO_AUTH_TOKEN"),
)
# Create transport
transport = FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
serializer=serializer,
session_timeout=30, # Optional timeout
),
)
# Your pipeline setup
# ...
# Run the pipeline
runner = PipelineRunner(handle_sigint=False, force_gc=True)
await runner.run(task)
Event Handling
FastAPIWebsocketTransport provides event callbacks for connection management. Register callbacks using the @transport.event_handler()
decorator:
Connection Events
on_client_connected
- Client connects to WebSocket endpoint
on_client_disconnected
- Client disconnects from WebSocket endpoint
on_session_timeout
- Session timeout occurs (if configured)
Example Usage
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, websocket):
logger.info("Caller connected")
# Start conversation
await task.queue_frames([TextFrame("Hello! How can I help you today?")])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, websocket):
logger.info("Call ended")
await task.cancel()
@transport.event_handler("on_session_timeout")
async def on_session_timeout(transport, websocket):
logger.info("Call timed out")
# Handle timeout (e.g., play message, end call)
Telephony Integration
FastAPIWebsocketTransport works with major telephony providers through frame serializers:
Supported Providers
Provider-Specific Setup
Each provider requires specific configuration and webhook URLs. The development runner handles this automatically, or you can configure manually:
# Twilio setup
serializer = TwilioFrameSerializer(
stream_sid="stream_id_from_webhook",
call_sid="call_id_from_webhook",
account_sid=os.getenv("TWILIO_ACCOUNT_SID"),
auth_token=os.getenv("TWILIO_AUTH_TOKEN"),
)
# Telnyx setup
serializer = TelnyxFrameSerializer(
stream_id="stream_id_from_webhook",
call_control_id="call_control_id_from_webhook",
api_key=os.getenv("TELNYX_API_KEY"),
)
See the Frame Serializers documentation for complete provider setup guides.
Advanced Configuration
Audio Processing
params = FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=8000, # Common for telephony
audio_out_sample_rate=8000, # Common for telephony
add_wav_header=False, # Depending on provider requirements
vad_analyzer=SileroVADAnalyzer(),
)
Session Management
params = FastAPIWebsocketParams(
session_timeout=300, # 5 minute timeout
# Other parameters...
)
@transport.event_handler("on_session_timeout")
async def handle_timeout(transport, websocket):
# Play timeout message before ending call
await task.queue_frames([
TTSTextFrame("I haven't heard from you in a while. Goodbye!"),
EndFrame()
])
Additional Notes
- Telephony Focus: Optimized for phone call audio with 8kHz sample rates
- Provider Integration: Works seamlessly with major telephony providers
- Audio Timing: Simulates real audio device timing for proper call flow
- Session Management: Built-in timeout handling for abandoned calls
- Webhook Requirements: Requires publicly accessible endpoints for telephony providers
- Development: Use ngrok or similar tools for local development with real phone numbers
FastAPIWebsocketTransport is the preferred choice for building phone-based voice AI applications with reliable audio streaming and provider compatibility.