Files
nexusAI/packages/memory-service/src/episodic/index.js
2026-04-14 01:07:59 -07:00

213 lines
6.3 KiB
JavaScript

const {getDB} = require('../db');
const { EPISODIC, getEnv, SERVICES, parseRow, formatEpisodeText } = require('@nexusai/shared');
const semantic = require('../semantic');
// --Sessions --------------------------------------------------
// Creates a new session with the given external ID and optional metadata
function createSession(externalId, metadata = null) {
const db = getDB();
const stmt = db.prepare(`
INSERT INTO sessions (external_id, metadata)
VALUES (?, ?)
`);
const result = stmt.run(externalId, metadata ? JSON.stringify(metadata) : null);
return getSession(result.lastInsertRowid);
}
// Retrieves a session by its external ID
function getSession(id) {
const db = getDB();
const stmt = db.prepare(`SELECT * FROM sessions WHERE id = ?`);
return parseRow(stmt.get(id));
}
function getSessions(limit = EPISODIC.DEFAULT_PAGE_SIZE, offset = 0, projectId = null) {
const db = getDB();
const stmt = projectId
? db.prepare(`
SELECT * FROM sessions
WHERE project_id = ?
ORDER BY updated_at DESC
LIMIT ? OFFSET ?
`)
: db.prepare(`
SELECT * FROM sessions
ORDER BY updated_at DESC
LIMIT ? OFFSET ?
`);
return projectId
? stmt.all(projectId, limit, offset).map(parseRow)
: stmt.all(limit, offset).map(parseRow);
}
// Retrieves a session by its external ID
function getSessionByExternalId(externalId) {
const db = getDB();
const stmt = db.prepare(`SELECT * FROM sessions WHERE external_id = ?`);
return parseRow(stmt.get(externalId));
}
// Updates the updated_at timestamp of a session to the current time
function touchSession(id) {
const db = getDB();
db.prepare(`UPDATE sessions SET updated_at = unixepoch() WHERE id = ?`).run(id);
}
// Deletes a session and all associated episodes, entities, relationships, and summaries
function deleteSession(id) {
const db = getDB();
db.prepare(`DELETE FROM sessions WHERE id = ?`).run(id);
}
function updateSession(id, { name, projectId } = {}) {
const db = getDB();
db.prepare(`
UPDATE sessions
SET name = ?,
project_id = COALESCE(?, project_id),
updated_at = unixepoch()
WHERE id = ?
`).run(name ?? null, projectId ?? null, id);
return getSession(id);
}
function updateSessionByExternalId(externalId, fields) {
const session = getSessionByExternalId(externalId);
if (!session) throw new Error('Session not found');
return updateSession(session.id, fields);
}
function deleteSessionByExternalId(externalId) {
const session = getSessionByExternalId(externalId);
if(!session) throw new Error('Session not found');
deleteSession(session.id);
}
// --Episodes --------------------------------------------------
// Creates a new episode linked to a session, with user message, AI response, optional token count, and metadata
async function createEpisode(sessionId, userMessage, aiResponse, tokenCount = null, metadata = null) {
const db = getDB();
// Wrap insert + session touch in a transaction — both succeed or neither does
const insert = db.transaction(() => {
const stmt = db.prepare(`
INSERT INTO episodes (session_id, user_message, ai_response, token_count, metadata)
VALUES (?, ?, ?, ?, ?)
`);
const result = stmt.run(
sessionId,
userMessage,
aiResponse,
tokenCount,
metadata ? JSON.stringify(metadata) : null
);
touchSession(sessionId);
return getEpisode(result.lastInsertRowid);
});
const episode= insert();
//embed ascynchronously after SQLite completes, non-blocking. If embedding fail, the episode still saved.
getEpisodeEmbedding(userMessage, aiResponse)
.then(vector => semantic.upsertEpisode(episode.id, vector, {
sessionId: episode.session_id,
createdAt: episode.created_at
}))
.catch(err => console.error(`Failed to embed episode ${episode.id}:`, err.message));
return episode;
}
// Retrieves an episode by its ID
function getEpisode(id) {
const db = getDB();
const stmt = db.prepare(`SELECT * FROM episodes WHERE id = ?`);
return parseRow(stmt.get(id));
}
// Retrieves episodes for a given session, ordered by creation time descending, with pagination
function getEpisodesBySession(sessionId, limit = EPISODIC.DEFAULT_PAGE_SIZE, offset = 0) {
const db = getDB();
const stmt = db.prepare(`
SELECT * FROM episodes
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?
`);
return stmt.all(sessionId, limit, offset).map(parseRow);
}
// Retrieves recent episodes across all sessions, ordered by creation time descending, with a limit
function getRecentEpisodes(limit = EPISODIC.DEFAULT_RECENT_LIMIT) {
// Cross-session recent episodes — useful for recency-based retrieval
const db = getDB();
const stmt = db.prepare(`
SELECT * FROM episodes
ORDER BY created_at DESC
LIMIT ?
`);
return stmt.all(limit).map(parseRow);
}
// Searches episodes using FTS5 full-text search, ordered by relevance, with a limit
function searchEpisodes(query, limit = EPISODIC.DEFAULT_SEARCH_LIMIT) {
// FTS5 full-text search across all episodes
const db = getDB();
const stmt = db.prepare(`
SELECT e.* FROM episodes e
JOIN episodes_fts fts ON e.id = fts.rowid
WHERE episodes_fts MATCH ?
ORDER BY rank
LIMIT ?
`);
return stmt.all(query, limit).map(parseRow);
}
// Deletes an episode by its ID
function deleteEpisode(id) {
const db = getDB();
db.prepare(`DELETE FROM episodes WHERE id = ?`).run(id);
}
/******** Embedding Helper ********/
async function getEpisodeEmbedding(userMessage, aiResponse){
const url = getEnv('EMBEDDING_SERVICE_URL', SERVICES.EMBEDDING_URL);
//Combine user message and AI response for embedding
const text = formatEpisodeText(userMessage, aiResponse);
const res = await fetch(`${url}/embed`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ text })
})
if (!res.ok) {
throw new Error(`Embedding service error: ${res.status} ${res.statusText}`);
}
const data = await res.json();
return data.embedding;
}
module.exports = {
createSession,
getSession,
getSessions,
getSessionByExternalId,
deleteSession,
updateSession,
updateSessionByExternalId,
deleteSessionByExternalId,
createEpisode,
getEpisode,
getEpisodesBySession,
getRecentEpisodes,
searchEpisodes,
deleteEpisode
};