memory settings implementation
This commit is contained in:
@@ -3,20 +3,21 @@ const inference = require("../services/inference");
|
||||
const embedding = require("../services/embedding");
|
||||
const qdrant = require("../services/qdrant");
|
||||
const { ORCHESTRATION } = require("@nexusai/shared");
|
||||
|
||||
const { RECENT_EPISODE_LIMIT, SEMANTIC_LIMIT, SCORE_THRESHOLD, SYSTEM_PROMPT } =
|
||||
ORCHESTRATION;
|
||||
const appSettings = require("../config/settings");
|
||||
const { SYSTEM_PROMPT } = ORCHESTRATION;
|
||||
|
||||
function buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage) {
|
||||
const parts = [SYSTEM_PROMPT];
|
||||
|
||||
if (entities.length > 0) {
|
||||
parts.push('Here is what you know about entities relevant to this conversation:');
|
||||
for (const e of entities) {
|
||||
parts.push(`- ${e.name} (${e.type}): ${e.notes}`);
|
||||
}
|
||||
parts.push('---');
|
||||
parts.push(
|
||||
"Here is what you know about entities relevant to this conversation:",
|
||||
);
|
||||
for (const e of entities) {
|
||||
parts.push(`- ${e.name} (${e.type}): ${e.notes}`);
|
||||
}
|
||||
parts.push("---");
|
||||
}
|
||||
|
||||
if (semanticEpisodes.length > 0) {
|
||||
parts.push("Here are some relevant memories from earlier conversations:");
|
||||
@@ -80,12 +81,13 @@ async function getSemanticEpisodes(
|
||||
sessionId,
|
||||
recentIds,
|
||||
projectSessionIds = null,
|
||||
{ semanticLimit, scoreThreshold } = {},
|
||||
) {
|
||||
try {
|
||||
const vector = await embedding.embed(userMessage);
|
||||
const results = await qdrant.searchEpisodes(vector, {
|
||||
limit: SEMANTIC_LIMIT,
|
||||
scoreThreshold: SCORE_THRESHOLD,
|
||||
limit: semanticLimit,
|
||||
scoreThreshold: scoreThreshold,
|
||||
sessionId: projectSessionIds ? null : sessionId,
|
||||
projectSessionIds,
|
||||
});
|
||||
@@ -106,45 +108,60 @@ async function getSemanticEpisodes(
|
||||
}
|
||||
|
||||
async function getRelevantEntities(userMessage) {
|
||||
try {
|
||||
const vector = await embedding.embed(userMessage);
|
||||
const results = await qdrant.searchEntities(vector);
|
||||
console.log('[orchestration] Entity search results:',
|
||||
results.map(r => ({ name: r.payload?.name, score: r.score }))
|
||||
);
|
||||
return results.map(r => r.payload).filter(Boolean);
|
||||
} catch (err) {
|
||||
console.warn('[orchestration] Entity search failed, continuing without:', err.message);
|
||||
return [];
|
||||
}
|
||||
try {
|
||||
const vector = await embedding.embed(userMessage);
|
||||
const results = await qdrant.searchEntities(vector);
|
||||
console.log(
|
||||
"[orchestration] Entity search results:",
|
||||
results.map((r) => ({ name: r.payload?.name, score: r.score })),
|
||||
);
|
||||
return results.map((r) => r.payload).filter(Boolean);
|
||||
} catch (err) {
|
||||
console.warn(
|
||||
"[orchestration] Entity search failed, continuing without:",
|
||||
err.message,
|
||||
);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async function chat(externalId, userMessage, options = {}) {
|
||||
const { recentEpisodeLimit, semanticLimit, scoreThreshold } =
|
||||
appSettings.load();
|
||||
// 1. Resolve or create session
|
||||
let session = await memory.getSessionByExternalId(externalId);
|
||||
if (!session) session = await memory.createSession(externalId);
|
||||
|
||||
let projectSessionIds = null;
|
||||
if (session.project_id) {
|
||||
try {
|
||||
const project = await memory.getProject(session.project_id);
|
||||
if (project) {
|
||||
const projectSessions = await memory.getProjectSessions(session.project_id);
|
||||
projectSessionIds = projectSessions.map(s => s.id);
|
||||
if (project.isolated === 1) {
|
||||
console.log(`[orchestration] Isolated project — restricting to ${projectSessionIds.length} sessions`);
|
||||
} else {
|
||||
console.log(`[orchestration] Non-isolated project — expanding search to ${projectSessionIds.length} sessions`);
|
||||
let projectSessionIds = null;
|
||||
if (session.project_id) {
|
||||
try {
|
||||
const project = await memory.getProject(session.project_id);
|
||||
if (project) {
|
||||
const projectSessions = await memory.getProjectSessions(
|
||||
session.project_id,
|
||||
);
|
||||
projectSessionIds = projectSessions.map((s) => s.id);
|
||||
if (project.isolated === 1) {
|
||||
console.log(
|
||||
`[orchestration] Isolated project — restricting to ${projectSessionIds.length} sessions`,
|
||||
);
|
||||
} else {
|
||||
console.log(
|
||||
`[orchestration] Non-isolated project — expanding search to ${projectSessionIds.length} sessions`,
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn(
|
||||
"[orchestration] Failed to resolve project context:",
|
||||
err.message,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn('[orchestration] Failed to resolve project context:', err.message);
|
||||
}
|
||||
}
|
||||
// 2. Fetch recent episodes for context
|
||||
const recentEpisodes = await memory.getRecentEpisodes(
|
||||
session.id,
|
||||
RECENT_EPISODE_LIMIT,
|
||||
recentEpisodeLimit,
|
||||
);
|
||||
const isFirstMessage = recentEpisodes.length === 0;
|
||||
const recentIds = new Set(recentEpisodes.map((e) => e.id));
|
||||
@@ -154,14 +171,20 @@ if (session.project_id) {
|
||||
userMessage,
|
||||
session.id,
|
||||
recentIds,
|
||||
projectSessionIds
|
||||
projectSessionIds,
|
||||
{ semanticLimit, scoreThreshold },
|
||||
);
|
||||
|
||||
// 3b. Entity Search
|
||||
const entities = await getRelevantEntities(userMessage)
|
||||
const entities = await getRelevantEntities(userMessage);
|
||||
|
||||
// 4. Assemble prompt
|
||||
const prompt = buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage);
|
||||
const prompt = buildPrompt(
|
||||
recentEpisodes,
|
||||
semanticEpisodes,
|
||||
entities,
|
||||
userMessage,
|
||||
);
|
||||
|
||||
// 5. Run inference
|
||||
const result = await inference.complete(prompt, options);
|
||||
@@ -193,118 +216,131 @@ if (session.project_id) {
|
||||
}
|
||||
|
||||
async function chatStream(externalId, userMessage, onChunk, options = {}) {
|
||||
|
||||
console.log('[orchestration] chatStream entry');
|
||||
try {
|
||||
const { recentEpisodeLimit, semanticLimit, scoreThreshold } = appSettings.load();
|
||||
let session = await memory.getSessionByExternalId(externalId);
|
||||
if (!session) session = await memory.createSession(externalId);
|
||||
|
||||
console.log("[orchestration] chatStream called:", {
|
||||
externalId,
|
||||
userMessage: userMessage.slice(0, 50),
|
||||
});
|
||||
let session = await memory.getSessionByExternalId(externalId);
|
||||
if (!session) session = await memory.createSession(externalId);
|
||||
|
||||
let projectSessionIds = null;
|
||||
if (session.project_id) {
|
||||
try {
|
||||
const project = await memory.getProject(session.project_id);
|
||||
if (project) {
|
||||
const projectSessions = await memory.getProjectSessions(session.project_id);
|
||||
projectSessionIds = projectSessions.map(s => s.id);
|
||||
if (project.isolated === 1) {
|
||||
console.log(`[orchestration] Isolated project — restricting to ${projectSessionIds.length} sessions`);
|
||||
} else {
|
||||
console.log(`[orchestration] Non-isolated project — expanding search to ${projectSessionIds.length} sessions`);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn('[orchestration] Failed to resolve project context:', err.message);
|
||||
}
|
||||
}
|
||||
|
||||
const recentEpisodes = await memory.getRecentEpisodes(
|
||||
session.id,
|
||||
RECENT_EPISODE_LIMIT,
|
||||
);
|
||||
const isFirstMessage = recentEpisodes.length === 0;
|
||||
const recentIds = new Set(recentEpisodes.map((e) => e.id));
|
||||
const semanticEpisodes = await getSemanticEpisodes(
|
||||
userMessage,
|
||||
session.id,
|
||||
recentIds,
|
||||
projectSessionIds
|
||||
);
|
||||
|
||||
const entities = await getRelevantEntities(userMessage);
|
||||
|
||||
const prompt = buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage);
|
||||
const res = await inference.completeStream(prompt, options);
|
||||
|
||||
let fullText = "";
|
||||
let model = "";
|
||||
let tokenCount = 0;
|
||||
let buffer = "";
|
||||
|
||||
for await (const chunk of res.body) {
|
||||
buffer += Buffer.from(chunk).toString("utf8");
|
||||
|
||||
const events = buffer.split("\n\n");
|
||||
buffer = events.pop() || "";
|
||||
|
||||
for (const event of events) {
|
||||
const lines = event.split("\n");
|
||||
const dataLines = lines
|
||||
.filter((line) => line.startsWith("data: "))
|
||||
.map((line) => line.slice(6));
|
||||
|
||||
if (dataLines.length === 0) continue;
|
||||
|
||||
const raw = dataLines.join("\n").trim();
|
||||
if (raw === "[DONE]") continue;
|
||||
|
||||
let projectSessionIds = null;
|
||||
if (session.project_id) {
|
||||
try {
|
||||
const data = JSON.parse(raw);
|
||||
|
||||
if (data.response) {
|
||||
fullText += data.response;
|
||||
onChunk(data.response);
|
||||
}
|
||||
|
||||
if (data.model) model = data.model;
|
||||
if (data.done && data.tokenCount !== undefined) {
|
||||
tokenCount = data.tokenCount;
|
||||
}
|
||||
|
||||
if (data.error) {
|
||||
throw new Error(data.error);
|
||||
const project = await memory.getProject(session.project_id);
|
||||
if (project) {
|
||||
const projectSessions = await memory.getProjectSessions(
|
||||
session.project_id,
|
||||
);
|
||||
projectSessionIds = projectSessions.map((s) => s.id);
|
||||
if (project.isolated === 1) {
|
||||
console.log(
|
||||
`[orchestration] Isolated project — restricting to ${projectSessionIds.length} sessions`,
|
||||
);
|
||||
} else {
|
||||
console.log(
|
||||
`[orchestration] Non-isolated project — expanding search to ${projectSessionIds.length} sessions`,
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(
|
||||
"[orchestration] Failed to parse inference SSE event:",
|
||||
raw,
|
||||
console.warn(
|
||||
"[orchestration] Failed to resolve project context:",
|
||||
err.message,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
console.log("[orchestration] final streamed text length:", fullText.length);
|
||||
|
||||
if (fullText.trim()) {
|
||||
await memory.createEpisode(session.id, userMessage, fullText, tokenCount);
|
||||
} else {
|
||||
console.warn(
|
||||
"[orchestration] Stream finished with no assistant text; episode not saved",
|
||||
const recentEpisodes = await memory.getRecentEpisodes(
|
||||
session.id,
|
||||
recentEpisodeLimit,
|
||||
);
|
||||
const isFirstMessage = recentEpisodes.length === 0;
|
||||
const recentIds = new Set(recentEpisodes.map((e) => e.id));
|
||||
const semanticEpisodes = await getSemanticEpisodes(
|
||||
userMessage,
|
||||
session.id,
|
||||
recentIds,
|
||||
projectSessionIds,
|
||||
{semanticLimit, scoreThreshold }
|
||||
);
|
||||
}
|
||||
|
||||
if (isFirstMessage && !session.name) {
|
||||
autoNameSession(externalId, userMessage, fullText).catch(() => {});
|
||||
}
|
||||
const entities = await getRelevantEntities(userMessage);
|
||||
|
||||
return { model, tokenCount };
|
||||
} catch (err) {
|
||||
console.error('[orchestration] chatStream fatal error:', err.message, err.stack);
|
||||
const prompt = buildPrompt(
|
||||
recentEpisodes,
|
||||
semanticEpisodes,
|
||||
entities,
|
||||
userMessage,
|
||||
);
|
||||
const res = await inference.completeStream(prompt, options);
|
||||
|
||||
let fullText = "";
|
||||
let model = "";
|
||||
let tokenCount = 0;
|
||||
let buffer = "";
|
||||
|
||||
for await (const chunk of res.body) {
|
||||
buffer += Buffer.from(chunk).toString("utf8");
|
||||
|
||||
const events = buffer.split("\n\n");
|
||||
buffer = events.pop() || "";
|
||||
|
||||
for (const event of events) {
|
||||
const lines = event.split("\n");
|
||||
const dataLines = lines
|
||||
.filter((line) => line.startsWith("data: "))
|
||||
.map((line) => line.slice(6));
|
||||
|
||||
if (dataLines.length === 0) continue;
|
||||
|
||||
const raw = dataLines.join("\n").trim();
|
||||
if (raw === "[DONE]") continue;
|
||||
|
||||
try {
|
||||
const data = JSON.parse(raw);
|
||||
|
||||
if (data.response) {
|
||||
fullText += data.response;
|
||||
onChunk(data.response);
|
||||
}
|
||||
|
||||
if (data.model) model = data.model;
|
||||
if (data.done && data.tokenCount !== undefined) {
|
||||
tokenCount = data.tokenCount;
|
||||
}
|
||||
|
||||
if (data.error) {
|
||||
throw new Error(data.error);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(
|
||||
"[orchestration] Failed to parse inference SSE event:",
|
||||
raw,
|
||||
err.message,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
console.log("[orchestration] final streamed text length:", fullText.length);
|
||||
|
||||
if (fullText.trim()) {
|
||||
await memory.createEpisode(session.id, userMessage, fullText, tokenCount);
|
||||
} else {
|
||||
console.warn(
|
||||
"[orchestration] Stream finished with no assistant text; episode not saved",
|
||||
);
|
||||
}
|
||||
|
||||
if (isFirstMessage && !session.name) {
|
||||
autoNameSession(externalId, userMessage, fullText).catch(() => {});
|
||||
}
|
||||
|
||||
return { model, tokenCount };
|
||||
} catch (err) {
|
||||
console.error(
|
||||
"[orchestration] chatStream fatal error:",
|
||||
err.message,
|
||||
err.stack,
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user