Documentation Index
Fetch the complete documentation index at: https://docs.dataspike.io/llms.txt
Use this file to discover all available pages before exploring further.
GitHub: github.com/dataspike-io/pipecat-deepfake-bot
The Dataspike integration for Pipecat adds real-time detection of AI‑manipulated (deepfake) content to Pipecat applications.
It processes video frames and audio streams from WebRTC connections, sends them to the Dataspike API for analysis, and triggers customizable detection callbacks for immediate action.
Summary
- Manipulation Recognition: Detects face swaps, compositing, and AI‑generated video/audio overlays.
- AI Model Detection: Identifies content associated with modern generative systems (e.g., Veo, Sana, Flux, voice cloning).
- Adaptive Sampling: Intelligently adjusts frame rate based on detection state—normal rate during clear states, burst rate when suspicious activity is detected.
- Realtime Alerts: Provides instant notification callbacks for ALERT and state transition events.
- Privacy by Design: No video or metadata is stored; frames are end‑to‑end encrypted in transit; results are ephemeral.
Installation
# Clone the repository
git clone https://github.com/dataspike-io/pipecat-deepfake-bot.git
cd pipecat-deepfake-bot
# Install dependencies using uv (recommended)
uv sync
# Or using pip
pip install -e .
Prerequisites
- Python 3.10+ — Required for async/await features and type hints.
- Dataspike API Key — Set as an environment variable:
export DATASPIKE_API_KEY="your_api_key_here"
- Pipecat Cloud credentials (for cloud deployment):
# Configured via pcc-deploy.toml or Pipecat Cloud dashboard
Optional (for advanced configurations):
export DATASPIKE_WS_URL="wss://api.dataspike.io/api/v4/deepfake/stream"
Quick Start
The example below shows a ready-to-use Pipecat bot that integrates the Dataspike deepfake processor.
It creates a pipeline that analyzes incoming video and audio streams in real-time.
Basic Bot Setup
Create a .env file with your API key:
DATASPIKE_API_KEY=your_api_key_here
Run the bot:
# Using uv
uv run bot.py
# Or using pip
python bot.py
🎮 Interactive Playground
When running locally, Pipecat automatically starts an interactive playground at http://localhost:7860 where you can:
- Test video and audio in real-time with your webcam and microphone
- See the bot in action with live deepfake detection
- Monitor notifications and detection state transitions (CLEAR → SUSPICIOUS → ALERT)
- Experiment with configurations without deploying to production
This makes it incredibly easy to develop, test, and debug your deepfake detection integration locally before deploying to Pipecat Cloud.
Quick Testing Workflow:
- Start the bot:
uv run bot.py
- Open http://localhost:7860 in your browser
- Grant camera/microphone permissions when prompted
- Click “Connect” to establish WebRTC connection
- Speak into your microphone and show your face to the camera
- Watch the console logs for real-time detection events
- Test different scenarios (different lighting, angles, voice tones)
Tip: Open your browser’s developer console to see detailed logs, WebSocket activity, and frame processing metrics.
Code Example
import os
import aiohttp
from pipecat.pipeline.pipeline import Pipeline
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
from deepfake import DataspikeDeepfakeProcessor, VideoParams, AudioParams
async def run_bot(transport):
async with aiohttp.ClientSession() as session:
# Create the deepfake detection processor
deepfake_processor = DataspikeDeepfakeProcessor(
api_key=os.getenv("DATASPIKE_API_KEY"),
session=session,
video_params=VideoParams(
normal_fps=0.2, # Baseline frame rate
burst_fps=1.0, # Elevated rate during suspicious states
quality=75 # JPEG quality (0-100)
),
audio_params=AudioParams(
sample_rate=16000, # Required: 16kHz
sample_size=48000, # 3 seconds of audio
interval=60 # Seconds between audio samples
),
# notification_cb=custom_handler # Optional: custom callback
)
# Create pipeline: Input → Deepfake Analysis → Output
pipeline = Pipeline([
transport.input(),
deepfake_processor,
transport.output(),
])
# Start processing when client connects
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
await deepfake_processor.start()
deepfake_processor.set_webrtc_connection(client)
deepfake_processor.set_participant_id(client.pc_id)
await transport.capture_participant_video("camera")
Default Alert Behavior
By default, the processor logs detection events to the console. When the system detects potential manipulation, it transitions through three states:
- CLEAR — No manipulation detected (normal frame rate)
- SUSPICIOUS — Potential signs of manipulation (increased frame rate for verification)
- ALERT — High confidence manipulation detected (notification triggered)
Example default log output:
{
"type": "deepfake_alert",
"level": "alert",
"participant_id": "participant_abc123",
"track_id": "video_track_xyz",
"message": "High likelihood of manipulation detected for participant_abc123.",
"timestamp_ms": 1730123456789
}
Custom Notification Channels
Developers can override the default logging behavior to route alerts anywhere—for example, sending them to a webhook, Slack, a moderation API, database, or your own analytics pipeline.
Provide a custom async callback (notification_cb) when creating the processor:
async def on_deepfake_alert(event):
"""
Custom notification handler for deepfake detection events.
Args:
event: DeepfakeStreamingSchemaResultEvent containing:
- type: EventType (CLEAR, SUSPICIOUS, ALERT)
- participant_id: ID of the participant being analyzed
- track_id: Media track identifier
- timestamp_ms: Detection timestamp
"""
if event.type == EventType.ALERT:
# High confidence deepfake detected
print(f"🚨 ALERT: Deepfake detected for {event.participant_id}")
# Send to external services
await post_to_webhook({
"event": "deepfake_detected",
"participant": event.participant_id,
"track": event.track_id,
"timestamp": event.timestamp_ms
})
# Store in database
await db.store_incident(event)
# Send Slack notification
await slack.send_message(f"⚠️ Deepfake detected: {event.participant_id}")
elif event.type == EventType.CLEAR:
# Returned to normal state (after being in ALERT)
print(f"✅ CLEAR: Manipulation no longer detected for {event.participant_id}")
# Create processor with custom handler
deepfake_processor = DataspikeDeepfakeProcessor(
api_key=os.getenv("DATASPIKE_API_KEY"),
session=session,
notification_cb=on_deepfake_alert
)
Flexible Integration: You have full control over how alerts are processed, logged, or relayed. Route detection events to monitoring systems, moderation queues, compliance logs, or real-time dashboards.
Configuration
Configure the processor via constructor parameters or environment variables.
Video Parameters
| Parameter | Type | Default | Description |
|---|
burst_fps | float | 1.0 | Frame rate during SUSPICIOUS state (higher for verification) |
normal_fps | float | 0.2 | Frame rate during CLEAR state (lower to conserve resources) |
quality | int | 75 | JPEG compression quality (0–100, where 100 is highest) |
Audio Parameters
| Parameter | Type | Default | Description |
|---|
sample_rate | int | 16000 | Required sampling rate in Hz (must be 16000 for Dataspike API) |
sample_size | int | 48000 | Samples per chunk (48000 = 3 seconds at 16kHz) |
interval | int | 60 | Minimum seconds between audio sample transmissions |
Processor Parameters
| Parameter | Type | Default | Description |
|---|
api_key | str | env:DATASPIKE_API_KEY | Dataspike API key. Required unless set via environment. |
session | ClientSession | required | aiohttp ClientSession for WebSocket connections |
video_params | VideoParams | defaults | Video processing configuration |
audio_params | AudioParams | defaults | Audio processing configuration |
notification_cb | Callable | None | Optional async callback for detection events. Defaults to logging. |
Environment Variables
export DATASPIKE_API_KEY="..." # Required: Your API key
export DATASPIKE_WS_URL="..." # Optional: Custom WebSocket endpoint
How It Works
Development Flow (Local with Playground)
- Start Bot — Run
uv run bot.py to launch the bot with the integrated Pipecat playground at http://localhost:7860
- Connect Browser — Open the playground in your browser and grant camera/microphone permissions
- WebRTC Handshake — The playground establishes a WebRTC connection to the bot
- Real-time Processing — Your video/audio streams through the pipeline for live deepfake analysis
- Instant Feedback — Detection results appear in console logs and trigger your notification callbacks
Production Flow (Architecture)
- WebRTC Connection — When a client connects via SmallWebRTC, the bot establishes a bidirectional media connection.
- Frame Capture — The processor intercepts
InputImageRawFrame (camera only) and InputAudioRawFrame from the pipeline.
- Adaptive Sampling — Video frames are sampled at
normal_fps during CLEAR state. When SUSPICIOUS events occur, sampling increases to burst_fps for rapid verification.
- Audio Buffering — Audio is only processed when users are speaking (detected via VAD). Samples are accumulated into 3-second chunks and resampled to 16kHz PCM.
- Encoding & Queuing — Video frames are JPEG-encoded and queued (max 16 items). Stale frames are dropped to prevent blocking.
- Streaming — Frames stream via secure WebSocket to the Dataspike API. Detection results return in real-time.
- State Management — The processor maintains detection state (CLEAR/SUSPICIOUS/ALERT) and triggers notifications on state transitions.
- Auto-Reconnection — If the WebSocket drops, the processor reconnects with exponential backoff (1s → 10s max) and jitter.
Deployment
Pipecat Cloud
Deploy to Pipecat Cloud for production-grade scaling:
- Update
pcc-deploy.toml:
agent_name = "dataspike-deepfake-detection"
image = "your_username/dataspike-deepfake-detection:0.1"
image_credentials = "your_dockerhub_image_pull_secret"
secret_set = "dataspike-secrets"
[scaling]
min_agents = 1
- Build and push Docker image:
docker build -t your_username/dataspike-deepfake-detection:0.1 .
docker push your_username/dataspike-deepfake-detection:0.1
-
Create secret set in Pipecat Cloud dashboard with
DATASPIKE_API_KEY
-
Deploy:
For detailed deployment instructions, see the Pipecat Quickstart Guide.
Docker (Self-Hosted)
# Build the image
docker build -t dataspike-deepfake-bot .
# Run with environment variables
docker run -e DATASPIKE_API_KEY=your_key_here dataspike-deepfake-bot
Privacy & Security
- No Storage — The processor and Dataspike API do not store video, audio, or metadata.
- E2E Encryption — All transmissions use TLS/WSS; only ephemeral detection signals are returned.
- Minimal Data — Only encoded frames and essential metadata (participant ID, track ID, timestamp) are transmitted.
- Key Management — Store
DATASPIKE_API_KEY in environment variables or secret managers. Never hard-code credentials.
- Camera Only — The processor explicitly filters for camera feeds (
transport_source == "camera") and ignores screen sharing to minimize unnecessary data transmission.
Troubleshooting
WebSocket Connection Issues
Symptom: “Timeout waiting for WebRTC connection” errors
Solutions:
- Verify Pipecat Cloud configuration is correct
- Check that
session_manager timeout (120s) is sufficient
- Ensure
DATASPIKE_API_KEY is set correctly
- Review network connectivity to
wss://api.dataspike.io
Symptom: High CPU usage or detection lag
Solutions:
- Reduce
video_params.normal_fps (default: 0.2 fps)
- Lower
video_params.quality to reduce encoding overhead (default: 75)
- Increase
audio_params.interval to process audio less frequently (default: 60s)
- Monitor queue size—frequent
QueueFull warnings indicate overload
Audio Detection Not Working
Symptom: No audio analysis results
Checklist:
- User must be actively speaking (audio only processed during
UserStartedSpeakingFrame)
- Audio buffer must reach
sample_size (48,000 samples = 3 seconds)
- WebRTC connection must have active audio input track
- VAD (Voice Activity Detection) must be properly configured
Authentication Errors
Symptom: “DATASPIKE_API_KEY must be set” or 401 errors
Solutions:
- Verify environment variable is set:
echo $DATASPIKE_API_KEY
- Check API key is valid at Dataspike Dashboard
- Ensure
.env file is in the correct directory
- For Docker: verify
-e flag or secret mount is correct
Local Playground Issues
Symptom: Can’t access http://localhost:7860
Solutions:
- Ensure the bot is running (
uv run bot.py or python bot.py)
- Check no other service is using port 7860
- Look for “Playground available at http://localhost:7860” in logs
- Try accessing from the same machine where the bot is running
Symptom: Webcam/microphone not working in playground
Solutions:
- Grant browser permissions for camera and microphone access
- Ensure no other application is using your webcam/microphone
- Try a different browser (Chrome/Edge recommended for WebRTC)
- Check browser console for detailed error messages
Full Bot Example
Complete implementation showing all features:
"""
Dataspike Deepfake Detection Bot for Pipecat
Real-time video and audio analysis with custom notifications
"""
import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
from pipecat.transports.base_transport import TransportParams
from deepfake import DataspikeDeepfakeProcessor, VideoParams, AudioParams
from schema_pb2 import DeepfakeStreamingSchemaResultEvent, EventType
load_dotenv()
async def custom_notification_handler(event: DeepfakeStreamingSchemaResultEvent):
"""Handle deepfake detection events with custom logic."""
if event.type == EventType.ALERT:
logger.critical(f"🚨 DEEPFAKE ALERT: {event.participant_id}")
# Route to your monitoring/moderation systems
# await send_to_slack(event)
# await store_in_database(event)
# await trigger_moderation_workflow(event)
elif event.type == EventType.CLEAR:
logger.info(f"✅ All clear for {event.participant_id}")
async def run_bot(transport, runner_args):
"""Execute the bot with deepfake detection."""
async with aiohttp.ClientSession() as session:
# Initialize processor with custom configuration
deepfake_processor = DataspikeDeepfakeProcessor(
api_key=os.getenv("DATASPIKE_API_KEY"),
session=session,
video_params=VideoParams(
normal_fps=0.2, # 1 frame every 5 seconds
burst_fps=1.0, # 1 frame per second when suspicious
quality=75 # Good quality/bandwidth balance
),
audio_params=AudioParams(
sample_rate=16000, # Required by API
sample_size=48000, # 3-second chunks
interval=60 # Sample every 60 seconds
),
notification_cb=custom_notification_handler
)
# Build pipeline
pipeline = Pipeline([
transport.input(),
deepfake_processor,
transport.output(),
])
# Configure task
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
)
# Event handlers
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client.pc_id}")
await deepfake_processor.start()
deepfake_processor.set_webrtc_connection(client)
deepfake_processor.set_participant_id(client.pc_id)
await transport.capture_participant_video("camera")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, participant):
logger.info(f"Client disconnected: {participant}")
await deepfake_processor.stop()
await task.cancel()
# Run the pipeline
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
# Entry point
if __name__ == "__main__":
from pipecat.runner.run import main
main()
Reference Implementation
For a complete, production-ready implementation, see the official repository:
🔗 dataspike-io/pipecat-deepfake-bot
The repository includes:
- Complete bot implementation with comprehensive documentation
- Dockerfile and Pipecat Cloud deployment configuration
- Example custom notification handlers
- Environment configuration templates
- Integration tests and examples
API Reference
DataspikeDeepfakeProcessor
Constructor:
DataspikeDeepfakeProcessor(
api_key: str | None = None,
session: aiohttp.ClientSession | None = None,
video_params: VideoParams = VideoParams(),
audio_params: AudioParams = AudioParams(),
notification_cb: Callable[[DeepfakeStreamingSchemaResultEvent], Awaitable[None]] | None = None
)
Methods:
async start() — Start WebSocket connection and processing loop
async stop() — Stop processor and close connections gracefully
set_participant_id(str) — Set participant identifier for tracking
set_webrtc_connection(SmallWebRTCConnection) — Configure media track access
State Transitions:
CLEAR → SUSPICIOUS — Potential manipulation detected, frame rate increases
SUSPICIOUS → ALERT — High confidence detection, notification triggered
ALERT → CLEAR — Manipulation resolved, notification triggered
Support & Resources
For issues specific to this integration, open an issue on GitHub or contact Dataspike support.