diff --git a/packages/orchestration-service/src/chat/index.js b/packages/orchestration-service/src/chat/index.js index 6b1d8ea..0e50fd7 100644 --- a/packages/orchestration-service/src/chat/index.js +++ b/packages/orchestration-service/src/chat/index.js @@ -1,15 +1,28 @@ const memory = require('../services/memory'); const inference = require('../services/inference'); +const embedding = require('../services/embedding'); +const qdrant = require('../services/qdrant'); const SYSTEM_PROMPT = `You are a helpful, context-aware AI assistant. You have access to memories of past conversations with the user. Use them to provide consistent, personalised responses.`; const RECENT_EPISODE_LIMIT = 10; // Number of recent episodes to retrieve for context +const SEMANTIC_LIMIT = 5; +const SCORE_THRESHOLD = 0.75; -function buildPrompt(recentEpisodes, userMessage) { +function buildPrompt(recentEpisodes, semanticEpisodes, userMessage) { const parts = [SYSTEM_PROMPT]; + if (semanticEpisodes.length > 0 ) + { + parts.push('Here are some relevant memories from earlier conversations:') + for (const ep of semanticEpisodes) { + parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`); + } + parts.push('---') + } + if (recentEpisodes.length > 0) { parts.push(`Here are some relevant memories from your past conversations:`); for (const ep of recentEpisodes) { @@ -24,26 +37,46 @@ function buildPrompt(recentEpisodes, userMessage) { return parts.join('\n'); } +async function getSemanticEpisodes(userMessage, sessionId, recentIds) { + try { + const vector = await embedding.embed(userMessage); + const results = await qdrant.searchEpisodes( vector, { + limit: SEMANTIC_LIMIT, + scoreThreshold: SCORE_THRESHOLD, + sessionId, + }); + + const fetched = await Promise.all( + results + .filter(r => !recentIds.has(r.id)) + .map(r => memory.getEpisodeById(r.id)) + ); + return fetched.filter(Boolean); + } catch (err) { + console.warn(`[orchestration] Semantic search failed, continuing without: `, err.message); + return []; + } +} + async function chat(externalId, userMessage, options = {}) { // 1. Resolve or create session let session = await memory.getSessionByExternalId(externalId); - if (!session) { - session = await memory.createSession(externalId); - } + if (!session) session = await memory.createSession(externalId); // 2. Fetch recent episodes for context - const recentEpisodes = await memory.getRecentEpisodes( - session.id, - RECENT_EPISODE_LIMIT - ); + const recentEpisodes = await memory.getRecentEpisodes(session.id, RECENT_EPISODE_LIMIT ); + const recentIds = new Set(recentEpisodes.map(e => e.id)); - // 3. Assemble prompt - const prompt = buildPrompt(recentEpisodes, userMessage); + // 3. Semantic Search + const semanticEpisodes = await getSemanticEpisodes(userMessage, session.id, recentIds); - // 4. Run inference + // 4. Assemble prompt + const prompt = buildPrompt(recentEpisodes, semanticEpisodes, userMessage); + + // 5. Run inference const result = await inference.complete(prompt, options); - // 5. Write episode back to memory + // 6. Write episode back to memory memory.createEpisode( session.id, userMessage, @@ -51,7 +84,7 @@ async function chat(externalId, userMessage, options = {}) { (result.evalCount || 0) + (result.promptEvalCount || 0 ) ).catch(err => console.error(`[orchestration] Failed to save episode`, err.message)); - // 6. Return response + // 7. Return response return { sessionId: externalId, response: result.text, diff --git a/packages/orchestration-service/src/services/embedding.js b/packages/orchestration-service/src/services/embedding.js new file mode 100644 index 0000000..7f6bb58 --- /dev/null +++ b/packages/orchestration-service/src/services/embedding.js @@ -0,0 +1,18 @@ +const {getEnv, SERVICES } = require('@nexusai/shared') + +const BASE_URL = getEnv('EMBEDDING_SERVICE_URL', SERVICES.EMBEDDING_URL); + +async function embed(text) { + const res = await fetch(`${BASE_URL}/embed`, { + method: 'POST', + headers: { 'Content-Type': 'application/json'}, + body: JSON.stringify({text}), + }) + + if (!res.ok) throw new Error(`Embedding service error: ${res.status}`); + + const data = await res.json(); + return data.embedding; +} + +module.exports = { embed }; \ No newline at end of file diff --git a/packages/orchestration-service/src/services/memory.js b/packages/orchestration-service/src/services/memory.js index 9be7d4d..baa4e71 100644 --- a/packages/orchestration-service/src/services/memory.js +++ b/packages/orchestration-service/src/services/memory.js @@ -40,9 +40,17 @@ async function createEpisode(sessionId, userMessage, aiResponse, tokenCount) { return res.json(); } +async function getEpisodeById(episodeId) { + const res = await (`${BASE_URL}/episodes/${episodeId}`); + if (res.status === 404) return null; + if (!res.ok) throw new Error(`Failed to fetch episode: ${res.status}`); + return res.json(); +} + module.exports = { getSessionByExternalId, createSession, getRecentEpisodes, - createEpisode + createEpisode, + getEpisodeById } \ No newline at end of file diff --git a/packages/orchestration-service/src/services/qdrant.js b/packages/orchestration-service/src/services/qdrant.js new file mode 100644 index 0000000..51b0c59 --- /dev/null +++ b/packages/orchestration-service/src/services/qdrant.js @@ -0,0 +1,27 @@ +const {getEnv, QDRANT, COLLECTIONS } = require('@nexusai/shared') + +const BASE_URL = getEnv('QDrant_URL', QDRANT.DEFAULT_URL); + +async function searchEpisodes( vector, {limit = 5, scoreThreshold = 0.75, sessionId } = {}) { + const body = {vector, limit, score_threshold: scoreThreshold, with_payload: true}; + + if (sessionId) { + body.filter = { must: [{key: 'sessionId', match: {value: sessionId} }] }; + } + + const res = await fetch ( + `${BASE_URL}/collections/${COLLECTIONS.EPISODES}/points/search`, + { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify(body) + } + ); + + if (!res.ok) throw new Error(`QDrant error: ${res.status}`); + + const data = await res.json(); + return data.result; +} + +module.exports = { searchEpisodes }; \ No newline at end of file