Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The application templates provided in this repo scale up to **millions of pages
| [`Adaptive RAG App`](templates/adaptive_rag/) | A RAG application using Adaptive RAG, a technique developed by Pathway to reduce token cost in RAG up to 4x while maintaining accuracy. |
| [`Private RAG App with Mistral and Ollama`](templates/private_rag/) | A fully private (local) version of the `question_answering_rag` RAG pipeline using Pathway Live Data Framework, Mistral, and Ollama. |
| [`Slides AI Search App`](templates/slides_ai_search/) | An indexing pipeline for retrieving slides. It performs multi-modal of PowerPoint and PDF and maintains live index of your slides."|
| [`Video RAG with TwelveLabs`](templates/video_rag_twelvelabs/) | A RAG pipeline over **video**. It uses [TwelveLabs](https://twelvelabs.io) Pegasus to turn videos into rich text descriptions and Marengo multimodal embeddings to index them, so you can ask questions about your videos on a live connected data source (files, Google Drive,...). |


## How do these AI Pipelines work?
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ profile = "black"

[tool.mypy]
python_version = "3.10"
exclude = ["templates/adaptive_rag", "templates/private_rag", "templates/document_indexing", "templates/multimodal_rag", "templates/question_answering_rag"]
exclude = ["templates/adaptive_rag", "templates/private_rag", "templates/document_indexing", "templates/multimodal_rag", "templates/question_answering_rag", "templates/video_rag_twelvelabs"]
ignore_missing_imports = true
check_untyped_defs = true
warn_redundant_casts = true
Expand Down
6 changes: 6 additions & 0 deletions templates/video_rag_twelvelabs/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# TwelveLabs API key, used by the Pegasus video parser and the Marengo embedder.
# Get a free key at https://twelvelabs.io
TWELVELABS_API_KEY=

# OpenAI API key, used by the question-answering LLM.
OPENAI_API_KEY=
12 changes: 12 additions & 0 deletions templates/video_rag_twelvelabs/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM pathwaycom/pathway:latest

WORKDIR /app

COPY requirements.txt .
RUN pip install -U --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["python", "app.py"]
122 changes: 122 additions & 0 deletions templates/video_rag_twelvelabs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<p align="center" class="flex items-center gap-1 justify-center flex-wrap">
<img src="../../assets/gcp-logo.svg?raw=true" alt="GCP Logo" height="20" width="20">
<a href="https://pathway.com/developers/user-guide/deployment/gcp-deploy">Deploy with GCP</a> |
<img src="../../assets/aws-fargate-logo.svg?raw=true" alt="AWS Logo" height="20" width="20">
<a href="https://pathway.com/developers/user-guide/deployment/aws-fargate-deploy">Deploy with AWS</a> |
<img src="../../assets/azure-logo.svg?raw=true" alt="Azure Logo" height="20" width="20">
<a href="https://pathway.com/developers/user-guide/deployment/azure-aci-deploy">Deploy with Azure</a> |
<img src="../../assets/render.png?raw=true" alt="Render Logo" height="20" width="20">
<a href="https://pathway.com/developers/user-guide/deployment/render-deploy"> Deploy with Render </a>
</p>

# Video RAG with Pathway Live Data Framework and TwelveLabs

## Overview

This app template shows how to build a RAG application **over video** using the
Pathway Live Data Framework together with [TwelveLabs](https://twelvelabs.io).

Videos are notoriously hard to put into a RAG pipeline: most stacks first
transcribe the audio and throw away everything that happens on screen. This
template indexes the *whole* video instead, using two TwelveLabs models:

- **[Pegasus](https://docs.twelvelabs.io/docs/concepts/models/pegasus)** — a
video-understanding model that turns each video into a rich text description
(what happens, who and what appears, the setting, on-screen and spoken text,
the overall topic). Pathway indexes that text exactly like it would index a PDF.
- **[Marengo](https://docs.twelvelabs.io/docs/concepts/models/marengo)** — a
multimodal embedding model that produces 512-dimensional vectors in a shared
space for text, image, audio and video. It is used here as the retriever
embedder.

Because the template uses the standard Pathway `DocumentStore` and
`BaseRAGQuestionAnswerer`, everything else works out of the box: live sync with
your data source, the in-memory vector index, caching, and the HTTP API. Drop a
new video into the connected folder and it becomes queryable automatically.

## Architecture

```
video files ─▶ pw.io.fs.read (bytes)
─▶ TwelveLabsVideoParser (Pegasus: video → text)
─▶ TokenCountSplitter (chunking)
─▶ DocumentStore + UsearchKnnFactory
with MarengoEmbedder (text → 512-dim vectors)
─▶ BaseRAGQuestionAnswerer (OpenAI LLM over retrieved context)
─▶ REST API on :8000
```

The TwelveLabs components live in the local
[`pathway_twelvelabs`](pathway_twelvelabs/__init__.py) package:

- `TwelveLabsVideoParser` — a Pathway parser (`pw.UDF`) that uploads the video
bytes as a TwelveLabs asset and asks Pegasus to describe it.
- `MarengoEmbedder` — a Pathway embedder (`BaseEmbedder`) that calls the Marengo
embedding endpoint.

Both are wired in entirely through [`app.yaml`](app.yaml), so you can swap models,
prompts, the data source, or the LLM without touching any Python.

## Customizing the pipeline

- **Change what is extracted from the video.** Set the `prompt` field of
`$parser` in `app.yaml`, e.g. `"Describe this video, focusing on the products
that appear and any prices shown."`
- **Change the data source.** Replace the `!pw.io.fs.read` source with the Google
Drive, SharePoint, or S3 connector (a commented Google Drive example is
included in `app.yaml`).
- **Change the answering LLM.** Edit the `$llm` block — any
[Pathway LLM wrapper](https://pathway.com/developers/api-docs/pathway-xpacks-llm/llms)
works.

## Running the app

### Prerequisites

- A TwelveLabs API key. Get a free one at [twelvelabs.io](https://twelvelabs.io) —
there is a generous free tier.
- An OpenAI API key for the question-answering LLM.

Copy `.env.example` to `.env` and fill in your keys:

```bash
cp .env.example .env
# edit .env and set TWELVELABS_API_KEY and OPENAI_API_KEY
```

Put one or more videos (e.g. `.mp4`, `.mov`) into the `data/` directory.

### With Docker

```bash
docker build -t video-rag-twelvelabs .
docker run -v $(pwd)/data:/app/data --env-file .env -p 8000:8000 video-rag-twelvelabs
```

### Locally

```bash
pip install -r requirements.txt
python app.py
```

### Querying

Once the server is up, ask questions about your videos:

```bash
curl -X POST http://localhost:8000/v1/pw_ai_answer \
-H "Content-Type: application/json" \
-d '{"prompt": "What products are shown in the videos?"}'
```

## Tests

A small test suite lives in [`test_twelvelabs.py`](test_twelvelabs.py). The
no-network tests run without any credentials; the live Marengo embedding test is
skipped unless `TWELVELABS_API_KEY` is set:

```bash
pip install -r requirements.txt pytest
TWELVELABS_API_KEY=... pytest templates/video_rag_twelvelabs/test_twelvelabs.py
```
88 changes: 88 additions & 0 deletions templates/video_rag_twelvelabs/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python

# Copyright © 2026 Pathway

import logging
from pathlib import Path
from warnings import warn

import pathway as pw

# `pathway_twelvelabs` registers the TwelveLabsVideoParser and MarengoEmbedder
# classes so they can be referenced from `app.yaml` via the `!` YAML tags.
import pathway_twelvelabs # noqa: F401
from dotenv import load_dotenv
from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer
from pathway.xpacks.llm.servers import QASummaryRestServer
from pydantic import BaseModel, ConfigDict, InstanceOf

# To use advanced features with Pathway Live Data Framework Scale, get your free license key from
# https://pathway.com/features and paste it below.
# To use Pathway Live Data Framework Community, comment out the line below.
pw.set_license_key("demo-license-key-with-telemetry")

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)


class App(BaseModel):
question_answerer: InstanceOf[BaseRAGQuestionAnswerer]
host: str = "0.0.0.0"
port: int = 8000

with_cache: bool | None = None # deprecated
persistence_backend: pw.persistence.Backend | None = None
persistence_mode: pw.PersistenceMode | None = pw.PersistenceMode.UDF_CACHING
terminate_on_error: bool = False

def run(self) -> None:
server = QASummaryRestServer( # noqa: F841
self.host, self.port, self.question_answerer
)

if self.persistence_mode is None:
if self.with_cache is True:
warn(
"`with_cache` is deprecated. Please use `persistence_mode` instead.",
DeprecationWarning,
)
persistence_mode = pw.PersistenceMode.UDF_CACHING
else:
persistence_mode = None
else:
persistence_mode = self.persistence_mode

if persistence_mode is not None:
if self.persistence_backend is None:
persistence_backend = pw.persistence.Backend.filesystem("./Cache")
else:
persistence_backend = self.persistence_backend
persistence_config = pw.persistence.Config(
persistence_backend,
persistence_mode=persistence_mode,
)
else:
persistence_config = None

pw.run(
persistence_config=persistence_config,
terminate_on_error=self.terminate_on_error,
monitoring_level=pw.MonitoringLevel.NONE,
)

model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)


if __name__ == "__main__":
base_dir = Path(__file__).resolve().parent

load_dotenv(base_dir / ".env")

with open(base_dir / "app.yaml") as f:
config = pw.load_yaml(f)

app = App(**config)
app.run()
92 changes: 92 additions & 0 deletions templates/video_rag_twelvelabs/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# This YAML configuration file sets up the TwelveLabs video RAG template.
# It indexes videos by turning them into text with the TwelveLabs Pegasus model,
# embedding that text with the TwelveLabs Marengo model, and serving a RAG
# question-answering endpoint on top.
# You can learn more about the YAML syntax here:
# https://pathway.com/developers/templates/configure-yaml


# $sources defines the data sources read and indexed in the RAG.
# Here we read video files from the local `data` directory. The connector emits
# raw bytes, which the TwelveLabs parser sends to Pegasus for analysis.
# You can learn more about configuring data sources here:
# https://pathway.com/developers/templates/yaml-examples/data-sources-examples
$sources:
- !pw.io.fs.read
path: data
format: binary
with_metadata: true

# Uncomment to use the Google Drive connector to index videos from a Drive folder.
# - !pw.io.gdrive.read
# object_id: $DRIVE_ID
# service_user_credentials_file: gdrive_indexer.json
# file_name_pattern:
# - "*.mp4"
# - "*.mov"
# object_size_limit: null
# with_metadata: true
# refresh_interval: 30


# The parser turns each video into text using the TwelveLabs Pegasus model.
# The TWELVELABS_API_KEY environment variable must be set (see .env.example).
# You can customize the prompt to extract exactly what your application needs.
$parser: !pathway_twelvelabs.TwelveLabsVideoParser
model: "pegasus1.5"
max_tokens: 2048
cache_strategy: !pw.udfs.DefaultCache {}
# prompt: "Describe this video, focusing on the products that appear and any prices shown."

# The embedder converts the parsed text into 512-dimensional multimodal
# embeddings using the TwelveLabs Marengo model.
$embedder: !pathway_twelvelabs.MarengoEmbedder
model: "marengo3.0"
cache_strategy: !pw.udfs.DefaultCache {}
retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy {}

# Splits the parsed video descriptions into chunks for indexing.
$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter
max_tokens: 400

# The LLM used to answer questions over the indexed video descriptions.
# The list of available Pathway LLM wrappers is available here:
# https://pathway.com/developers/api-docs/pathway-xpacks-llm/llms
$llm: !pw.xpacks.llm.llms.OpenAIChat
model: "gpt-4.1-mini"
retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy
max_retries: 6
cache_strategy: !pw.udfs.DefaultCache {}
temperature: 0
capacity: 8
async_mode: "fully_async"

# Builds the in-memory vector index over the Marengo embeddings.
$retriever_factory: !pw.indexing.UsearchKnnFactory
reserved_space: 1000
embedder: $embedder
metric: !pw.indexing.USearchMetricKind.COS

# Stores and retrieves the indexed video descriptions.
$document_store: !pw.xpacks.llm.document_store.DocumentStore
docs: $sources
parser: $parser
splitter: $splitter
retriever_factory: $retriever_factory

# The RAG question-answering component served over HTTP.
question_answerer: !pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer
llm: $llm
indexer: $document_store
# You can set the number of documents to be included as the context of the query
# search_topk: 6
# You can use your own prompt for querying.
# prompt_template: "Given these documents: {context}, please answer the question: {query}"

# Change host and port of the webserver by uncommenting these lines
# host: "0.0.0.0"
# port: 8000

# By default, caching is enabled for UDFs with cache_strategy set.
# You can disable it by uncommenting the following line.
# persistence_mode: null
Empty file.
Loading