Skip to main content
GitHub: github.com/dataspike-io/pipecat-deepfake-bot The Dataspike integration for Pipecat adds real-time detection of AI‑manipulated (deepfake) content to Pipecat applications. It processes video frames and audio streams from WebRTC connections, sends them to the Dataspike API for analysis, and triggers customizable detection callbacks for immediate action.
Summary
  • Manipulation Recognition: Detects face swaps, compositing, and AI‑generated video/audio overlays.
  • AI Model Detection: Identifies content associated with modern generative systems (e.g., Veo, Sana, Flux, voice cloning).
  • Adaptive Sampling: Intelligently adjusts frame rate based on detection state—normal rate during clear states, burst rate when suspicious activity is detected.
  • Realtime Alerts: Provides instant notification callbacks for ALERT and state transition events.
  • Privacy by Design: No video or metadata is stored; frames are end‑to‑end encrypted in transit; results are ephemeral.

Installation

# Clone the repository
git clone https://github.com/dataspike-io/pipecat-deepfake-bot.git
cd pipecat-deepfake-bot

# Install dependencies using uv (recommended)
uv sync

# Or using pip
pip install -e .

Prerequisites

  • Python 3.10+ — Required for async/await features and type hints.
  • Dataspike API Key — Set as an environment variable:
    export DATASPIKE_API_KEY="your_api_key_here"
    
  • Pipecat Cloud credentials (for cloud deployment):
    # Configured via pcc-deploy.toml or Pipecat Cloud dashboard
    
Optional (for advanced configurations):
export DATASPIKE_WS_URL="wss://api.dataspike.io/api/v4/deepfake/stream"

Quick Start

The example below shows a ready-to-use Pipecat bot that integrates the Dataspike deepfake processor. It creates a pipeline that analyzes incoming video and audio streams in real-time.

Basic Bot Setup

Create a .env file with your API key:
DATASPIKE_API_KEY=your_api_key_here
Run the bot:
# Using uv
uv run bot.py

# Or using pip
python bot.py
🎮 Interactive Playground When running locally, Pipecat automatically starts an interactive playground at http://localhost:7860 where you can:
  • Test video and audio in real-time with your webcam and microphone
  • See the bot in action with live deepfake detection
  • Monitor notifications and detection state transitions (CLEAR → SUSPICIOUS → ALERT)
  • Experiment with configurations without deploying to production
This makes it incredibly easy to develop, test, and debug your deepfake detection integration locally before deploying to Pipecat Cloud. Quick Testing Workflow:
  1. Start the bot: uv run bot.py
  2. Open http://localhost:7860 in your browser
  3. Grant camera/microphone permissions when prompted
  4. Click “Connect” to establish WebRTC connection
  5. Speak into your microphone and show your face to the camera
  6. Watch the console logs for real-time detection events
  7. Test different scenarios (different lighting, angles, voice tones)
Tip: Open your browser’s developer console to see detailed logs, WebSocket activity, and frame processing metrics.

Code Example

import os
import aiohttp
from pipecat.pipeline.pipeline import Pipeline
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
from deepfake import DataspikeDeepfakeProcessor, VideoParams, AudioParams

async def run_bot(transport):
    async with aiohttp.ClientSession() as session:
        # Create the deepfake detection processor
        deepfake_processor = DataspikeDeepfakeProcessor(
            api_key=os.getenv("DATASPIKE_API_KEY"),
            session=session,
            video_params=VideoParams(
                normal_fps=0.2,   # Baseline frame rate
                burst_fps=1.0,    # Elevated rate during suspicious states
                quality=75        # JPEG quality (0-100)
            ),
            audio_params=AudioParams(
                sample_rate=16000,   # Required: 16kHz
                sample_size=48000,   # 3 seconds of audio
                interval=60          # Seconds between audio samples
            ),
            # notification_cb=custom_handler  # Optional: custom callback
        )

        # Create pipeline: Input → Deepfake Analysis → Output
        pipeline = Pipeline([
            transport.input(),
            deepfake_processor,
            transport.output(),
        ])

        # Start processing when client connects
        @transport.event_handler("on_client_connected")
        async def on_client_connected(transport, client):
            await deepfake_processor.start()
            deepfake_processor.set_webrtc_connection(client)
            deepfake_processor.set_participant_id(client.pc_id)
            await transport.capture_participant_video("camera")

Default Alert Behavior

By default, the processor logs detection events to the console. When the system detects potential manipulation, it transitions through three states:
  • CLEAR — No manipulation detected (normal frame rate)
  • SUSPICIOUS — Potential signs of manipulation (increased frame rate for verification)
  • ALERT — High confidence manipulation detected (notification triggered)
Example default log output:
{
  "type": "deepfake_alert",
  "level": "alert",
  "participant_id": "participant_abc123",
  "track_id": "video_track_xyz",
  "message": "High likelihood of manipulation detected for participant_abc123.",
  "timestamp_ms": 1730123456789
}

Custom Notification Channels

Developers can override the default logging behavior to route alerts anywhere—for example, sending them to a webhook, Slack, a moderation API, database, or your own analytics pipeline. Provide a custom async callback (notification_cb) when creating the processor:
async def on_deepfake_alert(event):
    """
    Custom notification handler for deepfake detection events.

    Args:
        event: DeepfakeStreamingSchemaResultEvent containing:
            - type: EventType (CLEAR, SUSPICIOUS, ALERT)
            - participant_id: ID of the participant being analyzed
            - track_id: Media track identifier
            - timestamp_ms: Detection timestamp
    """
    if event.type == EventType.ALERT:
        # High confidence deepfake detected
        print(f"🚨 ALERT: Deepfake detected for {event.participant_id}")

        # Send to external services
        await post_to_webhook({
            "event": "deepfake_detected",
            "participant": event.participant_id,
            "track": event.track_id,
            "timestamp": event.timestamp_ms
        })

        # Store in database
        await db.store_incident(event)

        # Send Slack notification
        await slack.send_message(f"⚠️ Deepfake detected: {event.participant_id}")

    elif event.type == EventType.CLEAR:
        # Returned to normal state (after being in ALERT)
        print(f"✅ CLEAR: Manipulation no longer detected for {event.participant_id}")

# Create processor with custom handler
deepfake_processor = DataspikeDeepfakeProcessor(
    api_key=os.getenv("DATASPIKE_API_KEY"),
    session=session,
    notification_cb=on_deepfake_alert
)
Flexible Integration: You have full control over how alerts are processed, logged, or relayed. Route detection events to monitoring systems, moderation queues, compliance logs, or real-time dashboards.

Configuration

Configure the processor via constructor parameters or environment variables.

Video Parameters

ParameterTypeDefaultDescription
burst_fpsfloat1.0Frame rate during SUSPICIOUS state (higher for verification)
normal_fpsfloat0.2Frame rate during CLEAR state (lower to conserve resources)
qualityint75JPEG compression quality (0–100, where 100 is highest)

Audio Parameters

ParameterTypeDefaultDescription
sample_rateint16000Required sampling rate in Hz (must be 16000 for Dataspike API)
sample_sizeint48000Samples per chunk (48000 = 3 seconds at 16kHz)
intervalint60Minimum seconds between audio sample transmissions

Processor Parameters

ParameterTypeDefaultDescription
api_keystrenv:DATASPIKE_API_KEYDataspike API key. Required unless set via environment.
sessionClientSessionrequiredaiohttp ClientSession for WebSocket connections
video_paramsVideoParamsdefaultsVideo processing configuration
audio_paramsAudioParamsdefaultsAudio processing configuration
notification_cbCallableNoneOptional async callback for detection events. Defaults to logging.
Environment Variables
export DATASPIKE_API_KEY="..."        # Required: Your API key
export DATASPIKE_WS_URL="..."         # Optional: Custom WebSocket endpoint

How It Works

Development Flow (Local with Playground)

  1. Start Bot — Run uv run bot.py to launch the bot with the integrated Pipecat playground at http://localhost:7860
  2. Connect Browser — Open the playground in your browser and grant camera/microphone permissions
  3. WebRTC Handshake — The playground establishes a WebRTC connection to the bot
  4. Real-time Processing — Your video/audio streams through the pipeline for live deepfake analysis
  5. Instant Feedback — Detection results appear in console logs and trigger your notification callbacks

Production Flow (Architecture)

  1. WebRTC Connection — When a client connects via SmallWebRTC, the bot establishes a bidirectional media connection.
  2. Frame Capture — The processor intercepts InputImageRawFrame (camera only) and InputAudioRawFrame from the pipeline.
  3. Adaptive Sampling — Video frames are sampled at normal_fps during CLEAR state. When SUSPICIOUS events occur, sampling increases to burst_fps for rapid verification.
  4. Audio Buffering — Audio is only processed when users are speaking (detected via VAD). Samples are accumulated into 3-second chunks and resampled to 16kHz PCM.
  5. Encoding & Queuing — Video frames are JPEG-encoded and queued (max 16 items). Stale frames are dropped to prevent blocking.
  6. Streaming — Frames stream via secure WebSocket to the Dataspike API. Detection results return in real-time.
  7. State Management — The processor maintains detection state (CLEAR/SUSPICIOUS/ALERT) and triggers notifications on state transitions.
  8. Auto-Reconnection — If the WebSocket drops, the processor reconnects with exponential backoff (1s → 10s max) and jitter.

Deployment

Pipecat Cloud

Deploy to Pipecat Cloud for production-grade scaling:
  1. Update pcc-deploy.toml:
agent_name = "dataspike-deepfake-detection"
image = "your_username/dataspike-deepfake-detection:0.1"
image_credentials = "your_dockerhub_image_pull_secret"
secret_set = "dataspike-secrets"

[scaling]
min_agents = 1
  1. Build and push Docker image:
docker build -t your_username/dataspike-deepfake-detection:0.1 .
docker push your_username/dataspike-deepfake-detection:0.1
  1. Create secret set in Pipecat Cloud dashboard with DATASPIKE_API_KEY
  2. Deploy:
pcc deploy
For detailed deployment instructions, see the Pipecat Quickstart Guide.

Docker (Self-Hosted)

# Build the image
docker build -t dataspike-deepfake-bot .

# Run with environment variables
docker run -e DATASPIKE_API_KEY=your_key_here dataspike-deepfake-bot

Privacy & Security

  • No Storage — The processor and Dataspike API do not store video, audio, or metadata.
  • E2E Encryption — All transmissions use TLS/WSS; only ephemeral detection signals are returned.
  • Minimal Data — Only encoded frames and essential metadata (participant ID, track ID, timestamp) are transmitted.
  • Key Management — Store DATASPIKE_API_KEY in environment variables or secret managers. Never hard-code credentials.
  • Camera Only — The processor explicitly filters for camera feeds (transport_source == "camera") and ignores screen sharing to minimize unnecessary data transmission.

Troubleshooting

WebSocket Connection Issues

Symptom: “Timeout waiting for WebRTC connection” errors Solutions:
  • Verify Pipecat Cloud configuration is correct
  • Check that session_manager timeout (120s) is sufficient
  • Ensure DATASPIKE_API_KEY is set correctly
  • Review network connectivity to wss://api.dataspike.io

Frame Rate / Performance Issues

Symptom: High CPU usage or detection lag Solutions:
  • Reduce video_params.normal_fps (default: 0.2 fps)
  • Lower video_params.quality to reduce encoding overhead (default: 75)
  • Increase audio_params.interval to process audio less frequently (default: 60s)
  • Monitor queue size—frequent QueueFull warnings indicate overload

Audio Detection Not Working

Symptom: No audio analysis results Checklist:
  • User must be actively speaking (audio only processed during UserStartedSpeakingFrame)
  • Audio buffer must reach sample_size (48,000 samples = 3 seconds)
  • WebRTC connection must have active audio input track
  • VAD (Voice Activity Detection) must be properly configured

Authentication Errors

Symptom: “DATASPIKE_API_KEY must be set” or 401 errors Solutions:
  • Verify environment variable is set: echo $DATASPIKE_API_KEY
  • Check API key is valid at Dataspike Dashboard
  • Ensure .env file is in the correct directory
  • For Docker: verify -e flag or secret mount is correct

Local Playground Issues

Symptom: Can’t access http://localhost:7860 Solutions:
  • Ensure the bot is running (uv run bot.py or python bot.py)
  • Check no other service is using port 7860
  • Look for “Playground available at http://localhost:7860” in logs
  • Try accessing from the same machine where the bot is running
Symptom: Webcam/microphone not working in playground Solutions:
  • Grant browser permissions for camera and microphone access
  • Ensure no other application is using your webcam/microphone
  • Try a different browser (Chrome/Edge recommended for WebRTC)
  • Check browser console for detailed error messages

Full Bot Example

Complete implementation showing all features:
"""
Dataspike Deepfake Detection Bot for Pipecat
Real-time video and audio analysis with custom notifications
"""

import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
from pipecat.transports.base_transport import TransportParams

from deepfake import DataspikeDeepfakeProcessor, VideoParams, AudioParams
from schema_pb2 import DeepfakeStreamingSchemaResultEvent, EventType

load_dotenv()


async def custom_notification_handler(event: DeepfakeStreamingSchemaResultEvent):
    """Handle deepfake detection events with custom logic."""

    if event.type == EventType.ALERT:
        logger.critical(f"🚨 DEEPFAKE ALERT: {event.participant_id}")
        # Route to your monitoring/moderation systems
        # await send_to_slack(event)
        # await store_in_database(event)
        # await trigger_moderation_workflow(event)

    elif event.type == EventType.CLEAR:
        logger.info(f"✅ All clear for {event.participant_id}")


async def run_bot(transport, runner_args):
    """Execute the bot with deepfake detection."""

    async with aiohttp.ClientSession() as session:
        # Initialize processor with custom configuration
        deepfake_processor = DataspikeDeepfakeProcessor(
            api_key=os.getenv("DATASPIKE_API_KEY"),
            session=session,
            video_params=VideoParams(
                normal_fps=0.2,    # 1 frame every 5 seconds
                burst_fps=1.0,     # 1 frame per second when suspicious
                quality=75         # Good quality/bandwidth balance
            ),
            audio_params=AudioParams(
                sample_rate=16000,  # Required by API
                sample_size=48000,  # 3-second chunks
                interval=60         # Sample every 60 seconds
            ),
            notification_cb=custom_notification_handler
        )

        # Build pipeline
        pipeline = Pipeline([
            transport.input(),
            deepfake_processor,
            transport.output(),
        ])

        # Configure task
        task = PipelineTask(
            pipeline,
            params=PipelineParams(
                allow_interruptions=True,
                enable_metrics=True,
                enable_usage_metrics=True,
            ),
        )

        # Event handlers
        @transport.event_handler("on_client_connected")
        async def on_client_connected(transport, client):
            logger.info(f"Client connected: {client.pc_id}")
            await deepfake_processor.start()
            deepfake_processor.set_webrtc_connection(client)
            deepfake_processor.set_participant_id(client.pc_id)
            await transport.capture_participant_video("camera")

        @transport.event_handler("on_client_disconnected")
        async def on_client_disconnected(transport, participant):
            logger.info(f"Client disconnected: {participant}")
            await deepfake_processor.stop()
            await task.cancel()

        # Run the pipeline
        runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
        await runner.run(task)


# Entry point
if __name__ == "__main__":
    from pipecat.runner.run import main
    main()

Reference Implementation

For a complete, production-ready implementation, see the official repository: 🔗 dataspike-io/pipecat-deepfake-bot The repository includes:
  • Complete bot implementation with comprehensive documentation
  • Dockerfile and Pipecat Cloud deployment configuration
  • Example custom notification handlers
  • Environment configuration templates
  • Integration tests and examples

API Reference

DataspikeDeepfakeProcessor

Constructor:
DataspikeDeepfakeProcessor(
    api_key: str | None = None,
    session: aiohttp.ClientSession | None = None,
    video_params: VideoParams = VideoParams(),
    audio_params: AudioParams = AudioParams(),
    notification_cb: Callable[[DeepfakeStreamingSchemaResultEvent], Awaitable[None]] | None = None
)
Methods:
  • async start() — Start WebSocket connection and processing loop
  • async stop() — Stop processor and close connections gracefully
  • set_participant_id(str) — Set participant identifier for tracking
  • set_webrtc_connection(SmallWebRTCConnection) — Configure media track access
State Transitions:
  • CLEARSUSPICIOUS — Potential manipulation detected, frame rate increases
  • SUSPICIOUSALERT — High confidence detection, notification triggered
  • ALERTCLEAR — Manipulation resolved, notification triggered

Support & Resources

For issues specific to this integration, open an issue on GitHub or contact Dataspike support.