diff --git a/docs/architecture/overview.md b/docs/architecture/overview.md index f7d3d89..9f69906 100644 --- a/docs/architecture/overview.md +++ b/docs/architecture/overview.md @@ -74,7 +74,7 @@ service by ID after the vector search. The core four-service architecture is complete and operational. Key capabilities: - **Hybrid memory retrieval** — recent episodes + semantic search combined into every prompt -- **Entity layer** — automatic extraction of named entities from conversations via qwen2.5:3b, stored in SQLite and Qdrant, injected into every prompt as structured knowledge +- **Entity layer + Knowledge graph** — automatic extraction of named entities and relationships from conversations via qwen2.5:3b. Entities and relationships are stored in SQLite with `mention_count` tracking. A graph traversal layer expands Qdrant entity search hits into a 1-hop neighborhood subgraph, injecting structured connected knowledge into every prompt - **Projects** — sessions grouped with shared or isolated memory pools - **Auto-naming** — sessions named automatically from first exchange via inference - **Project-scoped semantic search** — Qdrant filtered by project session IDs diff --git a/docs/reference/API-routes.md b/docs/reference/API-routes.md index a406a36..c580238 100644 --- a/docs/reference/API-routes.md +++ b/docs/reference/API-routes.md @@ -360,13 +360,34 @@ Same request/response shape as orchestration `/projects` above. **DELETE /relationships — body:** ```json -{ "fromId": 1, "toId": 2, "label": "uses" } +{ "fromId": 1, "toId": 2, "label": "works_on", "notes": "Alice is the primary developer.", "metadata": {} } ``` +notes is optional. label should be a snake_case verb. Relationship is identified by the composite key (fromId, toId, label) — re-submitting with the same key increments mention_count and preserves existing notes if the new value is null. Relationships are identified by the composite key `(fromId, toId, label)`. Delete uses request body rather than URL params since this three-part key is awkward to encode in a path. +### Graph + +| Method | Path | Description | +|---|---|---| +| GET | /graph/neighborhood/:entityId | Entity neighborhood — nodes + edges within N hops | +| POST | /graph/neighbors | Bulk 1-hop neighborhood for a set of entity IDs | + +**GET /graph/neighborhood/:entityId — query params:** + +| Param | Default | Max | Description | +|---|---|---|---| +| depth | 1 | 3 | Traversal depth | + +Returns `{ entity, neighborhood: { nodes, edges } }`. Returns `404` if entity not found. + +**POST /graph/neighbors — body:** +```json +{ "entityIds": [5, 8, 12] } +Returns { nodes: [...], edges: [...] }. Used internally by orchestration — not a client-facing endpoint. + --- ## Embedding Service — port 3003 diff --git a/docs/roadmap.md b/docs/roadmap.md index 8adf7bd..4b3269e 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -59,10 +59,10 @@ ### 1. Knowledge Graph (SQLite) The highest-leverage memory upgrade. Transforms NexusAI from "remembers conversations" to "understands relationships between things." -- [ ] Graph schema — `nodes` and `edges` tables with typed relationships -- [ ] Entity → node promotion pipeline -- [ ] Relationship traversal queries -- [ ] Graph-aware context assembly in orchestration +- [x] Graph schema — `nodes` and `edges` tables with typed relationships +- [x] Entity → node promotion pipeline (`mention_count` tracked; threshold gating deferred to Phase 2) +- [x] Relationship traversal queries +- [x] Graph-aware context assembly in orchestration ### 2. Retrieval Fusion + Full-Text Search Multi-strategy retrieval merged into a single ranked result set. diff --git a/docs/services/entity-extraction.md b/docs/services/entity-extraction.md index 2f6ea12..6e43a0e 100644 --- a/docs/services/entity-extraction.md +++ b/docs/services/entity-extraction.md @@ -1,178 +1,140 @@ -# Memory Service +# Entity Extraction -**Package:** `@nexusai/memory-service` -**Location:** `packages/memory-service` -**Deployed on:** Mini PC 1 (192.168.0.81) -**Port:** 3002 +**Location:** `packages/memory-service/src/entities/extraction.js` +**Triggered by:** Episode creation (`POST /episodes`) +**Model:** `qwen2.5:3b` via Ollama (configurable via `EXTRACTION_MODEL` env var) ## Purpose -Responsible for all reading and writing of long-term memory. Acts as the -sole interface to both SQLite and Qdrant — no other service accesses these -stores directly. On episode creation, automatically calls the embedding -service to generate and store a vector in Qdrant. +After each episode is saved to SQLite, the extraction pipeline runs +asynchronously in the background to identify named entities and the +relationships between them. Results are written back to SQLite and +embedded into Qdrant — the episode response is never delayed. -## Dependencies +## Trigger -- `express` — HTTP API -- `better-sqlite3` — SQLite driver -- `@qdrant/js-client-rest` — Qdrant vector store client -- `dotenv` — environment variable loading -- `@nexusai/shared` — shared utilities and constants - -## Environment Variables - -| Variable | Required | Default | Description | -|---|---|---|---| -| PORT | No | 3002 | Port to listen on | -| SQLITE_PATH | Yes | — | Path to SQLite database file | -| QDRANT_URL | No | http://localhost:6333 | Qdrant instance URL | -| EMBEDDING_SERVICE_URL | No | http://localhost:3003 | Embedding service URL | -| EXTRACTION_URL | No | http://localhost:11434 | Ollama URL for entity extraction | -| EXTRACTION_MODEL | No | qwen2.5:3b | Ollama model used for entity extraction | - -## Internal Structure - -``` -src/ -├── db/ -│ ├── index.js # SQLite connection + initialization + migrations -│ ├── schema.js # Table definitions, indexes, FTS5, triggers -│ ├── projects.js # Project CRUD functions -│ └── summaries.js # Summary CRUD functions -├── episodic/ -│ └── index.js # Session + episode CRUD, FTS search, embedding write path -├── semantic/ -│ └── index.js # Qdrant collection management, upsert, search, delete -├── entities/ -│ ├── index.js # Entity + relationship CRUD -│ └── extraction.js # Automatic entity extraction via qwen2.5:3b on Ollama -└── index.js # Express app + all route definitions -``` - -## SQLite Schema - -Seven core tables: - -- **sessions** — top-level conversation containers. Fields: `external_id`, `name`, `project_id`, `metadata` -- **episodes** — individual exchanges (user message + AI response) tied to a session -- **entities** — named things the system learns about (people, places, concepts) -- **relationships** — directional labeled links between entities -- **summaries** — condensed episode groups for efficient context retrieval -- **projects** — named groupings of sessions with `name`, `description`, `colour`, `icon`, `isolated`, `notes`, `system_prompt` - -### Migrations - -Schema changes that cannot use `CREATE TABLE IF NOT EXISTS` are applied as -idempotent migrations in `db/index.js` at startup: +`createEpisode()` in `episodic/index.js` calls `extractAndStoreEntities()` +immediately after the SQLite insert, without awaiting it: ```js -try { db.exec(`ALTER TABLE sessions ADD COLUMN name TEXT`); } catch {} -try { db.exec(`ALTER TABLE sessions ADD COLUMN project_id INTEGER REFERENCES projects(id)`); } catch {} -try { db.exec(`CREATE INDEX IF NOT EXISTS idx_sessions_project ON sessions(project_id)`); } catch {} -try { db.exec(`ALTER TABLE projects ADD COLUMN isolated INTEGER NOT NULL DEFAULT 0`); } catch {} -try { db.exec(`ALTER TABLE projects ADD COLUMN notes TEXT`); } catch {} -try { db.exec(`ALTER TABLE projects ADD COLUMN system_prompt TEXT`); } catch {} +extractAndStoreEntities(userMessage, aiResponse, episode.id, projectId) + .catch(err => logger.error(`Failed to extract entities for episode ${episode.id}:`, err.message)); ``` -New migrations are always appended here — never modify the schema file for -existing tables since `ALTER TABLE` cannot use `IF NOT EXISTS`. +If extraction throws, the episode is unaffected — the error is logged and +swallowed. -### FTS5 Full-Text Search +## Model Settings -An `episodes_fts` virtual table enables keyword search across all episodes. -Three triggers (`episodes_fts_insert`, `episodes_fts_update`, `episodes_fts_delete`) -keep the FTS index automatically in sync with the episodes table. +| Setting | Value | Notes | +|---|---|---| +| Model | `qwen2.5:3b` | Ollama, configurable via `EXTRACTION_MODEL` | +| Temperature | 0.1 | Low for consistent, deterministic output | +| `num_predict` | 1500 | Higher ceiling to accommodate entity + relationship JSON | +| `format` | `'json'` | Ollama constrained decoding — enforces valid JSON output | +| Prompt format | ChatML | `<\|im_start\|>` / `<\|im_end\|>` tokens | -### SQLite Configuration +## Prompt Structure -- `journal_mode = WAL` — non-blocking reads during writes -- `foreign_keys = ON` — enforces referential integrity and cascade deletes -- PRAGMAs set via `db.pragma()`, not `db.exec()` +The prompt is built by `buildExtractionPrompt()`. It includes: -### Dynamic Updates +1. **System message** — declares the model's role as an entity and relationship extractor +2. **Instructions** — entity types, field rules, relationship label format, required JSON schema +3. **Known entities block** — last 20 entities from SQLite, by `rowid DESC`, used to encourage consistent name/type pairs across conversations +4. **Conversation** — the raw user message and AI response, delimited clearly -Both `updateSession` and `updateProject` build their `SET` clause dynamically -from only the fields passed — prevents partial updates from overwriting fields -that weren't touched. +``` +<|im_start|>system +You are a named entity and relationship extractor. You output only valid JSON. +<|im_end|> +<|im_start|>user +Read the conversation below and extract all named entities and the relationships between them. +Entity types: person, place, project, technology, concept, organization +... +Return this exact JSON structure: +{ "entities": [...], "relationships": [...] } -`updateProject` allowlist: -```js -const allowed = ['name', 'description', 'colour', 'icon', 'isolated', 'notes', 'system_prompt']; +Already known entities (use these exact name and type values if the same entity appears): +- "NexusAI" (project) +- "Alice" (person) + +--- CONVERSATION --- +User: ... +Assistant: ... +--- END CONVERSATION --- +<|im_end|> +<|im_start|>assistant ``` -## Qdrant / Semantic Layer +## Expected JSON Output -Three Qdrant collections are initialized on service startup via `semantic.initCollections()`: - -| Collection | Purpose | -|---|---| -| `episodes` | Embeddings for individual conversation exchanges | -| `entities` | Embeddings for named entities | -| `summaries` | Embeddings for condensed episode summaries | - -All collections use **768-dimension vectors** with **Cosine similarity**, -matching `nomic-embed-text` via Ollama. Vector size and distance metric are -defined in `@nexusai/shared` — not hardcoded here. - -`initCollections()` iterates `Object.values(COLLECTIONS)` and creates any -collection that doesn't already exist at startup — all three collections are -guaranteed to exist before any requests are handled, avoiding race conditions -between the first entity embed and an entity search. - -Each collection exposes upsert, search (with optional Qdrant filter), and -delete operations. The `wait: true` flag is used on all writes. - -## Embedding Write Path - -When a new episode is created: - -1. Episode saved to SQLite synchronously — response returned immediately -2. User message + AI response combined: `User: ...\nAssistant: ...` -3. Text sent to embedding service (`POST /embed`) -4. Vector upserted into `episodes` Qdrant collection with payload `{ sessionId, createdAt }` - -This step is **fire-and-forget** — if embedding fails, the episode is still -saved and searchable via FTS. The error is logged but not surfaced. - -> The Qdrant payload stores `sessionId` (the internal integer ID). See -> `memory-isolation.md` for how project-level filtering works. - -## Entity Layer - -Entities and relationships use upsert semantics with composite unique -constraints to prevent duplicates: - -- `UNIQUE(name, type)` on entities -- `UNIQUE(from_id, to_id, label)` on relationships -- `ON DELETE CASCADE` on relationship foreign keys - -After each episode is saved, `extraction.js` automatically extracts named -entities from the conversation using `qwen2.5:3b` on Ollama — fire-and-forget. - -> For full details on the extraction pipeline, prompt format, constrained -> decoding, stoplist, and Qdrant storage, see `entity-extraction.md`. - -## Summaries Layer - -Session summaries are generated by `orchestration-service/src/services/summarization.js` -after each episode write and stored here via `POST /summaries`. The memory -service is responsible only for CRUD — generation logic lives in orchestration. - -> For full details on trigger conditions, prompt format, cumulative updates, -> and ChatML token stripping, see `summarization.md`. - -## Project Delete Behaviour - -Deleting a project runs as a transaction — it first nulls out `project_id` -on all assigned sessions, then deletes the project. This avoids a foreign -key constraint failure since `sessions.project_id` has no `ON DELETE` rule: - -```js -const doDelete = db.transaction(() => { - db.prepare(`UPDATE sessions SET project_id = NULL WHERE project_id = ?`).run(id); - db.prepare(`DELETE FROM projects WHERE id = ?`).run(id); -}); +```json +{ + "entities": [ + { "name": "Alice", "type": "person", "notes": "Software engineer working on NexusAI." }, + { "name": "NexusAI", "type": "project", "notes": "A modular AI assistant with persistent memory." } + ], + "relationships": [ + { + "from": "Alice", "fromType": "person", + "to": "NexusAI", "toType": "project", + "label": "works_on", + "notes": "Alice is the primary developer." + } + ] +} ``` -For all HTTP endpoints, see `api-routes.md`. \ No newline at end of file +Relationship labels use **snake_case verbs** (e.g. `works_on`, `manages`, `uses`, +`knows`, `located_in`, `part_of`, `created_by`). + +## JSON Parsing + +The raw model response is matched with `/\{[\s\S]*\}/` before parsing — this +tolerates any preamble or trailing prose the model emits alongside the JSON. +If the match fails or `JSON.parse` throws, the function logs a warning and +returns without writing anything. + +## Entity Processing + +For each entity in `parsed.entities`: + +1. Validate `name`, `type` (must be in `ENTITY_TYPES`), and not in `IGNORED_NAMES` +2. Call `upsertEntity(name, type, notes)`: + - **Insert**: creates new row with `mention_count = 1`, `source = 'extraction'` + - **Conflict** on `(name, type)`: increments `mention_count`, updates `last_seen_at`, preserves existing `notes` if new extraction returns null +3. Add to `entityMap` keyed by `"${name}::${type}"` — used for relationship resolution below +4. Call `linkEntityToEpisode(entity.id, episodeId)` — writes to `entity_episodes` join table +5. Fire-and-forget: embed as `"${name} (${type}): ${notes}"` → store to Qdrant `entities` collection with `{ name, type, notes, projectId }` in payload + +**Valid entity types:** `person`, `place`, `project`, `technology`, `concept`, `organization` + +**Stoplist (ignored names):** `good morning`, `good night`, `hello`, `goodbye`, `thanks`, `thank you` + +## Relationship Processing + +After all entities are saved, relationships are processed: + +1. For each entry in `parsed.relationships`, look up both endpoints in `entityMap` using `"${from}::${fromType}"` and `"${to}::${toType}"` as keys +2. If either endpoint is missing (filtered out, invalid type, or not in this extraction), the relationship is silently skipped +3. Call `upsertRelationship(fromId, toId, label, notes)`: + - **Insert**: creates new row with `mention_count = 1` + - **Conflict** on `(from_id, to_id, label)`: increments `mention_count`, preserves existing `notes` if new is null + +Relationships are unidirectional in storage. Bidirectionality is handled at +query time by the graph traversal layer. + +## Project Scoping + +`projectId` is threaded through from the episode creation call. It is stored +in the Qdrant entity payload, which enables project-scoped entity search in +orchestration. SQLite entities and relationships are global — scoping only +applies at the Qdrant retrieval layer. + +## Error Behaviour + +All steps after the initial model call are wrapped in a single outer try/catch. +If Ollama is unreachable, returns a non-200 status, or the JSON cannot be +parsed, the function logs at `warn` level and returns. There is no retry logic. +Individual entity embedding failures are caught per-entity and logged at `warn` +level without affecting other entities in the same batch. diff --git a/docs/services/knowledge-graph.md b/docs/services/knowledge-graph.md new file mode 100644 index 0000000..be3e760 --- /dev/null +++ b/docs/services/knowledge-graph.md @@ -0,0 +1,213 @@ +# Knowledge Graph + +**Location:** `packages/memory-service/src/graph/index.js` +**Schema additions:** `entity_episodes` table; new columns on `entities` and `relationships` +**Exposed via:** `GET /graph/neighborhood/:entityId`, `POST /graph/neighbors` +**Consumed by:** Orchestration service context assembly + +## Purpose + +The knowledge graph transforms NexusAI from "remembers conversations" to +"understands relationships between things." Rather than injecting a flat +list of entity facts into every prompt, orchestration now retrieves a +1-hop subgraph of connected entities and their relationships, giving the +model structured, linked knowledge about people, projects, technologies, +and concepts that have appeared across conversations. + +## Schema + +### `entity_episodes` (join table) + +Tracks which episodes contributed to each entity's knowledge. Defined in +`schema.js` — exists on all installs. + +```sql +CREATE TABLE IF NOT EXISTS entity_episodes ( + entity_id INTEGER NOT NULL REFERENCES entities(id) ON DELETE CASCADE, + episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE, + PRIMARY KEY (entity_id, episode_id) +); +``` + +Both FKs cascade on delete — removing an entity or episode automatically +cleans up its join rows. + +### New columns on `entities` + +Added via migration in `db/index.js`: + +| Column | Type | Default | Description | +|---|---|---|---| +| `mention_count` | INTEGER | 1 | How many times this entity has been extracted across conversations | +| `confidence` | REAL | 1.0 | Reserved for future confidence scoring | +| `source` | TEXT | `'extraction'` | `'extraction'` (auto) or `'manual'` | +| `last_seen_at` | INTEGER | NULL | Unix timestamp of most recent extraction hit | + +### New columns on `relationships` + +| Column | Type | Default | Description | +|---|---|---|---| +| `mention_count` | INTEGER | 1 | How many times this edge has been extracted | +| `notes` | TEXT | NULL | Relationship context sentence from extraction | + +## Entity Promotion Model + +Entities are not created equal — some are mentioned once in passing, others +recur across many conversations. `mention_count` is the signal: + +- Every time `upsertEntity` is called for an existing `(name, type)` pair, `mention_count` is incremented and `last_seen_at` is updated. +- `ENTITIES.PROMOTION_THRESHOLD` (default: **3**) is the `mention_count` at which an entity is considered "well-established" — referenced in the codebase for future filtering and scoring logic. +- Currently `mention_count` is stored and incremented but not yet used to gate retrieval. It provides the foundation for future features such as orphan cleanup (entities never re-extracted) and confidence-weighted graph traversal. + +The same pattern applies to relationships — `mention_count` rises each time +the same `(from_id, to_id, label)` triple is extracted. + +## Graph Traversal + +`src/graph/index.js` exports two functions built on SQLite's `WITH RECURSIVE` +CTE support. No external graph database is needed. + +### `getNeighborhood(entityId, depth)` + +Traverses the graph from a single entity, following edges in **both directions**, +up to `depth` hops. Returns `{ nodes: [...entities], edges: [...relationships] }`. + +Default depth: `ENTITIES.GRAPH_HOP_DEPTH` (1). Maximum enforced at HTTP layer: 3. + +**SQLite query:** + +```sql +WITH RECURSIVE traverse(entity_id, depth) AS ( + SELECT ?, 0 + UNION + SELECT + CASE WHEN r.from_id = t.entity_id THEN r.to_id ELSE r.from_id END, + t.depth + 1 + FROM relationships r + JOIN traverse t ON (r.from_id = t.entity_id OR r.to_id = t.entity_id) + WHERE t.depth < ? +) +SELECT DISTINCT entity_id FROM traverse +``` + +`UNION` (not `UNION ALL`) eliminates duplicate visits and naturally handles +cycles — a node already in the traversal set is not re-visited. + +After collecting node IDs, two follow-up queries fetch: +- All entity rows for those IDs +- All relationship rows where both `from_id` and `to_id` are in the node set + +This ensures edges between neighbors are included even if they aren't on the +traversal path from the seed. + +### `getEntityNeighbors(entityIds[])` + +Bulk 1-hop version designed for orchestration. Given multiple seed entity IDs +(the results of Qdrant semantic search), returns the combined 1-hop subgraph. + +1. Finds all neighbor IDs via one query using `IN (...)` on both `from_id` and `to_id` +2. Deduplicates seeds + neighbors using a JavaScript `Set` +3. Fetches all entity rows and all relationship rows within the combined node set + +This is intentionally simpler than the recursive version — orchestration always +uses depth=1, and the bulk query avoids N separate CTE calls. + +## Graph-Aware Context Assembly + +Orchestration's `assembleContext` (in `src/chat/index.js`) integrates the +graph at step 7 of the chat pipeline: + +1. Qdrant entity search returns up to `ORCHESTRATION.ENTITIES_LIMIT` results, each including `r.id` (the SQLite entity ID) alongside the Qdrant payload +2. `graph.getNeighbors(entityIds)` is called with those IDs → `POST /graph/neighbors` on memory-service +3. The returned `{ nodes, edges }` is passed to `formatGraphContext()` +4. On failure, falls back to using the Qdrant payload data directly as flat nodes with no edges + +### Prompt Format + +`formatGraphContext(nodes, edges)` in `chat/index.js` formats the subgraph as: + +``` +Here is what you know about entities relevant to this conversation and their connections: +- Alice (person): software engineer working on NexusAI + → works_on NexusAI (project) + → knows Bob (person) +- NexusAI (project): AI assistant framework +- Bob (person): Alice's colleague +``` + +- One line per node: `- {name} ({type}): {notes}` +- Outbound edges indented below: ` → {label} {target_name} ({target_type})` +- Nodes with only inbound edges (pulled in as neighbors) appear without connection lines +- Only outbound edges are shown — each relationship appears once, from the `from_id` side + +## Project Scoping + +The knowledge graph respects project boundaries at the **entry point**, not +during traversal: + +- Qdrant entity search is filtered by `projectId` — only entities tagged with this project are returned as seeds +- Graph traversal in SQLite is unfiltered — neighbors can be from any project or no project +- This is intentional: the graph entry is project-scoped, but traversal follows the global relationship graph to discover connected knowledge + +Entities are tagged with `projectId` in the Qdrant payload at extraction time. +Entities extracted from non-project sessions have `projectId: null` and only +appear in unfiltered global searches. + +## API Reference + +### `GET /graph/neighborhood/:entityId` + +Returns the neighborhood of a single entity. + +**Query params:** + +| Param | Default | Max | Description | +|---|---|---|---| +| `depth` | `ENTITIES.GRAPH_HOP_DEPTH` (1) | 3 | Traversal depth | + +**Response:** +```json +{ + "entity": { "id": 5, "name": "Alice", "type": "person", "notes": "...", "mention_count": 4 }, + "neighborhood": { + "nodes": [ + { "id": 5, "name": "Alice", "type": "person", "notes": "..." }, + { "id": 8, "name": "NexusAI", "type": "project", "notes": "..." } + ], + "edges": [ + { "id": 2, "from_id": 5, "to_id": 8, "label": "works_on", "notes": "...", "mention_count": 3 } + ] + } +} +``` + +Returns 404 if the entity does not exist. + +### `POST /graph/neighbors` + +Bulk 1-hop neighborhood for a set of entity IDs. Used internally by +orchestration — not intended for direct client use. + +**Request body:** +```json +{ "entityIds": [5, 8, 12] } +``` + +**Response:** +```json +{ + "nodes": [ ...entity objects... ], + "edges": [ ...relationship objects... ] +} +``` + +Returns 400 if `entityIds` is missing or empty. + +## Constants (`packages/shared/src/config/constants.js`) + +| Constant | Value | Description | +|---|---|---| +| `ENTITIES.PROMOTION_THRESHOLD` | 3 | `mention_count` at which an entity is considered well-established | +| `ENTITIES.GRAPH_HOP_DEPTH` | 1 | Default traversal depth for neighborhood queries | +| `ORCHESTRATION.ENTITIES_LIMIT` | 5 | Max entity seeds returned from Qdrant search | +| `ORCHESTRATION.ENTITIES_THRESHOLD` | 0.55 | Minimum similarity score for entity Qdrant search | diff --git a/docs/services/memory-service.md b/docs/services/memory-service.md index 2f6ea12..a5c7974 100644 --- a/docs/services/memory-service.md +++ b/docs/services/memory-service.md @@ -9,8 +9,8 @@ Responsible for all reading and writing of long-term memory. Acts as the sole interface to both SQLite and Qdrant — no other service accesses these -stores directly. On episode creation, automatically calls the embedding -service to generate and store a vector in Qdrant. +stores directly. On episode creation, automatically triggers entity and +relationship extraction and embeds results into Qdrant. ## Dependencies @@ -45,19 +45,22 @@ src/ ├── semantic/ │ └── index.js # Qdrant collection management, upsert, search, delete ├── entities/ -│ ├── index.js # Entity + relationship CRUD -│ └── extraction.js # Automatic entity extraction via qwen2.5:3b on Ollama +│ ├── index.js # Entity + relationship CRUD (upsert, mention tracking) +│ └── extraction.js # Automatic entity + relationship extraction via qwen2.5:3b +├── graph/ +│ └── index.js # Knowledge graph traversal (neighborhood queries, recursive CTE) └── index.js # Express app + all route definitions ``` ## SQLite Schema -Seven core tables: +Eight core tables: - **sessions** — top-level conversation containers. Fields: `external_id`, `name`, `project_id`, `metadata` - **episodes** — individual exchanges (user message + AI response) tied to a session -- **entities** — named things the system learns about (people, places, concepts) -- **relationships** — directional labeled links between entities +- **entities** — named things the system learns about (people, places, concepts, etc.). Fields include `mention_count`, `confidence`, `source`, `last_seen_at` +- **relationships** — directional labeled links between entities (`from_id`, `to_id`, `label`). Fields include `mention_count`, `notes` +- **entity_episodes** — join table linking entities to the episodes where they were extracted. Used for provenance and orphan cleanup - **summaries** — condensed episode groups for efficient context retrieval - **projects** — named groupings of sessions with `name`, `description`, `colour`, `icon`, `isolated`, `notes`, `system_prompt` @@ -73,10 +76,18 @@ try { db.exec(`CREATE INDEX IF NOT EXISTS idx_sessions_project ON sessions(proje try { db.exec(`ALTER TABLE projects ADD COLUMN isolated INTEGER NOT NULL DEFAULT 0`); } catch {} try { db.exec(`ALTER TABLE projects ADD COLUMN notes TEXT`); } catch {} try { db.exec(`ALTER TABLE projects ADD COLUMN system_prompt TEXT`); } catch {} +// Knowledge graph columns: +try { db.exec(`ALTER TABLE entities ADD COLUMN mention_count INTEGER NOT NULL DEFAULT 1`) } catch {} +try { db.exec(`ALTER TABLE entities ADD COLUMN confidence REAL NOT NULL DEFAULT 1.0`) } catch {} +try { db.exec(`ALTER TABLE entities ADD COLUMN source TEXT NOT NULL DEFAULT 'extraction'`) } catch {} +try { db.exec(`ALTER TABLE entities ADD COLUMN last_seen_at INTEGER`) } catch {} +try { db.exec(`ALTER TABLE relationships ADD COLUMN mention_count INTEGER NOT NULL DEFAULT 1`) } catch {} +try { db.exec(`ALTER TABLE relationships ADD COLUMN notes TEXT`) } catch {} ``` -New migrations are always appended here — never modify the schema file for -existing tables since `ALTER TABLE` cannot use `IF NOT EXISTS`. +`entity_episodes` is defined in `schema.js` itself (not a migration) since it is a new table. + +New migrations are always appended — never modify the schema file for existing tables since `ALTER TABLE` cannot use `IF NOT EXISTS`. ### FTS5 Full-Text Search @@ -117,8 +128,7 @@ defined in `@nexusai/shared` — not hardcoded here. `initCollections()` iterates `Object.values(COLLECTIONS)` and creates any collection that doesn't already exist at startup — all three collections are -guaranteed to exist before any requests are handled, avoiding race conditions -between the first entity embed and an entity search. +guaranteed to exist before any requests are handled. Each collection exposes upsert, search (with optional Qdrant filter), and delete operations. The `wait: true` flag is used on all writes. @@ -143,15 +153,27 @@ saved and searchable via FTS. The error is logged but not surfaced. Entities and relationships use upsert semantics with composite unique constraints to prevent duplicates: -- `UNIQUE(name, type)` on entities -- `UNIQUE(from_id, to_id, label)` on relationships +- `UNIQUE(name, type)` on entities — conflict increments `mention_count` and updates `last_seen_at` +- `UNIQUE(from_id, to_id, label)` on relationships — conflict increments `mention_count` and preserves existing `notes` - `ON DELETE CASCADE` on relationship foreign keys After each episode is saved, `extraction.js` automatically extracts named -entities from the conversation using `qwen2.5:3b` on Ollama — fire-and-forget. +entities **and relationships** from the conversation using `qwen2.5:3b` on +Ollama — fire-and-forget. Each saved entity is also linked to the episode +via the `entity_episodes` join table. -> For full details on the extraction pipeline, prompt format, constrained -> decoding, stoplist, and Qdrant storage, see `entity-extraction.md`. +> For full details on the extraction pipeline and JSON format, see `entity-extraction.md`. +> For the knowledge graph traversal layer, see `knowledge-graph.md`. + +## Knowledge Graph Layer + +`src/graph/index.js` provides SQLite-based graph traversal over the entities +and relationships tables. Two functions are exposed via HTTP: + +- **`getNeighborhood(entityId, depth)`** — recursive CTE traversal, bidirectional, returns `{ nodes, edges }` +- **`getEntityNeighbors(entityIds[])`** — bulk 1-hop traversal for orchestration context assembly + +> For design rationale, traversal queries, and integration with orchestration, see `knowledge-graph.md`. ## Summaries Layer @@ -175,4 +197,4 @@ const doDelete = db.transaction(() => { }); ``` -For all HTTP endpoints, see `api-routes.md`. \ No newline at end of file +For all HTTP endpoints, see `api-routes.md`. diff --git a/docs/services/orchestration-service.md b/docs/services/orchestration-service.md index 89aebfd..8e03319 100644 --- a/docs/services/orchestration-service.md +++ b/docs/services/orchestration-service.md @@ -42,9 +42,10 @@ src/ │ ├── inference.js # HTTP client for inference service │ ├── embedding.js # HTTP client for embedding service │ ├── qdrant.js # HTTP client for Qdrant (direct vector search) +│ ├── graph.js # HTTP client for memory-service graph endpoints │ └── summarization.js # Session summarisation — triggers after each episode ├── chat/ -│ └── index.js # Core pipeline — context assembly, isolation, auto-naming +│ └── index.js # Core pipeline — context assembly, graph expansion, auto-naming ├── config/ │ └── settings.js # Settings load/save — reads/writes data/settings.json ├── routes/ @@ -71,7 +72,7 @@ via `appSettings.load()` — changes apply immediately without a service restart |---|---|---| | `recentEpisodeLimit` | 5 | Recent episodes injected into prompt | | `semanticLimit` | 5 | Semantic search results injected into prompt | -| `scoreThreshold` | 0.75 | Minimum similarity score for semantic results | +| `scoreThreshold` | 0.5 | Minimum similarity score for semantic results | | `modelsFolderPath` | `/mnt/nexus-models` | Path to folder containing .gguf files | | `temperature` | 0.7 | Inference temperature | | `repeatPenalty` | 1.1 | Repeat token penalty | @@ -104,20 +105,27 @@ difference is how the inference response is delivered to the client. episodes. Deduplicated against recent episodes. Non-critical. 6. **Entity search** — query `entities` Qdrant collection filtered by - `projectId`. Non-project sessions receive no entity context. Non-critical. + `projectId`. Returns entity IDs alongside Qdrant payload data (the Qdrant + point ID equals the SQLite entity ID). Non-critical. -7. **Prompt assembly** — combine system prompt, entity context, semantic +7. **Graph neighborhood expansion** — call `POST /graph/neighbors` on + memory-service with the entity IDs from step 6. Returns a 1-hop subgraph + `{ nodes, edges }` — entity objects plus the relationships connecting them. + If no entities were found or the graph call fails, falls back to flat entity + list (no edges). Non-critical. + +8. **Prompt assembly** — combine system prompt, graph context, semantic episodes, recent episodes, and user message. -8. **Inference** — send to inference service. `/chat` awaits full response; +9. **Inference** — send to inference service. `/chat` awaits full response; `/chat/stream` pipes SSE chunks to the client. -9. **Episode write** — write exchange back to memory with `projectId`. +10. **Episode write** — write exchange back to memory with `projectId`. -10. **Summarisation trigger** — `triggerSummary(session, allEpisodes)` called +11. **Summarisation trigger** — `triggerSummary(session, allEpisodes)` called fire-and-forget. See `summarization.md` for full details. -11. **Auto-naming** — on first message with no session name, fires a secondary +12. **Auto-naming** — on first message with no session name, fires a secondary inference call (max 20 tokens, temperature 0.3) to generate a session name. ### Prompt Structure @@ -125,8 +133,9 @@ difference is how the inference response is delivered to the client. ``` [Resolved system prompt] -Here is what you know about entities relevant to this conversation: +Here is what you know about entities relevant to this conversation and their connections: - {name} ({type}): {notes} + → {label} {neighbor_name} ({neighbor_type}) --- Here are some relevant memories from earlier conversations: User: {past user message} @@ -141,6 +150,12 @@ User: {current message} Assistant: ``` +The entity block renders the full graph neighborhood — seed entities matched +by Qdrant search plus any neighbors pulled in by 1-hop traversal. Each entity +shows its `notes` and any outbound relationships with their targets. Neighbor +nodes that have no outbound edges within the subgraph appear without connection +lines. + ## Summarisation After each episode write, `triggerSummary` is called fire-and-forget. It @@ -199,4 +214,7 @@ handle /health* { reverse_proxy localhost:4000 } After updating: `caddy reload --config /path/to/Caddyfile` -For all HTTP endpoints, see `api-routes.md`. \ No newline at end of file +> Note: `/graph` routes are on the memory-service (port 3002) and are called +> internally by orchestration — they do not need a Caddy entry. + +For all HTTP endpoints, see `api-routes.md`. diff --git a/packages/memory-service/CLAUDE.md b/packages/memory-service/CLAUDE.md index d3a725e..24e40ed 100644 --- a/packages/memory-service/CLAUDE.md +++ b/packages/memory-service/CLAUDE.md @@ -19,15 +19,26 @@ Default port: **3002**. Requires Qdrant and the embedding-service to be reachabl - `sessions` and `episodes` are linked by FK with cascade delete — deleting a session removes all its episodes automatically. - `episodes_fts` is an FTS5 virtual table that mirrors `user_message` and `ai_response`. It is kept in sync via SQL triggers on INSERT/UPDATE/DELETE. On service startup, the FTS index is fully rebuilt from live episode data. -- Several columns (`sessions.name`, `sessions.project_id`, `projects.isolated`, etc.) were added as migrations using `ALTER TABLE` wrapped in individual try-catch blocks. Failures are silently swallowed — if a column already exists, the alter fails and the service continues. The `idx_summaries_project` index is defined twice (benign duplicate). +- Several columns (`sessions.name`, `sessions.project_id`, `entities.mention_count`, etc.) were added as migrations using `ALTER TABLE` wrapped in individual try-catch blocks. Failures are silently swallowed — if a column already exists, the alter fails and the service continues. The `idx_summaries_project` index is defined twice (benign duplicate). - `summaries` rows with `session_id IS NULL` and a `project_id` represent project-level overviews, not session summaries. This distinction is how `GET /projects/:id/overview` works. +- `entity_episodes` is a join table linking entities to the episodes where they were first extracted. Used for provenance tracking and future orphan cleanup. Defined in `schema.js` (not a migration), so it exists on all installs. + +**New columns on `entities` (added via migration):** +- `mention_count INTEGER DEFAULT 1` — incremented every time this entity is re-extracted +- `confidence REAL DEFAULT 1.0` — reserved for future confidence scoring +- `source TEXT DEFAULT 'extraction'` — `'extraction'` or `'manual'` +- `last_seen_at INTEGER` — Unix timestamp of most recent extraction hit + +**New columns on `relationships` (added via migration):** +- `mention_count INTEGER DEFAULT 1` — incremented every time this edge is re-extracted +- `notes TEXT` — relationship context sentence from extraction ## Async Pipeline: Episode Creation `POST /episodes` returns a 201 as soon as the SQLite insert succeeds. Two background tasks run after without blocking the response: 1. **Embedding** — Fetches a vector from embedding-service, stores to Qdrant with `{sessionId, createdAt}` as payload metadata. -2. **Entity extraction** — Sends the episode text to Ollama (`qwen2.5:3b`, temp 0.1, 200 tokens) and upserts any recognized entities to both SQLite and Qdrant. +2. **Entity + relationship extraction** — Sends the episode text to Ollama (`qwen2.5:3b`, temp 0.1, 1500 tokens) and upserts any recognized entities and relationships to both SQLite and Qdrant. Also links each entity to the episode via `entity_episodes`. Both tasks catch and log errors silently. An episode can exist in SQLite with no corresponding Qdrant point if either step fails. @@ -36,26 +47,39 @@ Both tasks catch and log errors silently. An episode can exist in SQLite with no `src/entities/extraction.js`: - Fetches the last 20 known entities from SQLite before prompting the model, so the prompt can ask for name/type consistency with existing entries. -- Recognized types: `person`, `place`, `project`, `technology`, `concept`, `organization` — anything else is discarded. +- Recognized entity types: `person`, `place`, `project`, `technology`, `concept`, `organization` — anything else is discarded. - Ignores a hardcoded list of low-value names (`hello`, `thanks`, `good morning`, etc.). - Extracts JSON using a regex (`{...}`) applied to raw model output, so surrounding prose doesn't break parsing. -- Entity upsert uses `ON CONFLICT(name, type) DO UPDATE` — preserves existing `notes` if the new extraction returns null (`COALESCE(entities.notes, excluded.notes)`). +- The model is asked to return both entities and relationships in a single JSON response: `{ "entities": [...], "relationships": [...] }`. +- Entity upsert uses `ON CONFLICT(name, type) DO UPDATE` — preserves existing `notes` if the new extraction returns null, increments `mention_count`, updates `last_seen_at`. +- Relationship upsert uses `ON CONFLICT(from_id, to_id, label) DO UPDATE` — increments `mention_count`, preserves existing `notes` if new is null. +- Relationships are resolved by looking up both endpoints in the `entityMap` built during entity processing — if either entity wasn't saved (filtered out or invalid type), the relationship is silently dropped. - After upsert, embeds each entity as `"${name} (${type}): ${notes}"` and stores to Qdrant with `projectId` in the payload for project-scoped filtering. +> For full details see `docs/services/entity-extraction.md` and `docs/services/knowledge-graph.md`. + +## Knowledge Graph + +`src/graph/index.js` provides two SQLite traversal functions: + +- **`getNeighborhood(entityId, depth)`** — Single-entity recursive CTE traversal. Bidirectional (follows edges in both directions). Returns `{ nodes: [...entities], edges: [...relationships] }`. Depth defaults to `ENTITIES.GRAPH_HOP_DEPTH` (1), max enforced to 3 at the HTTP layer. + +- **`getEntityNeighbors(entityIds[])`** — Bulk 1-hop version for orchestration. Given a set of seed entity IDs, returns their immediate neighbors plus all edges within the combined node set. + +The recursive CTE uses `UNION` (not `UNION ALL`) to eliminate cycles and duplicate visits automatically. + +> For full design rationale and usage see `docs/services/knowledge-graph.md`. + ## Summarization Strategy `src/summarization/project.js`: - Preferred path: generate a project overview from existing **session-level summaries** (higher-level abstraction, shorter input). - Fallback path: if no session summaries exist, summarize raw episodes directly (up to `SUMMARIES.MAX_PROJECT_EPISODE_LIMIT`). -- Both paths truncate input at `SUMMARIES.MAX_SUMMARY_CHARS` (30,000 chars) by slicing from the end (most recent content wins). +- Both paths truncate input at `SUMMARIES.MAX_SUMMARY_CHARS` (8,000 chars) by slicing from the end (most recent content wins). - Strips ChatML tokens from the Ollama response (`<|im_start|>`, `<|im_end|>`). - Uses temp 0.2 and `num_predict 1200`. -## Known Quirk: `getRecentEpisodes` - -`src/episodic/index.js` `getRecentEpisodes(sessionId, limit)` has a parameter mismatch — the SQLite query binds only `limit`, not `sessionId`, so it returns recent episodes across **all sessions**. Orchestration-service uses `getEpisodesBySession()` (the paginated route) instead, so this bug is not visible in normal operation. Don't rely on `getRecentEpisodes` when you need session-scoped results. - ## Qdrant Client `src/semantic/index.js` creates the Qdrant client lazily on first use and reuses it. All three collections (`episodes`, `entities`, `summaries`) are created at startup if missing. There is no connection health check — if Qdrant is unreachable, semantic operations throw at call time. @@ -68,16 +92,18 @@ Both tasks catch and log errors silently. An episode can exist in SQLite with no | GET/POST | `/sessions` | POST requires `externalId`; duplicate → 409 | | GET/PATCH | `/sessions/by-external/:externalId` | PATCH accepts `name`, `projectId` | | DELETE | `/sessions/by-external/:externalId` | Cascades to episodes, summaries, relationships | -| GET/POST | `/episodes` | POST triggers async embedding + entity extraction | +| GET/POST | `/episodes` | POST triggers async embedding + entity/relationship extraction | | GET | `/episodes/search` | FTS5 search; route must precede `/:id` | | GET | `/sessions/:id/episodes` | Paginated, ordered `created_at DESC` | | DELETE | `/episodes/:id` | Removes from SQLite + async Qdrant delete | -| POST | `/entities` | Upsert by `(name, type)` | +| POST | `/entities` | Upsert by `(name, type)`; increments `mention_count` on conflict | | GET | `/entities/by-type/:type` | All entities of given type | | GET/DELETE | `/entities/:id` | | -| POST | `/relationships` | Upsert by `(fromId, toId, label)`; conflict = no-op | +| POST | `/relationships` | Upsert by `(fromId, toId, label)`; increments `mention_count` on conflict. Body: `fromId`, `toId`, `label`, `notes` (optional) | | GET | `/entities/:id/relationships` | Outbound only | | DELETE | `/relationships` | Body: `fromId`, `toId`, `label` | +| GET | `/graph/neighborhood/:entityId` | Single-entity neighborhood; `?depth=` (default 1, max 3) | +| POST | `/graph/neighbors` | Bulk 1-hop neighborhood; body: `{ entityIds: [...] }` | | GET/POST | `/projects` | POST requires non-empty `name` | | GET/PATCH/DELETE | `/projects/:id` | | | POST | `/projects/:id/summarize` | On-demand overview generation; 422 if no data | diff --git a/packages/memory-service/src/db/index.js b/packages/memory-service/src/db/index.js index d77b6c6..20ac118 100644 --- a/packages/memory-service/src/db/index.js +++ b/packages/memory-service/src/db/index.js @@ -54,9 +54,14 @@ function getDB() { db.exec(`CREATE INDEX IF NOT EXISTS idx_summaries_session ON summaries(session_id)`); } catch {} - try { - db.exec(`CREATE INDEX IF NOT EXISTS idx_summaries_project ON summaries(project_id)`); - } catch {} + try { db.exec(`ALTER TABLE entities ADD COLUMN mention_count INTEGER NOT NULL DEFAULT 1`) } catch {} + try { db.exec(`ALTER TABLE entities ADD COLUMN confidence REAL NOT NULL DEFAULT 1.0`) } catch {} + try { db.exec(`ALTER TABLE entities ADD COLUMN source TEXT NOT NULL DEFAULT 'extraction'`) } catch {} + try { db.exec(`ALTER TABLE entities ADD COLUMN last_seen_at INTEGER`) } catch {} + + try { db.exec(`ALTER TABLE relationships ADD COLUMN mention_count INTEGER NOT NULL DEFAULT 1`) } catch {} + try { db.exec(`ALTER TABLE relationships ADD COLUMN notes TEXT`) } catch {} + // Sync FTS index with any existing episodes data db.exec(`INSERT OR REPLACE INTO episodes_fts(rowid, user_message, ai_response) diff --git a/packages/memory-service/src/db/schema.js b/packages/memory-service/src/db/schema.js index 40d619b..e96e688 100644 --- a/packages/memory-service/src/db/schema.js +++ b/packages/memory-service/src/db/schema.js @@ -38,6 +38,20 @@ const schema = ` UNIQUE(from_id, to_id, label) ); + CREATE INDEX IF NOT EXISTS idx_relationships_from ON relationships(from_id); + CREATE INDEX IF NOT EXISTS idx_relationships_to ON relationships(to_id); + + CREATE TABLE IF NOT EXISTS entity_episodes ( + entity_id INTEGER NOT NULL REFERENCES entities(id) ON DELETE CASCADE, + episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE, + PRIMARY KEY (entity_id, episode_id) + ); + + CREATE INDEX IF NOT EXISTS idx_entity_episodes_entity ON entity_episodes(entity_id); + CREATE INDEX IF NOT EXISTS idx_entity_episodes_episode ON entity_episodes(episode_id); + + + CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, diff --git a/packages/memory-service/src/entities/extraction.js b/packages/memory-service/src/entities/extraction.js index 22503eb..ecd11c9 100644 --- a/packages/memory-service/src/entities/extraction.js +++ b/packages/memory-service/src/entities/extraction.js @@ -1,14 +1,18 @@ const semantic = require('../semantic') const { getEnv, SERVICES, formatEpisodeText, ENTITIES, logger } = require('@nexusai/shared'); -const { upsertEntity } = require('./index'); +const { upsertEntity, upsertRelationship, linkEntityToEpisode } = require('./index'); const EXTRACTION_URL = getEnv('EXTRACTION_URL', 'http://localhost:11434'); -const EXTRACTION_MODEL = getEnv('EXTRACTION_MODEL', 'qwen2.5:3b'); +const EXTRACTION_MODEL = getEnv('EXTRACTION_MODEL', 'qwen2.5:3b'); // ChatML format — see buildExtractionPrompt const EMBEDDING_SERVICE_URL = getEnv('EMBEDDING_SERVICE_URL', SERVICES.EMBEDDING_URL); const ENTITY_TYPES = ['person', 'place', 'project', 'technology', 'concept', 'organization']; const IGNORED_NAMES = ['good morning', 'good night', 'hello', 'goodbye', 'thanks', 'thank you']; +// NOTE: This prompt uses ChatML format (<|im_start|> / <|im_end|> tags), which is +// specific to qwen-family models. If EXTRACTION_MODEL is changed to a Llama-family +// or other model, this format will need to change — most alternatives use either +// plain text or [INST] / <> tags. Silent degradation is likely if mismatched. function buildExtractionPrompt(userMessage, aiResponse, knownEntities = []) { const knownBlock = knownEntities.length > 0 ? [ @@ -20,21 +24,22 @@ function buildExtractionPrompt(userMessage, aiResponse, knownEntities = []) { return [ '<|im_start|>system', - 'You are a named entity extractor. You output only valid JSON.', + 'You are a named entity and relationship extractor. You output only valid JSON.', '<|im_end|>', '<|im_start|>user', - 'Read the conversation below and extract every named entity mentioned.', - `Entity types to extract: ${ENTITY_TYPES.join(', ')}`, - 'For each entity found, provide: name, type, and a one-sentence notes field.', - 'Return your answer as: { "entities": [ ... ] }', - 'For each entity found, you MUST provide a non-empty notes field describing it based on the conversation.', - 'For each entity found, provide:', - ' "name": short proper noun only (max 4 words, e.g. "Sydney", "NexusAI", "Tim")', + 'Read the conversation below and extract all named entities and the relationships between them.', + `Entity types: ${ENTITY_TYPES.join(', ')}`, + 'For each entity provide:', + ' "name": short proper noun only (max 4 words)', ' "type": one of the valid types', - ' "notes": one specific sentence about this entity based on the conversation (not generic)', + ' "notes": one specific sentence about this entity based on the conversation', + 'For relationships, use snake_case verb labels (e.g. works_on, manages, uses, knows, located_in, part_of, created_by).', + 'Only include relationships between entities you have listed above.', + 'Return this exact JSON structure:', + '{ "entities": [{"name": "...", "type": "...", "notes": "..."}], "relationships": [{"from": "...", "fromType": "...", "to": "...", "toType": "...", "label": "...", "notes": "..."}] }', '', knownBlock, - '--- CONVERSATION ---', // clear delimiter helps smaller models + '--- CONVERSATION ---', `User: ${userMessage}`, `Assistant: ${aiResponse}`, '--- END CONVERSATION ---', @@ -58,7 +63,7 @@ async function embedEntity(entity) { return data.embedding; } -async function extractAndStoreEntities(userMessage, aiResponse, projectId=null) { +async function extractAndStoreEntities(userMessage, aiResponse, episodeId=null, projectId=null) { logger.info('[entities] Extraction triggered') try { // Fetch existing entities to guide the model toward consistent name/type pairs @@ -105,21 +110,23 @@ async function extractAndStoreEntities(userMessage, aiResponse, projectId=null) const entities = Array.isArray(parsed.entities) ? parsed.entities : []; if (entities.length === 0) { logger.debug('[entities] No entities found in this exchange — skipping'); - return; // not an error, just nothing to extract + return; } + // Map of "name::type" → saved entity, used for relationship resolution below + const entityMap = new Map(); let saved = 0; - for (const { name, type, notes } of entities) { - if (!name || !type || !ENTITY_TYPES.includes(type)) continue; if (IGNORED_NAMES.includes(name.toLowerCase())) continue; const entity = upsertEntity(name, type, notes ?? null); + entityMap.set(`${name}::${type}`, entity); logger.info('[entities] Upserted entity:', entity); - // Embed and upsert to Qdrant fire-and-forget + if (episodeId) linkEntityToEpisode(entity.id, episodeId); + embedEntity(entity) .then(vector => semantic.upsertEntity(entity.id, vector, { name: entity.name, @@ -136,6 +143,23 @@ async function extractAndStoreEntities(userMessage, aiResponse, projectId=null) if (saved > 0) logger.info(`[entities] Extracted and stored ${saved} entities`); + // Process extracted relationships — both entities must have been saved above + const relationships = Array.isArray(parsed.relationships) ? parsed.relationships : []; + let relSaved = 0; + + for (const { from, fromType, to, toType, label, notes } of relationships) { + if (!from || !fromType || !to || !toType || !label) continue; + + const fromEntity = entityMap.get(`${from}::${fromType}`); + const toEntity = entityMap.get(`${to}::${toType}`); + if (!fromEntity || !toEntity) continue; + + upsertRelationship(fromEntity.id, toEntity.id, label, notes ?? null); + relSaved++; + } + + if (relSaved > 0) logger.info(`[entities] Extracted and stored ${relSaved} relationships`); + } catch (err) { // Non-critical — log and move on, episode is already saved logger.warn('[entities] Extraction failed:', err.message); diff --git a/packages/memory-service/src/entities/index.js b/packages/memory-service/src/entities/index.js index 55c4479..fe1de87 100644 --- a/packages/memory-service/src/entities/index.js +++ b/packages/memory-service/src/entities/index.js @@ -4,18 +4,23 @@ const { parseRow } = require ('@nexusai/shared') /******* Entities ********/ // Upsert an entity - insert or update if (name, type) already exists -function upsertEntity(name, type, notes = null, metadata = null) { +function upsertEntity(name, type, notes = null, metadata = null, source = 'extraction') { const db = getDB(); - const stmt = db.prepare(` - INSERT INTO entities (name, type, notes, metadata) - VALUES (?, ?, ?, ?) - ON CONFLICT(name, type) DO UPDATE SET - notes = COALESCE(entities.notes, excluded.notes), - metadata = excluded.metadata, - updated_at = unixepoch() - `); - const result = stmt.run(name, type, notes, metadata ? JSON.stringify(metadata) : null); - +const stmt = db.prepare(` + INSERT INTO entities (name, type, notes, metadata, source) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(name, type) DO UPDATE SET + -- First extraction wins: notes are never overwritten once set. + -- Revisit during Memory Consolidation Lifecycle (Phase 2) — once entity + -- quality scoring exists, a higher-confidence extraction should be able + -- to replace stale notes rather than being silently dropped. + notes = COALESCE(entities.notes, excluded.notes), + metadata = excluded.metadata, + mention_count = entities.mention_count + 1, + last_seen_at = unixepoch(), + updated_at = unixepoch() +`); + stmt.run(name, type, notes, metadata ? JSON.stringify(metadata) : null, source); return getEntityByNameType(name, type); } @@ -40,15 +45,17 @@ function deleteEntity(id) { /********* Relationships *********/ // Upsert a relationship, insert or ignore if (from_id, to_id, label) already exists -function upsertRelationship(fromId, toId, label, metadata = null){ +function upsertRelationship(fromId, toId, label, notes = null, metadata = null) { const db = getDB(); const stmt = db.prepare(` - INSERT INTO relationships (from_id, to_id, label, metadata) - VALUES (?, ?, ?, ?) - ON CONFLICT(from_id, to_id, label) DO NOTHING + INSERT INTO relationships (from_id, to_id, label, notes, metadata) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(from_id, to_id, label) DO UPDATE SET + mention_count = relationships.mention_count + 1, + -- First extraction wins for notes — same policy as entities. + notes = COALESCE(relationships.notes, excluded.notes) `); - - const result = stmt.run(fromId, toId, label, metadata ?JSON.stringify(metadata) : null); + stmt.run(fromId, toId, label, notes, metadata ? JSON.stringify(metadata) : null); return getRelationship(fromId, toId, label); } @@ -69,7 +76,7 @@ function getEntityByNameType(name, type) { } // Retrive all relationships originating from a given entity -function getRelationshipsByEntity(entityId) { +function getOutboundRelationships(entityId) { const db = getDB(); return db.prepare(`SELECT * FROM relationships WHERE from_id = ?`).all(entityId).map(parseRow); } @@ -81,14 +88,23 @@ function deleteRelationship(fromId, toId, label) { db.prepare(`DELETE FROM relationships WHERE from_id = ? AND to_id = ? AND label = ?`).run(fromId, toId, label); } +function linkEntityToEpisode(entityId, episodeId) { + const db = getDB(); + db.prepare(` + INSERT OR IGNORE INTO entity_episodes (entity_id, episode_id) + VALUES (?, ?) + `).run(entityId, episodeId); +} + module.exports = { upsertEntity, getEntity, getEntitiesByType, getEntityByNameType, deleteEntity, + linkEntityToEpisode, upsertRelationship, getRelationship, - getRelationshipsByEntity, + getOutboundRelationships, deleteRelationship } \ No newline at end of file diff --git a/packages/memory-service/src/episodic/index.js b/packages/memory-service/src/episodic/index.js index 622d430..5be71ce 100644 --- a/packages/memory-service/src/episodic/index.js +++ b/packages/memory-service/src/episodic/index.js @@ -127,7 +127,7 @@ async function createEpisode(sessionId, userMessage, aiResponse, tokenCount = nu })) .catch(err => logger.error(`Failed to embed episode ${episode.id}:`, err.message)); - extractAndStoreEntities(userMessage, aiResponse, projectId) + extractAndStoreEntities(userMessage, aiResponse, episode.id, projectId) .catch(err => logger.error(`Failed to extract entities for episode ${episode.id}:`, err.message)); diff --git a/packages/memory-service/src/graph/index.js b/packages/memory-service/src/graph/index.js new file mode 100644 index 0000000..32e05f4 --- /dev/null +++ b/packages/memory-service/src/graph/index.js @@ -0,0 +1,67 @@ +const { getDB } = require('../db'); +const { parseRow, ENTITIES } = require('@nexusai/shared'); + +// Single-entity neighborhood via recursive CTE — bidirectional, configurable depth +function getNeighborhood(entityId, depth = ENTITIES.GRAPH_HOP_DEPTH) { + const db = getDB(); + + const nodeRows = db.prepare(` + WITH RECURSIVE traverse(entity_id, depth) AS ( + SELECT ?, 0 + UNION + SELECT + CASE WHEN r.from_id = t.entity_id THEN r.to_id ELSE r.from_id END, + t.depth + 1 + FROM relationships r + JOIN traverse t ON (r.from_id = t.entity_id OR r.to_id = t.entity_id) + WHERE t.depth < ? + ) + SELECT DISTINCT entity_id FROM traverse + `).all(entityId, depth); + + const nodeIds = nodeRows.map(r => r.entity_id); + if (nodeIds.length === 0) return { nodes: [], edges: [] }; + + const ph = nodeIds.map(() => '?').join(','); + const nodes = db.prepare( + `SELECT * FROM entities WHERE id IN (${ph})` + ).all(...nodeIds).map(parseRow); + + const edges = db.prepare( + `SELECT * FROM relationships WHERE from_id IN (${ph}) AND to_id IN (${ph})` + ).all(...nodeIds, ...nodeIds).map(parseRow); + + return { nodes, edges }; +} + +// Bulk 1-hop neighborhood for orchestration — seeds are entity IDs from Qdrant search +function getEntityNeighbors(entityIds) { + if (!entityIds.length) return { nodes: [], edges: [] }; + const db = getDB(); + + const ph = entityIds.map(() => '?').join(','); + + // entityIds appears three times — once for the CASE (finding the neighbor), + // and once each for the FROM and TO sides of the WHERE clause + const neighborRows = db.prepare(` + SELECT DISTINCT + CASE WHEN from_id IN (${ph}) THEN to_id ELSE from_id END AS entity_id + FROM relationships + WHERE from_id IN (${ph}) OR to_id IN (${ph}) + `).all(...entityIds, ...entityIds, ...entityIds); + + const allIds = [...new Set([...entityIds, ...neighborRows.map(r => r.entity_id)])]; + const allPh = allIds.map(() => '?').join(','); + + const nodes = db.prepare( + `SELECT * FROM entities WHERE id IN (${allPh})` + ).all(...allIds).map(parseRow); + + const edges = db.prepare( + `SELECT * FROM relationships WHERE from_id IN (${allPh}) AND to_id IN (${allPh})` + ).all(...allIds, ...allIds).map(parseRow); + + return { nodes, edges }; +} + +module.exports = { getNeighborhood, getEntityNeighbors }; diff --git a/packages/memory-service/src/index.js b/packages/memory-service/src/index.js index 8786217..f6da6c0 100644 --- a/packages/memory-service/src/index.js +++ b/packages/memory-service/src/index.js @@ -5,6 +5,7 @@ const { getDB } = require('./db'); const { createProject, getProjects, getProject, updateProject, deleteProject } = require('./db/projects'); const { createSummary, getSummary, getSummariesBySession, getSummariesByProject, updateSummary, deleteSummary } = require('./db/summaries'); const { generateAndStoreProjectSummary } = require('./summarization/project'); +const graph = require('./graph'); const episodic = require('./episodic'); const semantic = require('./semantic'); @@ -202,17 +203,17 @@ app.delete('/entities/:id', (req, res) => { // Upsert a relationship between two entities app.post('/relationships', (req, res) => { - const {fromId, toId, label, metadata } = req.body; + const { fromId, toId, label, notes, metadata } = req.body; if (!fromId || !toId || !label) { return res.status(400).json({ error: 'fromId, toId and label are required' }); } - const relationship = entities.upsertRelationship(fromId, toId, label, metadata); + const relationship = entities.upsertRelationship(fromId, toId, label, notes, metadata); res.status(201).json(relationship); }); // Get all relationships for a given entity ID app.get('/entities/:id/relationships', (req, res) => { - res.json(entities.getRelationshipsByEntity(req.params.id)); + res.json(entities.getOutboundRelationships(req.params.id)); }); // Delete a specific relationship @@ -225,6 +226,29 @@ app.delete('/relationships', (req, res) => { res.status(204).send(); }) +/********************************* */ +/********** Graph Routes ********** */ +/********************************* */ + +// Single-entity neighborhood — depth defaults to ENTITIES.GRAPH_HOP_DEPTH +app.get('/graph/neighborhood/:entityId', (req, res) => { + const entity = entities.getEntity(req.params.entityId); + if (!entity) return res.status(404).json({ error: 'Entity not found' }); + + const depth = req.query.depth ? Math.min(Number(req.query.depth), 3) : undefined; + const neighborhood = graph.getNeighborhood(Number(req.params.entityId), depth); + res.json({ entity, neighborhood }); +}); + +// Bulk 1-hop neighborhood — body: { entityIds: [...] } +app.post('/graph/neighbors', (req, res) => { + const { entityIds } = req.body; + if (!Array.isArray(entityIds) || entityIds.length === 0) { + return res.status(400).json({ error: 'entityIds array is required' }); + } + res.json(graph.getEntityNeighbors(entityIds.map(Number))); +}); + /*********************************** */ /********** Project Routes ********** */ /*********************************** */ diff --git a/packages/orchestration-service/CLAUDE.md b/packages/orchestration-service/CLAUDE.md index dab26c1..6cd5967 100644 --- a/packages/orchestration-service/CLAUDE.md +++ b/packages/orchestration-service/CLAUDE.md @@ -24,10 +24,25 @@ Default port: **4000**. Depends on memory-service, embedding-service, inference- - No project: `must: [sessionId == this session]` - Project: `should: [sessionId == s1, sessionId == s2, ...]` across all project sessions - Dedup against recent episode IDs before including. -5. Embed and search Qdrant ENTITIES; filter by `projectId` if applicable. -6. Build prompt in this fixed order: **system prompt → entities → semantic episodes → recent episodes → user message → "Assistant:"** +5. Embed and search Qdrant ENTITIES (filtered by `projectId` if in a project). Returns entity IDs alongside payload — the Qdrant point ID equals the SQLite entity ID. +6. Expand matched entities into a 1-hop graph neighborhood via `POST /graph/neighbors` on the memory-service. Returns `{ nodes, edges }` — the full entity objects plus connecting relationships. Falls back to flat entity list (no edges) if the graph call fails. +7. Build prompt in this fixed order: **system prompt → graph context → semantic episodes → recent episodes → user message → "Assistant:"** -The ordering prioritizes established facts (entities) and relevant past context (semantic) over pure recency. +The ordering prioritizes established facts (graph context) and relevant past context (semantic) over pure recency. + +## Graph Context Format + +`formatGraphContext(nodes, edges)` in `src/chat/index.js` formats the neighborhood as: + +``` +- Alice (person): software engineer working on NexusAI + → works_on NexusAI (project) + → knows Bob (person) +- NexusAI (project): AI assistant framework +- Bob (person): Alice's colleague +``` + +Each node shows its notes on the first line. Outbound edges are indented below with `→ label target (type)`. Nodes with only inbound edges (neighbors pulled in by traversal) appear without connection lines. ## System Prompt Resolution @@ -85,6 +100,12 @@ When the existing summary's token count exceeds `SUMMARIES.MAX_SUMMARY_TOKENS`, `searchEntities` checks `projectId !== null && projectId !== undefined` before applying the filter — a session with no project skips the filter entirely and searches globally. +## Graph Service Client (`src/services/graph.js`) + +Thin HTTP client for memory-service graph endpoints. One function: + +- **`getNeighbors(entityIds[])`** — POSTs to `memory-service/graph/neighbors` with the entity IDs from Qdrant entity search. Returns `{ nodes, edges }`. Throws on non-2xx — caller wraps in try/catch with graceful fallback. + ## Models Endpoint `GET /models` scans `modelsFolderPath` for `.gguf` files and optionally reads a `models.json` manifest (keyed by filename) for labels and descriptions. File size is reported in GB. Returns 500 if the folder is inaccessible. @@ -96,9 +117,7 @@ When the existing summary's token count exceeds `SUMMARIES.MAX_SUMMARY_TOKENS`, `GET /health/services` runs parallel fetch calls to all four dependent services with a 3-second `AbortSignal.timeout` each. Results are returned as an array — the endpoint never returns a non-2xx itself regardless of downstream status. ## Background Model (qwen2.5:3b) -Used for entity extraction and summarization via Ollama on Mini PC 1. Uses **ChatML -format** (`<|im_start|>` / `<|im_end|>`) — not Phi3 format. Use `format: 'json'` -only for structured extraction, never for free-text summarization. +Used for entity/relationship extraction and summarization via Ollama on Mini PC 1. Uses **ChatML format** (`<|im_start|>` / `<|im_end|>`) — not Phi3 format. Use `format: 'json'` only for structured extraction, never for free-text summarization. ## API Endpoints Quick Reference diff --git a/packages/orchestration-service/src/chat/index.js b/packages/orchestration-service/src/chat/index.js index 097d4c4..5c585b2 100644 --- a/packages/orchestration-service/src/chat/index.js +++ b/packages/orchestration-service/src/chat/index.js @@ -5,22 +5,20 @@ const qdrant = require("../services/qdrant"); const { ORCHESTRATION, logger } = require("@nexusai/shared"); const appSettings = require("../config/settings"); const {triggerSummary} = require('../services/summarization') +const graph = require('../services/graph'); -function buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage, systemPrompt) { +function buildPrompt(recentEpisodes, semanticEpisodes, neighborhood, userMessage, systemPrompt) { const parts = [systemPrompt ?? ORCHESTRATION.SYSTEM_PROMPT]; - if (entities.length > 0) { - parts.push( - "Here is what you know about entities relevant to this conversation:", - ); - for (const e of entities) { - parts.push(`- ${e.name} (${e.type}): ${e.notes}`); + const graphText = formatGraphContext(neighborhood.nodes ?? [], neighborhood.edges ?? []); + if (graphText) { + parts.push("Here is what you know about entities relevant to this conversation and their connections:"); + parts.push(graphText); + parts.push("---"); } - parts.push("---"); - } if (semanticEpisodes.length > 0) { - parts.push("Here are some relevant memories from earlier conversations:"); + parts.push("Long-term memory (semantically relevant to this message):"); for (const ep of semanticEpisodes) { parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`); } @@ -28,7 +26,7 @@ function buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage, sy } if (recentEpisodes.length > 0) { - parts.push(`Here are some relevant memories from your past conversations:`); + parts.push("Recent conversation history (most recent exchanges):"); for (const ep of recentEpisodes) { parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`); } @@ -54,6 +52,28 @@ function buildNamingPrompt(userMessage, aiResponse) { ].join("\n"); } +function formatGraphContext(nodes, edges) { + if (!nodes.length) return null; + + const nodeMap = new Map(nodes.map(n => [n.id, n])); + + // Build outbound adjacency + const outbound = new Map(nodes.map(n => [n.id, []])); + for (const edge of edges) { + if (outbound.has(edge.from_id) && nodeMap.has(edge.to_id)) { + const target = nodeMap.get(edge.to_id); + outbound.get(edge.from_id).push(`${edge.label} ${target.name} (${target.type})`); + } + } + + return nodes.map(n => { + const lines = [`- ${n.name} (${n.type}): ${n.notes ?? '(no notes)'}`]; + for (const conn of outbound.get(n.id) ?? []) lines.push(` → ${conn}`); + return lines.join('\n'); + }).join('\n'); +} + + async function autoNameSession(externalId, userMessage, aiResponse) { try { const prompt = buildNamingPrompt(userMessage, aiResponse); @@ -107,22 +127,20 @@ async function getSemanticEpisodes( } } -async function getRelevantEntities(userMessage, projectId=null) { - try { - const vector = await embedding.embed(userMessage); - const results = await qdrant.searchEntities(vector, { projectId }); - logger.info( - "[orchestration] Entity search results:", - results.map((r) => ({ name: r.payload?.name, score: r.score })), - ); - return results.map((r) => r.payload).filter(Boolean); - } catch (err) { - logger.debug( - "[orchestration] Entity search failed, continuing without:", - err.message, - ); - return []; - } +async function getRelevantEntities(userMessage, projectId = null) { + try { + const vector = await embedding.embed(userMessage); + const results = await qdrant.searchEntities(vector, { projectId }); + logger.info( + '[orchestration] Entity search results:', + results.map((r) => ({ name: r.payload?.name, score: r.score })), + ); + // Include the Qdrant point ID (== SQLite entity ID) for graph traversal + return results.map((r) => r.payload ? { id: r.id, ...r.payload } : null).filter(Boolean); + } catch (err) { + logger.debug('[orchestration] Entity search failed, continuing without:', err.message); + return []; + } } async function assembleContext(externalId, userMessage) { @@ -159,10 +177,22 @@ async function assembleContext(externalId, userMessage) { const semanticEpisodes = await getSemanticEpisodes( userMessage, session.id, recentIds, projectSessionIds, { semanticLimit, scoreThreshold } ); - const entities = await getRelevantEntities(userMessage, session.project_id ?? null); + const entityResults = await getRelevantEntities(userMessage, session.project_id ?? null); - // 5. Assemble prompt - const prompt = buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage, activeSystemPrompt); + // 5. Expand matched entities into 1-hop graph neighborhood + let neighborhood = { nodes: [], edges: [] }; + if (entityResults.length > 0) { + try { + neighborhood = await graph.getNeighbors(entityResults.map(e => e.id)); + } catch (err) { + logger.warn('[orchestration] Graph neighborhood fetch failed, falling back to flat entities:', err.message); + // Graceful fallback: use Qdrant payload data as flat nodes, no edges + neighborhood = { nodes: entityResults, edges: [] }; + } + } + + // 6. Assemble prompt + const prompt = buildPrompt(recentEpisodes, semanticEpisodes, neighborhood, userMessage, activeSystemPrompt); return { session, diff --git a/packages/orchestration-service/src/services/graph.js b/packages/orchestration-service/src/services/graph.js new file mode 100644 index 0000000..ec7f14c --- /dev/null +++ b/packages/orchestration-service/src/services/graph.js @@ -0,0 +1,15 @@ +const { getEnv, SERVICES } = require('@nexusai/shared'); + +const MEMORY_URL = getEnv('MEMORY_SERVICE_URL', SERVICES.MEMORY_URL); + +async function getNeighbors(entityIds) { + const res = await fetch(`${MEMORY_URL}/graph/neighbors`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ entityIds }), + }); + if (!res.ok) throw new Error(`Graph neighbors error: ${res.status}`); + return res.json(); +} + +module.exports = { getNeighbors }; diff --git a/packages/shared/src/config/constants.js b/packages/shared/src/config/constants.js index 65b5a93..0bc84db 100644 --- a/packages/shared/src/config/constants.js +++ b/packages/shared/src/config/constants.js @@ -79,8 +79,10 @@ const SUMMARIES = { const ENTITIES = { TEMPERATURE: 0.1, // Low temperature, more precise extraction, less creative - NUM_PREDICT: 1024, // Max tokens to consider for entity extraction (e.g. recent conversation) + NUM_PREDICT: 1500, // Max tokens to consider for entity extraction (e.g. recent conversation) THRESHOLD: 0.55, // Minimum confidence score for an extracted entity to be included in the results + PROMOTION_THRESHOLD: 3, // mention_count threshold before entity is considered well-established + GRAPH_HOP_DEPTH: 1, // Default traversal depth for neighborhood queries } module.exports = {