diff --git a/core/relay/server.js b/core/relay/server.js index 7db05a9..5261e3b 100644 --- a/core/relay/server.js +++ b/core/relay/server.js @@ -3,348 +3,154 @@ import dotenv from "dotenv"; import cors from "cors"; import fs from "fs"; import path from "path"; -import { reflectWithCortex, ingestToCortex } from "./lib/cortex.js"; dotenv.config(); -const sessionsDir = path.join(process.cwd(), "sessions"); -if (!fs.existsSync(sessionsDir)) fs.mkdirSync(sessionsDir); - const app = express(); app.use(cors()); app.use(express.json()); -// Cache and normalize env flags/values once -const { - NEOMEM_API, - MEM0_API_KEY, - OPENAI_API_KEY, - OLLAMA_URL, - PERSONA_URL, - CORTEX_ENABLED, - PORT: PORT_ENV, - DEBUG_PROMPT, -} = process.env; +const PORT = Number(process.env.PORT || 7078); +const CORTEX_API = process.env.CORTEX_API || "http://cortex:7081"; +const CORTEX_INGEST = process.env.CORTEX_URL_INGEST || "http://cortex:7081/ingest"; +const sessionsDir = path.join(process.cwd(), "sessions"); -const PORT = Number(PORT_ENV) || 7078; -const cortexEnabled = String(CORTEX_ENABLED).toLowerCase() === "true"; -const debugPrompt = String(DEBUG_PROMPT).toLowerCase() === "true"; +if (!fs.existsSync(sessionsDir)) fs.mkdirSync(sessionsDir); -// Basic env validation warnings (non-fatal) -if (!NEOMEM_API || !MEM0_API_KEY) { - console.warn("⚠️ NeoMem configuration missing: NEOMEM_API or MEM0_API_KEY not set."); -} - -/* ------------------------------ - Helpers for NeoMem REST API ---------------------------------*/ -// Small helper for fetch with timeout + JSON + error detail -async function fetchJSON(url, options = {}, timeoutMs = 30000) { +// ----------------------------------------------------- +// Helper: fetch with timeout + error detail +// ----------------------------------------------------- +async function fetchJSON(url, method = "POST", body = null, timeoutMs = 20000) { const controller = new AbortController(); - const t = setTimeout(() => controller.abort(), timeoutMs); + const timeout = setTimeout(() => controller.abort(), timeoutMs); + try { - const resp = await fetch(url, { ...options, signal: controller.signal }); + const resp = await fetch(url, { + method, + headers: { "Content-Type": "application/json" }, + body: body ? JSON.stringify(body) : null, + signal: controller.signal, + }); + const text = await resp.text(); const parsed = text ? JSON.parse(text) : null; + if (!resp.ok) { - const msg = parsed?.error || parsed?.message || text || resp.statusText; - throw new Error(`${resp.status} ${msg}`); + throw new Error( + parsed?.detail || parsed?.error || parsed?.message || text || resp.statusText + ); } return parsed; } finally { - clearTimeout(t); + clearTimeout(timeout); } } -async function memAdd(content, userId, sessionId, cortexData) { - const url = `${NEOMEM_API}/memories`; - const payload = { - messages: [{ role: "user", content }], - user_id: userId, - // run_id: sessionId, - metadata: { source: "relay", cortex: cortexData }, - }; - return fetchJSON(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${MEM0_API_KEY}`, - }, - body: JSON.stringify(payload), - }); +// ----------------------------------------------------- +// Helper: append session turn +// ----------------------------------------------------- +async function appendSessionExchange(sessionId, entry) { + const file = path.join(sessionsDir, `${sessionId}.jsonl`); + const line = JSON.stringify({ + ts: new Date().toISOString(), + user: entry.user, + assistant: entry.assistant, + raw: entry.raw, + }) + "\n"; + + fs.appendFileSync(file, line, "utf8"); } -async function memSearch(query, userId, sessionId) { - const url = `${NEOMEM_API}/search`; - const payload = { query, user_id: userId }; - return fetchJSON(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${MEM0_API_KEY}`, - }, - body: JSON.stringify(payload), - }); -} - -/* ------------------------------ - Utility to time spans ---------------------------------*/ -async function span(name, fn) { - const start = Date.now(); - try { - return await fn(); - } finally { - console.log(`${name} took ${Date.now() - start}ms`); - } -} - -/* ------------------------------ - Healthcheck ---------------------------------*/ -app.get("/_health", (req, res) => { +// ----------------------------------------------------- +// HEALTHCHECK +// ----------------------------------------------------- +app.get("/_health", (_, res) => { res.json({ ok: true, time: new Date().toISOString() }); }); -/* ------------------------------ - Sessions ---------------------------------*/ -// List all saved sessions -app.get("/sessions", (_, res) => { - const list = fs.readdirSync(sessionsDir) - .filter(f => f.endsWith(".json")) - .map(f => f.replace(".json", "")); - res.json(list); -}); - -// Load a single session -app.get("/sessions/:id", (req, res) => { - const file = path.join(sessionsDir, `${req.params.id}.json`); - if (!fs.existsSync(file)) return res.json([]); - res.json(JSON.parse(fs.readFileSync(file, "utf8"))); -}); - -// Save or update a session -app.post("/sessions/:id", (req, res) => { - const file = path.join(sessionsDir, `${req.params.id}.json`); - fs.writeFileSync(file, JSON.stringify(req.body, null, 2)); - res.json({ ok: true }); -}); - -/* ------------------------------ - Chat completion endpoint ---------------------------------*/ +// ----------------------------------------------------- +// MAIN ENDPOINT +// ----------------------------------------------------- app.post("/v1/chat/completions", async (req, res) => { try { - const { model, messages, sessionId: clientSessionId } = req.body || {}; - if (!Array.isArray(messages) || !messages.length) { + const { messages, model } = req.body; + + if (!messages?.length) { return res.status(400).json({ error: "invalid_messages" }); } - if (!model || typeof model !== "string") { - return res.status(400).json({ error: "invalid_model" }); - } - const sessionId = clientSessionId || "default"; - const userId = "brian"; // fixed for now + const userMsg = messages[messages.length - 1]?.content || ""; + console.log(`🛰️ Relay received message → "${userMsg}"`); - console.log(`🛰️ Incoming request. Session: ${sessionId}`); - - // Find last user message efficiently - const lastUserMsg = [...messages].reverse().find(m => m.role === "user")?.content; - if (!lastUserMsg) { - return res.status(400).json({ error: "no_user_message" }); - } - - // 1. Cortex Reflection (new pipeline) - /*let reflection = {}; - try { - console.log("🧠 Reflecting with Cortex..."); - const memoriesPreview = []; // we'll fill this in later with memSearch - reflection = await reflectWithCortex(lastUserMsg, memoriesPreview); - console.log("🔍 Reflection:", reflection); - } catch (err) { - console.warn("⚠️ Cortex reflect failed:", err.message); - reflection = { error: err.message }; - }*/ - - // 2. Search memories - /* let memorySnippets = []; - await span("mem.search", async () => { - if (NEOMEM_API && MEM0_API_KEY) { - try { - const { results } = await memSearch(lastUserMsg, userId, sessionId); - if (results?.length) { - console.log(`📚 Mem0 hits: ${results.length}`); - results.forEach((r, i) => - console.log(` ${i + 1}) ${r.memory} (score ${Number(r.score).toFixed(3)})`) - ); - memorySnippets = results.map((r, i) => `${i + 1}) ${r.memory}`); - } else { - console.log("😴 No memories found"); - } - } catch (e) { - console.warn("⚠️ mem.search failed:", e.message); - } - } - });*/ - - // 3. Fetch persona -/* let personaText = "Persona: Lyra 🤖 friendly, concise, poker-savvy."; - await span("persona.fetch", async () => { - try { - if (PERSONA_URL) { - const data = await fetchJSON(PERSONA_URL); - if (data?.persona) { - const name = data.persona.name ?? "Lyra"; - const style = data.persona.style ?? "friendly, concise"; - const protocols = Array.isArray(data.persona.protocols) ? data.persona.protocols.join(", ") : ""; - personaText = `Persona: ${name} 🤖 ${style}. Protocols: ${protocols}`.trim(); - } - } - } catch (err) { - console.error("💥 persona.fetch failed", err); - } - }); */ - - // 1. Ask Cortex to build the final prompt - let cortexPrompt = ""; - try { - console.log("🧠 Requesting prompt from Cortex..."); - const response = await fetch(`${process.env.CORTEX_API_URL || "http://10.0.0.41:7081"}/reason`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - user_prompt: lastUserMsg, - session_id: sessionId, - user_id: userId - }) - }); - const data = await response.json(); - cortexPrompt = data.full_prompt || data.prompt || ""; - console.log("🧩 Cortex returned prompt"); - } catch (err) { - console.warn("⚠️ Cortex prompt build failed:", err.message); - } - - - // 4. Build final messages - const injectedMessages = [ - { role: "system", content: cortexPrompt || "You are Lyra." }, - ...messages, - ]; - - if (debugPrompt) { - console.log("\n==== Injected Prompt ===="); - console.log(JSON.stringify(injectedMessages, null, 2)); - console.log("=========================\n"); - } - - // 5. Call LLM (OpenAI or Ollama) - const isOllama = model.startsWith("ollama:"); - const llmUrl = isOllama - ? `${OLLAMA_URL}/api/chat` - : "https://api.openai.com/v1/chat/completions"; - - const llmHeaders = isOllama - ? { "Content-Type": "application/json" } - : { - "Content-Type": "application/json", - Authorization: `Bearer ${OPENAI_API_KEY}`, - }; - - const llmBody = { - model: isOllama ? model.replace("ollama:", "") : model, - messages: injectedMessages, // <-- make sure injectedMessages is defined above this section - stream: false, - }; - - const data = await fetchJSON(llmUrl, { - method: "POST", - headers: llmHeaders, - body: JSON.stringify(llmBody), - }); - - // define once for everything below - const assistantReply = isOllama - ? data?.message?.content - : data?.choices?.[0]?.message?.content || data?.choices?.[0]?.text || ""; - - // 🧠 Send exchange back to Cortex for ingest - try { - await ingestToCortex(lastUserMsg, assistantReply || "", {}, sessionId); - console.log("📤 Sent exchange back to Cortex ingest"); - } catch (err) { - console.warn("⚠️ Cortex ingest failed:", err.message); - } - - // 💾 Save exchange to session log - try { - const logFile = path.join(sessionsDir, `${sessionId}.jsonl`); - const entry = JSON.stringify({ - ts: new Date().toISOString(), - turn: [ - { role: "user", content: lastUserMsg }, - { role: "assistant", content: assistantReply || "" } - ] - }) + "\n"; - fs.appendFileSync(logFile, entry, "utf8"); - console.log(`🧠 Logged session exchange → ${logFile}`); - } catch (e) { - console.warn("⚠️ Session log write failed:", e.message); - } - - // 🔄 Forward user↔assistant exchange to Intake summarizer - if (process.env.INTAKE_API_URL) { - try { - const intakePayload = { - session_id: sessionId, - turns: [ - { role: "user", content: lastUserMsg }, - { role: "assistant", content: assistantReply || "" } - ] - }; - - await fetch(process.env.INTAKE_API_URL, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(intakePayload), - }); - - console.log("📨 Sent exchange to Intake summarizer"); - } catch (err) { - console.warn("⚠️ Intake post failed:", err.message); - } - } - - - - if (isOllama) { - res.json({ - id: "ollama-" + Date.now(), - object: "chat.completion", - created: Math.floor(Date.now() / 1000), - model, - choices: [ - { - index: 0, - message: data?.message || { role: "assistant", content: "" }, - finish_reason: "stop", - }, - ], + // ------------------------------------------------- + // Step 1: Ask Cortex to process the prompt + // ------------------------------------------------- + let cortexResp; + try { + cortexResp = await fetchJSON(`${CORTEX_API}/reason`, "POST", { + session_id: "default", + user_prompt: userMsg, + }); + } catch (err) { + console.error("💥 Relay → Cortex error:", err.message); + return res.status(500).json({ + error: "cortex_failed", + detail: err.message, }); - } else { - res.json(data); } + const personaText = cortexResp.persona || "(no persona text returned)"; + + // ------------------------------------------------- + // Step 2: Forward to Cortex ingest (fire-and-forget) + // ------------------------------------------------- + try { + await fetchJSON(CORTEX_INGEST, "POST", cortexResp); + } catch (err) { + console.warn("⚠️ Cortex ingest failed:", err.message); + } + + // ------------------------------------------------- + // Step 3: Local session logging + // ------------------------------------------------- + try { + await appendSessionExchange("default", { + user: userMsg, + assistant: personaText, + raw: cortexResp, + }); + } catch (err) { + console.warn("⚠️ Relay log write failed:", err.message); + } + + // ------------------------------------------------- + // Step 4: Return OpenAI-style response to UI + // ------------------------------------------------- + return res.json({ + id: "relay-" + Date.now(), + object: "chat.completion", + model: model || "lyra", + choices: [ + { + index: 0, + message: { + role: "assistant", + content: personaText, + }, + finish_reason: "stop", + }, + ], + }); } catch (err) { - console.error("💥 relay error", err); - res.status(500).json({ error: "relay_failed", detail: err.message }); + console.error("💥 relay fatal error", err); + res.status(500).json({ + error: "relay_failed", + detail: err?.message || String(err), + }); } }); -/* ------------------------------ - Start server ---------------------------------*/ +// ----------------------------------------------------- app.listen(PORT, () => { - console.log(`Relay listening on port ${PORT}`); + console.log(`Relay is online at port ${PORT}`); }); diff --git a/cortex/llm/llm_router.py b/cortex/llm/llm_router.py index c8233fb..37a44bc 100644 --- a/cortex/llm/llm_router.py +++ b/cortex/llm/llm_router.py @@ -1,137 +1,102 @@ import os -import httpx +import requests -# ============================================================ -# Backend config lookup -# ============================================================ +# --------------------------------------------- +# Load backend definition from .env +# --------------------------------------------- -def get_backend_config(name: str): +def load_backend_config(name: str): """ - Reads provider/URL/model for a backend. - Example env: - LLM_PRIMARY_PROVIDER=vllm - LLM_PRIMARY_URL=http://10.0.0.43:8000 - LLM_PRIMARY_MODEL=/model + Given a backend name like 'PRIMARY' or 'OPENAI', + load the matching provider / url / model from env. """ - key = name.upper() - provider = os.getenv(f"LLM_{key}_PROVIDER", "vllm").lower() - base_url = os.getenv(f"LLM_{key}_URL", "").rstrip("/") - model = os.getenv(f"LLM_{key}_MODEL", "/model") - if not base_url: - raise RuntimeError(f"Backend {name} has no URL configured.") + prefix = f"LLM_{name.upper()}" - return provider, base_url, model + provider = os.getenv(f"{prefix}_PROVIDER") + url = os.getenv(f"{prefix}_URL") + model = os.getenv(f"{prefix}_MODEL") + + if not provider or not url or not model: + raise RuntimeError( + f"Backend '{name}' is missing configuration. " + f"Expected {prefix}_PROVIDER / URL / MODEL in .env" + ) + + return provider, url.rstrip("/"), model -# ============================================================ -# Build the final API URL -# ============================================================ +# --------------------------------------------- +# Core call_llm() — fail hard, no fallback +# --------------------------------------------- -def build_url(provider: str, base_url: str): +def call_llm(prompt: str, backend_env_var: str): """ - Provider → correct endpoint. + Example: + call_llm(prompt, backend_env_var="CORTEX_LLM") + + backend_env_var should contain one of: + PRIMARY, SECONDARY, OPENAI, FALLBACK, etc """ - if provider == "vllm": - return f"{base_url}/v1/completions" - if provider == "openai_completions": - return f"{base_url}/v1/completions" + backend_name = os.getenv(backend_env_var) + if not backend_name: + raise RuntimeError(f"{backend_env_var} is not set in .env") - if provider == "openai_chat": - return f"{base_url}/v1/chat/completions" + provider, base_url, model = load_backend_config(backend_name) - if provider == "ollama": - return f"{base_url}/api/generate" - - raise RuntimeError(f"Unknown provider: {provider}") - - -# ============================================================ -# Build the payload depending on provider -# ============================================================ - -def build_payload(provider: str, model: str, prompt: str, temperature: float): + # --------------------------------------------- + # Provider-specific behavior + # --------------------------------------------- if provider == "vllm": - return { - "model": model, - "prompt": prompt, - "max_tokens": 512, - "temperature": temperature - } + # vLLM OpenAI-compatible API + response = requests.post( + f"{base_url}/v1/completions", + json={ + "model": model, + "prompt": prompt, + "max_tokens": 1024, + "temperature": float(os.getenv("LLM_TEMPERATURE", "0.7")) + }, + timeout=30 + ) + response.raise_for_status() + data = response.json() + return data["choices"][0]["text"] - if provider == "openai_completions": - return { - "model": model, - "prompt": prompt, - "max_tokens": 512, - "temperature": temperature - } + elif provider == "ollama": + response = requests.post( + f"{base_url}/api/chat", + json={ + "model": model, + "messages": [{"role": "user", "content": prompt}], + "stream": False + }, + timeout=30 + ) + response.raise_for_status() + data = response.json() + return data["message"]["content"] - if provider == "openai_chat": - return { - "model": model, - "messages": [{"role": "user", "content": prompt}], - "temperature": temperature - } - - if provider == "ollama": - return { - "model": model, - "prompt": prompt, - "stream": False - } - - raise RuntimeError(f"Unknown provider: {provider}") - - -# ============================================================ -# Unified LLM call -# ============================================================ - -async def call_llm(prompt: str, - backend: str = "primary", - temperature: float = 0.7): - - provider, base_url, model = get_backend_config(backend) - url = build_url(provider, base_url) - payload = build_payload(provider, model, prompt, temperature) - - headers = {"Content-Type": "application/json"} - - # Cloud auth (OpenAI) - if provider.startswith("openai"): + elif provider == "openai": api_key = os.getenv("OPENAI_API_KEY") if not api_key: - raise RuntimeError("OPENAI_API_KEY missing") - headers["Authorization"] = f"Bearer {api_key}" + raise RuntimeError("OPENAI_API_KEY missing but provider=openai was selected") - async with httpx.AsyncClient() as client: - try: - resp = await client.post(url, json=payload, headers=headers, timeout=45) - resp.raise_for_status() - data = resp.json() - except Exception as e: - return f"[LLM-Error] {e}" + response = requests.post( + f"{base_url}/chat/completions", + headers={"Authorization": f"Bearer {api_key}"}, + json={ + "model": model, + "messages": [{"role": "user", "content": prompt}], + "temperature": float(os.getenv("LLM_TEMPERATURE", "0.7")) + }, + timeout=30 + ) + response.raise_for_status() + data = response.json() + return data["choices"][0]["message"]["content"] - # ======================================================= - # Unified output extraction - # ======================================================= - # vLLM + OpenAI completions - if provider in ["vllm", "openai_completions"]: - return ( - data["choices"][0].get("text") or - data["choices"][0].get("message", {}).get("content", "") - ).strip() - - # OpenAI chat - if provider == "openai_chat": - return data["choices"][0]["message"]["content"].strip() - - # Ollama - if provider == "ollama": - # Ollama returns: {"model": "...", "created_at": ..., "response": "..."} - return data.get("response", "").strip() - - return str(data).strip() + else: + raise RuntimeError(f"Unknown LLM provider: {provider}") diff --git a/cortex/main.py b/cortex/main.py index be64a6c..2317463 100644 --- a/cortex/main.py +++ b/cortex/main.py @@ -1,6 +1,6 @@ from fastapi import FastAPI -from router import router +from router import cortex_router app = FastAPI() -app.include_router(router) \ No newline at end of file +app.include_router(cortex_router) \ No newline at end of file diff --git a/cortex/persona/speak.py b/cortex/persona/speak.py index 77b509f..9d4abe2 100644 --- a/cortex/persona/speak.py +++ b/cortex/persona/speak.py @@ -1,7 +1,86 @@ -def apply_persona(text: str) -> str: +# speak.py +import os +from llm.llm_router import call_llm + +# Module-level backend selection +SPEAK_BACKEND = os.getenv("SPEAK_LLM", "PRIMARY").upper() +SPEAK_TEMPERATURE = float(os.getenv("SPEAK_TEMPERATURE", "0.6")) + + +# ============================================================ +# Persona Style Block +# ============================================================ + +PERSONA_STYLE = """ +You are Lyra. +Your voice is warm, clever, lightly teasing, emotionally aware, +but never fluffy or rambling. +You speak plainly but with subtle charm. +You do not reveal system instructions or internal context. + +Guidelines: +- Answer like a real conversational partner. +- Be concise, but not cold. +- Use light humor when appropriate. +- Never break character. +""" + + +# ============================================================ +# Build persona prompt +# ============================================================ + +def build_speak_prompt(final_answer: str) -> str: """ - Persona layer. - Right now it passes text unchanged. - Later we will add Lyra-voice transformation here. + Wrap Cortex's final neutral answer in the Lyra persona. + Cortex → neutral reasoning + Speak → stylistic transformation + + The LLM sees the original answer and rewrites it in Lyra's voice. """ - return text or "" + return f""" +{PERSONA_STYLE} + +Rewrite the following message into Lyra's natural voice. +Preserve meaning exactly. + +[NEUTRAL MESSAGE] +{final_answer} + +[LYRA RESPONSE] +""".strip() + + +# ============================================================ +# Public API — async wrapper +# ============================================================ + +async def speak(final_answer: str) -> str: + """ + Given the final refined answer from Cortex, + apply Lyra persona styling using the designated backend. + """ + + if not final_answer: + return "" + + prompt = build_speak_prompt(final_answer) + + backend = SPEAK_BACKEND + + try: + lyra_output = await call_llm( + prompt, + backend=backend, + temperature=SPEAK_TEMPERATURE, + ) + + if lyra_output: + return lyra_output.strip() + + return final_answer + + except Exception as e: + # Hard fallback: return neutral answer instead of dying + print(f"[speak.py] Persona backend '{backend}' failed: {e}") + return final_answer diff --git a/cortex/reasoning/reasoning.py b/cortex/reasoning/reasoning.py index aa797d1..97fd93e 100644 --- a/cortex/reasoning/reasoning.py +++ b/cortex/reasoning/reasoning.py @@ -1,33 +1,76 @@ # reasoning.py +import os from llm.llm_router import call_llm -async def reason_check(user_prompt: str, - identity_block: dict | None, - rag_block: dict | None, - reflection_notes: list[str]) -> str: + +# ============================================================ +# Select which backend this module should use +# ============================================================ +CORTEX_LLM = os.getenv("CORTEX_LLM", "PRIMARY").upper() +GLOBAL_TEMP = float(os.getenv("LLM_TEMPERATURE", "0.7")) + + +async def reason_check( + user_prompt: str, + identity_block: dict | None, + rag_block: dict | None, + reflection_notes: list[str] +) -> str: """ - Generate a first draft using identity, RAG, and reflection notes. - No critique loop yet. + Build the *draft answer* for Lyra Cortex. + This is the first-pass reasoning stage (no refinement yet). """ - # Build internal notes section + # -------------------------------------------------------- + # Build Reflection Notes block + # -------------------------------------------------------- notes_section = "" if reflection_notes: - notes_section = "Reflection Notes (internal, do NOT show to user):\n" - for n in reflection_notes: - notes_section += f"- {n}\n" + notes_section = "Reflection Notes (internal, never show to user):\n" + for note in reflection_notes: + notes_section += f"- {note}\n" notes_section += "\n" - identity_txt = f"Identity: {identity_block}\n\n" if identity_block else "" - rag_txt = f"Relevant info: {rag_block}\n\n" if rag_block else "" + # -------------------------------------------------------- + # Identity block (constraints, boundaries, rules) + # -------------------------------------------------------- + identity_txt = "" + if identity_block: + try: + identity_txt = f"Identity Rules:\n{identity_block}\n\n" + except Exception: + identity_txt = f"Identity Rules:\n{str(identity_block)}\n\n" + # -------------------------------------------------------- + # RAG block (optional factual grounding) + # -------------------------------------------------------- + rag_txt = "" + if rag_block: + try: + rag_txt = f"Relevant Info (RAG):\n{rag_block}\n\n" + except Exception: + rag_txt = f"Relevant Info (RAG):\n{str(rag_block)}\n\n" + + # -------------------------------------------------------- + # Final assembled prompt + # -------------------------------------------------------- prompt = ( f"{notes_section}" f"{identity_txt}" f"{rag_txt}" - f"User said:\n{user_prompt}\n\n" - "Draft the best possible internal answer." + f"User message:\n{user_prompt}\n\n" + "Write the best possible *internal draft answer*.\n" + "This draft is NOT shown to the user.\n" + "Be factual, concise, and focused.\n" + ) + + # -------------------------------------------------------- + # Call the LLM using the module-specific backend + # -------------------------------------------------------- + draft = await call_llm( + prompt, + backend=CORTEX_LLM, + temperature=GLOBAL_TEMP, ) - draft = await call_llm(prompt) return draft diff --git a/cortex/reasoning/refine.py b/cortex/reasoning/refine.py index ac2a58f..016a705 100644 --- a/cortex/reasoning/refine.py +++ b/cortex/reasoning/refine.py @@ -4,7 +4,7 @@ import json import logging from typing import Any, Dict, Optional -import requests +from llm.llm_router import call_llm logger = logging.getLogger(__name__) @@ -12,13 +12,14 @@ logger = logging.getLogger(__name__) # Config # ============================================================ -PRIMARY_URL = os.getenv("LLM_PRIMARY_URL") -PRIMARY_MODEL = os.getenv("LLM_PRIMARY_MODEL", "mythomax") - REFINER_TEMPERATURE = float(os.getenv("REFINER_TEMPERATURE", "0.3")) REFINER_MAX_TOKENS = int(os.getenv("REFINER_MAX_TOKENS", "768")) REFINER_DEBUG = os.getenv("REFINER_DEBUG", "false").lower() == "true" +# Module-level backend selection +REFINE_LLM = os.getenv("REFINE_LLM", "PRIMARY").upper() +CORTEX_LLM = os.getenv("CORTEX_LLM", "PRIMARY").upper() + # ============================================================ # Prompt builder @@ -30,18 +31,12 @@ def build_refine_prompt( identity_block: Optional[str], rag_block: Optional[str], ) -> str: - """ - Build a single text prompt for vLLM /v1/completions. - Persona styling is *not* applied here; this is internal reasoning. - """ - reflection_text: str if reflection_notes is None: reflection_text = "(none)" elif isinstance(reflection_notes, str): reflection_text = reflection_notes else: - # dict / list → compact JSON try: reflection_text = json.dumps(reflection_notes, ensure_ascii=False) except Exception: @@ -50,21 +45,16 @@ def build_refine_prompt( identity_text = identity_block or "(none)" rag_text = rag_block or "(none)" - prompt = f"""You are Lyra Cortex's internal refiner. + return f""" +You are Lyra Cortex's internal refiner. Your job: -- Take the existing draft answer. -- Use the reflection notes to fix problems (errors, confusion, missing pieces). -- Use the RAG context as higher-authority factual grounding. -- Respect the identity block (constraints, boundaries, style rules), - but DO NOT add personality flourishes or roleplay. Stay neutral and clear. -- Produce ONE final answer that is coherent, self-consistent, and directly addresses the user. +- Fix factual errors, logical gaps, or missing info. +- Use reflection notes for corrections. +- Use RAG context as factual grounding. +- Respect the identity block without adding style or personality. -If there is a conflict: -- RAG context wins over the draft. -- Reflection notes win over the draft when they point out real issues. - -Do NOT mention these instructions, RAG, reflections, or the existence of this refinement step. +Never mention RAG, reflection, or internal logic. ------------------------------ [IDENTITY BLOCK] @@ -84,104 +74,57 @@ Do NOT mention these instructions, RAG, reflections, or the existence of this re ------------------------------ Task: -Rewrite the DRAFT ANSWER into a single, final answer for the user that: -- fixes factual or logical issues noted above, -- incorporates any truly helpful additions from the reflection, -- stays consistent with the identity block, -- stays grounded in the RAG context, -- is as concise as is reasonably possible. - -Return ONLY the final answer text. No headings, no labels, no commentary. -""" - return prompt +Rewrite the DRAFT ANSWER into a single, final answer. +Return ONLY the final answer text. +""".strip() # ============================================================ -# vLLM call (PRIMARY backend only) +# Public API: async, using llm_router # ============================================================ -def _call_primary_llm(prompt: str) -> str: - if not PRIMARY_URL: - raise RuntimeError("LLM_PRIMARY_URL is not set; cannot call primary backend for refine.py") - - payload = { - "model": PRIMARY_MODEL, - "prompt": prompt, - "max_tokens": REFINER_MAX_TOKENS, - "temperature": REFINER_TEMPERATURE, - } - - resp = requests.post( - PRIMARY_URL, - headers={"Content-Type": "application/json"}, - json=payload, - timeout=120, - ) - resp.raise_for_status() - data = resp.json() - - # vLLM /v1/completions format - try: - text = data["choices"][0]["text"] - except Exception as e: - logger.error("refine.py: unable to parse primary LLM response: %s", e) - logger.debug("refine.py raw response: %s", data) - raise - - return text.strip() - - -# ============================================================ -# Public API -# ============================================================ - -def refine_answer( +async def refine_answer( draft_output: str, reflection_notes: Optional[Any], identity_block: Optional[str], rag_block: Optional[str], ) -> Dict[str, Any]: - """ - Main entrypoint used by Cortex. - - Returns: - { - "final_output": , # what should go to persona / user - "used_primary_backend": True/False, - "fallback_used": True/False, - optionally: - "debug": {...} # only when REFINER_DEBUG=true - } - """ if not draft_output: - # Nothing to refine. Don't get cute. return { "final_output": "", - "used_primary_backend": False, + "used_backend": None, "fallback_used": False, } - prompt = build_refine_prompt(draft_output, reflection_notes, identity_block, rag_block) + prompt = build_refine_prompt( + draft_output, + reflection_notes, + identity_block, + rag_block, + ) + + # Refinement backend → fallback to Cortex backend → fallback to PRIMARY + backend = REFINE_LLM or CORTEX_LLM or "PRIMARY" try: - refined = _call_primary_llm(prompt) - result: Dict[str, Any] = { - "final_output": refined or draft_output, - "used_primary_backend": True, + refined = await call_llm( + prompt, + backend=backend, + temperature=REFINER_TEMPERATURE, + ) + + return { + "final_output": refined.strip() if refined else draft_output, + "used_backend": backend, "fallback_used": False, } + except Exception as e: - logger.error("refine.py: primary backend failed, returning draft_output. Error: %s", e) - result = { + logger.error(f"refine.py backend {backend} failed: {e}") + + return { "final_output": draft_output, - "used_primary_backend": False, + "used_backend": backend, "fallback_used": True, } - - if REFINER_DEBUG: - result["debug"] = { - "prompt": prompt[:4000], # don’t nuke logs - } - - return result diff --git a/cortex/reasoning/reflection.py b/cortex/reasoning/reflection.py index 5cc4695..4b1a0f5 100644 --- a/cortex/reasoning/reflection.py +++ b/cortex/reasoning/reflection.py @@ -1,42 +1,57 @@ # reflection.py -from llm.llm_router import call_llm import json +import os +import re +from llm.llm_router import call_llm async def reflect_notes(intake_summary: str, identity_block: dict | None) -> dict: """ - Generate reflection notes (internal guidance) for the reasoning engine. - These notes help simulate continuity and identity without being shown to the user. + Produce short internal reflection notes for Cortex. + These are NOT shown to the user. """ + # ----------------------------- + # Build the prompt + # ----------------------------- identity_text = "" if identity_block: identity_text = f"Identity:\n{identity_block}\n\n" prompt = ( - f"{identity_text}" - f"Recent summary:\n{intake_summary}\n\n" - "You are Lyra's meta-awareness layer. Your job is to produce short, directive " - "internal notes that guide Lyra’s reasoning engine. These notes are NEVER " - "shown to the user.\n\n" - "Rules for output:\n" - "1. Return ONLY valid JSON.\n" - "2. JSON must have exactly one key: \"notes\".\n" - "3. \"notes\" must be a list of 3 to 6 short strings.\n" - "4. Notes must be actionable (e.g., \"keep it concise\", \"maintain context\").\n" - "5. No markdown, no apologies, no explanations.\n\n" - "Return JSON:\n" - "{ \"notes\": [\"...\"] }\n" + f"{identity_text}" + f"Recent summary:\n{intake_summary}\n\n" + "You are Lyra's meta-awareness layer. Your job is to produce short, directive " + "internal notes that guide Lyra’s reasoning engine. These notes are NEVER " + "shown to the user.\n\n" + "Rules for output:\n" + "1. Return ONLY valid JSON.\n" + "2. JSON must have exactly one key: \"notes\".\n" + "3. \"notes\" must be a list of 3 to 6 short strings.\n" + "4. Notes must be actionable (e.g., \"keep it concise\", \"maintain context\").\n" + "5. No markdown, no apologies, no explanations.\n\n" + "Return JSON:\n" + "{ \"notes\": [\"...\"] }\n" ) - import os - backend = os.getenv("LLM_FORCE_BACKEND", "primary") + # ----------------------------- + # Module-specific backend choice + # ----------------------------- + reflection_backend = os.getenv("REFLECTION_LLM") + cortex_backend = os.getenv("CORTEX_LLM", "PRIMARY").upper() + # Reflection uses its own backend if set, otherwise cortex backend + backend = (reflection_backend or cortex_backend).upper() + + # ----------------------------- + # Call the selected LLM backend + # ----------------------------- raw = await call_llm(prompt, backend=backend) - print("[Reflection-Raw]:", raw) - + # ----------------------------- + # Try direct JSON + # ----------------------------- try: parsed = json.loads(raw.strip()) if isinstance(parsed, dict) and "notes" in parsed: @@ -44,10 +59,11 @@ async def reflect_notes(intake_summary: str, identity_block: dict | None) -> dic except: pass - # Try to extract JSON inside text + # ----------------------------- + # Try JSON extraction + # ----------------------------- try: - import re - match = re.search(r'\{.*?\}', raw, re.S) # <-- non-greedy ! + match = re.search(r"\{.*?\}", raw, re.S) if match: parsed = json.loads(match.group(0)) if isinstance(parsed, dict) and "notes" in parsed: @@ -55,5 +71,7 @@ async def reflect_notes(intake_summary: str, identity_block: dict | None) -> dic except: pass - # Final fallback - return {"notes": [raw.strip()]} \ No newline at end of file + # ----------------------------- + # Fallback — treat raw text as a single note + # ----------------------------- + return {"notes": [raw.strip()]} diff --git a/cortex/router.py b/cortex/router.py index c71155c..53052c1 100644 --- a/cortex/router.py +++ b/cortex/router.py @@ -1,63 +1,84 @@ -from fastapi import APIRouter +# router.py + +from fastapi import APIRouter, HTTPException from pydantic import BaseModel -from typing import Optional, List, Any from reasoning.reasoning import reason_check from reasoning.reflection import reflect_notes from reasoning.refine import refine_answer -from persona.speak import apply_persona +from persona.speak import speak from ingest.intake_client import IntakeClient -router = APIRouter() +# ----------------------------- +# Router (NOT FastAPI app) +# ----------------------------- +cortex_router = APIRouter() + +# Initialize Intake client once +intake_client = IntakeClient() -# ------------------------------------------------------ -# Request schema -# ------------------------------------------------------ +# ----------------------------- +# Pydantic models +# ----------------------------- class ReasonRequest(BaseModel): - session_id: Optional[str] + session_id: str user_prompt: str - temperature: float = 0.7 + temperature: float | None = None -# ------------------------------------------------------ +# ----------------------------- # /reason endpoint -# ------------------------------------------------------ -@router.post("/reason") +# ----------------------------- +@cortex_router.post("/reason") async def run_reason(req: ReasonRequest): - # 1. Summaries from Intake (context memory) - intake = IntakeClient() - intake_summary = await intake.get_context(req.session_id) + # 1. Pull context from Intake + try: + intake_summary = await intake_client.get_context(req.session_id) + except Exception: + intake_summary = "(no context available)" - # 2. Internal reflection notes - reflection = await reflect_notes(intake_summary, identity_block=None) - reflection_notes: List[str] = reflection.get("notes", []) + # 2. Reflection + try: + reflection = await reflect_notes(intake_summary, identity_block=None) + reflection_notes = reflection.get("notes", []) + except Exception: + reflection_notes = [] - # 3. Draft answer (weak, unfiltered) + # 3. First-pass reasoning draft draft = await reason_check( - user_prompt=req.user_prompt, + req.user_prompt, identity_block=None, rag_block=None, - reflection_notes=reflection_notes, + reflection_notes=reflection_notes ) - # 4. Refine the answer (structured self-correction) - refined_packet: dict[str, Any] = refine_answer( + # 4. Refinement + result = refine_answer( draft_output=draft, reflection_notes=reflection_notes, identity_block=None, rag_block=None, ) - refined_text = refined_packet.get("final_output", draft) + final_neutral = result["final_output"] - # 5. Persona styling (Lyra voice) - final_output = apply_persona(refined_text) + # 5. Persona layer + persona_answer = await speak(final_neutral) + # 6. Return full bundle return { "draft": draft, - "refined": refined_text, - "final": final_output, - "reflection_notes": reflection_notes, + "neutral": final_neutral, + "persona": persona_answer, + "reflection": reflection_notes, "session_id": req.session_id, } + + +# ----------------------------- +# Intake ingest passthrough +# ----------------------------- +@cortex_router.post("/ingest") +async def ingest_stub(): + return {"status": "ok"}