> ## Documentation Index
> Fetch the complete documentation index at: https://docs.dataspike.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipecat Integration

> Integrate Dataspike deepfake detection into Pipecat bots for realtime video and audio analysis with intelligent alerts.

**GitHub**: [github.com/dataspike-io/pipecat-deepfake-bot](https://github.com/dataspike-io/pipecat-deepfake-bot)

The **Dataspike integration for Pipecat** adds **real-time detection of AI‑manipulated (deepfake) content** to Pipecat applications.
It processes video frames and audio streams from WebRTC connections, sends them to the Dataspike API for analysis, and triggers **customizable detection callbacks** for immediate action.

> **Summary**
>
> * **Manipulation Recognition:** Detects face swaps, compositing, and AI‑generated video/audio overlays.
> * **AI Model Detection:** Identifies content associated with modern generative systems (e.g., Veo, Sana, Flux, voice cloning).
> * **Adaptive Sampling:** Intelligently adjusts frame rate based on detection state—normal rate during clear states, burst rate when suspicious activity is detected.
> * **Realtime Alerts:** Provides instant notification callbacks for ALERT and state transition events.
> * **Privacy by Design:** No video or metadata is stored; frames are end‑to‑end encrypted in transit; results are ephemeral.

***

## Installation

```bash theme={null}
# Clone the repository
git clone https://github.com/dataspike-io/pipecat-deepfake-bot.git
cd pipecat-deepfake-bot

# Install dependencies using uv (recommended)
uv sync

# Or using pip
pip install -e .
```

### Prerequisites

* **Python 3.10+** — Required for async/await features and type hints.
* **Dataspike API Key** — Set as an environment variable:
  ```bash theme={null}
  export DATASPIKE_API_KEY="your_api_key_here"
  ```
* **Pipecat Cloud credentials** (for cloud deployment):
  ```bash theme={null}
  # Configured via pcc-deploy.toml or Pipecat Cloud dashboard
  ```

Optional (for advanced configurations):

```bash theme={null}
export DATASPIKE_WS_URL="wss://api.dataspike.io/api/v4/deepfake/stream"
```

***

## Quick Start

The example below shows a **ready-to-use Pipecat bot** that integrates the Dataspike deepfake processor.
It creates a pipeline that analyzes incoming video and audio streams in real-time.

### Basic Bot Setup

Create a `.env` file with your API key:

```bash theme={null}
DATASPIKE_API_KEY=your_api_key_here
```

Run the bot:

```bash theme={null}
# Using uv
uv run bot.py

# Or using pip
python bot.py
```

**🎮 Interactive Playground**

When running locally, Pipecat automatically starts an interactive playground at **[http://localhost:7860](http://localhost:7860)** where you can:

* **Test video and audio** in real-time with your webcam and microphone
* **See the bot in action** with live deepfake detection
* **Monitor notifications** and detection state transitions (CLEAR → SUSPICIOUS → ALERT)
* **Experiment with configurations** without deploying to production

This makes it incredibly easy to develop, test, and debug your deepfake detection integration locally before deploying to Pipecat Cloud.

**Quick Testing Workflow:**

1. Start the bot: `uv run bot.py`
2. Open [http://localhost:7860](http://localhost:7860) in your browser
3. Grant camera/microphone permissions when prompted
4. Click "Connect" to establish WebRTC connection
5. Speak into your microphone and show your face to the camera
6. Watch the console logs for real-time detection events
7. Test different scenarios (different lighting, angles, voice tones)

> **Tip:** Open your browser's developer console to see detailed logs, WebSocket activity, and frame processing metrics.

### Code Example

```python theme={null}
import os
import aiohttp
from pipecat.pipeline.pipeline import Pipeline
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
from deepfake import DataspikeDeepfakeProcessor, VideoParams, AudioParams

async def run_bot(transport):
    async with aiohttp.ClientSession() as session:
        # Create the deepfake detection processor
        deepfake_processor = DataspikeDeepfakeProcessor(
            api_key=os.getenv("DATASPIKE_API_KEY"),
            session=session,
            video_params=VideoParams(
                normal_fps=0.2,   # Baseline frame rate
                burst_fps=1.0,    # Elevated rate during suspicious states
                quality=75        # JPEG quality (0-100)
            ),
            audio_params=AudioParams(
                sample_rate=16000,   # Required: 16kHz
                sample_size=48000,   # 3 seconds of audio
                interval=60          # Seconds between audio samples
            ),
            # notification_cb=custom_handler  # Optional: custom callback
        )

        # Create pipeline: Input → Deepfake Analysis → Output
        pipeline = Pipeline([
            transport.input(),
            deepfake_processor,
            transport.output(),
        ])

        # Start processing when client connects
        @transport.event_handler("on_client_connected")
        async def on_client_connected(transport, client):
            await deepfake_processor.start()
            deepfake_processor.set_webrtc_connection(client)
            deepfake_processor.set_participant_id(client.pc_id)
            await transport.capture_participant_video("camera")
```

### Default Alert Behavior

By default, the processor logs detection events to the console. When the system detects potential manipulation, it transitions through three states:

* **CLEAR** — No manipulation detected (normal frame rate)
* **SUSPICIOUS** — Potential signs of manipulation (increased frame rate for verification)
* **ALERT** — High confidence manipulation detected (notification triggered)

**Example default log output:**

```json theme={null}
{
  "type": "deepfake_alert",
  "level": "alert",
  "participant_id": "participant_abc123",
  "track_id": "video_track_xyz",
  "message": "High likelihood of manipulation detected for participant_abc123.",
  "timestamp_ms": 1730123456789
}
```

***

## Custom Notification Channels

Developers can override the default logging behavior to route alerts anywhere—for example, sending them to a webhook, Slack, a moderation API, database, or your own analytics pipeline.

Provide a custom async callback (`notification_cb`) when creating the processor:

```python theme={null}
async def on_deepfake_alert(event):
    """
    Custom notification handler for deepfake detection events.

    Args:
        event: DeepfakeStreamingSchemaResultEvent containing:
            - type: EventType (CLEAR, SUSPICIOUS, ALERT)
            - participant_id: ID of the participant being analyzed
            - track_id: Media track identifier
            - timestamp_ms: Detection timestamp
    """
    if event.type == EventType.ALERT:
        # High confidence deepfake detected
        print(f"🚨 ALERT: Deepfake detected for {event.participant_id}")

        # Send to external services
        await post_to_webhook({
            "event": "deepfake_detected",
            "participant": event.participant_id,
            "track": event.track_id,
            "timestamp": event.timestamp_ms
        })

        # Store in database
        await db.store_incident(event)

        # Send Slack notification
        await slack.send_message(f"⚠️ Deepfake detected: {event.participant_id}")

    elif event.type == EventType.CLEAR:
        # Returned to normal state (after being in ALERT)
        print(f"✅ CLEAR: Manipulation no longer detected for {event.participant_id}")

# Create processor with custom handler
deepfake_processor = DataspikeDeepfakeProcessor(
    api_key=os.getenv("DATASPIKE_API_KEY"),
    session=session,
    notification_cb=on_deepfake_alert
)
```

> **Flexible Integration:** You have full control over how alerts are processed, logged, or relayed. Route detection events to monitoring systems, moderation queues, compliance logs, or real-time dashboards.

***

## Configuration

Configure the processor via constructor parameters or environment variables.

### Video Parameters

|    Parameter | Type    | Default | Description                                                  |
| -----------: | :------ | :-----: | :----------------------------------------------------------- |
|  `burst_fps` | `float` |  `1.0`  | Frame rate during SUSPICIOUS state (higher for verification) |
| `normal_fps` | `float` |  `0.2`  | Frame rate during CLEAR state (lower to conserve resources)  |
|    `quality` | `int`   |   `75`  | JPEG compression quality (0–100, where 100 is highest)       |

### Audio Parameters

|     Parameter | Type  | Default | Description                                                    |
| ------------: | :---- | :-----: | :------------------------------------------------------------- |
| `sample_rate` | `int` | `16000` | Required sampling rate in Hz (must be 16000 for Dataspike API) |
| `sample_size` | `int` | `48000` | Samples per chunk (48000 = 3 seconds at 16kHz)                 |
|    `interval` | `int` |   `60`  | Minimum seconds between audio sample transmissions             |

### Processor Parameters

|         Parameter | Type            |         Default         | Description                                                        |
| ----------------: | :-------------- | :---------------------: | :----------------------------------------------------------------- |
|         `api_key` | `str`           | env:`DATASPIKE_API_KEY` | Dataspike API key. Required unless set via environment.            |
|         `session` | `ClientSession` |        *required*       | aiohttp ClientSession for WebSocket connections                    |
|    `video_params` | `VideoParams`   |         defaults        | Video processing configuration                                     |
|    `audio_params` | `AudioParams`   |         defaults        | Audio processing configuration                                     |
| `notification_cb` | `Callable`      |          `None`         | Optional async callback for detection events. Defaults to logging. |

**Environment Variables**

```bash theme={null}
export DATASPIKE_API_KEY="..."        # Required: Your API key
export DATASPIKE_WS_URL="..."         # Optional: Custom WebSocket endpoint
```

***

## How It Works

### Development Flow (Local with Playground)

1. **Start Bot** — Run `uv run bot.py` to launch the bot with the integrated Pipecat playground at [http://localhost:7860](http://localhost:7860)
2. **Connect Browser** — Open the playground in your browser and grant camera/microphone permissions
3. **WebRTC Handshake** — The playground establishes a WebRTC connection to the bot
4. **Real-time Processing** — Your video/audio streams through the pipeline for live deepfake analysis
5. **Instant Feedback** — Detection results appear in console logs and trigger your notification callbacks

### Production Flow (Architecture)

1. **WebRTC Connection** — When a client connects via SmallWebRTC, the bot establishes a bidirectional media connection.
2. **Frame Capture** — The processor intercepts `InputImageRawFrame` (camera only) and `InputAudioRawFrame` from the pipeline.
3. **Adaptive Sampling** — Video frames are sampled at `normal_fps` during CLEAR state. When SUSPICIOUS events occur, sampling increases to `burst_fps` for rapid verification.
4. **Audio Buffering** — Audio is only processed when users are speaking (detected via VAD). Samples are accumulated into 3-second chunks and resampled to 16kHz PCM.
5. **Encoding & Queuing** — Video frames are JPEG-encoded and queued (max 16 items). Stale frames are dropped to prevent blocking.
6. **Streaming** — Frames stream via secure WebSocket to the Dataspike API. Detection results return in real-time.
7. **State Management** — The processor maintains detection state (CLEAR/SUSPICIOUS/ALERT) and triggers notifications on state transitions.
8. **Auto-Reconnection** — If the WebSocket drops, the processor reconnects with exponential backoff (1s → 10s max) and jitter.

***

## Deployment

### Pipecat Cloud

Deploy to [Pipecat Cloud](https://www.pipecat.ai/) for production-grade scaling:

1. **Update `pcc-deploy.toml`**:

```toml theme={null}
agent_name = "dataspike-deepfake-detection"
image = "your_username/dataspike-deepfake-detection:0.1"
image_credentials = "your_dockerhub_image_pull_secret"
secret_set = "dataspike-secrets"

[scaling]
min_agents = 1
```

2. **Build and push Docker image**:

```bash theme={null}
docker build -t your_username/dataspike-deepfake-detection:0.1 .
docker push your_username/dataspike-deepfake-detection:0.1
```

3. **Create secret set** in Pipecat Cloud dashboard with `DATASPIKE_API_KEY`

4. **Deploy**:

```bash theme={null}
pcc deploy
```

For detailed deployment instructions, see the [Pipecat Quickstart Guide](https://docs.pipecat.ai/getting-started/quickstart#step-2%3A-deploy-to-production).

### Docker (Self-Hosted)

```bash theme={null}
# Build the image
docker build -t dataspike-deepfake-bot .

# Run with environment variables
docker run -e DATASPIKE_API_KEY=your_key_here dataspike-deepfake-bot
```

***

## Privacy & Security

* **No Storage** — The processor and Dataspike API **do not store** video, audio, or metadata.
* **E2E Encryption** — All transmissions use TLS/WSS; only ephemeral detection signals are returned.
* **Minimal Data** — Only encoded frames and essential metadata (participant ID, track ID, timestamp) are transmitted.
* **Key Management** — Store `DATASPIKE_API_KEY` in environment variables or secret managers. Never hard-code credentials.
* **Camera Only** — The processor explicitly filters for camera feeds (`transport_source == "camera"`) and ignores screen sharing to minimize unnecessary data transmission.

***

## Troubleshooting

### WebSocket Connection Issues

**Symptom:** "Timeout waiting for WebRTC connection" errors

**Solutions:**

* Verify Pipecat Cloud configuration is correct
* Check that `session_manager` timeout (120s) is sufficient
* Ensure `DATASPIKE_API_KEY` is set correctly
* Review network connectivity to `wss://api.dataspike.io`

### Frame Rate / Performance Issues

**Symptom:** High CPU usage or detection lag

**Solutions:**

* Reduce `video_params.normal_fps` (default: 0.2 fps)
* Lower `video_params.quality` to reduce encoding overhead (default: 75)
* Increase `audio_params.interval` to process audio less frequently (default: 60s)
* Monitor queue size—frequent `QueueFull` warnings indicate overload

### Audio Detection Not Working

**Symptom:** No audio analysis results

**Checklist:**

* User must be actively speaking (audio only processed during `UserStartedSpeakingFrame`)
* Audio buffer must reach `sample_size` (48,000 samples = 3 seconds)
* WebRTC connection must have active audio input track
* VAD (Voice Activity Detection) must be properly configured

### Authentication Errors

**Symptom:** "DATASPIKE\_API\_KEY must be set" or 401 errors

**Solutions:**

* Verify environment variable is set: `echo $DATASPIKE_API_KEY`
* Check API key is valid at [Dataspike Dashboard](https://dataspike.io/)
* Ensure `.env` file is in the correct directory
* For Docker: verify `-e` flag or secret mount is correct

### Local Playground Issues

**Symptom:** Can't access [http://localhost:7860](http://localhost:7860)

**Solutions:**

* Ensure the bot is running (`uv run bot.py` or `python bot.py`)
* Check no other service is using port 7860
* Look for "Playground available at [http://localhost:7860](http://localhost:7860)" in logs
* Try accessing from the same machine where the bot is running

**Symptom:** Webcam/microphone not working in playground

**Solutions:**

* Grant browser permissions for camera and microphone access
* Ensure no other application is using your webcam/microphone
* Try a different browser (Chrome/Edge recommended for WebRTC)
* Check browser console for detailed error messages

***

## Full Bot Example

Complete implementation showing all features:

```python theme={null}
"""
Dataspike Deepfake Detection Bot for Pipecat
Real-time video and audio analysis with custom notifications
"""

import os
import aiohttp
from dotenv import load_dotenv
from loguru import logger

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
from pipecat.transports.base_transport import TransportParams

from deepfake import DataspikeDeepfakeProcessor, VideoParams, AudioParams
from schema_pb2 import DeepfakeStreamingSchemaResultEvent, EventType

load_dotenv()


async def custom_notification_handler(event: DeepfakeStreamingSchemaResultEvent):
    """Handle deepfake detection events with custom logic."""

    if event.type == EventType.ALERT:
        logger.critical(f"🚨 DEEPFAKE ALERT: {event.participant_id}")
        # Route to your monitoring/moderation systems
        # await send_to_slack(event)
        # await store_in_database(event)
        # await trigger_moderation_workflow(event)

    elif event.type == EventType.CLEAR:
        logger.info(f"✅ All clear for {event.participant_id}")


async def run_bot(transport, runner_args):
    """Execute the bot with deepfake detection."""

    async with aiohttp.ClientSession() as session:
        # Initialize processor with custom configuration
        deepfake_processor = DataspikeDeepfakeProcessor(
            api_key=os.getenv("DATASPIKE_API_KEY"),
            session=session,
            video_params=VideoParams(
                normal_fps=0.2,    # 1 frame every 5 seconds
                burst_fps=1.0,     # 1 frame per second when suspicious
                quality=75         # Good quality/bandwidth balance
            ),
            audio_params=AudioParams(
                sample_rate=16000,  # Required by API
                sample_size=48000,  # 3-second chunks
                interval=60         # Sample every 60 seconds
            ),
            notification_cb=custom_notification_handler
        )

        # Build pipeline
        pipeline = Pipeline([
            transport.input(),
            deepfake_processor,
            transport.output(),
        ])

        # Configure task
        task = PipelineTask(
            pipeline,
            params=PipelineParams(
                allow_interruptions=True,
                enable_metrics=True,
                enable_usage_metrics=True,
            ),
        )

        # Event handlers
        @transport.event_handler("on_client_connected")
        async def on_client_connected(transport, client):
            logger.info(f"Client connected: {client.pc_id}")
            await deepfake_processor.start()
            deepfake_processor.set_webrtc_connection(client)
            deepfake_processor.set_participant_id(client.pc_id)
            await transport.capture_participant_video("camera")

        @transport.event_handler("on_client_disconnected")
        async def on_client_disconnected(transport, participant):
            logger.info(f"Client disconnected: {participant}")
            await deepfake_processor.stop()
            await task.cancel()

        # Run the pipeline
        runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
        await runner.run(task)


# Entry point
if __name__ == "__main__":
    from pipecat.runner.run import main
    main()
```

***

## Reference Implementation

For a complete, production-ready implementation, see the **official repository**:

**🔗 [dataspike-io/pipecat-deepfake-bot](https://github.com/dataspike-io/pipecat-deepfake-bot)**

The repository includes:

* Complete bot implementation with comprehensive documentation
* Dockerfile and Pipecat Cloud deployment configuration
* Example custom notification handlers
* Environment configuration templates
* Integration tests and examples

***

## API Reference

### DataspikeDeepfakeProcessor

**Constructor:**

```python theme={null}
DataspikeDeepfakeProcessor(
    api_key: str | None = None,
    session: aiohttp.ClientSession | None = None,
    video_params: VideoParams = VideoParams(),
    audio_params: AudioParams = AudioParams(),
    notification_cb: Callable[[DeepfakeStreamingSchemaResultEvent], Awaitable[None]] | None = None
)
```

**Methods:**

* `async start()` — Start WebSocket connection and processing loop
* `async stop()` — Stop processor and close connections gracefully
* `set_participant_id(str)` — Set participant identifier for tracking
* `set_webrtc_connection(SmallWebRTCConnection)` — Configure media track access

**State Transitions:**

* `CLEAR` → `SUSPICIOUS` — Potential manipulation detected, frame rate increases
* `SUSPICIOUS` → `ALERT` — High confidence detection, notification triggered
* `ALERT` → `CLEAR` — Manipulation resolved, notification triggered

***

## Support & Resources

* **Dataspike API Documentation:** [docs.dataspike.io](https://docs.dataspike.io)
* **Dataspike Support:** [dataspike.io/contact-us](https://dataspike.io/contact-us?lang=en)
* **Pipecat Framework:** [github.com/pipecat-ai/pipecat](https://github.com/pipecat-ai/pipecat)
* **Pipecat Documentation:** [docs.pipecat.ai](https://docs.pipecat.ai/)
* **This Integration:** [github.com/dataspike-io/pipecat-deepfake-bot](https://github.com/dataspike-io/pipecat-deepfake-bot)

For issues specific to this integration, open an issue on GitHub or contact Dataspike support.
