From f83e37f5c775de04d382ea88a6c629e07c8c8ecb Mon Sep 17 00:00:00 2001 From: Storme-bit Date: Wed, 15 Apr 2026 02:36:37 -0700 Subject: [PATCH] wired in project isolation --- .../orchestration-service/src/chat/index.js | 334 +++++++++++------- .../src/services/memory.js | 20 ++ .../src/services/qdrant.js | 10 +- 3 files changed, 237 insertions(+), 127 deletions(-) diff --git a/packages/orchestration-service/src/chat/index.js b/packages/orchestration-service/src/chat/index.js index 82986e9..30add87 100644 --- a/packages/orchestration-service/src/chat/index.js +++ b/packages/orchestration-service/src/chat/index.js @@ -1,164 +1,242 @@ -const memory = require('../services/memory'); -const inference = require('../services/inference'); -const embedding = require('../services/embedding'); -const qdrant = require('../services/qdrant'); -const { ORCHESTRATION } = require('@nexusai/shared') +const memory = require("../services/memory"); +const inference = require("../services/inference"); +const embedding = require("../services/embedding"); +const qdrant = require("../services/qdrant"); +const { ORCHESTRATION } = require("@nexusai/shared"); -const { RECENT_EPISODE_LIMIT, SEMANTIC_LIMIT, SCORE_THRESHOLD, SYSTEM_PROMPT } = ORCHESTRATION; +const { RECENT_EPISODE_LIMIT, SEMANTIC_LIMIT, SCORE_THRESHOLD, SYSTEM_PROMPT } = + ORCHESTRATION; function buildPrompt(recentEpisodes, semanticEpisodes, userMessage) { - const parts = [SYSTEM_PROMPT]; + const parts = [SYSTEM_PROMPT]; - if (semanticEpisodes.length > 0 ) - { - parts.push('Here are some relevant memories from earlier conversations:') - for (const ep of semanticEpisodes) { - parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`); - } - parts.push('---') + if (semanticEpisodes.length > 0) { + parts.push("Here are some relevant memories from earlier conversations:"); + for (const ep of semanticEpisodes) { + parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`); } + parts.push("---"); + } - if (recentEpisodes.length > 0) { - parts.push(`Here are some relevant memories from your past conversations:`); - for (const ep of recentEpisodes) { - parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`); - } - parts.push('--- End of recent memories ---\n'); + if (recentEpisodes.length > 0) { + parts.push(`Here are some relevant memories from your past conversations:`); + for (const ep of recentEpisodes) { + parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`); } + parts.push("--- End of recent memories ---\n"); + } - parts.push(`User: ${userMessage}`); - parts.push('Assistant:'); + parts.push(`User: ${userMessage}`); + parts.push("Assistant:"); - return parts.join('\n'); + return parts.join("\n"); } function buildNamingPrompt(userMessage, aiResponse) { - return [ - 'Your task is to generate a short title for a conversation based on its first exchange.', - 'Rules: maximum 5 words, no punctuation, no quotes, plain text only.', - 'Examples: "Setting up a Raspberry Pi", "Help with Python list comprehension", "Planning a trip to Japan"', - '', - `User: ${userMessage}`, - `Assistant: ${aiResponse}`, - '', - 'Title:', - ].join('\n'); + return [ + "Your task is to generate a short title for a conversation based on its first exchange.", + "Rules: maximum 5 words, no punctuation, no quotes, plain text only.", + 'Examples: "Setting up a Raspberry Pi", "Help with Python list comprehension", "Planning a trip to Japan"', + "", + `User: ${userMessage}`, + `Assistant: ${aiResponse}`, + "", + "Title:", + ].join("\n"); } async function autoNameSession(externalId, userMessage, aiResponse) { - try { - const prompt = buildNamingPrompt(userMessage, aiResponse); - const result = await inference.complete(prompt, { - maxTokens: 20, // title only needs a handful of tokens - temperature: 0.3, // low temperature for consistent, factual naming - }); - const name = result.text?.trim().replace(/^["']|["']$/g, ''); // strip any quotes the model adds - if (name) { - await memory.updateSession(externalId, { name }); - console.log(`[orchestration] Auto-named session "${externalId}": "${name}"`); - } - } catch (err) { - console.warn('[orchestration] Auto-naming failed (non-critical):', err.message); + try { + const prompt = buildNamingPrompt(userMessage, aiResponse); + const result = await inference.complete(prompt, { + maxTokens: 20, // title only needs a handful of tokens + temperature: 0.3, // low temperature for consistent, factual naming + }); + const name = result.text?.trim().replace(/^["']|["']$/g, ""); // strip any quotes the model adds + if (name) { + await memory.updateSession(externalId, { name }); + console.log( + `[orchestration] Auto-named session "${externalId}": "${name}"`, + ); } + } catch (err) { + console.warn( + "[orchestration] Auto-naming failed (non-critical):", + err.message, + ); + } } -async function getSemanticEpisodes(userMessage, sessionId, recentIds) { - try { - const vector = await embedding.embed(userMessage); - const results = await qdrant.searchEpisodes( vector, { - limit: SEMANTIC_LIMIT, - scoreThreshold: SCORE_THRESHOLD, - sessionId, - }); +async function getSemanticEpisodes( + userMessage, + sessionId, + recentIds, + projectSessionIds = null, +) { + try { + const vector = await embedding.embed(userMessage); + const results = await qdrant.searchEpisodes(vector, { + limit: SEMANTIC_LIMIT, + scoreThreshold: SCORE_THRESHOLD, + sessionId: projectSessionIds ? null : sessionId, + projectSessionIds, + }); - const fetched = await Promise.all( - results - .filter(r => !recentIds.has(r.id)) - .map(r => memory.getEpisodeById(r.id)) - ); - return fetched.filter(Boolean); - } catch (err) { - console.warn(`[orchestration] Semantic search failed, continuing without: `, err.message); - return []; - } + const fetched = await Promise.all( + results + .filter((r) => !recentIds.has(r.id)) + .map((r) => memory.getEpisodeById(r.id)), + ); + return fetched.filter(Boolean); + } catch (err) { + console.warn( + `[orchestration] Semantic search failed, continuing without: `, + err.message, + ); + return []; + } } async function chat(externalId, userMessage, options = {}) { - // 1. Resolve or create session - let session = await memory.getSessionByExternalId(externalId); - if (!session) session = await memory.createSession(externalId); - - // 2. Fetch recent episodes for context - const recentEpisodes = await memory.getRecentEpisodes(session.id, RECENT_EPISODE_LIMIT ); - const isFirstMessage = recentEpisodes.length === 0; - const recentIds = new Set(recentEpisodes.map(e => e.id)); - - // 3. Semantic Search - const semanticEpisodes = await getSemanticEpisodes(userMessage, session.id, recentIds); - - // 4. Assemble prompt - const prompt = buildPrompt(recentEpisodes, semanticEpisodes, userMessage); - - // 5. Run inference - const result = await inference.complete(prompt, options); - - // 6. Write episode back to memory - memory.createEpisode( - session.id, - userMessage, - result.text, - (result.evalCount || 0) + (result.promptEvalCount || 0 ) - ).catch(err => console.error(`[orchestration] Failed to save episode`, err.message)); - - // 7. Auto-name on first message - if (isFirstMessage && !session.name) { - autoNameSession(externalId, userMessage, result.text) - .catch(() => {}); // already logged inside autoNameSession - } - - // 8. Return response - return { - sessionId: externalId, - response: result.text, - model: result.model, - tokenCount: (result.evalCount || 0 ) + (result.promptEvalCount || 0 ), - }; -} - -async function chatStream(externalId, userMessage, onChunk, options = {}) { - console.log('[orchestration] chatStream called:', { externalId, userMessage: userMessage.slice(0, 50) }); + // 1. Resolve or create session let session = await memory.getSessionByExternalId(externalId); if (!session) session = await memory.createSession(externalId); - const recentEpisodes = await memory.getRecentEpisodes(session.id, RECENT_EPISODE_LIMIT); + let projectSessionIds = null; + if (session.project_id) { + try { + const project = await memory.getProject(session.project_id); + if (project?.isolated === 1) { + const projectSessions = await memory.getProjectSessions( + session.project_id, + ); + projectSessionIds = projectSessions.map((s) => s.id); + console.log( + `[orchestration] Isolated project ${session.project_id} — restricting search to ${projectSessionIds.length} sessions`, + ); + } + } catch (err) { + console.warn( + "[orchestration] Failed to resolve isolation context:", + err.message, + ); + } + } + + // 2. Fetch recent episodes for context + const recentEpisodes = await memory.getRecentEpisodes( + session.id, + RECENT_EPISODE_LIMIT, + ); const isFirstMessage = recentEpisodes.length === 0; - const recentIds = new Set(recentEpisodes.map(e => e.id)); - const semanticEpisodes = await getSemanticEpisodes(userMessage, session.id, recentIds); + const recentIds = new Set(recentEpisodes.map((e) => e.id)); + + // 3. Semantic Search + const semanticEpisodes = await getSemanticEpisodes( + userMessage, + session.id, + recentIds, + projectSessionIds + ); + + // 4. Assemble prompt + const prompt = buildPrompt(recentEpisodes, semanticEpisodes, userMessage); + + // 5. Run inference + const result = await inference.complete(prompt, options); + + // 6. Write episode back to memory + memory + .createEpisode( + session.id, + userMessage, + result.text, + (result.evalCount || 0) + (result.promptEvalCount || 0), + ) + .catch((err) => + console.error(`[orchestration] Failed to save episode`, err.message), + ); + + // 7. Auto-name on first message + if (isFirstMessage && !session.name) { + autoNameSession(externalId, userMessage, result.text).catch(() => {}); // already logged inside autoNameSession + } + + // 8. Return response + return { + sessionId: externalId, + response: result.text, + model: result.model, + tokenCount: (result.evalCount || 0) + (result.promptEvalCount || 0), + }; +} + +async function chatStream(externalId, userMessage, onChunk, options = {}) { + console.log("[orchestration] chatStream called:", { + externalId, + userMessage: userMessage.slice(0, 50), + }); + let session = await memory.getSessionByExternalId(externalId); + if (!session) session = await memory.createSession(externalId); + + let projectSessionIds = null; + if (session.project_id) { + try { + const project = await memory.getProject(session.project_id); + if (project?.isolated === 1) { + const projectSessions = await memory.getProjectSessions( + session.project_id, + ); + projectSessionIds = projectSessions.map((s) => s.id); + console.log( + `[orchestration] Isolated project ${session.project_id} — restricting search to ${projectSessionIds.length} sessions`, + ); + } + } catch (err) { + console.warn( + "[orchestration] Failed to resolve isolation context:", + err.message, + ); + } + } + + const recentEpisodes = await memory.getRecentEpisodes( + session.id, + RECENT_EPISODE_LIMIT, + ); + const isFirstMessage = recentEpisodes.length === 0; + const recentIds = new Set(recentEpisodes.map((e) => e.id)); + const semanticEpisodes = await getSemanticEpisodes( + userMessage, + session.id, + recentIds, + projectSessionIds + ); const prompt = buildPrompt(recentEpisodes, semanticEpisodes, userMessage); const res = await inference.completeStream(prompt, options); - let fullText = ''; - let model = ''; + let fullText = ""; + let model = ""; let tokenCount = 0; - let buffer = ''; + let buffer = ""; for await (const chunk of res.body) { - buffer += Buffer.from(chunk).toString('utf8'); + buffer += Buffer.from(chunk).toString("utf8"); - const events = buffer.split('\n\n'); - buffer = events.pop() || ''; + const events = buffer.split("\n\n"); + buffer = events.pop() || ""; for (const event of events) { - const lines = event.split('\n'); + const lines = event.split("\n"); const dataLines = lines - .filter(line => line.startsWith('data: ')) - .map(line => line.slice(6)); + .filter((line) => line.startsWith("data: ")) + .map((line) => line.slice(6)); if (dataLines.length === 0) continue; - const raw = dataLines.join('\n').trim(); - if (raw === '[DONE]') continue; + const raw = dataLines.join("\n").trim(); + if (raw === "[DONE]") continue; try { const data = JSON.parse(raw); @@ -177,17 +255,23 @@ async function chatStream(externalId, userMessage, onChunk, options = {}) { throw new Error(data.error); } } catch (err) { - console.error('[orchestration] Failed to parse inference SSE event:', raw, err.message); + console.error( + "[orchestration] Failed to parse inference SSE event:", + raw, + err.message, + ); } } } - console.log('[orchestration] final streamed text length:', fullText.length); + console.log("[orchestration] final streamed text length:", fullText.length); if (fullText.trim()) { await memory.createEpisode(session.id, userMessage, fullText, tokenCount); } else { - console.warn('[orchestration] Stream finished with no assistant text; episode not saved'); + console.warn( + "[orchestration] Stream finished with no assistant text; episode not saved", + ); } if (isFirstMessage && !session.name) { @@ -196,4 +280,4 @@ async function chatStream(externalId, userMessage, onChunk, options = {}) { return { model, tokenCount }; } -module.exports = { chat, chatStream }; \ No newline at end of file +module.exports = { chat, chatStream }; diff --git a/packages/orchestration-service/src/services/memory.js b/packages/orchestration-service/src/services/memory.js index 1174d12..3f2b1a4 100644 --- a/packages/orchestration-service/src/services/memory.js +++ b/packages/orchestration-service/src/services/memory.js @@ -115,6 +115,24 @@ async function deleteProject(id) { if (!res.ok) throw new Error(`Failed to delete project: ${res.status}`); } +async function getProjectSessions(projectId) { + const url = new URL(`${BASE_URL}/sessions`); + url.searchParams.set('limit', 200); // generous upper bound + url.searchParams.set('offset', 0); + url.searchParams.set('projectId', projectId); + + const res = await fetch(url.toString()); + if (!res.ok) throw new Error(`Failed to fetch project sessions: ${res.status}`); + return res.json(); // returns array of session objects +} + +async function getProject(id) { + const res = await fetch(`${BASE_URL}/projects/${id}`); + if (res.status === 404) return null; + if (!res.ok) throw new Error(`Failed to fetch project: ${res.status}`); + return res.json(); +} + module.exports = { getSessionByExternalId, createSession, @@ -129,4 +147,6 @@ module.exports = { getProjects, updateProject, deleteProject, + getProjectSessions, + getProject, } \ No newline at end of file diff --git a/packages/orchestration-service/src/services/qdrant.js b/packages/orchestration-service/src/services/qdrant.js index 53b542c..e9e5838 100644 --- a/packages/orchestration-service/src/services/qdrant.js +++ b/packages/orchestration-service/src/services/qdrant.js @@ -2,10 +2,16 @@ const {getEnv, QDRANT, COLLECTIONS, ORCHESTRATION } = require('@nexusai/shared') const BASE_URL = getEnv('QDRANT_URL', QDRANT.DEFAULT_URL); -async function searchEpisodes( vector, {limit = ORCHESTRATION.RECENT_EPISODE_LIMIT, scoreThreshold = ORCHESTRATION.SCORE_THRESHOLD, sessionId } = {}) { +async function searchEpisodes( vector, {limit = ORCHESTRATION.RECENT_EPISODE_LIMIT, scoreThreshold = ORCHESTRATION.SCORE_THRESHOLD, sessionId, projectSessionIds } = {}) { const body = {vector, limit, score_threshold: scoreThreshold, with_payload: true}; - if (sessionId) { + if(projectSessionIds) { + body.filter = { + should: projectSessionIds.map(id => ({ + key: 'sessionId', match: { value: id } + })) + }; + } else if (sessionId) { body.filter = { must: [{key: 'sessionId', match: {value: sessionId} }] }; }