> ## Documentation Index
> Fetch the complete documentation index at: https://docs.dataspike.io/llms.txt
> Use this file to discover all available pages before exploring further.

# LiveKit Plugin

> Integrate Dataspike deepfake detection into LiveKit Agents for realtime video analysis and in-room alerts.

The **Dataspike plugin for LiveKit** adds **real-time detection of AI‑manipulated (deepfake) content** to LiveKit Agent applications.\
It samples frames from remote participants’ video tracks, sends them to the Dataspike API for analysis, and publishes **lightweight detection events** back into the room or to a custom callback.

> **Summary**
>
> * **Manipulation Recognition:** Detects face swaps, compositing, and AI‑generated overlays.
> * **AI Model Detection:** Identifies content associated with modern generative systems (e.g., Veo, Sana, Flux).
> * **Adaptive Sampling:** Selectively samples frames to maintain real‑time responsiveness while minimizing bandwidth.
> * **Realtime Alerts:** Emits compact detection signals for immediate action.
> * **Privacy by Design:** No video or metadata is stored; frames are end‑to‑end encrypted in transit; results are short‑lived.

***

## Installation

```bash theme={null}
pip install dataspike-livekit-plugins
```

### Prerequisites

* **Dataspike API Key** — set as an environment variable:
  ```bash theme={null}
  export DATASPIKE_API_KEY="your_api_key_here"
  ```
* **LiveKit credentials** (for your room/agent):
  ```bash theme={null}
  export LIVEKIT_API_KEY="..."
  export LIVEKIT_API_SECRET="..."
  export LIVEKIT_URL="..."
  ```

Optional (if your Agent uses an LLM/voice stack):

```bash theme={null}
export OPENAI_API_KEY="..."
```

***

## Quick Start (Agent Worker)

The example below shows a **ready-to-use LiveKit Agent Worker** that integrates the Dataspike detector.\
It connects to a LiveKit room, attaches the deepfake detection plugin, and streams frames for real-time analysis.

Simply copy the snippet into a new file (for example, `agent_worker.py`), set your environment variables, and run it.

```python theme={null}
from livekit.plugins import dataspike
from livekit.agents import AgentSession, Agent

async def entrypoint(ctx):
    # Connect AgentSession and join the room
    await ctx.connect()
    session = AgentSession(...)

    # Create and start the Dataspike detector
    detector = dataspike.DataspikeDetector(
        # api_key="YOUR_API_KEY",  # optional; defaults to DATASPIKE_API_KEY env
        # notification_cb=on_notification,  # optional; custom result handling
        # normal_fps=0.2,  # baseline sampling (frames/sec)
        # burst_fps=1.0,   # elevated sampling during suspicious states
        # quality=75,      # JPEG encode quality (0–100)
    )
    await detector.start(session, room=ctx.room)

    # Start your agent logic
    await session.start(agent=Agent(instructions="Talk to me!"), room=ctx.room)
```

**Start the worker**

```bash theme={null}
python examples/deepfake-detection/dataspike/agent_worker.py dev
```

### Default Alert Behavior

By default, the detector automatically subscribes to remote participant video tracks and begins adaptive frame sampling.
Detection results are published back into the LiveKit room’s data channel as lightweight JSON alerts under the topic deepfake\_alert.

**Example default payload:**

```json theme={null}
{
  "type": "deepfake_alert",
  "level": "suspicious",
  "participant_id": "PA_abc123",
  "track_id": "TRK_xyz789",
  "message": "Possible manipulation indicators detected.",
  "timestamp_ms": 1730123456789
}
```

***

## Custom Notification Channels

Developers can override this default behavior to route alerts anywhere —
for example, sending them to a webhook, Slack, a moderation API, or your own analytics pipeline.

You can achieve this by passing a custom async callback (notification\_cb) when creating the detector:

```python theme={null}
async def on_notification(event):
    # event is a DeepfakeStreamingSchemaResultEvent
    # You can push this to any external service
    print(f"[ALERT] {event.level.upper()} from {event.participant_id}")
    # Example: await post_to_webhook(event.dict())

detector = dataspike.DataspikeDetector(notification_cb=on_notification)
```

> Flexible integration: You have full control over how alerts are processed, logged, or relayed. For example, you can store results in a database, call an external moderation API, or send messages to your in-house alerting system.

***

## Configuration

You can configure the detector via constructor parameters or environment variables.

|         Parameter | Type       |         Default         | Description                                                                                                       |
| ----------------: | :--------- | :---------------------: | :---------------------------------------------------------------------------------------------------------------- |
|         `api_key` | `str`      | env:`DATASPIKE_API_KEY` | Dataspike API key. Required unless set via env.                                                                   |
|      `normal_fps` | `float`    |          `0.2`          | Baseline sampling rate (frames/second).                                                                           |
|       `burst_fps` | `float`    |          `1.0`          | Elevated sampling rate when state is suspicious/alert.                                                            |
|         `quality` | `int`      |           `75`          | JPEG quality used for encoded frames (0–100).                                                                     |
| `notification_cb` | `Callable` |          `None`         | Optional async callback invoked for each result event. If omitted, events are published to the room data channel. |

**Environment Variables**

```bash theme={null}
export DATASPIKE_API_KEY="..."   # plugin reads this by default
export LIVEKIT_API_KEY="..."
export LIVEKIT_API_SECRET="..."
export LIVEKIT_URL="..."
```

***

## How It Works

1. **Track discovery** — When a remote participant’s video track is subscribed, the detector creates an internal `InputTrack` for sampling.
2. **Adaptive sampling** — Frames are emitted at `normal_fps` during CLEAR state; if results indicate elevated risk (SUSPICIOUS/ALERT), sampling increases up to `burst_fps`.
3. **Encoding** — Frames are JPEG‑encoded (configurable `quality`) and placed on a bounded async queue to avoid backpressure.
4. **Streaming** — Frames are streamed via a secure WebSocket to the Dataspike API; detection signals are returned in real time.
5. **Notifications** — Incoming results update per‑track state and trigger either the default data‑channel publisher or your custom callback.

***

## Privacy & Security

* **No storage** — The plugin and server **do not store** video data or metadata.
* **E2E encryption in transit** — All frame transmissions are encrypted; only short‑lived detection signals are returned.
* **Principle of least data** — Only the encoded frame and minimal metadata required for analysis are transmitted.
* **Key management** — Provide `DATASPIKE_API_KEY` via environment variables or a secret manager; never hard‑code secrets.

***

## Troubleshooting

* **No events received** — Confirm the agent joined the room and remote participants are publishing video tracks. Check data channel permissions.
* **High latency** — Reduce `quality` or `burst_fps`; ensure the network path to the Dataspike API is low‑latency.
* **Rate/bandwidth constraints** — Lower `normal_fps` and/or enable selective subscription to fewer tracks.
* **Auth errors** — Ensure `DATASPIKE_API_KEY` is set in the environment of the worker process.
* **Reconnect loops** — The detector auto‑reconnects with exponential backoff; inspect logs for repeated auth or network failures.

***

## Full Agent Worker Example

```python theme={null}
import logging

from dotenv import load_dotenv

from livekit.agents import (
    Agent,
    AgentSession,
    JobContext,
    JobRequest,
    WorkerOptions,
    WorkerType,
    cli,
)
from livekit.plugins import dataspike, openai

logger = logging.getLogger("dataspike-deepfake-example")
logger.setLevel(logging.DEBUG)

load_dotenv()


async def entrypoint(ctx: JobContext):
    session = AgentSession(
        llm=openai.realtime.RealtimeModel(voice="alloy"),
        resume_false_interruption=False,
    )

    # Configure the Dataspike detector.
    # - api_key: optional; if omitted, uses DATASPIKE_API_KEY from environment
    # - notification_cb: optional; callback for handling notifications
    #   (defaults to publishing results in the room data channel)
    # - other config: see plugin docs for available parameters
    detector = dataspike.DataspikeDetector(
        # api_key="YOUR_API_KEY",
        # notification_cb=on_notification,
    )

    # Start the Dataspike detector and attach it to the current session and room.
    await detector.start(session, room=ctx.room)

    # Launch the main agent, which will automatically join the same room.
    await session.start(
        agent=Agent(instructions="Talk to me!"),
        room=ctx.room,
    )


async def on_request(req: JobRequest):
    logger.info(f"[worker] job request for room={req.room.name} id={req.id}")
    # you can set the agent's participant name/identity here:
    await req.accept(name="dataspike-agent")


if __name__ == "__main__":
    cli.run_app(
        WorkerOptions(
            entrypoint_fnc=entrypoint,
            request_fnc=on_request,
            worker_type=WorkerType.ROOM,
        )
    )
```
