Cortex rework in progress

This commit is contained in:
serversdwn
2025-11-26 18:01:48 -05:00
parent a087de9790
commit 734999e8bb
8 changed files with 468 additions and 593 deletions

View File

@@ -3,348 +3,154 @@ import dotenv from "dotenv";
import cors from "cors";
import fs from "fs";
import path from "path";
import { reflectWithCortex, ingestToCortex } from "./lib/cortex.js";
dotenv.config();
const sessionsDir = path.join(process.cwd(), "sessions");
if (!fs.existsSync(sessionsDir)) fs.mkdirSync(sessionsDir);
const app = express();
app.use(cors());
app.use(express.json());
// Cache and normalize env flags/values once
const {
NEOMEM_API,
MEM0_API_KEY,
OPENAI_API_KEY,
OLLAMA_URL,
PERSONA_URL,
CORTEX_ENABLED,
PORT: PORT_ENV,
DEBUG_PROMPT,
} = process.env;
const PORT = Number(process.env.PORT || 7078);
const CORTEX_API = process.env.CORTEX_API || "http://cortex:7081";
const CORTEX_INGEST = process.env.CORTEX_URL_INGEST || "http://cortex:7081/ingest";
const sessionsDir = path.join(process.cwd(), "sessions");
const PORT = Number(PORT_ENV) || 7078;
const cortexEnabled = String(CORTEX_ENABLED).toLowerCase() === "true";
const debugPrompt = String(DEBUG_PROMPT).toLowerCase() === "true";
if (!fs.existsSync(sessionsDir)) fs.mkdirSync(sessionsDir);
// Basic env validation warnings (non-fatal)
if (!NEOMEM_API || !MEM0_API_KEY) {
console.warn("⚠️ NeoMem configuration missing: NEOMEM_API or MEM0_API_KEY not set.");
}
/* ------------------------------
Helpers for NeoMem REST API
--------------------------------*/
// Small helper for fetch with timeout + JSON + error detail
async function fetchJSON(url, options = {}, timeoutMs = 30000) {
// -----------------------------------------------------
// Helper: fetch with timeout + error detail
// -----------------------------------------------------
async function fetchJSON(url, method = "POST", body = null, timeoutMs = 20000) {
const controller = new AbortController();
const t = setTimeout(() => controller.abort(), timeoutMs);
const timeout = setTimeout(() => controller.abort(), timeoutMs);
try {
const resp = await fetch(url, { ...options, signal: controller.signal });
const resp = await fetch(url, {
method,
headers: { "Content-Type": "application/json" },
body: body ? JSON.stringify(body) : null,
signal: controller.signal,
});
const text = await resp.text();
const parsed = text ? JSON.parse(text) : null;
if (!resp.ok) {
const msg = parsed?.error || parsed?.message || text || resp.statusText;
throw new Error(`${resp.status} ${msg}`);
throw new Error(
parsed?.detail || parsed?.error || parsed?.message || text || resp.statusText
);
}
return parsed;
} finally {
clearTimeout(t);
clearTimeout(timeout);
}
}
async function memAdd(content, userId, sessionId, cortexData) {
const url = `${NEOMEM_API}/memories`;
const payload = {
messages: [{ role: "user", content }],
user_id: userId,
// run_id: sessionId,
metadata: { source: "relay", cortex: cortexData },
};
return fetchJSON(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${MEM0_API_KEY}`,
},
body: JSON.stringify(payload),
});
// -----------------------------------------------------
// Helper: append session turn
// -----------------------------------------------------
async function appendSessionExchange(sessionId, entry) {
const file = path.join(sessionsDir, `${sessionId}.jsonl`);
const line = JSON.stringify({
ts: new Date().toISOString(),
user: entry.user,
assistant: entry.assistant,
raw: entry.raw,
}) + "\n";
fs.appendFileSync(file, line, "utf8");
}
async function memSearch(query, userId, sessionId) {
const url = `${NEOMEM_API}/search`;
const payload = { query, user_id: userId };
return fetchJSON(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${MEM0_API_KEY}`,
},
body: JSON.stringify(payload),
});
}
/* ------------------------------
Utility to time spans
--------------------------------*/
async function span(name, fn) {
const start = Date.now();
try {
return await fn();
} finally {
console.log(`${name} took ${Date.now() - start}ms`);
}
}
/* ------------------------------
Healthcheck
--------------------------------*/
app.get("/_health", (req, res) => {
// -----------------------------------------------------
// HEALTHCHECK
// -----------------------------------------------------
app.get("/_health", (_, res) => {
res.json({ ok: true, time: new Date().toISOString() });
});
/* ------------------------------
Sessions
--------------------------------*/
// List all saved sessions
app.get("/sessions", (_, res) => {
const list = fs.readdirSync(sessionsDir)
.filter(f => f.endsWith(".json"))
.map(f => f.replace(".json", ""));
res.json(list);
});
// Load a single session
app.get("/sessions/:id", (req, res) => {
const file = path.join(sessionsDir, `${req.params.id}.json`);
if (!fs.existsSync(file)) return res.json([]);
res.json(JSON.parse(fs.readFileSync(file, "utf8")));
});
// Save or update a session
app.post("/sessions/:id", (req, res) => {
const file = path.join(sessionsDir, `${req.params.id}.json`);
fs.writeFileSync(file, JSON.stringify(req.body, null, 2));
res.json({ ok: true });
});
/* ------------------------------
Chat completion endpoint
--------------------------------*/
// -----------------------------------------------------
// MAIN ENDPOINT
// -----------------------------------------------------
app.post("/v1/chat/completions", async (req, res) => {
try {
const { model, messages, sessionId: clientSessionId } = req.body || {};
if (!Array.isArray(messages) || !messages.length) {
const { messages, model } = req.body;
if (!messages?.length) {
return res.status(400).json({ error: "invalid_messages" });
}
if (!model || typeof model !== "string") {
return res.status(400).json({ error: "invalid_model" });
}
const sessionId = clientSessionId || "default";
const userId = "brian"; // fixed for now
const userMsg = messages[messages.length - 1]?.content || "";
console.log(`🛰️ Relay received message → "${userMsg}"`);
console.log(`🛰️ Incoming request. Session: ${sessionId}`);
// Find last user message efficiently
const lastUserMsg = [...messages].reverse().find(m => m.role === "user")?.content;
if (!lastUserMsg) {
return res.status(400).json({ error: "no_user_message" });
}
// 1. Cortex Reflection (new pipeline)
/*let reflection = {};
try {
console.log("🧠 Reflecting with Cortex...");
const memoriesPreview = []; // we'll fill this in later with memSearch
reflection = await reflectWithCortex(lastUserMsg, memoriesPreview);
console.log("🔍 Reflection:", reflection);
} catch (err) {
console.warn("⚠️ Cortex reflect failed:", err.message);
reflection = { error: err.message };
}*/
// 2. Search memories
/* let memorySnippets = [];
await span("mem.search", async () => {
if (NEOMEM_API && MEM0_API_KEY) {
try {
const { results } = await memSearch(lastUserMsg, userId, sessionId);
if (results?.length) {
console.log(`📚 Mem0 hits: ${results.length}`);
results.forEach((r, i) =>
console.log(` ${i + 1}) ${r.memory} (score ${Number(r.score).toFixed(3)})`)
);
memorySnippets = results.map((r, i) => `${i + 1}) ${r.memory}`);
} else {
console.log("😴 No memories found");
}
} catch (e) {
console.warn("⚠️ mem.search failed:", e.message);
}
}
});*/
// 3. Fetch persona
/* let personaText = "Persona: Lyra 🤖 friendly, concise, poker-savvy.";
await span("persona.fetch", async () => {
try {
if (PERSONA_URL) {
const data = await fetchJSON(PERSONA_URL);
if (data?.persona) {
const name = data.persona.name ?? "Lyra";
const style = data.persona.style ?? "friendly, concise";
const protocols = Array.isArray(data.persona.protocols) ? data.persona.protocols.join(", ") : "";
personaText = `Persona: ${name} 🤖 ${style}. Protocols: ${protocols}`.trim();
}
}
} catch (err) {
console.error("💥 persona.fetch failed", err);
}
}); */
// 1. Ask Cortex to build the final prompt
let cortexPrompt = "";
try {
console.log("🧠 Requesting prompt from Cortex...");
const response = await fetch(`${process.env.CORTEX_API_URL || "http://10.0.0.41:7081"}/reason`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
user_prompt: lastUserMsg,
session_id: sessionId,
user_id: userId
})
});
const data = await response.json();
cortexPrompt = data.full_prompt || data.prompt || "";
console.log("🧩 Cortex returned prompt");
} catch (err) {
console.warn("⚠️ Cortex prompt build failed:", err.message);
}
// 4. Build final messages
const injectedMessages = [
{ role: "system", content: cortexPrompt || "You are Lyra." },
...messages,
];
if (debugPrompt) {
console.log("\n==== Injected Prompt ====");
console.log(JSON.stringify(injectedMessages, null, 2));
console.log("=========================\n");
}
// 5. Call LLM (OpenAI or Ollama)
const isOllama = model.startsWith("ollama:");
const llmUrl = isOllama
? `${OLLAMA_URL}/api/chat`
: "https://api.openai.com/v1/chat/completions";
const llmHeaders = isOllama
? { "Content-Type": "application/json" }
: {
"Content-Type": "application/json",
Authorization: `Bearer ${OPENAI_API_KEY}`,
};
const llmBody = {
model: isOllama ? model.replace("ollama:", "") : model,
messages: injectedMessages, // <-- make sure injectedMessages is defined above this section
stream: false,
};
const data = await fetchJSON(llmUrl, {
method: "POST",
headers: llmHeaders,
body: JSON.stringify(llmBody),
});
// define once for everything below
const assistantReply = isOllama
? data?.message?.content
: data?.choices?.[0]?.message?.content || data?.choices?.[0]?.text || "";
// 🧠 Send exchange back to Cortex for ingest
try {
await ingestToCortex(lastUserMsg, assistantReply || "", {}, sessionId);
console.log("📤 Sent exchange back to Cortex ingest");
} catch (err) {
console.warn("⚠️ Cortex ingest failed:", err.message);
}
// 💾 Save exchange to session log
try {
const logFile = path.join(sessionsDir, `${sessionId}.jsonl`);
const entry = JSON.stringify({
ts: new Date().toISOString(),
turn: [
{ role: "user", content: lastUserMsg },
{ role: "assistant", content: assistantReply || "" }
]
}) + "\n";
fs.appendFileSync(logFile, entry, "utf8");
console.log(`🧠 Logged session exchange → ${logFile}`);
} catch (e) {
console.warn("⚠️ Session log write failed:", e.message);
}
// 🔄 Forward user↔assistant exchange to Intake summarizer
if (process.env.INTAKE_API_URL) {
try {
const intakePayload = {
session_id: sessionId,
turns: [
{ role: "user", content: lastUserMsg },
{ role: "assistant", content: assistantReply || "" }
]
};
await fetch(process.env.INTAKE_API_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(intakePayload),
});
console.log("📨 Sent exchange to Intake summarizer");
} catch (err) {
console.warn("⚠️ Intake post failed:", err.message);
}
}
if (isOllama) {
res.json({
id: "ollama-" + Date.now(),
object: "chat.completion",
created: Math.floor(Date.now() / 1000),
model,
choices: [
{
index: 0,
message: data?.message || { role: "assistant", content: "" },
finish_reason: "stop",
},
],
// -------------------------------------------------
// Step 1: Ask Cortex to process the prompt
// -------------------------------------------------
let cortexResp;
try {
cortexResp = await fetchJSON(`${CORTEX_API}/reason`, "POST", {
session_id: "default",
user_prompt: userMsg,
});
} catch (err) {
console.error("💥 Relay → Cortex error:", err.message);
return res.status(500).json({
error: "cortex_failed",
detail: err.message,
});
} else {
res.json(data);
}
const personaText = cortexResp.persona || "(no persona text returned)";
// -------------------------------------------------
// Step 2: Forward to Cortex ingest (fire-and-forget)
// -------------------------------------------------
try {
await fetchJSON(CORTEX_INGEST, "POST", cortexResp);
} catch (err) {
console.warn("⚠️ Cortex ingest failed:", err.message);
}
// -------------------------------------------------
// Step 3: Local session logging
// -------------------------------------------------
try {
await appendSessionExchange("default", {
user: userMsg,
assistant: personaText,
raw: cortexResp,
});
} catch (err) {
console.warn("⚠️ Relay log write failed:", err.message);
}
// -------------------------------------------------
// Step 4: Return OpenAI-style response to UI
// -------------------------------------------------
return res.json({
id: "relay-" + Date.now(),
object: "chat.completion",
model: model || "lyra",
choices: [
{
index: 0,
message: {
role: "assistant",
content: personaText,
},
finish_reason: "stop",
},
],
});
} catch (err) {
console.error("💥 relay error", err);
res.status(500).json({ error: "relay_failed", detail: err.message });
console.error("💥 relay fatal error", err);
res.status(500).json({
error: "relay_failed",
detail: err?.message || String(err),
});
}
});
/* ------------------------------
Start server
--------------------------------*/
// -----------------------------------------------------
app.listen(PORT, () => {
console.log(`Relay listening on port ${PORT}`);
console.log(`Relay is online at port ${PORT}`);
});