diff --git a/apps/memos-copaw-plugin/memos_client.py b/apps/memos-copaw-plugin/memos_client.py new file mode 100644 index 00000000..5936b0ef --- /dev/null +++ b/apps/memos-copaw-plugin/memos_client.py @@ -0,0 +1,191 @@ +"""Async HTTP client for MemOS Cloud API.""" + +import asyncio +import logging + +from typing import Any + +import aiohttp + + +logger = logging.getLogger(__name__) + +_DEFAULT_BASE_URL = "https://memos.memtensor.cn/api/openmem/v1" +_DEFAULT_TIMEOUT = 8.0 +_DEFAULT_RETRIES = 1 + + +class MemOSClient: + """Async client for MemOS Cloud API. + + Handles authentication, retries, and graceful degradation for + the two core endpoints: /search/memory and /add/message. + """ + + def __init__( + self, + base_url: str = _DEFAULT_BASE_URL, + api_key: str = "", + timeout: float = _DEFAULT_TIMEOUT, + retries: int = _DEFAULT_RETRIES, + ): + self.base_url = base_url.rstrip("/") + self.api_key = api_key + self.timeout = timeout + self.retries = retries + self._session: aiohttp.ClientSession | None = None + + async def _ensure_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession( + headers={ + "Authorization": f"Token {self.api_key}", + "Content-Type": "application/json", + }, + timeout=aiohttp.ClientTimeout(total=self.timeout), + ) + return self._session + + async def close(self) -> None: + if self._session and not self._session.closed: + await self._session.close() + self._session = None + + async def _post( + self, + path: str, + payload: dict[str, Any], + ) -> dict[str, Any] | None: + """POST with retry. Returns parsed JSON or None on failure.""" + url = f"{self.base_url}{path}" + last_err: Exception | None = None + + for attempt in range(1 + self.retries): + try: + session = await self._ensure_session() + async with session.post(url, json=payload) as resp: + if resp.status == 200: + return await resp.json() + body = await resp.text() + logger.warning( + "MemOS API %s returned %s: %s", + path, + resp.status, + body[:300], + ) + except (aiohttp.ClientError, asyncio.TimeoutError) as exc: + last_err = exc + logger.warning( + "MemOS API %s attempt %d failed: %s", + path, + attempt + 1, + exc, + ) + if attempt < self.retries: + await asyncio.sleep(0.5 * (attempt + 1)) + + logger.error("MemOS API %s exhausted retries: %s", path, last_err) + return None + + # ------------------------------------------------------------------ # + # Search / Recall + # ------------------------------------------------------------------ # + + async def search_memory( + self, + user_id: str, + query: str, + *, + source: str = "copaw", + conversation_id: str = "", + memory_limit_number: int = 9, + include_preference: bool = True, + preference_limit_number: int = 6, + include_tool_memory: bool = False, + tool_memory_limit_number: int = 6, + relativity: float = 0.45, + knowledgebase_ids: list[str] | None = None, + filter_obj: dict[str, Any] | None = None, + ) -> dict[str, Any] | None: + """Call POST /search/memory. + + Returns the ``data`` dict from MemOS response, or *None* on failure. + """ + payload: dict[str, Any] = { + "user_id": user_id, + "query": query, + "source": source, + "memory_limit_number": memory_limit_number, + "include_preference": include_preference, + "preference_limit_number": preference_limit_number, + "include_tool_memory": include_tool_memory, + "tool_memory_limit_number": tool_memory_limit_number, + "relativity": relativity, + } + if conversation_id: + payload["conversation_id"] = conversation_id + if knowledgebase_ids: + payload["knowledgebase_ids"] = knowledgebase_ids + if filter_obj: + payload["filter"] = filter_obj + + result = await self._post("/search/memory", payload) + if result is None: + return None + return result.get("data", result) + + # ------------------------------------------------------------------ # + # Add / Store + # ------------------------------------------------------------------ # + + async def add_message( + self, + user_id: str, + messages: list[dict[str, str]], + *, + conversation_id: str = "", + source: str = "copaw", + agent_id: str = "", + async_mode: bool = True, + tags: list[str] | None = None, + ) -> bool: + """Call POST /add/message. + + Returns True on success, False on failure. + """ + payload: dict[str, Any] = { + "user_id": user_id, + "messages": messages, + "source": source, + "async_mode": async_mode, + "tags": tags or ["copaw"], + } + if conversation_id: + payload["conversation_id"] = conversation_id + if agent_id: + payload["agent_id"] = agent_id + + result = await self._post("/add/message", payload) + return result is not None + + # ------------------------------------------------------------------ # + # Health check + # ------------------------------------------------------------------ # + + async def ping(self) -> bool: + """Lightweight connectivity check via a minimal search call. + + Returns True only for 2xx responses. 401/403 (bad key) and + other client errors are treated as failures so that ``start()`` + does not falsely report a healthy connection. + """ + try: + session = await self._ensure_session() + async with session.post( + f"{self.base_url}/search/memory", + json={"user_id": "_ping", "query": "ping"}, + timeout=aiohttp.ClientTimeout(total=5), + ) as resp: + return 200 <= resp.status < 300 + except Exception: + return False diff --git a/apps/memos-copaw-plugin/memos_memory_manager.py b/apps/memos-copaw-plugin/memos_memory_manager.py new file mode 100644 index 00000000..b1ab4281 --- /dev/null +++ b/apps/memos-copaw-plugin/memos_memory_manager.py @@ -0,0 +1,247 @@ +"""MemOS Cloud memory manager for CoPaw agents. + +Extends ReMeLightMemoryManager — all local operations (context compaction, +token counting, tool result truncation, in-memory memory) are delegated +to the parent class unchanged. Only ``memory_search`` is overridden to +query MemOS Cloud instead of the local vector index. + +This ensures full compatibility with CoPaw's MemoryCompactionHook and +force_memory_search auto-recall mechanism. +""" + +import contextlib +import datetime +import logging +import os + +from agentscope.message import TextBlock +from agentscope.tool import ToolResponse +from copaw.agents.memory.reme_light_memory_manager import ( + ReMeLightMemoryManager, +) +from copaw.constant import EnvVarLoader + + +logger = logging.getLogger(__name__) + + +# ------------------------------------------------------------------ # +# Helper: format MemOS search results into readable text +# ------------------------------------------------------------------ # + + +def _format_search_results(data: dict) -> str: + """Convert MemOS /search/memory response into plain-text blocks.""" + parts: list[str] = [] + + for item in data.get("memory_detail_list", []): + ts = item.get("update_time") or item.get("create_time") + date_str = "" + if ts: + with contextlib.suppress(OSError, ValueError): + date_str = datetime.datetime.fromtimestamp( + ts, + tz=datetime.timezone.utc, + ).strftime("[%Y-%m-%d %H:%M] ") + value = (item.get("memory_value") or "")[:8000] + rel = item.get("relativity", 0) + parts.append(f"{date_str}{value} (score={rel:.2f})") + + for item in data.get("preference_detail_list", []): + ptype = item.get("preference_type", "Preference") + pref = item.get("preference", "") + parts.append(f"[{ptype}] {pref}") + + return "\n---\n".join(parts) if parts else "" + + +# ------------------------------------------------------------------ # +# MemOSMemoryManager +# ------------------------------------------------------------------ # + + +class MemOSMemoryManager(ReMeLightMemoryManager): + """Memory manager that combines ReMeLight local ops with MemOS Cloud. + + Inherits all ReMeLight capabilities: + - compact_memory / compact_tool_result / check_context (token-aware) + - get_in_memory_memory (with as_token_counter support) + - summary_memory (file-based with toolkit) + + Overrides one method: + - memory_search → POST /search/memory to MemOS Cloud, + with automatic fallback to local ReMeLight search on failure. + + Configuration is read from ``running.memos_config`` in agent config, + with env-var fallbacks: MEMOS_API_KEY, MEMOS_BASE_URL, MEMOS_USER_ID. + """ + + def __init__(self, working_dir: str, agent_id: str): + super().__init__(working_dir=working_dir, agent_id=agent_id) + self._memos_client = None + self._memos_cfg: dict | None = None + + # ------------------------------------------------------------------ # + # Config resolution + # ------------------------------------------------------------------ # + + def _load_memos_config(self) -> dict: + """Resolve MemOS config: agent config > env var > default.""" + if self._memos_cfg is not None: + return self._memos_cfg + + cfg: dict = {} + try: + from copaw.config.config import load_agent_config + + ac = load_agent_config(self.agent_id) + if hasattr(ac.running, "memos_config"): + mc = ac.running.memos_config + cfg = { + "base_url": mc.base_url, + "api_key": mc.api_key, + "user_id": mc.user_id, + "memory_limit_number": mc.memory_limit_number, + "include_preference": mc.include_preference, + "preference_limit_number": mc.preference_limit_number, + "relativity": mc.relativity, + "timeout": mc.timeout, + "conversation_id": mc.conversation_id, + "knowledgebase_ids": mc.knowledgebase_ids, + # async_mode reserved for future add flow + } + except Exception as e: + logger.debug( + "Could not load memos_config from agent config: %s", + e, + ) + + result = { + "base_url": cfg.get("base_url") + or EnvVarLoader.get_str( + "MEMOS_BASE_URL", + "https://memos.memtensor.cn/api/openmem/v1", + ), + "api_key": cfg.get("api_key") or EnvVarLoader.get_str("MEMOS_API_KEY", ""), + "user_id": cfg.get("user_id") or EnvVarLoader.get_str("MEMOS_USER_ID", "copaw-user"), + "memory_limit_number": cfg.get("memory_limit_number", 9), + "include_preference": cfg.get("include_preference", True), + "preference_limit_number": cfg.get("preference_limit_number", 6), + "relativity": cfg.get("relativity", 0.45), + "timeout": cfg.get("timeout", 8.0), + "conversation_id": cfg.get("conversation_id", ""), + "knowledgebase_ids": cfg.get("knowledgebase_ids", []), + } + self._memos_cfg = result + return result + + # ------------------------------------------------------------------ # + # Lifecycle (extend parent) + # ------------------------------------------------------------------ # + + async def start(self) -> None: + """Start ReMeLight, then initialise the MemOS HTTP client.""" + # ReMeLight local memory first + await super().start() + + # MemOS Cloud client + import importlib.util + + _spec = importlib.util.spec_from_file_location( + "memos_client", + os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "memos_client.py", + ), + ) + _mod = importlib.util.module_from_spec(_spec) + _spec.loader.exec_module(_mod) + + mc = self._load_memos_config() + if not mc["api_key"]: + logger.warning( + "MemOS API key not configured. Set MEMOS_API_KEY env var " + "or add memos_config.api_key to agent config.", + ) + + self._memos_client = _mod.MemOSClient( + base_url=mc["base_url"], + api_key=mc["api_key"], + timeout=mc["timeout"], + ) + + masked = mc["api_key"][:5] + "***" if len(mc["api_key"]) > 5 else "***" + ok = await self._memos_client.ping() + if ok: + logger.info( + "MemOS Cloud connected: %s (key=%s)", + mc["base_url"], + masked, + ) + else: + logger.warning( + "MemOS Cloud unreachable at %s — will fall back to local ReMeLight search.", + mc["base_url"], + ) + + async def close(self) -> bool: + """Close MemOS client, then close ReMeLight.""" + if self._memos_client: + await self._memos_client.close() + self._memos_client = None + return await super().close() + + # ------------------------------------------------------------------ # + # memory_search → MemOS Cloud (fallback: ReMeLight local) + # ------------------------------------------------------------------ # + + async def memory_search( + self, + query: str, + max_results: int = 5, + min_score: float = 0.1, + ) -> ToolResponse: + """Search MemOS Cloud; fall back to local ReMeLight on failure.""" + if not self._memos_client: + return await super().memory_search(query, max_results, min_score) + + mc = self._load_memos_config() + # Use config default when caller doesn't override + limit = max(max_results, mc["memory_limit_number"]) + data = await self._memos_client.search_memory( + user_id=mc["user_id"], + query=query, + memory_limit_number=limit, + include_preference=mc["include_preference"], + preference_limit_number=mc["preference_limit_number"], + relativity=max(min_score, mc["relativity"]), + conversation_id=mc["conversation_id"], + knowledgebase_ids=mc["knowledgebase_ids"] or None, + ) + + # Fallback to local search if cloud is unreachable + if data is None: + logger.warning( + "MemOS Cloud search failed, falling back to local search.", + ) + return await super().memory_search(query, max_results, min_score) + + text = _format_search_results(data) + if not text: + # Cloud returned empty — try local as supplement + return await super().memory_search(query, max_results, min_score) + + return ToolResponse( + content=[TextBlock(type="text", text=text)], + ) + + # summary_memory / compact_memory / compact_tool_result / check_context + # → fully inherited from ReMeLightMemoryManager, no override needed. + # + # MemOS Cloud "add" (uploading conversations) is intentionally NOT done + # here. CoPaw's memory manager interface has no per-turn callback; + # summary_memory only fires during context compaction, which is too + # infrequent for reliable cloud sync. A proper add flow requires + # CoPaw to expose a post-turn hook (similar to OpenClaw's agent_end). + # Until then, users can populate MemOS Cloud via the MemOS dashboard + # or other clients, and this plugin provides cloud-based recall. diff --git a/apps/memos-copaw-plugin/plugin.json b/apps/memos-copaw-plugin/plugin.json new file mode 100644 index 00000000..934468ca --- /dev/null +++ b/apps/memos-copaw-plugin/plugin.json @@ -0,0 +1,16 @@ +{ + "id": "memos-copaw-plugin", + "name": "MemOS Cloud Memory", + "version": "0.1.0", + "description": "MemOS Cloud memory backend for CoPaw — cloud-based long-term memory recall via MemOS search API.", + "author": "MemTensor", + "entry_point": "plugin.py", + "dependencies": [ + "aiohttp>=3.9.0" + ], + "min_copaw_version": "0.1.0", + "meta": { + "api_key_url": "https://memos.memtensor.cn", + "api_key_hint": "Get your API key from MemOS Cloud dashboard" + } +} diff --git a/apps/memos-copaw-plugin/plugin.py b/apps/memos-copaw-plugin/plugin.py new file mode 100644 index 00000000..061a392c --- /dev/null +++ b/apps/memos-copaw-plugin/plugin.py @@ -0,0 +1,53 @@ +"""MemOS Cloud memory plugin for CoPaw. + +Registers MemOSMemoryManager as a pluggable memory backend so that +CoPaw agents can use MemOS Cloud for long-term memory. + +Installation: + copaw plugin install + +Then set ``memory_manager_backend: "memos"`` in agent config and +provide MEMOS_API_KEY (env var or config). +""" + +import importlib.util +import logging +import os + + +logger = logging.getLogger(__name__) + +# Load sibling module without mutating sys.path +_plugin_dir = os.path.dirname(os.path.abspath(__file__)) +_spec = importlib.util.spec_from_file_location( + "memos_memory_manager", + os.path.join(_plugin_dir, "memos_memory_manager.py"), +) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) +MemOSMemoryManager = _mod.MemOSMemoryManager + + +class _MemOSPlugin: + """Plugin definition following CoPaw's plugin contract.""" + + def register(self, api): + """Register the MemOS memory manager backend with CoPaw.""" + logger.info("MemOS Cloud plugin registering...") + + api.register_memory_manager( + backend_id="memos", + manager_class=MemOSMemoryManager, + ) + logger.info("Registered MemOS memory manager backend 'memos'") + + api.register_startup_hook( + hook_name="memos_cloud_init", + callback=lambda: logger.info( + "MemOS Cloud plugin ready. Set memory_manager_backend='memos' to activate." + ), + priority=90, + ) + + +plugin = _MemOSPlugin()