from fastapi import FastAPI, Body, Query, BackgroundTasks from collections import deque from datetime import datetime from uuid import uuid4 import requests import os import sys # ───────────────────────────── # Config # ───────────────────────────── SUMMARY_MODEL = os.getenv("SUMMARY_MODEL_NAME", "mistral-7b-instruct-v0.2.Q4_K_M.gguf") SUMMARY_URL = os.getenv("SUMMARY_API_URL", "http://localhost:8080/v1/completions") SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200")) SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3")) NEOMEM_API = os.getenv("NEOMEM_API") NEOMEM_KEY = os.getenv("NEOMEM_KEY") # ───────────────────────────── # App + session buffer # ───────────────────────────── app = FastAPI() SESSIONS = {} @app.on_event("startup") def banner(): print("🧩 Intake v0.2 booting...") print(f" Model: {SUMMARY_MODEL}") print(f" API: {SUMMARY_URL}") sys.stdout.flush() # ───────────────────────────── # Helper: summarize exchanges # ───────────────────────────── def llm(prompt: str): try: resp = requests.post( SUMMARY_URL, json={ "model": SUMMARY_MODEL, "prompt": prompt, "max_tokens": SUMMARY_MAX_TOKENS, "temperature": SUMMARY_TEMPERATURE, }, timeout=30, ) resp.raise_for_status() return resp.json().get("choices", [{}])[0].get("text", "").strip() except Exception as e: return f"[Error summarizing: {e}]" def summarize_simple(exchanges): """Simple factual summary of recent exchanges.""" text = "" for e in exchanges: text += f"User: {e['user_msg']}\nAssistant: {e['assistant_msg']}\n\n" prompt = f""" Summarize the following conversation between Brian (user) and Lyra (assistant). Focus only on factual content. Avoid names, examples, story tone, or invented details. {text} Summary: """ return llm(prompt) # ───────────────────────────── # NeoMem push # ───────────────────────────── def push_to_neomem(summary: str, session_id: str): if not NEOMEM_API: return headers = {"Content-Type": "application/json"} if NEOMEM_KEY: headers["Authorization"] = f"Bearer {NEOMEM_KEY}" payload = { "messages": [{"role": "assistant", "content": summary}], "user_id": "brian", "metadata": { "source": "intake", "session_id": session_id } } try: requests.post( f"{NEOMEM_API}/memories", json=payload, headers=headers, timeout=20 ).raise_for_status() print(f"🧠 NeoMem updated for {session_id}") except Exception as e: print(f"NeoMem push failed: {e}") # ─────────────────────────────────────────────── # Multilevel Summaries (L1, L5, L10, L20, L30) # ─────────────────────────────────────────────── # History maps L10_HISTORY = {} # session_id → list of L10 blocks L20_HISTORY = {} # session_id → list of merged overviews def summarize_L1(buf): return summarize_simple(buf[-5:]) def summarize_L5(buf): return summarize_simple(buf[-10:]) def summarize_L10(buf): # “Reality Check” for last 10 exchanges text = "" for e in buf[-10:]: text += f"User: {e['user_msg']}\nAssistant: {e['assistant_msg']}\n\n" prompt = f""" You are Lyra Intake performing a short 'Reality Check'. Summarize the last block of conversation (up to 10 exchanges) in one clear paragraph focusing on tone, intent, and direction. {text} Reality Check: """ return llm(prompt) def summarize_L20(L10_list): joined = "\n\n".join(L10_list) prompt = f""" You are Lyra Intake creating a 'Session Overview'. Merge the following Reality Check paragraphs into one short summary capturing progress, themes, and the direction of the conversation. {joined} Overview: """ return llm(prompt) def summarize_L30(L20_list): joined = "\n\n".join(L20_list) prompt = f""" You are Lyra Intake generating a 'Continuity Report'. Condense these session overviews into one high-level reflection, noting major themes, persistent goals, and shifts. {joined} Continuity Report: """ return llm(prompt) def bg_summarize(session_id: str): """Runs all summary levels on every exchange.""" try: hopper = SESSIONS.get(session_id) if not hopper: return buf = list(hopper["buffer"]) if not buf: return # Ensure history lists exist L10_HISTORY.setdefault(session_id, []) L20_HISTORY.setdefault(session_id, []) # L1, L5 (simple factual) s_L1 = summarize_L1(buf) s_L5 = summarize_L5(buf) # L10 (append to history) s_L10 = summarize_L10(buf) L10_HISTORY[session_id].append(s_L10) # L20 (merge all L10s) s_L20 = summarize_L20(L10_HISTORY[session_id]) L20_HISTORY[session_id].append(s_L20) # L30 (merge all L20s) s_L30 = summarize_L30(L20_HISTORY[session_id]) # Push most important tier(s) to NeoMem push_to_neomem(s_L10, session_id) push_to_neomem(s_L20, session_id) push_to_neomem(s_L30, session_id) print(f"🧩 L1/L5/L10/L20/L30 updated for {session_id}") except Exception as e: print(f"💥 Multilevel summarizer error for {session_id}: {e}") # ───────────────────────────── # Routes # ───────────────────────────── @app.post("/add_exchange") def add_exchange(exchange: dict = Body(...), background_tasks: BackgroundTasks = None): session_id = exchange.get("session_id") or f"sess-{uuid4().hex[:8]}" exchange["session_id"] = session_id exchange["timestamp"] = datetime.now().isoformat() if session_id not in SESSIONS: SESSIONS[session_id] = { "buffer": deque(maxlen=200), "created_at": datetime.now() } print(f"🆕 Hopper created: {session_id}") SESSIONS[session_id]["buffer"].append(exchange) if background_tasks: background_tasks.add_task(bg_summarize, session_id) print(f"⏩ Summarization queued for {session_id}") return {"ok": True, "session_id": session_id} @app.post("/close_session/{session_id}") def close_session(session_id: str): if session_id in SESSIONS: del SESSIONS[session_id] return {"ok": True, "closed": session_id} @app.get("/summaries") def get_summary(session_id: str = Query(...)): hopper = SESSIONS.get(session_id) if not hopper: return {"summary_text": "(none)", "session_id": session_id} summary = summarize_simple(list(hopper["buffer"])) return {"summary_text": summary, "session_id": session_id} @app.get("/context") def get_context(session_id: str = Query(...)): """Return full multilevel summary context for Cortex.""" if session_id not in SESSIONS: return { "session_id": session_id, "exchange_count": 0, "L1": "", "L5": "", "L10": "", "L20": "", "L30": "", "last_updated": None } buffer = list(SESSIONS[session_id]["buffer"]) # Build levels on demand L1 = summarize_L1(buffer) L5 = summarize_L5(buffer) L10 = summarize_L10(buffer) L20 = summarize_L20(L10_HISTORY.get(session_id, [])) L30 = summarize_L30(L20_HISTORY.get(session_id, [])) return { "session_id": session_id, "exchange_count": len(buffer), "L1": L1, "L5": L5, "L10": L10, "L20": L20, "L30": L30, "last_updated": datetime.now().isoformat() } @app.get("/health") def health(): return {"ok": True, "model": SUMMARY_MODEL, "url": SUMMARY_URL}