Added orchestration layer

This commit is contained in:
Storme-bit
2026-04-05 05:27:04 -07:00
parent 4b3f6455f9
commit eccda21992
5 changed files with 161 additions and 8 deletions

View File

@@ -0,0 +1,63 @@
const memory = require('./memory');
const inference = require('./inference');
const SYSTEM_PROMPT = `You are a helpful, context-aware AI assistant.
You have access to memories of past conversations with the user.
Use them to provide consistent, personalised responses.`;
const RECENT_EPISODE_LIMIT = 10; // Number of recent episodes to retrieve for context
function buildPrompt(getRecentEpisodes, userMessage) {
const parts = [SYSTEM_PROMPT];
if (getRecentEpisodes.length > 0) {
parts.push(`Here are some relevant memories from your past conversations:`);
for (const ep of recentEpisodes) {
parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`);
}
parts.push('--- End of recent memories ---\n');
}
parts.push(`User: ${userMessage}`);
parts.push('Assistant:');
return parts.join('\n');
}
async function chat(externalId, userMessage, options = {}) {
// 1. Resolve or create session
let session = await memory.getSessionByExternalId(externalId);
if (!session) {
session = await memory.createSession(externalId);
}
// 2. Fetch recent episodes for context
const recentEpisodes = await memory.getRecentEpisodes(
session.id,
RECENT_EPISODE_LIMIT
);
// 3. Assemble prompt
const prompt = buildPrompt(recentEpisodes, userMessage);
// 4. Run inference
const result = await inference.complete(prompt, options);
// 5. Write episode back to memory
memory.createEpisode(
session.id,
userMessage,
result.text,
(result.evalCount || 0) + (result.promptEvalCount || 0 )
).catch(err => console.error(`[orchestration] Failed to save episode`, err.message));
// 6. Return response
return {
sessionId: externalId,
response: result.text,
model: result.model,
tokenCount: (result.evalCount || 0 ) + (result.promptEvalCount || 0 ),
};
}
module.exports = { chat };

View File

@@ -1,27 +1,27 @@
require ('dotenv').config();
const express = require('express');
const {getEnv} = require('@nexusai/shared');
const chatRouter = require('./routes/chat');
const app = express();
app.use(express.json());
const PORT = getEnv('PORT', '3000'); // Default to 3000 if PORT is not set
//Service URLS - loaded from .env on each node
const MEMORY_URL = getEnv('MEMORY_SERVICE_URL', 'http://localhost:3002');
const INFERENCE_URL = getEnv('INFERENCE_SERVICE_URL', 'http://localhost:3001');
const EMBEDDING_URL = getEnv('EMBEDDING_SERVICE_URL', 'http://localhost:3003');
const PORT = getEnv('PORT', '4000'); // Default to 4000 if PORT is not set
// Health check endpoint
app.get('/health', (req, res) => {
res.json({
service: 'Orchestration Service',
status: 'healthy',
connections: {MEMORY_URL, INFERENCE_URL, EMBEDDING_URL}
memoryService: getEnv('MEMORY_SERVICE_URL', 'http://localhost:3002'),
embeddingService: getEnv('EMBEDDING_SERVICE_URL', 'http://localhost:3003'),
inferenceService: getEnv('INFERENCE_SERVICE_URL', 'http://localhost:3001'),
});
});
// Start the server
app.use('/chat', chatRouter);
/******* Start the server ************/
app.listen(PORT, () => {
console.log(`Orchestration Service is running on port ${PORT}`);
});

View File

@@ -0,0 +1,24 @@
const { Router } = require('express')
const { chat } = require('../chat/index');
const router = Router();
router.post('/chat', async (req, res) => {
const { sessionId, message } = req.body;
if (!sessionId) return res.status(400).json({ error: 'sessionId is required'});
if (!message) return res.status(400).json({ error: 'message is required'});
try {
const result = await chat(sessionId, message, {
model: req.body.model,
temperature: req.body.temperature,
});
res.json(result)
} catch (err) {
console.error(`[orchestration] chat error: `, err.message)
res.status(500).json ({ error: err.message})
}
});
module.exports = router;

View File

@@ -0,0 +1,18 @@
const fetch = require('node-fetch');
const { getEnv } = require('@nexusai/shared');
const BASE_URL = getEnv('INFERENCE_SERVICE_URL', 'http://localhost:3001');
async function complete(prompt, options ={}) {
const res = await fetch(`${BASE_URL}/complete`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt, ...options })
})
if (!res.ok) throw new Error(`Inference service error: ${res.status} ${res.statusText}`);
return res.json();
}
module.exports = {
complete
}

View File

@@ -0,0 +1,48 @@
const fetch = require('node-fetch');
const { getEnv } = require('@nexusai/shared');
const BASE_URL = getEnv('MEMORY_SERVICE_URL', 'http://localhost:3002');
//function to get session by external id, returns null if not found, throws error for other issues
async function getSessionByExternalId(externalId) {
const res = await fetch(`${BASE_URL}/sessions/by-external/${externalId}`);
if (!res.status === 400) return null; // Not found or bad request
if (!res.ok) throw new Error(`Memory service error: ${res.status} ${res.statusText}`); // Other errors
return res.json();
}
// create a new session with an external ID, returns the created session
async function createSession(externalId) {
const res = await fetch(`${BASE_URL}/sessions`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ externalId })
});
if (!res.ok) throw new Error(`Failed to create sessions: ${res.status} ${res.statusText}`);
return res.json();
}
async function getRecentEpisodes(sessionId, limit = 10) {
const res = await fetch(`${BASE_URL}/sessions/${sessionId}/episodes/recent?limit=${limit}`);
if (!res.ok) throw new Error(`Failed to fetch episodes: ${res.status} ${res.statusText}`);
return res.json();
}
async function createEpisode(sessionId, userMessage, aiResponse, tokenCount) {
const res = await fetch(`${BASE_URL}/episodes`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ sessionId, userMessage, aiResponse, tokenCount })
});
if (!res.ok) throw new Error(`Failed to create episode: ${res.status} ${res.statusText}`);
return res.json();
}
module.exports = {
getSessionByExternalId,
createSession,
getRecentEpisodes,
createEpisode
}