from fastapi import FastAPI, Body, Query, BackgroundTasks from collections import deque from datetime import datetime import requests import os import sys import asyncio from dotenv import load_dotenv # ─────────────────────────────────────────────── # 🔧 Load environment variables # ─────────────────────────────────────────────── load_dotenv() SUMMARY_MODEL = os.getenv("SUMMARY_MODEL_NAME", "mistral-7b-instruct-v0.2.Q4_K_M.gguf") SUMMARY_URL = os.getenv("SUMMARY_API_URL", "http://localhost:8080/v1/completions") SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200")) SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3")) # ─────────────────────────────────────────────── # 🧠 NeoMem connection (session-aware) # ─────────────────────────────────────────────── from uuid import uuid4 NEOMEM_API = os.getenv("NEOMEM_API") NEOMEM_KEY = os.getenv("NEOMEM_KEY") def push_summary_to_neomem(summary_text: str, level: str, session_id: str): """Send summarized text to NeoMem, tagged by session_id.""" if not NEOMEM_API: print("⚠️ NEOMEM_API not set, skipping NeoMem push") return payload = { "messages": [ {"role": "assistant", "content": summary_text} ], "user_id": "brian", # optional: uncomment if you want sessions tracked in NeoMem natively # "run_id": session_id, "metadata": { "source": "intake", "type": "summary", "level": level, "session_id": session_id, "cortex": {} } } headers = {"Content-Type": "application/json"} if NEOMEM_KEY: headers["Authorization"] = f"Bearer {NEOMEM_KEY}" try: r = requests.post(f"{NEOMEM_API}/memories", json=payload, headers=headers, timeout=25) r.raise_for_status() print(f"🧠 NeoMem updated ({level}, {session_id}, {len(summary_text)} chars)") except Exception as e: print(f"❌ NeoMem push failed ({level}, {session_id}): {e}") # ─────────────────────────────────────────────── # ⚙️ FastAPI + buffer setup # ─────────────────────────────────────────────── app = FastAPI() # Multiple rolling buffers keyed by session_id SESSIONS = {} # Summary trigger points # → low-tier: quick factual recaps # → mid-tier: “Reality Check” reflections # → high-tier: rolling continuity synthesis LEVELS = [1, 2, 5, 10, 20, 30] @app.on_event("startup") def show_boot_banner(): print("🧩 Intake booting...") print(f" Model: {SUMMARY_MODEL}") print(f" API: {SUMMARY_URL}") print(f" Max tokens: {SUMMARY_MAX_TOKENS}, Temp: {SUMMARY_TEMPERATURE}") sys.stdout.flush() # ─────────────────────────────────────────────── # 🧠 Hierarchical Summarizer (L10→L20→L30 cascade) # ─────────────────────────────────────────────── SUMMARIES_CACHE = {"L10": [], "L20": [], "L30": []} def summarize(exchanges, level): """Hierarchical summarizer: builds local and meta summaries.""" # Join exchanges into readable text text = "\n".join( f"User: {e['turns'][0]['content']}\nAssistant: {e['turns'][1]['content']}" for e in exchanges ) def query_llm(prompt: str): try: resp = requests.post( SUMMARY_URL, json={ "model": SUMMARY_MODEL, "prompt": prompt, "max_tokens": SUMMARY_MAX_TOKENS, "temperature": SUMMARY_TEMPERATURE, }, timeout=180, ) resp.raise_for_status() data = resp.json() return data.get("choices", [{}])[0].get("text", "").strip() except Exception as e: return f"[Error summarizing: {e}]" # ───── L10: local “Reality Check” block ───── if level == 10: prompt = f""" You are Lyra Intake performing a 'Reality Check' for the last {len(exchanges)} exchanges. Summarize this block as one coherent paragraph describing the user’s focus, progress, and tone. Avoid bullet points. Exchanges: {text} Reality Check Summary: """ summary = query_llm(prompt) SUMMARIES_CACHE["L10"].append(summary) # ───── L20: merge L10s ───── elif level == 20: # 1️⃣ create fresh L10 for 11–20 l10_prompt = f""" You are Lyra Intake generating a second Reality Check for the most recent {len(exchanges)} exchanges. Summarize them as one paragraph describing what's new or changed since the last block. Avoid bullet points. Exchanges: {text} Reality Check Summary: """ new_l10 = query_llm(l10_prompt) SUMMARIES_CACHE["L10"].append(new_l10) # 2️⃣ merge all L10s into a Session Overview joined_l10s = "\n\n".join(SUMMARIES_CACHE["L10"]) l20_prompt = f""" You are Lyra Intake merging multiple 'Reality Checks' into a single Session Overview. Summarize the following Reality Checks into one short paragraph capturing the ongoing goals, patterns, and overall progress. Reality Checks: {joined_l10s} Session Overview: """ l20_summary = query_llm(l20_prompt) SUMMARIES_CACHE["L20"].append(l20_summary) summary = new_l10 + "\n\n" + l20_summary # ───── L30: continuity synthesis ───── elif level == 30: # 1️⃣ create new L10 for 21–30 new_l10 = query_llm(f""" You are Lyra Intake creating a new Reality Check for exchanges 21–30. Summarize this block in one cohesive paragraph, describing any shifts in focus or tone. Exchanges: {text} Reality Check Summary: """) SUMMARIES_CACHE["L10"].append(new_l10) # 2️⃣ merge all lower levels for continuity joined = "\n\n".join(SUMMARIES_CACHE["L10"] + SUMMARIES_CACHE["L20"]) continuity_prompt = f""" You are Lyra Intake performing a 'Continuity Report' — a high-level reflection combining all Reality Checks and Session Overviews so far. Describe how the conversation has evolved, the key insights, and remaining threads. Reality Checks and Overviews: {joined} Continuity Report: """ l30_summary = query_llm(continuity_prompt) SUMMARIES_CACHE["L30"].append(l30_summary) summary = new_l10 + "\n\n" + l30_summary # ───── L1–L5 (standard factual summaries) ───── else: prompt = f""" You are Lyra Intake, a background summarization module for an AI assistant. Your job is to compress recent chat exchanges between a user and an assistant into a short, factual summary. The user's name is Brian, and the assistant's name is Lyra. Focus only on the real conversation content. Do NOT invent names, people, or examples. Avoid speculation or storytelling. Summarize clearly what topics were discussed and what conclusions were reached. Avoid speculation, names, or bullet points. Exchanges: {text} Summary: """ summary = query_llm(prompt) return f"[L{level} Summary of {len(exchanges)} exchanges]: {summary}" from datetime import datetime LOG_DIR = "/app/logs" os.makedirs(LOG_DIR, exist_ok=True) def log_to_file(level: str, summary: str): """Append each summary to a persistent .txt log file.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") filename = os.path.join(LOG_DIR, "summaries.log") with open(filename, "a", encoding="utf-8") as f: f.write(f"[{timestamp}] {level}\n{summary}\n{'='*60}\n\n") # ─────────────────────────────────────────────── # 🔁 Background summarization helper # ─────────────────────────────────────────────── def run_summarization_task(exchange, session_id): """Async-friendly wrapper for slow summarization work.""" try: hopper = SESSIONS.get(session_id) if not hopper: print(f"⚠️ No hopper found for {session_id}") return buffer = hopper["buffer"] count = len(buffer) summaries = {} if count < 30: for lvl in LEVELS: if lvl <= count: s_text = summarize(list(buffer)[-lvl:], lvl) log_to_file(f"L{lvl}", s_text) push_summary_to_neomem(s_text, f"L{lvl}", session_id) summaries[f"L{lvl}"] = s_text else: # optional: include your existing 30+ logic here pass if summaries: print(f"🧩 [BG] Summaries generated asynchronously at count={count}: {list(summaries.keys())}") except Exception as e: print(f"💥 [BG] Async summarization failed: {e}") # ─────────────────────────────────────────────── # 📨 Routes # ─────────────────────────────────────────────── @app.post("/add_exchange") def add_exchange(exchange: dict = Body(...), background_tasks: BackgroundTasks = None): session_id = exchange.get("session_id") or f"sess-{uuid4().hex[:8]}" exchange["session_id"] = session_id if session_id not in SESSIONS: SESSIONS[session_id] = {"buffer": deque(maxlen=100), "last_update": datetime.now()} print(f"🆕 Hopper created: {session_id}") hopper = SESSIONS[session_id] hopper["buffer"].append(exchange) hopper["last_update"] = datetime.now() count = len(hopper["buffer"]) # 🚀 queue background summarization if background_tasks: background_tasks.add_task(run_summarization_task, exchange, session_id) print(f"⏩ Queued async summarization for {session_id}") return {"ok": True, "exchange_count": count, "queued": True} # # ── Normal tiered behavior up to 30 ── commented out for aysnc addon # if count < 30: # if count in LEVELS: # for lvl in LEVELS: # if lvl <= count: # summaries[f"L{lvl}"] = summarize(list(buffer)[-lvl:], lvl) # log_to_file(f"L{lvl}", summaries[f"L{lvl}"]) # push_summary_to_neomem(summaries[f"L{lvl}"], f"L{lvl}", session_id) # # 🚀 Launch summarization in the background (non-blocking) # if background_tasks: # background_tasks.add_task(run_summarization_task, exchange, session_id) # print(f"⏩ Queued async summarization for {session_id}") # # ── Beyond 30: keep summarizing every +15 exchanges ── # else: # # Find next milestone after 30 (45, 60, 75, ...) # milestone = 30 + ((count - 30) // 15) * 15 # if count == milestone: # summaries[f"L{milestone}"] = summarize(list(buffer)[-15:], milestone) # log_to_file(f"L{milestone}", summaries[f"L{milestone}"]) # push_summary_to_neomem(summaries[f"L{milestone}"], f"L{milestone}", session_id) # # Optional: merge all continuity summaries so far into a running meta-summary # joined = "\n\n".join( # [s for key, s in summaries.items() if key.startswith("L")] # ) # meta_prompt = f""" # You are Lyra Intake composing an 'Ongoing Continuity Report' that merges # all prior continuity summaries into one living narrative. # Focus on major themes, changes, and lessons so far. # Continuity Summaries: # {joined} # Ongoing Continuity Report: # """ # meta_summary = f"[L∞ Ongoing Continuity Report]: {query_llm(meta_prompt)}" # summaries["L∞"] = meta_summary # log_to_file("L∞", meta_summary) # push_summary_to_neomem(meta_summary, "L∞", session_id) # print(f"🌀 L{milestone} continuity summary created (messages {count-14}-{count})") # # ── Log summaries ── # if summaries: # print(f"🧩 Summaries generated at count={count}: {list(summaries.keys())}") # return { # "ok": True, # "exchange_count": len(buffer), # "queued": True # } # ─────────────────────────────────────────────── # Clear rubbish from hopper. # ─────────────────────────────────────────────── def close_session(session_id: str): """Run a final summary for the given hopper, post it to NeoMem, then delete it.""" hopper = SESSIONS.get(session_id) if not hopper: print(f"⚠️ No active hopper for {session_id}") return buffer = hopper["buffer"] if not buffer: print(f"⚠️ Hopper {session_id} is empty, skipping closure") del SESSIONS[session_id] return try: print(f"🔒 Closing hopper {session_id} ({len(buffer)} exchanges)") # Summarize everything left in the buffer final_summary = summarize(list(buffer), 30) # level 30 = continuity synthesis log_to_file("LFinal", final_summary) push_summary_to_neomem(final_summary, "LFinal", session_id) # Optionally: mark this as a special 'closure' memory closure_note = f"[Session {session_id} closed with {len(buffer)} exchanges]" push_summary_to_neomem(closure_note, "LFinalNote", session_id) print(f"🧹 Hopper {session_id} closed and deleted") except Exception as e: print(f"💥 Error closing hopper {session_id}: {e}") finally: del SESSIONS[session_id] @app.post("/close_session/{session_id}") def close_session_endpoint(session_id: str): close_session(session_id) return {"ok": True, "closed": session_id} # ─────────────────────────────────────────────── # 🧾 Provide recent summary for Cortex /reason calls # ─────────────────────────────────────────────── @app.get("/summaries") def get_summary(session_id: str = Query(..., description="Active session ID")): """ Return the most recent summary (L10→L30→LFinal) for a given session. If none exist yet, return a placeholder summary. """ try: # Find the most recent file entry in summaries.log log_path = os.path.join(LOG_DIR, "summaries.log") if not os.path.exists(log_path): return { "summary_text": "(none)", "last_message_ts": datetime.now().isoformat(), "session_id": session_id, "exchange_count": 0, } with open(log_path, "r", encoding="utf-8") as f: lines = f.readlines() # Grab the last summary section that mentions this session_id recent_lines = [ln for ln in lines if session_id in ln or ln.startswith("[L")] if recent_lines: # Find the last non-empty summary text snippet = "".join(recent_lines[-8:]).strip() else: snippet = "(no summaries yet)" return { "summary_text": snippet[-1000:], # truncate to avoid huge block "last_message_ts": datetime.now().isoformat(), "session_id": session_id, "exchange_count": len(SESSIONS.get(session_id, {}).get("buffer", [])), } except Exception as e: print(f"⚠️ /summaries failed for {session_id}: {e}") return { "summary_text": f"(error fetching summaries: {e})", "last_message_ts": datetime.now().isoformat(), "session_id": session_id, "exchange_count": 0, } # ─────────────────────────────────────────────── # ✅ Health check # ─────────────────────────────────────────────── @app.get("/health") def health(): return {"ok": True, "model": SUMMARY_MODEL, "url": SUMMARY_URL}