Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/langbot/pkg/plugin/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ async def vector_search(data: dict[str, Any]) -> handler.ActionResponse:
filters = data.get('filters')
search_type = data.get('search_type', 'vector')
query_text = data.get('query_text', '')
vector_weight = data.get('vector_weight')
try:
results = await self.ap.rag_runtime_service.vector_search(
collection_id,
Expand All @@ -539,6 +540,7 @@ async def vector_search(data: dict[str, Any]) -> handler.ActionResponse:
filters,
search_type,
query_text,
vector_weight=vector_weight,
)
return handler.ActionResponse.success(data={'results': results})
except Exception as e:
Expand Down
2 changes: 2 additions & 0 deletions src/langbot/pkg/rag/service/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async def vector_search(
filters: dict[str, Any] | None = None,
search_type: str = 'vector',
query_text: str = '',
vector_weight: float | None = None,
) -> list[dict[str, Any]]:
"""Handle VECTOR_SEARCH action."""
return await self.ap.vector_db_mgr.search(
Expand All @@ -50,6 +51,7 @@ async def vector_search(
filter=filters,
search_type=search_type,
query_text=query_text,
vector_weight=vector_weight,
)

async def vector_delete(
Expand Down
2 changes: 2 additions & 0 deletions src/langbot/pkg/vector/mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async def search(
filter: dict | None = None,
search_type: str = 'vector',
query_text: str = '',
vector_weight: float | None = None,
) -> list[dict]:
"""Proxy: Search vectors.

Expand All @@ -111,6 +112,7 @@ async def search(
search_type=search_type,
query_text=query_text,
filter=filter,
vector_weight=vector_weight,
)

if not results or 'ids' not in results or not results['ids']:
Expand Down
3 changes: 3 additions & 0 deletions src/langbot/pkg/vector/vdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async def search(
search_type: str = 'vector',
query_text: str = '',
filter: dict[str, Any] | None = None,
vector_weight: float | None = None,
) -> Dict[str, Any]:
"""Search for the most similar vectors in the specified collection.

Expand All @@ -70,6 +71,8 @@ async def search(
{"file_id": "abc"}
{"created_at": {"$gte": 1700000000}}
{"file_type": {"$in": ["pdf", "docx"]}}
vector_weight: Weight for vector search in hybrid mode (0.0–1.0).
``None`` means use equal weights (backward compatible).
"""
pass

Expand Down
30 changes: 25 additions & 5 deletions src/langbot/pkg/vector/vdbs/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ async def search(
search_type: str = 'vector',
query_text: str = '',
filter: dict[str, Any] | None = None,
vector_weight: float | None = None,
) -> dict[str, Any]:
col = await self.get_or_create_collection(collection)

if search_type == SearchType.FULL_TEXT:
return await self._full_text_search(col, collection, k, query_text, filter)
elif search_type == SearchType.HYBRID:
return await self._hybrid_search(col, collection, query_embedding, k, query_text, filter)
return await self._hybrid_search(
col, collection, query_embedding, k, query_text, filter, vector_weight=vector_weight
)

# Default: vector search
return await self._vector_search(col, collection, query_embedding, k, filter)
Expand Down Expand Up @@ -127,6 +130,7 @@ async def _hybrid_search(
k: int,
query_text: str,
filter: dict[str, Any] | None,
vector_weight: float | None = None,
) -> dict[str, Any]:
# Fall back to pure vector search when no text is provided
if not query_text:
Expand All @@ -144,7 +148,15 @@ async def _hybrid_search(
return {'ids': [[]], 'metadatas': [[]], 'distances': [[]], 'documents': [[]]}

# RRF fusion
fused = self._rrf_fuse([vector_ids, text_ids], k)
weights = None
if vector_weight is not None:
weights = [vector_weight, 1.0 - vector_weight]
self.ap.logger.info(
f"Chroma hybrid fusion config in '{collection}': "
f'vector_weight={vector_weight}, weights={weights or [1.0, 1.0]}, '
f'vector_hits={len(vector_ids)}, text_hits={len(text_ids)}'
)
fused = self._rrf_fuse([vector_ids, text_ids], k, weights=weights)
if not fused:
return {'ids': [[]], 'metadatas': [[]], 'distances': [[]], 'documents': [[]]}

Expand Down Expand Up @@ -197,16 +209,24 @@ async def _hybrid_search(
}

@staticmethod
def _rrf_fuse(result_lists: list[list[str]], k: int) -> list[tuple[str, float]]:
def _rrf_fuse(result_lists: list[list[str]], k: int, weights: list[float] | None = None) -> list[tuple[str, float]]:
"""Reciprocal Rank Fusion over multiple ranked ID lists.

Returns a list of (doc_id, rrf_score) sorted by descending score,
truncated to *k* entries.

Args:
result_lists: Ranked ID lists from different search methods.
k: Number of results to return.
weights: Per-list weights. ``None`` means equal weight (1.0 each).
"""
if weights is None:
weights = [1.0] * len(result_lists)
scores: dict[str, float] = {}
for ranked_ids in result_lists:
for list_idx, ranked_ids in enumerate(result_lists):
w = weights[list_idx]
for rank, doc_id in enumerate(ranked_ids):
scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (_RRF_K + rank + 1)
scores[doc_id] = scores.get(doc_id, 0.0) + w / (_RRF_K + rank + 1)
sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return sorted_results[:k]

Expand Down
1 change: 1 addition & 0 deletions src/langbot/pkg/vector/vdbs/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ async def search(
search_type: str = 'vector',
query_text: str = '',
filter: dict[str, Any] | None = None,
vector_weight: float | None = None,
) -> Dict[str, Any]:
"""Search for similar vectors in Milvus collection

Expand Down
1 change: 1 addition & 0 deletions src/langbot/pkg/vector/vdbs/pgvector_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ async def search(
search_type: str = 'vector',
query_text: str = '',
filter: dict[str, Any] | None = None,
vector_weight: float | None = None,
) -> Dict[str, Any]:
"""Search for similar vectors using cosine distance

Expand Down
1 change: 1 addition & 0 deletions src/langbot/pkg/vector/vdbs/qdrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ async def search(
search_type: str = 'vector',
query_text: str = '',
filter: dict[str, Any] | None = None,
vector_weight: float | None = None,
) -> dict[str, Any]:
exists = await self.client.collection_exists(collection)
if not exists:
Expand Down
45 changes: 45 additions & 0 deletions src/langbot/pkg/vector/vdbs/seekdb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import asyncio
from decimal import Decimal
import re
from typing import Any, Dict, List


Expand Down Expand Up @@ -101,8 +103,28 @@ def __init__(self, ap: app.Application):
}
)

def _normalize_collection_name(self, collection: str) -> str:
"""SeekDB only accepts [a-zA-Z0-9_], while LangBot uses UUID-like KB IDs."""
normalized = re.sub(r'[^A-Za-z0-9_]', '_', collection)
if normalized != collection:
self.ap.logger.info(f"Normalized SeekDB collection name: '{collection}' -> '{normalized}'")
return normalized

def _json_safe(self, value: Any) -> Any:
"""Convert SeekDB result values into JSON-serializable Python primitives."""
if isinstance(value, Decimal):
return float(value)
if isinstance(value, dict):
return {k: self._json_safe(v) for k, v in value.items()}
if isinstance(value, list):
return [self._json_safe(v) for v in value]
if isinstance(value, tuple):
return [self._json_safe(v) for v in value]
return value

async def _get_or_create_collection_internal(self, collection: str, vector_size: int = None) -> Any:
"""Internal method to get or create a collection with proper configuration."""
collection = self._normalize_collection_name(collection)
if collection in self._collections:
return self._collections[collection]

Expand Down Expand Up @@ -173,6 +195,7 @@ async def add_embeddings(
if not embeddings_list:
return

collection = self._normalize_collection_name(collection)
# Ensure collection exists with correct dimension
vector_size = len(embeddings_list[0])
coll = await self._get_or_create_collection_internal(collection, vector_size)
Expand All @@ -194,6 +217,7 @@ async def search(
search_type: str = 'vector',
query_text: str = '',
filter: Dict[str, Any] | None = None,
vector_weight: float | None = None,
) -> Dict[str, Any]:
"""Search for the most similar vectors in the specified collection.

Expand All @@ -210,6 +234,7 @@ async def search(
Returns:
Dictionary with 'ids', 'metadatas', 'distances' keys
"""
collection = self._normalize_collection_name(collection)
# Check if collection exists
exists = await asyncio.to_thread(self.client.has_collection, collection)
if not exists:
Expand Down Expand Up @@ -271,6 +296,17 @@ async def search(
query_cfg['where'] = filter
knn_cfg['where'] = filter

# Apply vector_weight via pyseekdb's native boost parameter
if vector_weight is not None:
knn_cfg['boost'] = vector_weight
query_cfg['boost'] = 1.0 - vector_weight
self.ap.logger.info(
f"SeekDB hybrid fusion config in '{collection}': "
f'vector_weight={vector_weight}, '
f'knn_boost={knn_cfg.get("boost", 1.0)}, '
f'query_boost={query_cfg.get("boost", 1.0)}'
)

results = await asyncio.to_thread(
coll.hybrid_search,
query=query_cfg,
Expand All @@ -279,13 +315,17 @@ async def search(
n_results=k,
include=['documents', 'metadatas'],
)
self.ap.logger.info(
f"SeekDB hybrid search in '{collection}' returned {len(results.get('ids', [[]])[0])} results."
)
else:
# Default: vector search via query()
query_kwargs = {'n_results': k, 'query_embeddings': query_embedding}
if filter:
query_kwargs['where'] = filter
results = await asyncio.to_thread(coll.query, **query_kwargs)

results = self._json_safe(results)
self.ap.logger.info(
f"SeekDB {search_type} search in '{collection}' returned {len(results.get('ids', [[]])[0])} results"
)
Expand All @@ -299,6 +339,7 @@ async def delete_by_file_id(self, collection: str, file_id: str) -> None:
collection: Collection name
file_id: File ID to delete
"""
collection = self._normalize_collection_name(collection)
# Check if collection exists
exists = await asyncio.to_thread(self.client.has_collection, collection)
if not exists:
Expand All @@ -325,6 +366,7 @@ async def delete_by_filter(self, collection: str, filter: Dict[str, Any]) -> int
collection: Collection name
filter: Chroma-style ``where`` filter dict
"""
collection = self._normalize_collection_name(collection)
exists = await asyncio.to_thread(self.client.has_collection, collection)
if not exists:
self.ap.logger.warning(f"SeekDB collection '{collection}' not found for deletion")
Expand All @@ -347,6 +389,7 @@ async def list_by_filter(
limit: int = 20,
offset: int = 0,
) -> tuple[list[Dict[str, Any]], int]:
collection = self._normalize_collection_name(collection)
exists = await asyncio.to_thread(self.client.has_collection, collection)
if not exists:
return [], 0
Expand All @@ -367,6 +410,7 @@ async def list_by_filter(

results = await asyncio.to_thread(coll.get, **get_kwargs)

results = self._json_safe(results)
ids = results.get('ids', [])
metadatas = results.get('metadatas', []) or [None] * len(ids)
documents = results.get('documents', []) or [None] * len(ids)
Expand All @@ -390,6 +434,7 @@ async def delete_collection(self, collection: str):
Args:
collection: Collection name
"""
collection = self._normalize_collection_name(collection)
# Remove from cache
if collection in self._collections:
del self._collections[collection]
Expand Down
Loading