237 lines
7.2 KiB
JavaScript
237 lines
7.2 KiB
JavaScript
const {getDB} = require('../db');
|
|
const { EPISODIC, getEnv, SERVICES, parseRow, formatEpisodeText, SUMMARIES, logger } = require('@nexusai/shared');
|
|
const semantic = require('../semantic');
|
|
const { extractAndStoreEntities } = require('../entities/extraction')
|
|
|
|
// --Sessions --------------------------------------------------
|
|
|
|
// Creates a new session with the given external ID and optional metadata
|
|
function createSession(externalId, metadata = null) {
|
|
const db = getDB();
|
|
const stmt = db.prepare(`
|
|
INSERT INTO sessions (external_id, metadata)
|
|
VALUES (?, ?)
|
|
`);
|
|
|
|
const result = stmt.run(externalId, metadata ? JSON.stringify(metadata) : null);
|
|
return getSession(result.lastInsertRowid);
|
|
}
|
|
|
|
// Retrieves a session by its external ID
|
|
function getSession(id) {
|
|
const db = getDB();
|
|
const stmt = db.prepare(`SELECT * FROM sessions WHERE id = ?`);
|
|
return parseRow(stmt.get(id));
|
|
}
|
|
|
|
|
|
function getSessions(limit = EPISODIC.DEFAULT_PAGE_SIZE, offset = EPISODIC.DEFAULT_OFFSET, projectId = null) {
|
|
const db = getDB();
|
|
const stmt = projectId
|
|
? db.prepare(`
|
|
SELECT * FROM sessions
|
|
WHERE project_id = ?
|
|
ORDER BY updated_at DESC
|
|
LIMIT ? OFFSET ?
|
|
`)
|
|
: db.prepare(`
|
|
SELECT * FROM sessions
|
|
ORDER BY updated_at DESC
|
|
LIMIT ? OFFSET ?
|
|
`);
|
|
|
|
return projectId
|
|
? stmt.all(projectId, limit, offset).map(parseRow)
|
|
: stmt.all(limit, offset).map(parseRow);
|
|
}
|
|
|
|
// Retrieves a session by its external ID
|
|
function getSessionByExternalId(externalId) {
|
|
const db = getDB();
|
|
const stmt = db.prepare(`SELECT * FROM sessions WHERE external_id = ?`);
|
|
return parseRow(stmt.get(externalId));
|
|
}
|
|
|
|
// Updates the updated_at timestamp of a session to the current time
|
|
function touchSession(id) {
|
|
const db = getDB();
|
|
db.prepare(`UPDATE sessions SET updated_at = unixepoch() WHERE id = ?`).run(id);
|
|
}
|
|
|
|
// Deletes a session and all associated episodes, entities, relationships, and summaries
|
|
function deleteSession(id) {
|
|
const db = getDB();
|
|
db.prepare(`DELETE FROM sessions WHERE id = ?`).run(id);
|
|
}
|
|
|
|
function updateSession(id, { name, projectId } = {}) {
|
|
const db = getDB();
|
|
|
|
// Build update dynamically based on what was provided
|
|
const updates = [];
|
|
const values = [];
|
|
|
|
if (name !== undefined) { updates.push('name = ?'); values.push(name ?? null); }
|
|
if (projectId !== undefined) { updates.push('project_id = ?'); values.push(projectId ?? null); }
|
|
|
|
if (updates.length === 0) return getSession(id);
|
|
|
|
updates.push('updated_at = unixepoch()');
|
|
values.push(id);
|
|
|
|
db.prepare(`UPDATE sessions SET ${updates.join(', ')} WHERE id = ?`).run(...values);
|
|
return getSession(id);
|
|
}
|
|
|
|
function updateSessionByExternalId(externalId, fields) {
|
|
const session = getSessionByExternalId(externalId);
|
|
if (!session) throw new Error('Session not found');
|
|
return updateSession(session.id, fields);
|
|
}
|
|
|
|
function deleteSessionByExternalId(externalId) {
|
|
const session = getSessionByExternalId(externalId);
|
|
if(!session) throw new Error('Session not found');
|
|
deleteSession(session.id);
|
|
}
|
|
|
|
|
|
// --Episodes --------------------------------------------------
|
|
// Creates a new episode linked to a session, with user message, AI response, optional token count, and metadata
|
|
async function createEpisode(sessionId, userMessage, aiResponse, tokenCount = null, projectId=null) {
|
|
const db = getDB();
|
|
|
|
// Wrap insert + session touch in a transaction — both succeed or neither does
|
|
const insert = db.transaction(() => {
|
|
const stmt = db.prepare(`
|
|
INSERT INTO episodes (session_id, user_message, ai_response, token_count)
|
|
VALUES (?, ?, ?, ?)
|
|
`);
|
|
const result = stmt.run(
|
|
sessionId,
|
|
userMessage,
|
|
aiResponse,
|
|
tokenCount,
|
|
);
|
|
touchSession(sessionId);
|
|
return getEpisode(result.lastInsertRowid);
|
|
});
|
|
|
|
const episode = insert();
|
|
|
|
//embed ascynchronously after SQLite completes, non-blocking. If embedding fail, the episode still saved.
|
|
getEpisodeEmbedding(userMessage, aiResponse)
|
|
.then(vector => semantic.upsertEpisode(episode.id, vector, {
|
|
sessionId: episode.session_id,
|
|
createdAt: episode.created_at
|
|
}))
|
|
.catch(err => logger.error(`Failed to embed episode ${episode.id}:`, err.message));
|
|
|
|
extractAndStoreEntities(userMessage, aiResponse, projectId)
|
|
.catch(err => logger.error(`Failed to extract entities for episode ${episode.id}:`, err.message));
|
|
|
|
|
|
return episode;
|
|
}
|
|
|
|
// Retrieves an episode by its ID
|
|
function getEpisode(id) {
|
|
const db = getDB();
|
|
const stmt = db.prepare(`SELECT * FROM episodes WHERE id = ?`);
|
|
return parseRow(stmt.get(id));
|
|
}
|
|
|
|
// Retrieves episodes for a given session, ordered by creation time descending, with pagination
|
|
function getEpisodesBySession(sessionId, limit = EPISODIC.DEFAULT_PAGE_SIZE, offset = EPISODIC.DEFAULT_OFFSET) {
|
|
const db = getDB();
|
|
const stmt = db.prepare(`
|
|
SELECT * FROM episodes
|
|
WHERE session_id = ?
|
|
ORDER BY created_at DESC
|
|
LIMIT ? OFFSET ?
|
|
`);
|
|
return stmt.all(sessionId, limit, offset).map(parseRow);
|
|
}
|
|
|
|
// Retrieves recent episodes across all sessions, ordered by creation time descending, with a limit
|
|
function getRecentEpisodes(sessionId, limit = EPISODIC.DEFAULT_RECENT_LIMIT) {
|
|
// Cross-session recent episodes — useful for recency-based retrieval
|
|
const db = getDB();
|
|
const stmt = db.prepare(`
|
|
SELECT * FROM episodes
|
|
WHERE session_id = ?
|
|
ORDER BY created_at DESC
|
|
LIMIT ?
|
|
`);
|
|
return stmt.all(limit).map(parseRow);
|
|
}
|
|
|
|
|
|
// Searches episodes using FTS5 full-text search, ordered by relevance, with a limit
|
|
function searchEpisodes(query, limit = EPISODIC.DEFAULT_SEARCH_LIMIT) {
|
|
// FTS5 full-text search across all episodes
|
|
const db = getDB();
|
|
const stmt = db.prepare(`
|
|
SELECT e.* FROM episodes e
|
|
JOIN episodes_fts fts ON e.id = fts.rowid
|
|
WHERE episodes_fts MATCH ?
|
|
ORDER BY rank
|
|
LIMIT ?
|
|
`);
|
|
return stmt.all(query, limit).map(parseRow);
|
|
}
|
|
|
|
// Deletes an episode by its ID
|
|
function deleteEpisode(id) {
|
|
const db = getDB();
|
|
db.prepare(`DELETE FROM episodes WHERE id = ?`).run(id);
|
|
}
|
|
|
|
/******** Embedding Helper ********/
|
|
async function getEpisodeEmbedding(userMessage, aiResponse){
|
|
const url = getEnv('EMBEDDING_SERVICE_URL', SERVICES.EMBEDDING_URL);
|
|
|
|
//Combine user message and AI response for embedding
|
|
const text = formatEpisodeText(userMessage, aiResponse);
|
|
|
|
const res = await fetch(`${url}/embed`, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({ text })
|
|
})
|
|
|
|
if (!res.ok) {
|
|
throw new Error(`Embedding service error: ${res.status} ${res.statusText}`);
|
|
}
|
|
const data = await res.json();
|
|
return data.embedding;
|
|
}
|
|
|
|
function getEpisodesByProject(projectId, limit = SUMMARIES.MAX_PROJECT_EPISODE_LIMIT) {
|
|
const db = getDB();
|
|
return db.prepare(`
|
|
SELECT e.* FROM episodes e
|
|
JOIN sessions s ON s.id = e.session_id
|
|
WHERE s.project_id = ?
|
|
ORDER BY e.created_at ASC
|
|
LIMIT ?
|
|
`).all(projectId, limit).map(parseRow);
|
|
}
|
|
|
|
module.exports = {
|
|
createSession,
|
|
getSession,
|
|
getSessions,
|
|
getSessionByExternalId,
|
|
deleteSession,
|
|
updateSession,
|
|
updateSessionByExternalId,
|
|
deleteSessionByExternalId,
|
|
createEpisode,
|
|
getEpisode,
|
|
getEpisodesBySession,
|
|
getRecentEpisodes,
|
|
searchEpisodes,
|
|
deleteEpisode,
|
|
getEpisodesByProject
|
|
}; |