diff --git a/CHANGELOG.md b/CHANGELOG.md index 4625397..b634cc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,106 @@ All notable changes to Project Lyra are organized by component. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and adheres to [Semantic Versioning](https://semver.org/). -# Last Updated: 11-26-25 +# Last Updated: 11-28-25 --- ## 🧠 Lyra-Core ############################################################################## +## [Project Lyra v0.5.0] - 2025-11-28 + +### πŸ”§ Fixed - Critical API Wiring & Integration +After the major architectural rewire (v0.4.x), this release fixes all critical endpoint mismatches and ensures end-to-end system connectivity. + +#### Cortex β†’ Intake Integration βœ… +- **Fixed** `IntakeClient` to use correct Intake v0.2 API endpoints + - Changed `GET /context/{session_id}` β†’ `GET /summaries?session_id={session_id}` + - Updated JSON response parsing to extract `summary_text` field + - Fixed environment variable name: `INTAKE_API` β†’ `INTAKE_API_URL` + - Corrected default port: `7083` β†’ `7080` + - Added deprecation warning to `summarize_turn()` method (endpoint removed in Intake v0.2) + +#### Relay β†’ UI Compatibility βœ… +- **Added** OpenAI-compatible endpoint `POST /v1/chat/completions` + - Accepts standard OpenAI format with `messages[]` array + - Returns OpenAI-compatible response structure with `choices[]` + - Extracts last message content from messages array + - Includes usage metadata (stub values for compatibility) +- **Refactored** Relay to use shared `handleChatRequest()` function + - Both `/chat` and `/v1/chat/completions` use same core logic + - Eliminates code duplication + - Consistent error handling across endpoints + +#### Relay β†’ Intake Connection βœ… +- **Fixed** Intake URL fallback in Relay server configuration + - Corrected port: `7082` β†’ `7080` + - Updated endpoint: `/summary` β†’ `/add_exchange` + - Now properly sends exchanges to Intake for summarization + +#### Code Quality & Python Package Structure βœ… +- **Added** missing `__init__.py` files to all Cortex subdirectories + - `cortex/llm/__init__.py` + - `cortex/reasoning/__init__.py` + - `cortex/persona/__init__.py` + - `cortex/ingest/__init__.py` + - `cortex/utils/__init__.py` + - Improves package imports and IDE support +- **Removed** unused import in `cortex/router.py`: `from unittest import result` +- **Deleted** empty file `cortex/llm/resolve_llm_url.py` (was 0 bytes, never implemented) + +### βœ… Verified Working +Complete end-to-end message flow now operational: +``` +UI β†’ Relay (/v1/chat/completions) + ↓ +Relay β†’ Cortex (/reason) + ↓ +Cortex β†’ Intake (/summaries) [retrieves context] + ↓ +Cortex 4-stage pipeline: + 1. reflection.py β†’ meta-awareness notes + 2. reasoning.py β†’ draft answer + 3. refine.py β†’ polished answer + 4. persona/speak.py β†’ Lyra personality + ↓ +Cortex β†’ Relay (returns persona response) + ↓ +Relay β†’ Intake (/add_exchange) [async summary] + ↓ +Intake β†’ NeoMem (background memory storage) + ↓ +Relay β†’ UI (final response) +``` + +### πŸ“ Documentation +- **Added** this CHANGELOG entry with comprehensive v0.5.0 notes +- **Updated** README.md to reflect v0.5.0 architecture + - Documented new endpoints + - Updated data flow diagrams + - Clarified Intake v0.2 changes + - Corrected service descriptions + +### πŸ› Issues Resolved +- ❌ Cortex could not retrieve context from Intake (wrong endpoint) +- ❌ UI could not send messages to Relay (endpoint mismatch) +- ❌ Relay could not send summaries to Intake (wrong port/endpoint) +- ❌ Python package imports were implicit (missing __init__.py) + +### ⚠️ Known Issues (Non-Critical) +- Session management endpoints not implemented in Relay (`GET/POST /sessions/:id`) +- RAG service currently disabled in docker-compose.yml +- Cortex `/ingest` endpoint is a stub returning `{"status": "ok"}` + +### 🎯 Migration Notes +If upgrading from v0.4.x: +1. Pull latest changes from git +2. Verify environment variables in `.env` files: + - Check `INTAKE_API_URL=http://intake:7080` (not `INTAKE_API`) + - Verify all service URLs use correct ports +3. Restart Docker containers: `docker-compose down && docker-compose up -d` +4. Test with a simple message through the UI + +--- + ## [Infrastructure v1.0.0] - 2025-11-26 ### Changed diff --git a/README.md b/README.md index f8a1eed..74bf62d 100644 --- a/README.md +++ b/README.md @@ -1,73 +1,178 @@ -##### Project Lyra - README v0.3.0 - needs fixing ##### +# Project Lyra - README v0.5.0 -Lyra is a modular persistent AI companion system. -It provides memory-backed chat using **NeoMem** + **Relay** + **Persona Sidecar**, -with optional subconscious annotation powered by **Cortex VM** running local LLMs. +Lyra is a modular persistent AI companion system with advanced reasoning capabilities. +It provides memory-backed chat using **NeoMem** + **Relay** + **Cortex**, +with multi-stage reasoning pipeline powered by distributed LLM backends. -## Mission Statement ## - The point of project lyra is to give an AI chatbot more abilities than a typical chatbot. typical chat bots are essentially amnesic and forget everything about your project. Lyra helps keep projects organized and remembers everything you have done. Think of her abilities as a notepad/schedule/data base/ co-creator/collaborattor all with its own executive function. Say something in passing, Lyra remembers it then reminds you of it later. +## Mission Statement + +The point of Project Lyra is to give an AI chatbot more abilities than a typical chatbot. Typical chatbots are essentially amnesic and forget everything about your project. Lyra helps keep projects organized and remembers everything you have done. Think of her abilities as a notepad/schedule/database/co-creator/collaborator all with its own executive function. Say something in passing, Lyra remembers it then reminds you of it later. --- - -## Structure ## - Project Lyra exists as a series of docker containers that run independentally of each other but are all networked together. Think of it as how the brain has regions, Lyra has modules: - ## A. VM 100 - lyra-core: - 1. ** Core v0.3.1 - Docker Stack - - Relay - (docker container) - The main harness that connects the modules together and accepts input from the user. - - UI - (HTML) - This is how the user communicates with lyra. ATM its a typical instant message interface, but plans are to make it much more than that. - - Persona - (docker container) - This is the personality of lyra, set how you want her to behave. Give specific instructions for output. Basically prompt injection. - - All of this is built and controlled by a single .env and docker-compose.lyra.yml. - 2. **NeoMem v0.1.0 - (docker stack) - - NeoMem is Lyra's main long term memory data base. It is a fork of mem0 oss. Uses vector databases and graph. - - NeoMem launches with a single separate docker-compose.neomem.yml. - - ## B. VM 101 - lyra - cortex - 3. ** Cortex - VM containing docker stack - - This is the working reasoning layer of Lyra. - - Built to be flexible in deployment. Run it locally or remotely (via wan/lan) - - Intake v0.1.0 - (docker Container) gives conversations context and purpose - - Intake takes the last N exchanges and summarizes them into coherrent short term memories. - - Uses a cascading summarization setup that quantizes the exchanges. Summaries occur at L2, L5, L10, L15, L20 etc. - - Keeps the bot aware of what is going on with out having to send it the whole chat every time. - - Cortex - Docker container containing: - - Reasoning Layer - - TBD - - Reflect - (docker continer) - Not yet implemented, road map. - - Calls back to NeoMem after N exchanges and N summaries and edits memories created during the initial messaging step. This helps contain memories to coherrent thoughts, reduces the noise. - - Can be done actively and asynchronously, or on a time basis (think human sleep and dreams). - - This stage is not yet built, this is just an idea. - - ## C. Remote LLM APIs: - 3. **AI Backends - - Lyra doesnt run models her self, she calls up APIs. - - Endlessly customizable as long as it outputs to the same schema. + +## Architecture Overview + +Project Lyra operates as a series of Docker containers networked together in a microservices architecture. Like how the brain has regions, Lyra has modules: + +### A. VM 100 - lyra-core (Core Services) + +**1. Relay** (Node.js/Express) - Port 7078 +- Main orchestrator and message router +- Coordinates all module interactions +- OpenAI-compatible endpoint: `POST /v1/chat/completions` +- Internal endpoint: `POST /chat` +- Routes messages through Cortex reasoning pipeline +- Manages async calls to Intake and NeoMem + +**2. UI** (Static HTML) +- Browser-based chat interface with cyberpunk theme +- Connects to Relay at `http://10.0.0.40:7078` +- Saves and loads sessions +- OpenAI-compatible message format + +**3. NeoMem** (Python/FastAPI) - Port 7077 +- Long-term memory database (fork of Mem0 OSS) +- Vector storage (PostgreSQL + pgvector) + Graph storage (Neo4j) +- RESTful API: `/memories`, `/search` +- Semantic memory updates and retrieval +- No external SDK dependencies - fully local + +### B. VM 101 - lyra-cortex (Reasoning Layer) + +**4. Cortex** (Python/FastAPI) - Port 7081 +- Primary reasoning engine with multi-stage pipeline +- **4-Stage Processing:** + 1. **Reflection** - Generates meta-awareness notes about conversation + 2. **Reasoning** - Creates initial draft answer using context + 3. **Refinement** - Polishes and improves the draft + 4. **Persona** - Applies Lyra's personality and speaking style +- Integrates with Intake for short-term context +- Flexible LLM router supporting multiple backends + +**5. Intake v0.2** (Python/FastAPI) - Port 7080 +- Simplified short-term memory summarization +- Session-based circular buffer (deque, maxlen=200) +- Single-level simple summarization (no cascading) +- Background async processing with FastAPI BackgroundTasks +- Pushes summaries to NeoMem automatically +- **API Endpoints:** + - `POST /add_exchange` - Add conversation exchange + - `GET /summaries?session_id={id}` - Retrieve session summary + - `POST /close_session/{id}` - Close and cleanup session + +### C. LLM Backends (Remote/Local APIs) + +**Multi-Backend Strategy:** +- **PRIMARY**: vLLM on AMD MI50 GPU (`http://10.0.0.43:8000`) - Cortex reasoning, Intake +- **SECONDARY**: Ollama on RTX 3090 (`http://10.0.0.3:11434`) - Configurable per-module +- **CLOUD**: OpenAI API (`https://api.openai.com/v1`) - Cortex persona layer +- **FALLBACK**: Local backup (`http://10.0.0.41:11435`) - Emergency fallback --- +## Data Flow Architecture (v0.5.0) -## πŸš€ Features ## +### Normal Message Flow: -# Lyra-Core VM (VM100) -- **Relay **: - - The main harness and orchestrator of Lyra. - - OpenAI-compatible endpoint: `POST /v1/chat/completions` - - Injects persona + relevant memories into every LLM call - - Routes all memory storage/retrieval through **NeoMem** - - Logs spans (`neomem.add`, `neomem.search`, `persona.fetch`, `llm.generate`) +``` +User (UI) β†’ POST /v1/chat/completions + ↓ +Relay (7078) + ↓ POST /reason +Cortex (7081) + ↓ GET /summaries?session_id=xxx +Intake (7080) [RETURNS SUMMARY] + ↓ +Cortex processes (4 stages): + 1. reflection.py β†’ meta-awareness notes + 2. reasoning.py β†’ draft answer (uses LLM) + 3. refine.py β†’ refined answer (uses LLM) + 4. persona/speak.py β†’ Lyra personality (uses LLM) + ↓ +Returns persona answer to Relay + ↓ +Relay β†’ Cortex /ingest (async, stub) +Relay β†’ Intake /add_exchange (async) + ↓ +Intake β†’ Background summarize β†’ NeoMem + ↓ +Relay β†’ UI (returns final response) +``` -- **NeoMem (Memory Engine)**: - - Forked from Mem0 OSS and fully independent. - - Drop-in compatible API (`/memories`, `/search`). - - Local-first: runs on FastAPI with Postgres + Neo4j. - - No external SDK dependencies. - - Default service: `neomem-api` (port 7077). - - Capable of adding new memories and updating previous memories. Compares existing embeddings and performs in place updates when a memory is judged to be a semantic match. +### Cortex 4-Stage Reasoning Pipeline: -- **UI**: - - Lightweight static HTML chat page. - - Connects to Relay at `http://:7078`. - - Nice cyberpunk theme! - - Saves and loads sessions, which then in turn send to relay. +1. **Reflection** (`reflection.py`) - Cloud backend (OpenAI) + - Analyzes user intent and conversation context + - Generates meta-awareness notes + - "What is the user really asking?" + +2. **Reasoning** (`reasoning.py`) - Primary backend (vLLM) + - Retrieves short-term context from Intake + - Creates initial draft answer + - Integrates context, reflection notes, and user prompt + +3. **Refinement** (`refine.py`) - Primary backend (vLLM) + - Polishes the draft answer + - Improves clarity and coherence + - Ensures factual consistency + +4. **Persona** (`speak.py`) - Cloud backend (OpenAI) + - Applies Lyra's personality and speaking style + - Natural, conversational output + - Final answer returned to user + +--- + +## Features + +### Lyra-Core (VM 100) + +**Relay**: +- Main orchestrator and message router +- OpenAI-compatible endpoint: `POST /v1/chat/completions` +- Internal endpoint: `POST /chat` +- Health check: `GET /_health` +- Async non-blocking calls to Cortex and Intake +- Shared request handler for code reuse +- Comprehensive error handling + +**NeoMem (Memory Engine)**: +- Forked from Mem0 OSS - fully independent +- Drop-in compatible API (`/memories`, `/search`) +- Local-first: runs on FastAPI with Postgres + Neo4j +- No external SDK dependencies +- Semantic memory updates - compares embeddings and performs in-place updates +- Default service: `neomem-api` (port 7077) + +**UI**: +- Lightweight static HTML chat interface +- Cyberpunk theme +- Session save/load functionality +- OpenAI message format support + +### Cortex (VM 101) + +**Cortex** (v0.5): +- Multi-stage reasoning pipeline (reflection β†’ reasoning β†’ refine β†’ persona) +- Flexible LLM backend routing +- Per-stage backend selection +- Async processing throughout +- IntakeClient integration for short-term context +- `/reason`, `/ingest` (stub), `/health` endpoints + +**Intake** (v0.2): +- Simplified single-level summarization +- Session-based circular buffer (200 exchanges max) +- Background async summarization +- Automatic NeoMem push +- No persistent log files (memory-only) +- **Breaking change from v0.1**: Removed cascading summaries (L1, L2, L5, L10, L20, L30) + +**LLM Router**: +- Dynamic backend selection +- Environment-driven configuration +- Support for vLLM, Ollama, OpenAI, custom endpoints +- Per-module backend preferences # Beta Lyrae (RAG Memory DB) - added 11-3-25 - **RAG Knowledge DB - Beta Lyrae (sheliak)** @@ -159,7 +264,85 @@ with optional subconscious annotation powered by **Cortex VM** running local LLM └── Future: sends summaries β†’ Cortex for reflection -# Additional information available in the trilium docs. # +--- + +## Version History + +### v0.5.0 (2025-11-28) - Current Release +- βœ… Fixed all critical API wiring issues +- βœ… Added OpenAI-compatible endpoint to Relay (`/v1/chat/completions`) +- βœ… Fixed Cortex β†’ Intake integration +- βœ… Added missing Python package `__init__.py` files +- βœ… End-to-end message flow verified and working + +### v0.4.x (Major Rewire) +- Cortex multi-stage reasoning pipeline +- Intake v0.2 simplification +- LLM router with multi-backend support +- Major architectural restructuring + +### v0.3.x +- Beta Lyrae RAG system +- NeoMem integration +- Basic Cortex reasoning loop + +--- + +## Known Issues (v0.5.0) + +### Non-Critical +- Session management endpoints not fully implemented in Relay +- RAG service currently disabled in docker-compose.yml +- Cortex `/ingest` endpoint is a stub + +### Future Enhancements +- Re-enable RAG service integration +- Implement full session persistence +- Add request correlation IDs for tracing +- Comprehensive health checks + +--- + +## Quick Start + +### Prerequisites +- Docker + Docker Compose +- PostgreSQL 13+, Neo4j 4.4+ (for NeoMem) +- At least one LLM API endpoint (vLLM, Ollama, or OpenAI) + +### Setup +1. Configure environment variables in `.env` files +2. Start services: `docker-compose up -d` +3. Check health: `curl http://localhost:7078/_health` +4. Access UI: `http://localhost:7078` + +### Test +```bash +curl -X POST http://localhost:7078/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "messages": [{"role": "user", "content": "Hello Lyra!"}], + "session_id": "test" + }' +``` + +--- + +## Documentation + +- See [CHANGELOG.md](CHANGELOG.md) for detailed version history +- See `ENVIRONMENT_VARIABLES.md` for environment variable reference +- Additional information available in the Trilium docs + +--- + +## License + +NeoMem is a derivative work based on Mem0 OSS (Apache 2.0). +Β© 2025 Terra-Mechanics / ServersDown Labs. All modifications released under Apache 2.0. + +**Built with Claude Code** + --- ## πŸ“¦ Requirements diff --git a/core/relay/server.js b/core/relay/server.js index 32c3bec..71e5dca 100644 --- a/core/relay/server.js +++ b/core/relay/server.js @@ -13,7 +13,7 @@ const PORT = Number(process.env.PORT || 7078); // core endpoints const CORTEX_REASON = process.env.CORTEX_REASON_URL || "http://cortex:7081/reason"; const CORTEX_INGEST = process.env.CORTEX_INGEST_URL || "http://cortex:7081/ingest"; -const INTAKE_URL = process.env.INTAKE_URL || "http://intake:7082/summary"; +const INTAKE_URL = process.env.INTAKE_URL || "http://intake:7080/add_exchange"; // ----------------------------------------------------- // Helper request wrapper @@ -41,6 +41,45 @@ async function postJSON(url, data) { return json; } +// ----------------------------------------------------- +// Shared chat handler logic +// ----------------------------------------------------- +async function handleChatRequest(session_id, user_msg) { + // 1. β†’ Cortex.reason + let reason; + try { + reason = await postJSON(CORTEX_REASON, { + session_id, + user_prompt: user_msg + }); + } catch (e) { + console.error("Relay β†’ Cortex.reason error:", e.message); + throw new Error(`cortex_reason_failed: ${e.message}`); + } + + const persona = reason.final_output || reason.persona || "(no persona text)"; + + // 2. β†’ Cortex.ingest (async, non-blocking) + postJSON(CORTEX_INGEST, { + session_id, + user_msg, + assistant_msg: persona + }).catch(e => console.warn("Relay β†’ Cortex.ingest failed:", e.message)); + + // 3. β†’ Intake summary (async, non-blocking) + postJSON(INTAKE_URL, { + session_id, + user_msg, + assistant_msg: persona + }).catch(e => console.warn("Relay β†’ Intake failed:", e.message)); + + // 4. Return result + return { + session_id, + reply: persona + }; +} + // ----------------------------------------------------- // HEALTHCHECK // ----------------------------------------------------- @@ -48,6 +87,59 @@ app.get("/_health", (_, res) => { res.json({ ok: true }); }); +// ----------------------------------------------------- +// OPENAI-COMPATIBLE ENDPOINT (for UI) +// ----------------------------------------------------- +app.post("/v1/chat/completions", async (req, res) => { + try { + // Extract from OpenAI format + const session_id = req.body.session_id || req.body.user || "default"; + const messages = req.body.messages || []; + const lastMessage = messages[messages.length - 1]; + const user_msg = lastMessage?.content || ""; + + if (!user_msg) { + return res.status(400).json({ error: "No message content provided" }); + } + + console.log(`Relay (v1) β†’ received: "${user_msg}"`); + + // Call the same logic as /chat + const result = await handleChatRequest(session_id, user_msg); + + // Return in OpenAI format + return res.json({ + id: `chatcmpl-${Date.now()}`, + object: "chat.completion", + created: Math.floor(Date.now() / 1000), + model: "lyra", + choices: [{ + index: 0, + message: { + role: "assistant", + content: result.reply + }, + finish_reason: "stop" + }], + usage: { + prompt_tokens: 0, + completion_tokens: 0, + total_tokens: 0 + } + }); + + } catch (err) { + console.error("Relay v1 endpoint fatal:", err); + res.status(500).json({ + error: { + message: err.message || String(err), + type: "server_error", + code: "relay_failed" + } + }); + } +}); + // ----------------------------------------------------- // MAIN ENDPOINT (new canonical) // ----------------------------------------------------- @@ -58,39 +150,8 @@ app.post("/chat", async (req, res) => { console.log(`Relay β†’ received: "${user_msg}"`); - // 1. β†’ Cortex.reason - let reason; - try { - reason = await postJSON(CORTEX_REASON, { - session_id, - user_prompt: user_msg - }); - } catch (e) { - console.error("Relay β†’ Cortex.reason error:", e.message); - return res.status(500).json({ error: "cortex_reason_failed", detail: e.message }); - } - - const persona = reason.final_output || reason.persona || "(no persona text)"; - - // 2. β†’ Cortex.ingest - postJSON(CORTEX_INGEST, { - session_id, - user_msg, - assistant_msg: persona - }).catch(e => console.warn("Relay β†’ Cortex.ingest failed:", e.message)); - - // 3. β†’ Intake summary - postJSON(INTAKE_URL, { - session_id, - user_msg, - assistant_msg: persona - }).catch(e => console.warn("Relay β†’ Intake failed:", e.message)); - - // 4. β†’ Return to UI - return res.json({ - session_id, - reply: persona - }); + const result = await handleChatRequest(session_id, user_msg); + return res.json(result); } catch (err) { console.error("Relay fatal:", err); diff --git a/cortex/ingest/__init__.py b/cortex/ingest/__init__.py new file mode 100644 index 0000000..0b058b3 --- /dev/null +++ b/cortex/ingest/__init__.py @@ -0,0 +1 @@ +# Ingest module - handles communication with Intake service diff --git a/cortex/ingest/intake_client.py b/cortex/ingest/intake_client.py index a0b85f3..f0b1760 100644 --- a/cortex/ingest/intake_client.py +++ b/cortex/ingest/intake_client.py @@ -8,9 +8,14 @@ class IntakeClient: """Handles short-term / episodic summaries from Intake service.""" def __init__(self): - self.base_url = os.getenv("INTAKE_API", "http://intake:7083") + self.base_url = os.getenv("INTAKE_API_URL", "http://intake:7080") async def summarize_turn(self, session_id: str, user_msg: str, assistant_msg: Optional[str] = None) -> Dict[str, Any]: + """ + DEPRECATED: Intake v0.2 removed the /summarize endpoint. + Use add_exchange() instead, which auto-summarizes in the background. + This method is kept for backwards compatibility but will fail. + """ payload = { "session_id": session_id, "turns": [{"role": "user", "content": user_msg}] @@ -24,15 +29,17 @@ class IntakeClient: r.raise_for_status() return r.json() except Exception as e: - logger.warning(f"Intake summarize_turn failed: {e}") + logger.warning(f"Intake summarize_turn failed (endpoint removed in v0.2): {e}") return {} async def get_context(self, session_id: str) -> str: + """Get summarized context for a session from Intake.""" async with httpx.AsyncClient(timeout=15) as client: try: - r = await client.get(f"{self.base_url}/context/{session_id}") + r = await client.get(f"{self.base_url}/summaries", params={"session_id": session_id}) r.raise_for_status() - return r.text + data = r.json() + return data.get("summary_text", "") except Exception as e: logger.warning(f"Intake get_context failed: {e}") return "" diff --git a/cortex/llm/__init__.py b/cortex/llm/__init__.py new file mode 100644 index 0000000..4113107 --- /dev/null +++ b/cortex/llm/__init__.py @@ -0,0 +1 @@ +# LLM module - provides LLM routing and backend abstraction diff --git a/cortex/llm/resolve_llm_url.py b/cortex/llm/resolve_llm_url.py deleted file mode 100644 index e69de29..0000000 diff --git a/cortex/persona/__init__.py b/cortex/persona/__init__.py new file mode 100644 index 0000000..07910ce --- /dev/null +++ b/cortex/persona/__init__.py @@ -0,0 +1 @@ +# Persona module - applies Lyra's personality and speaking style diff --git a/cortex/reasoning/__init__.py b/cortex/reasoning/__init__.py new file mode 100644 index 0000000..0931e2c --- /dev/null +++ b/cortex/reasoning/__init__.py @@ -0,0 +1 @@ +# Reasoning module - multi-stage reasoning pipeline diff --git a/cortex/router.py b/cortex/router.py index 5fd9b41..339a971 100644 --- a/cortex/router.py +++ b/cortex/router.py @@ -1,6 +1,5 @@ # router.py -from unittest import result from fastapi import APIRouter, HTTPException from pydantic import BaseModel diff --git a/cortex/utils/__init__.py b/cortex/utils/__init__.py new file mode 100644 index 0000000..1d96dc7 --- /dev/null +++ b/cortex/utils/__init__.py @@ -0,0 +1 @@ +# Utilities module diff --git a/intake/intake.py b/intake/intake.py index 1724ece..fb2a564 100644 --- a/intake/intake.py +++ b/intake/intake.py @@ -1,430 +1,160 @@ from fastapi import FastAPI, Body, Query, BackgroundTasks from collections import deque from datetime import datetime +from uuid import uuid4 import requests import os import sys -import asyncio -from dotenv import load_dotenv - -# ─────────────────────────────────────────────── -# πŸ”§ Load environment variables -# ─────────────────────────────────────────────── -load_dotenv() +# ───────────────────────────── +# Config +# ───────────────────────────── SUMMARY_MODEL = os.getenv("SUMMARY_MODEL_NAME", "mistral-7b-instruct-v0.2.Q4_K_M.gguf") SUMMARY_URL = os.getenv("SUMMARY_API_URL", "http://localhost:8080/v1/completions") SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200")) SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3")) -# ─────────────────────────────────────────────── -# 🧠 NeoMem connection (session-aware) -# ─────────────────────────────────────────────── -from uuid import uuid4 - NEOMEM_API = os.getenv("NEOMEM_API") NEOMEM_KEY = os.getenv("NEOMEM_KEY") -def push_summary_to_neomem(summary_text: str, level: str, session_id: str): - """Send summarized text to NeoMem, tagged by session_id.""" - if not NEOMEM_API: - print("⚠️ NEOMEM_API not set, skipping NeoMem push") - return +# ───────────────────────────── +# App + session buffer +# ───────────────────────────── +app = FastAPI() +SESSIONS = {} - payload = { - "messages": [ - {"role": "assistant", "content": summary_text} - ], - "user_id": "brian", - # optional: uncomment if you want sessions tracked in NeoMem natively - # "run_id": session_id, - "metadata": { - "source": "intake", - "type": "summary", - "level": level, - "session_id": session_id, - "cortex": {} - } - } +@app.on_event("startup") +def banner(): + print("🧩 Intake v0.2 booting...") + print(f" Model: {SUMMARY_MODEL}") + print(f" API: {SUMMARY_URL}") + sys.stdout.flush() + +# ───────────────────────────── +# Helper: summarize exchanges +# ───────────────────────────── +def llm(prompt: str): + try: + resp = requests.post( + SUMMARY_URL, + json={ + "model": SUMMARY_MODEL, + "prompt": prompt, + "max_tokens": SUMMARY_MAX_TOKENS, + "temperature": SUMMARY_TEMPERATURE, + }, + timeout=30, + ) + resp.raise_for_status() + return resp.json().get("choices", [{}])[0].get("text", "").strip() + except Exception as e: + return f"[Error summarizing: {e}]" + +def summarize_simple(exchanges): + """Simple factual summary of recent exchanges.""" + text = "" + for e in exchanges: + text += f"User: {e['user_msg']}\nAssistant: {e['assistant_msg']}\n\n" + + prompt = f""" + Summarize the following conversation between Brian (user) and Lyra (assistant). + Focus only on factual content. Avoid names, examples, story tone, or invented details. + + {text} + + Summary: + """ + return llm(prompt) + +# ───────────────────────────── +# NeoMem push +# ───────────────────────────── +def push_to_neomem(summary: str, session_id: str): + if not NEOMEM_API: + return headers = {"Content-Type": "application/json"} if NEOMEM_KEY: headers["Authorization"] = f"Bearer {NEOMEM_KEY}" + payload = { + "messages": [{"role": "assistant", "content": summary}], + "user_id": "brian", + "metadata": { + "source": "intake", + "session_id": session_id + } + } + try: - r = requests.post(f"{NEOMEM_API}/memories", json=payload, headers=headers, timeout=25) - r.raise_for_status() - print(f"🧠 NeoMem updated ({level}, {session_id}, {len(summary_text)} chars)") + requests.post( + f"{NEOMEM_API}/memories", + json=payload, + headers=headers, + timeout=20 + ).raise_for_status() + print(f"🧠 NeoMem updated for {session_id}") except Exception as e: - print(f"❌ NeoMem push failed ({level}, {session_id}): {e}") + print(f"NeoMem push failed: {e}") - -# ─────────────────────────────────────────────── -# βš™οΈ FastAPI + buffer setup -# ─────────────────────────────────────────────── -app = FastAPI() - -# Multiple rolling buffers keyed by session_id -SESSIONS = {} - - -# Summary trigger points -# β†’ low-tier: quick factual recaps -# β†’ mid-tier: β€œReality Check” reflections -# β†’ high-tier: rolling continuity synthesis -LEVELS = [1, 2, 5, 10, 20, 30] - -@app.on_event("startup") -def show_boot_banner(): - print("🧩 Intake booting...") - print(f" Model: {SUMMARY_MODEL}") - print(f" API: {SUMMARY_URL}") - print(f" Max tokens: {SUMMARY_MAX_TOKENS}, Temp: {SUMMARY_TEMPERATURE}") - sys.stdout.flush() - -# ─────────────────────────────────────────────── -# 🧠 Hierarchical Summarizer (L10β†’L20β†’L30 cascade) -# ─────────────────────────────────────────────── -SUMMARIES_CACHE = {"L10": [], "L20": [], "L30": []} - -def summarize(exchanges, level): - """Hierarchical summarizer: builds local and meta summaries.""" - # Join exchanges into readable text - text = "\n".join( - f"User: {e['turns'][0]['content']}\nAssistant: {e['turns'][1]['content']}" - for e in exchanges - ) - - def query_llm(prompt: str): - try: - resp = requests.post( - SUMMARY_URL, - json={ - "model": SUMMARY_MODEL, - "prompt": prompt, - "max_tokens": SUMMARY_MAX_TOKENS, - "temperature": SUMMARY_TEMPERATURE, - }, - timeout=180, - ) - resp.raise_for_status() - data = resp.json() - return data.get("choices", [{}])[0].get("text", "").strip() - except Exception as e: - return f"[Error summarizing: {e}]" - - # ───── L10: local β€œReality Check” block ───── - if level == 10: - prompt = f""" - You are Lyra Intake performing a 'Reality Check' for the last {len(exchanges)} exchanges. - Summarize this block as one coherent paragraph describing the user’s focus, progress, and tone. - Avoid bullet points. - - Exchanges: - {text} - - Reality Check Summary: - """ - summary = query_llm(prompt) - SUMMARIES_CACHE["L10"].append(summary) - - # ───── L20: merge L10s ───── - elif level == 20: - # 1️⃣ create fresh L10 for 11–20 - l10_prompt = f""" - You are Lyra Intake generating a second Reality Check for the most recent {len(exchanges)} exchanges. - Summarize them as one paragraph describing what's new or changed since the last block. - Avoid bullet points. - - Exchanges: - {text} - - Reality Check Summary: - """ - new_l10 = query_llm(l10_prompt) - SUMMARIES_CACHE["L10"].append(new_l10) - - # 2️⃣ merge all L10s into a Session Overview - joined_l10s = "\n\n".join(SUMMARIES_CACHE["L10"]) - l20_prompt = f""" - You are Lyra Intake merging multiple 'Reality Checks' into a single Session Overview. - Summarize the following Reality Checks into one short paragraph capturing the ongoing goals, - patterns, and overall progress. - - Reality Checks: - {joined_l10s} - - Session Overview: - """ - l20_summary = query_llm(l20_prompt) - SUMMARIES_CACHE["L20"].append(l20_summary) - summary = new_l10 + "\n\n" + l20_summary - - # ───── L30: continuity synthesis ───── - elif level == 30: - # 1️⃣ create new L10 for 21–30 - new_l10 = query_llm(f""" - You are Lyra Intake creating a new Reality Check for exchanges 21–30. - Summarize this block in one cohesive paragraph, describing any shifts in focus or tone. - - Exchanges: - {text} - - Reality Check Summary: - """) - - SUMMARIES_CACHE["L10"].append(new_l10) - - # 2️⃣ merge all lower levels for continuity - joined = "\n\n".join(SUMMARIES_CACHE["L10"] + SUMMARIES_CACHE["L20"]) - continuity_prompt = f""" - You are Lyra Intake performing a 'Continuity Report' β€” a high-level reflection combining all Reality Checks - and Session Overviews so far. Describe how the conversation has evolved, the key insights, and remaining threads. - - Reality Checks and Overviews: - {joined} - - Continuity Report: - """ - l30_summary = query_llm(continuity_prompt) - SUMMARIES_CACHE["L30"].append(l30_summary) - summary = new_l10 + "\n\n" + l30_summary - - # ───── L1–L5 (standard factual summaries) ───── - else: - prompt = f""" - You are Lyra Intake, a background summarization module for an AI assistant. - - Your job is to compress recent chat exchanges between a user and an assistant - into a short, factual summary. The user's name is Brian, and the assistant's name is Lyra. - Focus only on the real conversation content. - Do NOT invent names, people, or examples. Avoid speculation or storytelling. - - Summarize clearly what topics were discussed and what conclusions were reached. - Avoid speculation, names, or bullet points. - - Exchanges: - {text} - - Summary: - """ - summary = query_llm(prompt) - - return f"[L{level} Summary of {len(exchanges)} exchanges]: {summary}" - -from datetime import datetime - -LOG_DIR = "/app/logs" -os.makedirs(LOG_DIR, exist_ok=True) - -def log_to_file(level: str, summary: str): - """Append each summary to a persistent .txt log file.""" - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - filename = os.path.join(LOG_DIR, "summaries.log") - with open(filename, "a", encoding="utf-8") as f: - f.write(f"[{timestamp}] {level}\n{summary}\n{'='*60}\n\n") - -# ─────────────────────────────────────────────── -# πŸ” Background summarization helper -# ─────────────────────────────────────────────── -def run_summarization_task(exchange, session_id): - """Async-friendly wrapper for slow summarization work.""" +# ───────────────────────────── +# Background summarizer +# ───────────────────────────── +def bg_summarize(session_id: str): try: hopper = SESSIONS.get(session_id) if not hopper: - print(f"⚠️ No hopper found for {session_id}") return - buffer = hopper["buffer"] - count = len(buffer) - summaries = {} - - if count < 30: - for lvl in LEVELS: - if lvl <= count: - s_text = summarize(list(buffer)[-lvl:], lvl) - log_to_file(f"L{lvl}", s_text) - push_summary_to_neomem(s_text, f"L{lvl}", session_id) - summaries[f"L{lvl}"] = s_text - else: - # optional: include your existing 30+ logic here - pass - - if summaries: - print(f"🧩 [BG] Summaries generated asynchronously at count={count}: {list(summaries.keys())}") + buf = list(hopper["buffer"]) + summary = summarize_simple(buf) + push_to_neomem(summary, session_id) + print(f"🧩 Summary generated for {session_id}") except Exception as e: - print(f"πŸ’₯ [BG] Async summarization failed: {e}") + print(f"Summarizer error: {e}") + +# ───────────────────────────── +# Routes +# ───────────────────────────── -# ─────────────────────────────────────────────── -# πŸ“¨ Routes -# ─────────────────────────────────────────────── @app.post("/add_exchange") def add_exchange(exchange: dict = Body(...), background_tasks: BackgroundTasks = None): + session_id = exchange.get("session_id") or f"sess-{uuid4().hex[:8]}" exchange["session_id"] = session_id + exchange["timestamp"] = datetime.now().isoformat() if session_id not in SESSIONS: - SESSIONS[session_id] = {"buffer": deque(maxlen=100), "last_update": datetime.now()} + SESSIONS[session_id] = { + "buffer": deque(maxlen=200), + "created_at": datetime.now() + } print(f"πŸ†• Hopper created: {session_id}") - hopper = SESSIONS[session_id] - hopper["buffer"].append(exchange) - hopper["last_update"] = datetime.now() - count = len(hopper["buffer"]) + SESSIONS[session_id]["buffer"].append(exchange) - # πŸš€ queue background summarization if background_tasks: - background_tasks.add_task(run_summarization_task, exchange, session_id) - print(f"⏩ Queued async summarization for {session_id}") + background_tasks.add_task(bg_summarize, session_id) + print(f"⏩ Summarization queued for {session_id}") - return {"ok": True, "exchange_count": count, "queued": True} + return {"ok": True, "session_id": session_id} - - # # ── Normal tiered behavior up to 30 ── commented out for aysnc addon - # if count < 30: - # if count in LEVELS: - # for lvl in LEVELS: - # if lvl <= count: - # summaries[f"L{lvl}"] = summarize(list(buffer)[-lvl:], lvl) - # log_to_file(f"L{lvl}", summaries[f"L{lvl}"]) - # push_summary_to_neomem(summaries[f"L{lvl}"], f"L{lvl}", session_id) - -# # πŸš€ Launch summarization in the background (non-blocking) -# if background_tasks: - # background_tasks.add_task(run_summarization_task, exchange, session_id) - # print(f"⏩ Queued async summarization for {session_id}") - - - # # ── Beyond 30: keep summarizing every +15 exchanges ── - # else: - # # Find next milestone after 30 (45, 60, 75, ...) - # milestone = 30 + ((count - 30) // 15) * 15 - # if count == milestone: - # summaries[f"L{milestone}"] = summarize(list(buffer)[-15:], milestone) - # log_to_file(f"L{milestone}", summaries[f"L{milestone}"]) - # push_summary_to_neomem(summaries[f"L{milestone}"], f"L{milestone}", session_id) - - # # Optional: merge all continuity summaries so far into a running meta-summary - # joined = "\n\n".join( - # [s for key, s in summaries.items() if key.startswith("L")] - # ) - # meta_prompt = f""" - # You are Lyra Intake composing an 'Ongoing Continuity Report' that merges - # all prior continuity summaries into one living narrative. - # Focus on major themes, changes, and lessons so far. - - # Continuity Summaries: - # {joined} - - # Ongoing Continuity Report: - # """ - # meta_summary = f"[L∞ Ongoing Continuity Report]: {query_llm(meta_prompt)}" - # summaries["L∞"] = meta_summary - # log_to_file("L∞", meta_summary) - # push_summary_to_neomem(meta_summary, "L∞", session_id) - - # print(f"πŸŒ€ L{milestone} continuity summary created (messages {count-14}-{count})") - - # # ── Log summaries ── - # if summaries: - # print(f"🧩 Summaries generated at count={count}: {list(summaries.keys())}") - - # return { - # "ok": True, - # "exchange_count": len(buffer), - # "queued": True - # } - -# ─────────────────────────────────────────────── -# Clear rubbish from hopper. -# ─────────────────────────────────────────────── -def close_session(session_id: str): - """Run a final summary for the given hopper, post it to NeoMem, then delete it.""" - hopper = SESSIONS.get(session_id) - if not hopper: - print(f"⚠️ No active hopper for {session_id}") - return - - buffer = hopper["buffer"] - if not buffer: - print(f"⚠️ Hopper {session_id} is empty, skipping closure") - del SESSIONS[session_id] - return - - try: - print(f"πŸ”’ Closing hopper {session_id} ({len(buffer)} exchanges)") - - # Summarize everything left in the buffer - final_summary = summarize(list(buffer), 30) # level 30 = continuity synthesis - log_to_file("LFinal", final_summary) - push_summary_to_neomem(final_summary, "LFinal", session_id) - - # Optionally: mark this as a special 'closure' memory - closure_note = f"[Session {session_id} closed with {len(buffer)} exchanges]" - push_summary_to_neomem(closure_note, "LFinalNote", session_id) - - print(f"🧹 Hopper {session_id} closed and deleted") - except Exception as e: - print(f"πŸ’₯ Error closing hopper {session_id}: {e}") - finally: - del SESSIONS[session_id] - @app.post("/close_session/{session_id}") -def close_session_endpoint(session_id: str): - close_session(session_id) +def close_session(session_id: str): + if session_id in SESSIONS: + del SESSIONS[session_id] return {"ok": True, "closed": session_id} -# ─────────────────────────────────────────────── -# 🧾 Provide recent summary for Cortex /reason calls -# ─────────────────────────────────────────────── @app.get("/summaries") -def get_summary(session_id: str = Query(..., description="Active session ID")): - """ - Return the most recent summary (L10β†’L30β†’LFinal) for a given session. - If none exist yet, return a placeholder summary. - """ - try: - # Find the most recent file entry in summaries.log - log_path = os.path.join(LOG_DIR, "summaries.log") - if not os.path.exists(log_path): - return { - "summary_text": "(none)", - "last_message_ts": datetime.now().isoformat(), - "session_id": session_id, - "exchange_count": 0, - } +def get_summary(session_id: str = Query(...)): + hopper = SESSIONS.get(session_id) + if not hopper: + return {"summary_text": "(none)", "session_id": session_id} - with open(log_path, "r", encoding="utf-8") as f: - lines = f.readlines() + summary = summarize_simple(list(hopper["buffer"])) + return {"summary_text": summary, "session_id": session_id} - # Grab the last summary section that mentions this session_id - recent_lines = [ln for ln in lines if session_id in ln or ln.startswith("[L")] - if recent_lines: - # Find the last non-empty summary text - snippet = "".join(recent_lines[-8:]).strip() - else: - snippet = "(no summaries yet)" - - return { - "summary_text": snippet[-1000:], # truncate to avoid huge block - "last_message_ts": datetime.now().isoformat(), - "session_id": session_id, - "exchange_count": len(SESSIONS.get(session_id, {}).get("buffer", [])), - } - - except Exception as e: - print(f"⚠️ /summaries failed for {session_id}: {e}") - return { - "summary_text": f"(error fetching summaries: {e})", - "last_message_ts": datetime.now().isoformat(), - "session_id": session_id, - "exchange_count": 0, - } - -# ─────────────────────────────────────────────── -# βœ… Health check -# ─────────────────────────────────────────────── @app.get("/health") def health(): return {"ok": True, "model": SUMMARY_MODEL, "url": SUMMARY_URL}