roadmap phase 1 complete
This commit is contained in:
@@ -19,15 +19,26 @@ Default port: **3002**. Requires Qdrant and the embedding-service to be reachabl
|
||||
|
||||
- `sessions` and `episodes` are linked by FK with cascade delete — deleting a session removes all its episodes automatically.
|
||||
- `episodes_fts` is an FTS5 virtual table that mirrors `user_message` and `ai_response`. It is kept in sync via SQL triggers on INSERT/UPDATE/DELETE. On service startup, the FTS index is fully rebuilt from live episode data.
|
||||
- Several columns (`sessions.name`, `sessions.project_id`, `projects.isolated`, etc.) were added as migrations using `ALTER TABLE` wrapped in individual try-catch blocks. Failures are silently swallowed — if a column already exists, the alter fails and the service continues. The `idx_summaries_project` index is defined twice (benign duplicate).
|
||||
- Several columns (`sessions.name`, `sessions.project_id`, `entities.mention_count`, etc.) were added as migrations using `ALTER TABLE` wrapped in individual try-catch blocks. Failures are silently swallowed — if a column already exists, the alter fails and the service continues. The `idx_summaries_project` index is defined twice (benign duplicate).
|
||||
- `summaries` rows with `session_id IS NULL` and a `project_id` represent project-level overviews, not session summaries. This distinction is how `GET /projects/:id/overview` works.
|
||||
- `entity_episodes` is a join table linking entities to the episodes where they were first extracted. Used for provenance tracking and future orphan cleanup. Defined in `schema.js` (not a migration), so it exists on all installs.
|
||||
|
||||
**New columns on `entities` (added via migration):**
|
||||
- `mention_count INTEGER DEFAULT 1` — incremented every time this entity is re-extracted
|
||||
- `confidence REAL DEFAULT 1.0` — reserved for future confidence scoring
|
||||
- `source TEXT DEFAULT 'extraction'` — `'extraction'` or `'manual'`
|
||||
- `last_seen_at INTEGER` — Unix timestamp of most recent extraction hit
|
||||
|
||||
**New columns on `relationships` (added via migration):**
|
||||
- `mention_count INTEGER DEFAULT 1` — incremented every time this edge is re-extracted
|
||||
- `notes TEXT` — relationship context sentence from extraction
|
||||
|
||||
## Async Pipeline: Episode Creation
|
||||
|
||||
`POST /episodes` returns a 201 as soon as the SQLite insert succeeds. Two background tasks run after without blocking the response:
|
||||
|
||||
1. **Embedding** — Fetches a vector from embedding-service, stores to Qdrant with `{sessionId, createdAt}` as payload metadata.
|
||||
2. **Entity extraction** — Sends the episode text to Ollama (`qwen2.5:3b`, temp 0.1, 200 tokens) and upserts any recognized entities to both SQLite and Qdrant.
|
||||
2. **Entity + relationship extraction** — Sends the episode text to Ollama (`qwen2.5:3b`, temp 0.1, 1500 tokens) and upserts any recognized entities and relationships to both SQLite and Qdrant. Also links each entity to the episode via `entity_episodes`.
|
||||
|
||||
Both tasks catch and log errors silently. An episode can exist in SQLite with no corresponding Qdrant point if either step fails.
|
||||
|
||||
@@ -36,26 +47,39 @@ Both tasks catch and log errors silently. An episode can exist in SQLite with no
|
||||
`src/entities/extraction.js`:
|
||||
|
||||
- Fetches the last 20 known entities from SQLite before prompting the model, so the prompt can ask for name/type consistency with existing entries.
|
||||
- Recognized types: `person`, `place`, `project`, `technology`, `concept`, `organization` — anything else is discarded.
|
||||
- Recognized entity types: `person`, `place`, `project`, `technology`, `concept`, `organization` — anything else is discarded.
|
||||
- Ignores a hardcoded list of low-value names (`hello`, `thanks`, `good morning`, etc.).
|
||||
- Extracts JSON using a regex (`{...}`) applied to raw model output, so surrounding prose doesn't break parsing.
|
||||
- Entity upsert uses `ON CONFLICT(name, type) DO UPDATE` — preserves existing `notes` if the new extraction returns null (`COALESCE(entities.notes, excluded.notes)`).
|
||||
- The model is asked to return both entities and relationships in a single JSON response: `{ "entities": [...], "relationships": [...] }`.
|
||||
- Entity upsert uses `ON CONFLICT(name, type) DO UPDATE` — preserves existing `notes` if the new extraction returns null, increments `mention_count`, updates `last_seen_at`.
|
||||
- Relationship upsert uses `ON CONFLICT(from_id, to_id, label) DO UPDATE` — increments `mention_count`, preserves existing `notes` if new is null.
|
||||
- Relationships are resolved by looking up both endpoints in the `entityMap` built during entity processing — if either entity wasn't saved (filtered out or invalid type), the relationship is silently dropped.
|
||||
- After upsert, embeds each entity as `"${name} (${type}): ${notes}"` and stores to Qdrant with `projectId` in the payload for project-scoped filtering.
|
||||
|
||||
> For full details see `docs/services/entity-extraction.md` and `docs/services/knowledge-graph.md`.
|
||||
|
||||
## Knowledge Graph
|
||||
|
||||
`src/graph/index.js` provides two SQLite traversal functions:
|
||||
|
||||
- **`getNeighborhood(entityId, depth)`** — Single-entity recursive CTE traversal. Bidirectional (follows edges in both directions). Returns `{ nodes: [...entities], edges: [...relationships] }`. Depth defaults to `ENTITIES.GRAPH_HOP_DEPTH` (1), max enforced to 3 at the HTTP layer.
|
||||
|
||||
- **`getEntityNeighbors(entityIds[])`** — Bulk 1-hop version for orchestration. Given a set of seed entity IDs, returns their immediate neighbors plus all edges within the combined node set.
|
||||
|
||||
The recursive CTE uses `UNION` (not `UNION ALL`) to eliminate cycles and duplicate visits automatically.
|
||||
|
||||
> For full design rationale and usage see `docs/services/knowledge-graph.md`.
|
||||
|
||||
## Summarization Strategy
|
||||
|
||||
`src/summarization/project.js`:
|
||||
|
||||
- Preferred path: generate a project overview from existing **session-level summaries** (higher-level abstraction, shorter input).
|
||||
- Fallback path: if no session summaries exist, summarize raw episodes directly (up to `SUMMARIES.MAX_PROJECT_EPISODE_LIMIT`).
|
||||
- Both paths truncate input at `SUMMARIES.MAX_SUMMARY_CHARS` (30,000 chars) by slicing from the end (most recent content wins).
|
||||
- Both paths truncate input at `SUMMARIES.MAX_SUMMARY_CHARS` (8,000 chars) by slicing from the end (most recent content wins).
|
||||
- Strips ChatML tokens from the Ollama response (`<|im_start|>`, `<|im_end|>`).
|
||||
- Uses temp 0.2 and `num_predict 1200`.
|
||||
|
||||
## Known Quirk: `getRecentEpisodes`
|
||||
|
||||
`src/episodic/index.js` `getRecentEpisodes(sessionId, limit)` has a parameter mismatch — the SQLite query binds only `limit`, not `sessionId`, so it returns recent episodes across **all sessions**. Orchestration-service uses `getEpisodesBySession()` (the paginated route) instead, so this bug is not visible in normal operation. Don't rely on `getRecentEpisodes` when you need session-scoped results.
|
||||
|
||||
## Qdrant Client
|
||||
|
||||
`src/semantic/index.js` creates the Qdrant client lazily on first use and reuses it. All three collections (`episodes`, `entities`, `summaries`) are created at startup if missing. There is no connection health check — if Qdrant is unreachable, semantic operations throw at call time.
|
||||
@@ -68,16 +92,18 @@ Both tasks catch and log errors silently. An episode can exist in SQLite with no
|
||||
| GET/POST | `/sessions` | POST requires `externalId`; duplicate → 409 |
|
||||
| GET/PATCH | `/sessions/by-external/:externalId` | PATCH accepts `name`, `projectId` |
|
||||
| DELETE | `/sessions/by-external/:externalId` | Cascades to episodes, summaries, relationships |
|
||||
| GET/POST | `/episodes` | POST triggers async embedding + entity extraction |
|
||||
| GET/POST | `/episodes` | POST triggers async embedding + entity/relationship extraction |
|
||||
| GET | `/episodes/search` | FTS5 search; route must precede `/:id` |
|
||||
| GET | `/sessions/:id/episodes` | Paginated, ordered `created_at DESC` |
|
||||
| DELETE | `/episodes/:id` | Removes from SQLite + async Qdrant delete |
|
||||
| POST | `/entities` | Upsert by `(name, type)` |
|
||||
| POST | `/entities` | Upsert by `(name, type)`; increments `mention_count` on conflict |
|
||||
| GET | `/entities/by-type/:type` | All entities of given type |
|
||||
| GET/DELETE | `/entities/:id` | |
|
||||
| POST | `/relationships` | Upsert by `(fromId, toId, label)`; conflict = no-op |
|
||||
| POST | `/relationships` | Upsert by `(fromId, toId, label)`; increments `mention_count` on conflict. Body: `fromId`, `toId`, `label`, `notes` (optional) |
|
||||
| GET | `/entities/:id/relationships` | Outbound only |
|
||||
| DELETE | `/relationships` | Body: `fromId`, `toId`, `label` |
|
||||
| GET | `/graph/neighborhood/:entityId` | Single-entity neighborhood; `?depth=` (default 1, max 3) |
|
||||
| POST | `/graph/neighbors` | Bulk 1-hop neighborhood; body: `{ entityIds: [...] }` |
|
||||
| GET/POST | `/projects` | POST requires non-empty `name` |
|
||||
| GET/PATCH/DELETE | `/projects/:id` | |
|
||||
| POST | `/projects/:id/summarize` | On-demand overview generation; 422 if no data |
|
||||
|
||||
@@ -54,9 +54,14 @@ function getDB() {
|
||||
db.exec(`CREATE INDEX IF NOT EXISTS idx_summaries_session ON summaries(session_id)`);
|
||||
} catch {}
|
||||
|
||||
try {
|
||||
db.exec(`CREATE INDEX IF NOT EXISTS idx_summaries_project ON summaries(project_id)`);
|
||||
} catch {}
|
||||
try { db.exec(`ALTER TABLE entities ADD COLUMN mention_count INTEGER NOT NULL DEFAULT 1`) } catch {}
|
||||
try { db.exec(`ALTER TABLE entities ADD COLUMN confidence REAL NOT NULL DEFAULT 1.0`) } catch {}
|
||||
try { db.exec(`ALTER TABLE entities ADD COLUMN source TEXT NOT NULL DEFAULT 'extraction'`) } catch {}
|
||||
try { db.exec(`ALTER TABLE entities ADD COLUMN last_seen_at INTEGER`) } catch {}
|
||||
|
||||
try { db.exec(`ALTER TABLE relationships ADD COLUMN mention_count INTEGER NOT NULL DEFAULT 1`) } catch {}
|
||||
try { db.exec(`ALTER TABLE relationships ADD COLUMN notes TEXT`) } catch {}
|
||||
|
||||
|
||||
// Sync FTS index with any existing episodes data
|
||||
db.exec(`INSERT OR REPLACE INTO episodes_fts(rowid, user_message, ai_response)
|
||||
|
||||
@@ -38,6 +38,20 @@ const schema = `
|
||||
UNIQUE(from_id, to_id, label)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_relationships_from ON relationships(from_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_relationships_to ON relationships(to_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS entity_episodes (
|
||||
entity_id INTEGER NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
|
||||
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
|
||||
PRIMARY KEY (entity_id, episode_id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_entity_episodes_entity ON entity_episodes(entity_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_entity_episodes_episode ON entity_episodes(episode_id);
|
||||
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS projects (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
const semantic = require('../semantic')
|
||||
const { getEnv, SERVICES, formatEpisodeText, ENTITIES, logger } = require('@nexusai/shared');
|
||||
const { upsertEntity } = require('./index');
|
||||
const { upsertEntity, upsertRelationship, linkEntityToEpisode } = require('./index');
|
||||
|
||||
const EXTRACTION_URL = getEnv('EXTRACTION_URL', 'http://localhost:11434');
|
||||
const EXTRACTION_MODEL = getEnv('EXTRACTION_MODEL', 'qwen2.5:3b');
|
||||
const EXTRACTION_MODEL = getEnv('EXTRACTION_MODEL', 'qwen2.5:3b'); // ChatML format — see buildExtractionPrompt
|
||||
const EMBEDDING_SERVICE_URL = getEnv('EMBEDDING_SERVICE_URL', SERVICES.EMBEDDING_URL);
|
||||
|
||||
const ENTITY_TYPES = ['person', 'place', 'project', 'technology', 'concept', 'organization'];
|
||||
const IGNORED_NAMES = ['good morning', 'good night', 'hello', 'goodbye', 'thanks', 'thank you'];
|
||||
|
||||
// NOTE: This prompt uses ChatML format (<|im_start|> / <|im_end|> tags), which is
|
||||
// specific to qwen-family models. If EXTRACTION_MODEL is changed to a Llama-family
|
||||
// or other model, this format will need to change — most alternatives use either
|
||||
// plain text or [INST] / <<SYS>> tags. Silent degradation is likely if mismatched.
|
||||
function buildExtractionPrompt(userMessage, aiResponse, knownEntities = []) {
|
||||
const knownBlock = knownEntities.length > 0
|
||||
? [
|
||||
@@ -20,21 +24,22 @@ function buildExtractionPrompt(userMessage, aiResponse, knownEntities = []) {
|
||||
|
||||
return [
|
||||
'<|im_start|>system',
|
||||
'You are a named entity extractor. You output only valid JSON.',
|
||||
'You are a named entity and relationship extractor. You output only valid JSON.',
|
||||
'<|im_end|>',
|
||||
'<|im_start|>user',
|
||||
'Read the conversation below and extract every named entity mentioned.',
|
||||
`Entity types to extract: ${ENTITY_TYPES.join(', ')}`,
|
||||
'For each entity found, provide: name, type, and a one-sentence notes field.',
|
||||
'Return your answer as: { "entities": [ ... ] }',
|
||||
'For each entity found, you MUST provide a non-empty notes field describing it based on the conversation.',
|
||||
'For each entity found, provide:',
|
||||
' "name": short proper noun only (max 4 words, e.g. "Sydney", "NexusAI", "Tim")',
|
||||
'Read the conversation below and extract all named entities and the relationships between them.',
|
||||
`Entity types: ${ENTITY_TYPES.join(', ')}`,
|
||||
'For each entity provide:',
|
||||
' "name": short proper noun only (max 4 words)',
|
||||
' "type": one of the valid types',
|
||||
' "notes": one specific sentence about this entity based on the conversation (not generic)',
|
||||
' "notes": one specific sentence about this entity based on the conversation',
|
||||
'For relationships, use snake_case verb labels (e.g. works_on, manages, uses, knows, located_in, part_of, created_by).',
|
||||
'Only include relationships between entities you have listed above.',
|
||||
'Return this exact JSON structure:',
|
||||
'{ "entities": [{"name": "...", "type": "...", "notes": "..."}], "relationships": [{"from": "...", "fromType": "...", "to": "...", "toType": "...", "label": "...", "notes": "..."}] }',
|
||||
'',
|
||||
knownBlock,
|
||||
'--- CONVERSATION ---', // clear delimiter helps smaller models
|
||||
'--- CONVERSATION ---',
|
||||
`User: ${userMessage}`,
|
||||
`Assistant: ${aiResponse}`,
|
||||
'--- END CONVERSATION ---',
|
||||
@@ -58,7 +63,7 @@ async function embedEntity(entity) {
|
||||
return data.embedding;
|
||||
}
|
||||
|
||||
async function extractAndStoreEntities(userMessage, aiResponse, projectId=null) {
|
||||
async function extractAndStoreEntities(userMessage, aiResponse, episodeId=null, projectId=null) {
|
||||
logger.info('[entities] Extraction triggered')
|
||||
try {
|
||||
// Fetch existing entities to guide the model toward consistent name/type pairs
|
||||
@@ -105,21 +110,23 @@ async function extractAndStoreEntities(userMessage, aiResponse, projectId=null)
|
||||
const entities = Array.isArray(parsed.entities) ? parsed.entities : [];
|
||||
if (entities.length === 0) {
|
||||
logger.debug('[entities] No entities found in this exchange — skipping');
|
||||
return; // not an error, just nothing to extract
|
||||
return;
|
||||
}
|
||||
|
||||
// Map of "name::type" → saved entity, used for relationship resolution below
|
||||
const entityMap = new Map();
|
||||
let saved = 0;
|
||||
|
||||
|
||||
for (const { name, type, notes } of entities) {
|
||||
|
||||
if (!name || !type || !ENTITY_TYPES.includes(type)) continue;
|
||||
if (IGNORED_NAMES.includes(name.toLowerCase())) continue;
|
||||
|
||||
const entity = upsertEntity(name, type, notes ?? null);
|
||||
entityMap.set(`${name}::${type}`, entity);
|
||||
logger.info('[entities] Upserted entity:', entity);
|
||||
|
||||
// Embed and upsert to Qdrant fire-and-forget
|
||||
if (episodeId) linkEntityToEpisode(entity.id, episodeId);
|
||||
|
||||
embedEntity(entity)
|
||||
.then(vector => semantic.upsertEntity(entity.id, vector, {
|
||||
name: entity.name,
|
||||
@@ -136,6 +143,23 @@ async function extractAndStoreEntities(userMessage, aiResponse, projectId=null)
|
||||
|
||||
if (saved > 0) logger.info(`[entities] Extracted and stored ${saved} entities`);
|
||||
|
||||
// Process extracted relationships — both entities must have been saved above
|
||||
const relationships = Array.isArray(parsed.relationships) ? parsed.relationships : [];
|
||||
let relSaved = 0;
|
||||
|
||||
for (const { from, fromType, to, toType, label, notes } of relationships) {
|
||||
if (!from || !fromType || !to || !toType || !label) continue;
|
||||
|
||||
const fromEntity = entityMap.get(`${from}::${fromType}`);
|
||||
const toEntity = entityMap.get(`${to}::${toType}`);
|
||||
if (!fromEntity || !toEntity) continue;
|
||||
|
||||
upsertRelationship(fromEntity.id, toEntity.id, label, notes ?? null);
|
||||
relSaved++;
|
||||
}
|
||||
|
||||
if (relSaved > 0) logger.info(`[entities] Extracted and stored ${relSaved} relationships`);
|
||||
|
||||
} catch (err) {
|
||||
// Non-critical — log and move on, episode is already saved
|
||||
logger.warn('[entities] Extraction failed:', err.message);
|
||||
|
||||
@@ -4,18 +4,23 @@ const { parseRow } = require ('@nexusai/shared')
|
||||
/******* Entities ********/
|
||||
|
||||
// Upsert an entity - insert or update if (name, type) already exists
|
||||
function upsertEntity(name, type, notes = null, metadata = null) {
|
||||
function upsertEntity(name, type, notes = null, metadata = null, source = 'extraction') {
|
||||
const db = getDB();
|
||||
const stmt = db.prepare(`
|
||||
INSERT INTO entities (name, type, notes, metadata)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(name, type) DO UPDATE SET
|
||||
notes = COALESCE(entities.notes, excluded.notes),
|
||||
metadata = excluded.metadata,
|
||||
updated_at = unixepoch()
|
||||
`);
|
||||
const result = stmt.run(name, type, notes, metadata ? JSON.stringify(metadata) : null);
|
||||
|
||||
const stmt = db.prepare(`
|
||||
INSERT INTO entities (name, type, notes, metadata, source)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(name, type) DO UPDATE SET
|
||||
-- First extraction wins: notes are never overwritten once set.
|
||||
-- Revisit during Memory Consolidation Lifecycle (Phase 2) — once entity
|
||||
-- quality scoring exists, a higher-confidence extraction should be able
|
||||
-- to replace stale notes rather than being silently dropped.
|
||||
notes = COALESCE(entities.notes, excluded.notes),
|
||||
metadata = excluded.metadata,
|
||||
mention_count = entities.mention_count + 1,
|
||||
last_seen_at = unixepoch(),
|
||||
updated_at = unixepoch()
|
||||
`);
|
||||
stmt.run(name, type, notes, metadata ? JSON.stringify(metadata) : null, source);
|
||||
return getEntityByNameType(name, type);
|
||||
}
|
||||
|
||||
@@ -40,15 +45,17 @@ function deleteEntity(id) {
|
||||
/********* Relationships *********/
|
||||
|
||||
// Upsert a relationship, insert or ignore if (from_id, to_id, label) already exists
|
||||
function upsertRelationship(fromId, toId, label, metadata = null){
|
||||
function upsertRelationship(fromId, toId, label, notes = null, metadata = null) {
|
||||
const db = getDB();
|
||||
const stmt = db.prepare(`
|
||||
INSERT INTO relationships (from_id, to_id, label, metadata)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(from_id, to_id, label) DO NOTHING
|
||||
INSERT INTO relationships (from_id, to_id, label, notes, metadata)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(from_id, to_id, label) DO UPDATE SET
|
||||
mention_count = relationships.mention_count + 1,
|
||||
-- First extraction wins for notes — same policy as entities.
|
||||
notes = COALESCE(relationships.notes, excluded.notes)
|
||||
`);
|
||||
|
||||
const result = stmt.run(fromId, toId, label, metadata ?JSON.stringify(metadata) : null);
|
||||
stmt.run(fromId, toId, label, notes, metadata ? JSON.stringify(metadata) : null);
|
||||
return getRelationship(fromId, toId, label);
|
||||
}
|
||||
|
||||
@@ -69,7 +76,7 @@ function getEntityByNameType(name, type) {
|
||||
}
|
||||
|
||||
// Retrive all relationships originating from a given entity
|
||||
function getRelationshipsByEntity(entityId) {
|
||||
function getOutboundRelationships(entityId) {
|
||||
const db = getDB();
|
||||
return db.prepare(`SELECT * FROM relationships WHERE from_id = ?`).all(entityId).map(parseRow);
|
||||
}
|
||||
@@ -81,14 +88,23 @@ function deleteRelationship(fromId, toId, label) {
|
||||
db.prepare(`DELETE FROM relationships WHERE from_id = ? AND to_id = ? AND label = ?`).run(fromId, toId, label);
|
||||
}
|
||||
|
||||
function linkEntityToEpisode(entityId, episodeId) {
|
||||
const db = getDB();
|
||||
db.prepare(`
|
||||
INSERT OR IGNORE INTO entity_episodes (entity_id, episode_id)
|
||||
VALUES (?, ?)
|
||||
`).run(entityId, episodeId);
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
upsertEntity,
|
||||
getEntity,
|
||||
getEntitiesByType,
|
||||
getEntityByNameType,
|
||||
deleteEntity,
|
||||
linkEntityToEpisode,
|
||||
upsertRelationship,
|
||||
getRelationship,
|
||||
getRelationshipsByEntity,
|
||||
getOutboundRelationships,
|
||||
deleteRelationship
|
||||
}
|
||||
@@ -127,7 +127,7 @@ async function createEpisode(sessionId, userMessage, aiResponse, tokenCount = nu
|
||||
}))
|
||||
.catch(err => logger.error(`Failed to embed episode ${episode.id}:`, err.message));
|
||||
|
||||
extractAndStoreEntities(userMessage, aiResponse, projectId)
|
||||
extractAndStoreEntities(userMessage, aiResponse, episode.id, projectId)
|
||||
.catch(err => logger.error(`Failed to extract entities for episode ${episode.id}:`, err.message));
|
||||
|
||||
|
||||
|
||||
67
packages/memory-service/src/graph/index.js
Normal file
67
packages/memory-service/src/graph/index.js
Normal file
@@ -0,0 +1,67 @@
|
||||
const { getDB } = require('../db');
|
||||
const { parseRow, ENTITIES } = require('@nexusai/shared');
|
||||
|
||||
// Single-entity neighborhood via recursive CTE — bidirectional, configurable depth
|
||||
function getNeighborhood(entityId, depth = ENTITIES.GRAPH_HOP_DEPTH) {
|
||||
const db = getDB();
|
||||
|
||||
const nodeRows = db.prepare(`
|
||||
WITH RECURSIVE traverse(entity_id, depth) AS (
|
||||
SELECT ?, 0
|
||||
UNION
|
||||
SELECT
|
||||
CASE WHEN r.from_id = t.entity_id THEN r.to_id ELSE r.from_id END,
|
||||
t.depth + 1
|
||||
FROM relationships r
|
||||
JOIN traverse t ON (r.from_id = t.entity_id OR r.to_id = t.entity_id)
|
||||
WHERE t.depth < ?
|
||||
)
|
||||
SELECT DISTINCT entity_id FROM traverse
|
||||
`).all(entityId, depth);
|
||||
|
||||
const nodeIds = nodeRows.map(r => r.entity_id);
|
||||
if (nodeIds.length === 0) return { nodes: [], edges: [] };
|
||||
|
||||
const ph = nodeIds.map(() => '?').join(',');
|
||||
const nodes = db.prepare(
|
||||
`SELECT * FROM entities WHERE id IN (${ph})`
|
||||
).all(...nodeIds).map(parseRow);
|
||||
|
||||
const edges = db.prepare(
|
||||
`SELECT * FROM relationships WHERE from_id IN (${ph}) AND to_id IN (${ph})`
|
||||
).all(...nodeIds, ...nodeIds).map(parseRow);
|
||||
|
||||
return { nodes, edges };
|
||||
}
|
||||
|
||||
// Bulk 1-hop neighborhood for orchestration — seeds are entity IDs from Qdrant search
|
||||
function getEntityNeighbors(entityIds) {
|
||||
if (!entityIds.length) return { nodes: [], edges: [] };
|
||||
const db = getDB();
|
||||
|
||||
const ph = entityIds.map(() => '?').join(',');
|
||||
|
||||
// entityIds appears three times — once for the CASE (finding the neighbor),
|
||||
// and once each for the FROM and TO sides of the WHERE clause
|
||||
const neighborRows = db.prepare(`
|
||||
SELECT DISTINCT
|
||||
CASE WHEN from_id IN (${ph}) THEN to_id ELSE from_id END AS entity_id
|
||||
FROM relationships
|
||||
WHERE from_id IN (${ph}) OR to_id IN (${ph})
|
||||
`).all(...entityIds, ...entityIds, ...entityIds);
|
||||
|
||||
const allIds = [...new Set([...entityIds, ...neighborRows.map(r => r.entity_id)])];
|
||||
const allPh = allIds.map(() => '?').join(',');
|
||||
|
||||
const nodes = db.prepare(
|
||||
`SELECT * FROM entities WHERE id IN (${allPh})`
|
||||
).all(...allIds).map(parseRow);
|
||||
|
||||
const edges = db.prepare(
|
||||
`SELECT * FROM relationships WHERE from_id IN (${allPh}) AND to_id IN (${allPh})`
|
||||
).all(...allIds, ...allIds).map(parseRow);
|
||||
|
||||
return { nodes, edges };
|
||||
}
|
||||
|
||||
module.exports = { getNeighborhood, getEntityNeighbors };
|
||||
@@ -5,6 +5,7 @@ const { getDB } = require('./db');
|
||||
const { createProject, getProjects, getProject, updateProject, deleteProject } = require('./db/projects');
|
||||
const { createSummary, getSummary, getSummariesBySession, getSummariesByProject, updateSummary, deleteSummary } = require('./db/summaries');
|
||||
const { generateAndStoreProjectSummary } = require('./summarization/project');
|
||||
const graph = require('./graph');
|
||||
|
||||
const episodic = require('./episodic');
|
||||
const semantic = require('./semantic');
|
||||
@@ -202,17 +203,17 @@ app.delete('/entities/:id', (req, res) => {
|
||||
|
||||
// Upsert a relationship between two entities
|
||||
app.post('/relationships', (req, res) => {
|
||||
const {fromId, toId, label, metadata } = req.body;
|
||||
const { fromId, toId, label, notes, metadata } = req.body;
|
||||
if (!fromId || !toId || !label) {
|
||||
return res.status(400).json({ error: 'fromId, toId and label are required' });
|
||||
}
|
||||
const relationship = entities.upsertRelationship(fromId, toId, label, metadata);
|
||||
const relationship = entities.upsertRelationship(fromId, toId, label, notes, metadata);
|
||||
res.status(201).json(relationship);
|
||||
});
|
||||
|
||||
// Get all relationships for a given entity ID
|
||||
app.get('/entities/:id/relationships', (req, res) => {
|
||||
res.json(entities.getRelationshipsByEntity(req.params.id));
|
||||
res.json(entities.getOutboundRelationships(req.params.id));
|
||||
});
|
||||
|
||||
// Delete a specific relationship
|
||||
@@ -225,6 +226,29 @@ app.delete('/relationships', (req, res) => {
|
||||
res.status(204).send();
|
||||
})
|
||||
|
||||
/********************************* */
|
||||
/********** Graph Routes ********** */
|
||||
/********************************* */
|
||||
|
||||
// Single-entity neighborhood — depth defaults to ENTITIES.GRAPH_HOP_DEPTH
|
||||
app.get('/graph/neighborhood/:entityId', (req, res) => {
|
||||
const entity = entities.getEntity(req.params.entityId);
|
||||
if (!entity) return res.status(404).json({ error: 'Entity not found' });
|
||||
|
||||
const depth = req.query.depth ? Math.min(Number(req.query.depth), 3) : undefined;
|
||||
const neighborhood = graph.getNeighborhood(Number(req.params.entityId), depth);
|
||||
res.json({ entity, neighborhood });
|
||||
});
|
||||
|
||||
// Bulk 1-hop neighborhood — body: { entityIds: [...] }
|
||||
app.post('/graph/neighbors', (req, res) => {
|
||||
const { entityIds } = req.body;
|
||||
if (!Array.isArray(entityIds) || entityIds.length === 0) {
|
||||
return res.status(400).json({ error: 'entityIds array is required' });
|
||||
}
|
||||
res.json(graph.getEntityNeighbors(entityIds.map(Number)));
|
||||
});
|
||||
|
||||
/*********************************** */
|
||||
/********** Project Routes ********** */
|
||||
/*********************************** */
|
||||
|
||||
@@ -24,10 +24,25 @@ Default port: **4000**. Depends on memory-service, embedding-service, inference-
|
||||
- No project: `must: [sessionId == this session]`
|
||||
- Project: `should: [sessionId == s1, sessionId == s2, ...]` across all project sessions
|
||||
- Dedup against recent episode IDs before including.
|
||||
5. Embed and search Qdrant ENTITIES; filter by `projectId` if applicable.
|
||||
6. Build prompt in this fixed order: **system prompt → entities → semantic episodes → recent episodes → user message → "Assistant:"**
|
||||
5. Embed and search Qdrant ENTITIES (filtered by `projectId` if in a project). Returns entity IDs alongside payload — the Qdrant point ID equals the SQLite entity ID.
|
||||
6. Expand matched entities into a 1-hop graph neighborhood via `POST /graph/neighbors` on the memory-service. Returns `{ nodes, edges }` — the full entity objects plus connecting relationships. Falls back to flat entity list (no edges) if the graph call fails.
|
||||
7. Build prompt in this fixed order: **system prompt → graph context → semantic episodes → recent episodes → user message → "Assistant:"**
|
||||
|
||||
The ordering prioritizes established facts (entities) and relevant past context (semantic) over pure recency.
|
||||
The ordering prioritizes established facts (graph context) and relevant past context (semantic) over pure recency.
|
||||
|
||||
## Graph Context Format
|
||||
|
||||
`formatGraphContext(nodes, edges)` in `src/chat/index.js` formats the neighborhood as:
|
||||
|
||||
```
|
||||
- Alice (person): software engineer working on NexusAI
|
||||
→ works_on NexusAI (project)
|
||||
→ knows Bob (person)
|
||||
- NexusAI (project): AI assistant framework
|
||||
- Bob (person): Alice's colleague
|
||||
```
|
||||
|
||||
Each node shows its notes on the first line. Outbound edges are indented below with `→ label target (type)`. Nodes with only inbound edges (neighbors pulled in by traversal) appear without connection lines.
|
||||
|
||||
## System Prompt Resolution
|
||||
|
||||
@@ -85,6 +100,12 @@ When the existing summary's token count exceeds `SUMMARIES.MAX_SUMMARY_TOKENS`,
|
||||
|
||||
`searchEntities` checks `projectId !== null && projectId !== undefined` before applying the filter — a session with no project skips the filter entirely and searches globally.
|
||||
|
||||
## Graph Service Client (`src/services/graph.js`)
|
||||
|
||||
Thin HTTP client for memory-service graph endpoints. One function:
|
||||
|
||||
- **`getNeighbors(entityIds[])`** — POSTs to `memory-service/graph/neighbors` with the entity IDs from Qdrant entity search. Returns `{ nodes, edges }`. Throws on non-2xx — caller wraps in try/catch with graceful fallback.
|
||||
|
||||
## Models Endpoint
|
||||
|
||||
`GET /models` scans `modelsFolderPath` for `.gguf` files and optionally reads a `models.json` manifest (keyed by filename) for labels and descriptions. File size is reported in GB. Returns 500 if the folder is inaccessible.
|
||||
@@ -96,9 +117,7 @@ When the existing summary's token count exceeds `SUMMARIES.MAX_SUMMARY_TOKENS`,
|
||||
`GET /health/services` runs parallel fetch calls to all four dependent services with a 3-second `AbortSignal.timeout` each. Results are returned as an array — the endpoint never returns a non-2xx itself regardless of downstream status.
|
||||
|
||||
## Background Model (qwen2.5:3b)
|
||||
Used for entity extraction and summarization via Ollama on Mini PC 1. Uses **ChatML
|
||||
format** (`<|im_start|>` / `<|im_end|>`) — not Phi3 format. Use `format: 'json'`
|
||||
only for structured extraction, never for free-text summarization.
|
||||
Used for entity/relationship extraction and summarization via Ollama on Mini PC 1. Uses **ChatML format** (`<|im_start|>` / `<|im_end|>`) — not Phi3 format. Use `format: 'json'` only for structured extraction, never for free-text summarization.
|
||||
|
||||
## API Endpoints Quick Reference
|
||||
|
||||
|
||||
@@ -5,22 +5,20 @@ const qdrant = require("../services/qdrant");
|
||||
const { ORCHESTRATION, logger } = require("@nexusai/shared");
|
||||
const appSettings = require("../config/settings");
|
||||
const {triggerSummary} = require('../services/summarization')
|
||||
const graph = require('../services/graph');
|
||||
|
||||
function buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage, systemPrompt) {
|
||||
function buildPrompt(recentEpisodes, semanticEpisodes, neighborhood, userMessage, systemPrompt) {
|
||||
const parts = [systemPrompt ?? ORCHESTRATION.SYSTEM_PROMPT];
|
||||
|
||||
if (entities.length > 0) {
|
||||
parts.push(
|
||||
"Here is what you know about entities relevant to this conversation:",
|
||||
);
|
||||
for (const e of entities) {
|
||||
parts.push(`- ${e.name} (${e.type}): ${e.notes}`);
|
||||
const graphText = formatGraphContext(neighborhood.nodes ?? [], neighborhood.edges ?? []);
|
||||
if (graphText) {
|
||||
parts.push("Here is what you know about entities relevant to this conversation and their connections:");
|
||||
parts.push(graphText);
|
||||
parts.push("---");
|
||||
}
|
||||
parts.push("---");
|
||||
}
|
||||
|
||||
if (semanticEpisodes.length > 0) {
|
||||
parts.push("Here are some relevant memories from earlier conversations:");
|
||||
parts.push("Long-term memory (semantically relevant to this message):");
|
||||
for (const ep of semanticEpisodes) {
|
||||
parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`);
|
||||
}
|
||||
@@ -28,7 +26,7 @@ function buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage, sy
|
||||
}
|
||||
|
||||
if (recentEpisodes.length > 0) {
|
||||
parts.push(`Here are some relevant memories from your past conversations:`);
|
||||
parts.push("Recent conversation history (most recent exchanges):");
|
||||
for (const ep of recentEpisodes) {
|
||||
parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`);
|
||||
}
|
||||
@@ -54,6 +52,28 @@ function buildNamingPrompt(userMessage, aiResponse) {
|
||||
].join("\n");
|
||||
}
|
||||
|
||||
function formatGraphContext(nodes, edges) {
|
||||
if (!nodes.length) return null;
|
||||
|
||||
const nodeMap = new Map(nodes.map(n => [n.id, n]));
|
||||
|
||||
// Build outbound adjacency
|
||||
const outbound = new Map(nodes.map(n => [n.id, []]));
|
||||
for (const edge of edges) {
|
||||
if (outbound.has(edge.from_id) && nodeMap.has(edge.to_id)) {
|
||||
const target = nodeMap.get(edge.to_id);
|
||||
outbound.get(edge.from_id).push(`${edge.label} ${target.name} (${target.type})`);
|
||||
}
|
||||
}
|
||||
|
||||
return nodes.map(n => {
|
||||
const lines = [`- ${n.name} (${n.type}): ${n.notes ?? '(no notes)'}`];
|
||||
for (const conn of outbound.get(n.id) ?? []) lines.push(` → ${conn}`);
|
||||
return lines.join('\n');
|
||||
}).join('\n');
|
||||
}
|
||||
|
||||
|
||||
async function autoNameSession(externalId, userMessage, aiResponse) {
|
||||
try {
|
||||
const prompt = buildNamingPrompt(userMessage, aiResponse);
|
||||
@@ -107,22 +127,20 @@ async function getSemanticEpisodes(
|
||||
}
|
||||
}
|
||||
|
||||
async function getRelevantEntities(userMessage, projectId=null) {
|
||||
try {
|
||||
const vector = await embedding.embed(userMessage);
|
||||
const results = await qdrant.searchEntities(vector, { projectId });
|
||||
logger.info(
|
||||
"[orchestration] Entity search results:",
|
||||
results.map((r) => ({ name: r.payload?.name, score: r.score })),
|
||||
);
|
||||
return results.map((r) => r.payload).filter(Boolean);
|
||||
} catch (err) {
|
||||
logger.debug(
|
||||
"[orchestration] Entity search failed, continuing without:",
|
||||
err.message,
|
||||
);
|
||||
return [];
|
||||
}
|
||||
async function getRelevantEntities(userMessage, projectId = null) {
|
||||
try {
|
||||
const vector = await embedding.embed(userMessage);
|
||||
const results = await qdrant.searchEntities(vector, { projectId });
|
||||
logger.info(
|
||||
'[orchestration] Entity search results:',
|
||||
results.map((r) => ({ name: r.payload?.name, score: r.score })),
|
||||
);
|
||||
// Include the Qdrant point ID (== SQLite entity ID) for graph traversal
|
||||
return results.map((r) => r.payload ? { id: r.id, ...r.payload } : null).filter(Boolean);
|
||||
} catch (err) {
|
||||
logger.debug('[orchestration] Entity search failed, continuing without:', err.message);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async function assembleContext(externalId, userMessage) {
|
||||
@@ -159,10 +177,22 @@ async function assembleContext(externalId, userMessage) {
|
||||
const semanticEpisodes = await getSemanticEpisodes(
|
||||
userMessage, session.id, recentIds, projectSessionIds, { semanticLimit, scoreThreshold }
|
||||
);
|
||||
const entities = await getRelevantEntities(userMessage, session.project_id ?? null);
|
||||
const entityResults = await getRelevantEntities(userMessage, session.project_id ?? null);
|
||||
|
||||
// 5. Assemble prompt
|
||||
const prompt = buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage, activeSystemPrompt);
|
||||
// 5. Expand matched entities into 1-hop graph neighborhood
|
||||
let neighborhood = { nodes: [], edges: [] };
|
||||
if (entityResults.length > 0) {
|
||||
try {
|
||||
neighborhood = await graph.getNeighbors(entityResults.map(e => e.id));
|
||||
} catch (err) {
|
||||
logger.warn('[orchestration] Graph neighborhood fetch failed, falling back to flat entities:', err.message);
|
||||
// Graceful fallback: use Qdrant payload data as flat nodes, no edges
|
||||
neighborhood = { nodes: entityResults, edges: [] };
|
||||
}
|
||||
}
|
||||
|
||||
// 6. Assemble prompt
|
||||
const prompt = buildPrompt(recentEpisodes, semanticEpisodes, neighborhood, userMessage, activeSystemPrompt);
|
||||
|
||||
return {
|
||||
session,
|
||||
|
||||
15
packages/orchestration-service/src/services/graph.js
Normal file
15
packages/orchestration-service/src/services/graph.js
Normal file
@@ -0,0 +1,15 @@
|
||||
const { getEnv, SERVICES } = require('@nexusai/shared');
|
||||
|
||||
const MEMORY_URL = getEnv('MEMORY_SERVICE_URL', SERVICES.MEMORY_URL);
|
||||
|
||||
async function getNeighbors(entityIds) {
|
||||
const res = await fetch(`${MEMORY_URL}/graph/neighbors`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ entityIds }),
|
||||
});
|
||||
if (!res.ok) throw new Error(`Graph neighbors error: ${res.status}`);
|
||||
return res.json();
|
||||
}
|
||||
|
||||
module.exports = { getNeighbors };
|
||||
@@ -79,8 +79,10 @@ const SUMMARIES = {
|
||||
|
||||
const ENTITIES = {
|
||||
TEMPERATURE: 0.1, // Low temperature, more precise extraction, less creative
|
||||
NUM_PREDICT: 1024, // Max tokens to consider for entity extraction (e.g. recent conversation)
|
||||
NUM_PREDICT: 1500, // Max tokens to consider for entity extraction (e.g. recent conversation)
|
||||
THRESHOLD: 0.55, // Minimum confidence score for an extracted entity to be included in the results
|
||||
PROMOTION_THRESHOLD: 3, // mention_count threshold before entity is considered well-established
|
||||
GRAPH_HOP_DEPTH: 1, // Default traversal depth for neighborhood queries
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
||||
Reference in New Issue
Block a user