e623d42eb8
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
388 lines
11 KiB
JavaScript
388 lines
11 KiB
JavaScript
'use strict';
|
|
|
|
const express = require('express');
|
|
const { WebSocketServer } = require('ws');
|
|
const fs = require('fs');
|
|
const path = require('path');
|
|
const readline = require('readline');
|
|
const http = require('http');
|
|
const { URL } = require('url');
|
|
|
|
const PORT = parseInt(process.env.PORT || '3850', 10);
|
|
const SESSIONS_DIR = process.env.SESSIONS_DIR || '/home/node/.openclaw/agents';
|
|
|
|
const app = express();
|
|
const server = http.createServer(app);
|
|
const wss = new WebSocketServer({ noServer: true });
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Security helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/** Prevent path traversal: only allow alphanumeric, dots, hyphens, underscores */
|
|
function isValidName(name) {
|
|
return (
|
|
typeof name === 'string' &&
|
|
name.length > 0 &&
|
|
name.length <= 256 &&
|
|
/^[a-zA-Z0-9._-]+$/.test(name) &&
|
|
!name.includes('..')
|
|
);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// File discovery
|
|
// ---------------------------------------------------------------------------
|
|
|
|
function getAgentDirs() {
|
|
try {
|
|
return fs.readdirSync(SESSIONS_DIR, { withFileTypes: true })
|
|
.filter(d => d.isDirectory())
|
|
.map(d => d.name);
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns all session file descriptors for an agent.
|
|
* Handles both plain `uuid.jsonl` and archived `uuid.jsonl.deleted.*` / `uuid.jsonl.reset.*`.
|
|
*/
|
|
function getSessionFiles(agent) {
|
|
if (!isValidName(agent)) return [];
|
|
const sessionsPath = path.join(SESSIONS_DIR, agent, 'sessions');
|
|
try {
|
|
const entries = fs.readdirSync(sessionsPath);
|
|
return entries
|
|
.filter(f => f.includes('.jsonl'))
|
|
.map(f => {
|
|
// The session ID is the part before the first `.jsonl`
|
|
const jsonlIdx = f.indexOf('.jsonl');
|
|
const fileSessionId = f.slice(0, jsonlIdx);
|
|
|
|
let status = null;
|
|
if (f.includes('.jsonl.deleted')) status = 'deleted';
|
|
else if (f.includes('.jsonl.reset')) status = 'reset';
|
|
|
|
return {
|
|
filename: f,
|
|
filePath: path.join(sessionsPath, f),
|
|
agent,
|
|
fileSessionId,
|
|
status, // null = active or idle (determined later by mtime)
|
|
};
|
|
});
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/** Find the file path for a given (agent, sessionId) pair */
|
|
function findSessionFile(agent, sessionId) {
|
|
if (!isValidName(agent) || !isValidName(sessionId)) return null;
|
|
const files = getSessionFiles(agent);
|
|
const match = files.find(f => f.fileSessionId === sessionId);
|
|
return match ? match.filePath : null;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// JSONL parsing helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/** Read all non-empty lines from a file, returns string[] */
|
|
function readLines(filePath) {
|
|
return new Promise((resolve, reject) => {
|
|
const lines = [];
|
|
const stream = fs.createReadStream(filePath, { encoding: 'utf8' });
|
|
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
|
|
rl.on('line', line => { if (line.trim()) lines.push(line); });
|
|
rl.on('close', () => resolve(lines));
|
|
rl.on('error', reject);
|
|
stream.on('error', reject);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Parse a session file and return a summary object.
|
|
* Skips malformed lines silently.
|
|
*/
|
|
async function parseSessionSummary(fileInfo) {
|
|
const { filePath, fileSessionId, agent, status: fileStatus } = fileInfo;
|
|
try {
|
|
const lines = await readLines(filePath);
|
|
|
|
let sessionId = null;
|
|
let cwd = null;
|
|
let model = null;
|
|
let messageCount = 0;
|
|
let userMessages = 0;
|
|
let assistantMessages = 0;
|
|
let totalTokens = 0;
|
|
let totalCost = 0;
|
|
let firstTimestamp = null;
|
|
let lastTimestamp = null;
|
|
|
|
for (const line of lines) {
|
|
try {
|
|
const event = JSON.parse(line);
|
|
|
|
// Session start event
|
|
if (event.type === 'session') {
|
|
sessionId = event.id;
|
|
cwd = event.cwd;
|
|
if (!firstTimestamp && event.timestamp) firstTimestamp = event.timestamp;
|
|
continue;
|
|
}
|
|
|
|
// Model snapshot (most reliable model source)
|
|
if (event.type === 'custom' && event.customType === 'model-snapshot' && event.data?.modelId) {
|
|
model = event.data.modelId;
|
|
continue;
|
|
}
|
|
|
|
// Model change event
|
|
if (event.type === 'model_change' && event.modelId) {
|
|
if (!model) model = event.modelId;
|
|
continue;
|
|
}
|
|
|
|
// Message events
|
|
if (event.type === 'message') {
|
|
const msg = event.message || {};
|
|
messageCount++;
|
|
|
|
if (event.timestamp) {
|
|
if (!firstTimestamp) firstTimestamp = event.timestamp;
|
|
lastTimestamp = event.timestamp;
|
|
}
|
|
|
|
if (msg.usage) {
|
|
totalTokens += msg.usage.totalTokens || 0;
|
|
totalCost += msg.usage.cost?.total || 0;
|
|
}
|
|
|
|
// Extract model from assistant messages (skip proxy names)
|
|
if (!model && msg.model && msg.model !== 'delivery-mirror') {
|
|
model = msg.model;
|
|
}
|
|
|
|
if (msg.role === 'user') userMessages++;
|
|
if (msg.role === 'assistant') assistantMessages++;
|
|
}
|
|
} catch {
|
|
// Skip malformed line
|
|
}
|
|
}
|
|
|
|
// Determine status: file suffix wins, otherwise check mtime
|
|
let status = fileStatus;
|
|
if (!status) {
|
|
try {
|
|
const stat = fs.statSync(filePath);
|
|
const ageMs = Date.now() - stat.mtimeMs;
|
|
status = ageMs < 5 * 60 * 1000 ? 'active' : 'idle';
|
|
} catch {
|
|
status = 'idle';
|
|
}
|
|
}
|
|
|
|
return {
|
|
agent,
|
|
fileSessionId,
|
|
sessionId: sessionId || fileSessionId,
|
|
cwd,
|
|
model,
|
|
status,
|
|
messageCount,
|
|
userMessages,
|
|
assistantMessages,
|
|
totalTokens,
|
|
totalCost,
|
|
firstTimestamp,
|
|
lastTimestamp,
|
|
};
|
|
} catch {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/** Parse all events from a session file (for the detail view) */
|
|
async function parseAllEvents(filePath) {
|
|
const lines = await readLines(filePath);
|
|
const events = [];
|
|
for (const line of lines) {
|
|
try {
|
|
events.push(JSON.parse(line));
|
|
} catch {
|
|
// Skip malformed line
|
|
}
|
|
}
|
|
return events;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// API routes
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/** GET /api/sessions — list all sessions across all agents */
|
|
app.get('/api/sessions', async (req, res) => {
|
|
try {
|
|
const agents = getAgentDirs();
|
|
const fileInfos = agents.flatMap(agent => getSessionFiles(agent));
|
|
|
|
// Parse all session files in parallel
|
|
const results = await Promise.all(fileInfos.map(info => parseSessionSummary(info)));
|
|
const sessions = results.filter(Boolean);
|
|
|
|
// Sort by last activity desc (most recent first)
|
|
sessions.sort((a, b) => {
|
|
const at = a.lastTimestamp || a.firstTimestamp || '';
|
|
const bt = b.lastTimestamp || b.firstTimestamp || '';
|
|
return bt.localeCompare(at);
|
|
});
|
|
|
|
res.json(sessions);
|
|
} catch (err) {
|
|
console.error('GET /api/sessions error:', err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
/** GET /api/sessions/:agent/:id — full event log for one session */
|
|
app.get('/api/sessions/:agent/:id', async (req, res) => {
|
|
const { agent, id } = req.params;
|
|
|
|
if (!isValidName(agent) || !isValidName(id)) {
|
|
return res.status(400).json({ error: 'Invalid parameters' });
|
|
}
|
|
|
|
const filePath = findSessionFile(agent, id);
|
|
if (!filePath) {
|
|
return res.status(404).json({ error: 'Session not found' });
|
|
}
|
|
|
|
try {
|
|
const events = await parseAllEvents(filePath);
|
|
res.json(events);
|
|
} catch (err) {
|
|
console.error(`GET /api/sessions/${agent}/${id} error:`, err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// WebSocket — real-time tail of a session file
|
|
// ---------------------------------------------------------------------------
|
|
|
|
server.on('upgrade', (request, socket, head) => {
|
|
try {
|
|
const { pathname } = new URL(request.url, 'http://localhost');
|
|
if (pathname.startsWith('/ws/sessions/')) {
|
|
wss.handleUpgrade(request, socket, head, ws => {
|
|
wss.emit('connection', ws, request);
|
|
});
|
|
} else {
|
|
socket.destroy();
|
|
}
|
|
} catch {
|
|
socket.destroy();
|
|
}
|
|
});
|
|
|
|
wss.on('connection', (ws, request) => {
|
|
try {
|
|
const { pathname } = new URL(request.url, 'http://localhost');
|
|
// Pattern: /ws/sessions/:agent/:id
|
|
const match = pathname.match(/^\/ws\/sessions\/([^/]+)\/(.+)$/);
|
|
|
|
if (!match) {
|
|
ws.close(1008, 'Invalid path');
|
|
return;
|
|
}
|
|
|
|
const [, agent, sessionId] = match;
|
|
const filePath = findSessionFile(agent, sessionId);
|
|
|
|
if (!filePath) {
|
|
ws.close(1008, 'Session not found');
|
|
return;
|
|
}
|
|
|
|
let lastPosition = 0;
|
|
let remainder = '';
|
|
|
|
/** Read any new bytes since last read and emit parsed events */
|
|
const sendNewContent = () => {
|
|
if (ws.readyState !== 1 /* OPEN */) return;
|
|
try {
|
|
const stat = fs.statSync(filePath);
|
|
if (stat.size <= lastPosition) return;
|
|
|
|
const chunkSize = stat.size - lastPosition;
|
|
const buf = Buffer.alloc(chunkSize);
|
|
const fd = fs.openSync(filePath, 'r');
|
|
fs.readSync(fd, buf, 0, chunkSize, lastPosition);
|
|
fs.closeSync(fd);
|
|
lastPosition = stat.size;
|
|
|
|
remainder += buf.toString('utf8');
|
|
const lines = remainder.split('\n');
|
|
remainder = lines.pop() || ''; // keep incomplete last line
|
|
|
|
for (const line of lines) {
|
|
if (!line.trim()) continue;
|
|
// Validate JSON before sending
|
|
try {
|
|
JSON.parse(line); // throws if invalid
|
|
ws.send(line);
|
|
} catch {
|
|
// Skip malformed line
|
|
}
|
|
}
|
|
} catch {
|
|
// File read error (deleted, etc.) — ignore
|
|
}
|
|
};
|
|
|
|
// Send all existing content immediately
|
|
sendNewContent();
|
|
|
|
// Watch for new writes
|
|
let watcher = null;
|
|
try {
|
|
watcher = fs.watch(filePath, () => sendNewContent());
|
|
} catch {
|
|
// fs.watch unavailable — fall back to 1s polling
|
|
const interval = setInterval(sendNewContent, 1000);
|
|
ws.on('close', () => clearInterval(interval));
|
|
return;
|
|
}
|
|
|
|
ws.on('close', () => {
|
|
try { watcher?.close(); } catch { /* ignore */ }
|
|
});
|
|
} catch (err) {
|
|
console.error('WebSocket error:', err);
|
|
try { ws.close(1011, 'Internal error'); } catch { /* ignore */ }
|
|
}
|
|
});
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Static files + SPA fallback
|
|
// ---------------------------------------------------------------------------
|
|
|
|
app.use(express.static(path.join(__dirname, 'public')));
|
|
|
|
app.get('*', (_req, res) => {
|
|
res.sendFile(path.join(__dirname, 'public', 'index.html'));
|
|
});
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Start
|
|
// ---------------------------------------------------------------------------
|
|
|
|
server.listen(PORT, () => {
|
|
console.log(`OpenClaw Dashboard → http://localhost:${PORT}`);
|
|
console.log(`Sessions directory → ${SESSIONS_DIR}`);
|
|
});
|