feat: add orchestrator execution engine, observability panel, and LSP document caching

Wire FlowExecutor into orchestrator routes for actual flow execution with
pause/resume/stop lifecycle management. Add CLI session audit system with
audit-routes backend and Observability tab in IssueHub frontend. Introduce
cli-session-mux for cross-workspace session routing and QueueSendToOrchestrator
UI component. Normalize frontend API response handling for { data: ... }
wrapper format and propagate projectPath through flow hooks.

In codex-lens, add per-server opened-document cache in StandaloneLspManager
to avoid redundant didOpen notifications (using didChange for updates), and
skip warmup delay for already-warmed LSP server instances in ChainSearchEngine.
This commit is contained in:
catlog22
2026-02-11 15:38:33 +08:00
parent d0cdee2e68
commit 5a9e54fd70
35 changed files with 5325 additions and 77 deletions

View File

@@ -0,0 +1,154 @@
/**
* Audit Routes Module
* Read-only APIs for audit/observability panels.
*
* Currently supported:
* - GET /api/audit/cli-sessions - Read CLI session (PTY) audit events (JSONL)
*/
import { existsSync } from 'fs';
import { readFile } from 'fs/promises';
import { join } from 'path';
import { validatePath as validateAllowedPath } from '../../utils/path-validator.js';
import type { CliSessionAuditEvent, CliSessionAuditEventType } from '../services/cli-session-audit.js';
import type { RouteContext } from './types.js';
function clampInt(value: number, min: number, max: number): number {
if (!Number.isFinite(value)) return min;
return Math.min(max, Math.max(min, Math.trunc(value)));
}
function parseCsvParam(value: string | null): string[] {
if (!value) return [];
return value
.split(',')
.map((v) => v.trim())
.filter(Boolean);
}
function isCliSessionAuditEventType(value: string): value is CliSessionAuditEventType {
return [
'session_created',
'session_closed',
'session_send',
'session_execute',
'session_resize',
'session_share_created',
'session_share_revoked',
'session_idle_reaped',
].includes(value);
}
function matchesSearch(event: CliSessionAuditEvent, qLower: string): boolean {
if (!qLower) return true;
const haystacks: string[] = [];
if (event.type) haystacks.push(event.type);
if (event.timestamp) haystacks.push(event.timestamp);
if (event.sessionKey) haystacks.push(event.sessionKey);
if (event.tool) haystacks.push(event.tool);
if (event.resumeKey) haystacks.push(event.resumeKey);
if (event.workingDir) haystacks.push(event.workingDir);
if (event.ip) haystacks.push(event.ip);
if (event.userAgent) haystacks.push(event.userAgent);
if (event.details) {
try {
haystacks.push(JSON.stringify(event.details));
} catch {
// Ignore non-serializable details
}
}
return haystacks.some((h) => h.toLowerCase().includes(qLower));
}
/**
* Handle audit routes
* @returns true if route was handled, false otherwise
*/
export async function handleAuditRoutes(ctx: RouteContext): Promise<boolean> {
const { pathname, url, req, res, initialPath } = ctx;
// GET /api/audit/cli-sessions
if (pathname === '/api/audit/cli-sessions' && req.method === 'GET') {
const projectPathParam = url.searchParams.get('path') || initialPath;
const limit = clampInt(parseInt(url.searchParams.get('limit') || '200', 10), 1, 1000);
const offset = clampInt(parseInt(url.searchParams.get('offset') || '0', 10), 0, Number.MAX_SAFE_INTEGER);
const sessionKey = url.searchParams.get('sessionKey');
const qLower = (url.searchParams.get('q') || '').trim().toLowerCase();
const typeFilters = parseCsvParam(url.searchParams.get('type'))
.filter(isCliSessionAuditEventType);
const typeFilterSet = typeFilters.length > 0 ? new Set<CliSessionAuditEventType>(typeFilters) : null;
try {
const projectRoot = await validateAllowedPath(projectPathParam, {
mustExist: true,
allowedDirectories: [initialPath],
});
const filePath = join(projectRoot, '.workflow', 'audit', 'cli-sessions.jsonl');
if (!existsSync(filePath)) {
res.writeHead(200, { 'Content-Type': 'application/json; charset=utf-8' });
res.end(JSON.stringify({
success: true,
data: { events: [], total: 0, limit, offset, hasMore: false },
}));
return true;
}
const raw = await readFile(filePath, 'utf-8');
const parsed: CliSessionAuditEvent[] = [];
for (const line of raw.split('\n')) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
parsed.push(JSON.parse(trimmed) as CliSessionAuditEvent);
} catch {
// Skip invalid JSONL line
}
}
const filtered = parsed.filter((ev) => {
if (sessionKey && ev.sessionKey !== sessionKey) return false;
if (typeFilterSet && !typeFilterSet.has(ev.type)) return false;
if (qLower && !matchesSearch(ev, qLower)) return false;
return true;
});
// Best-effort: file is append-only, so reverse for newest-first.
filtered.reverse();
const total = filtered.length;
const page = filtered.slice(offset, offset + limit);
res.writeHead(200, { 'Content-Type': 'application/json; charset=utf-8' });
res.end(JSON.stringify({
success: true,
data: {
events: page,
total,
limit,
offset,
hasMore: offset + limit < total,
},
}));
return true;
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
const lowered = message.toLowerCase();
const status = lowered.includes('access denied') ? 403 : 400;
res.writeHead(status, { 'Content-Type': 'application/json; charset=utf-8' });
res.end(JSON.stringify({
success: false,
error: status === 403 ? 'Access denied' : 'Invalid request',
}));
return true;
}
}
return false;
}

View File

@@ -33,11 +33,16 @@ import { join, dirname } from 'path';
import { randomBytes } from 'crypto';
import { fileURLToPath } from 'url';
import type { RouteContext } from './types.js';
import { FlowExecutor } from '../services/flow-executor.js';
import { validatePath as validateAllowedPath } from '../../utils/path-validator.js';
// ES Module __dirname equivalent
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
// In-memory execution engines for pause/resume/stop (best-effort; resets on server restart)
const activeExecutors = new Map<string, FlowExecutor>();
// ============================================================================
// TypeScript Interfaces
// ============================================================================
@@ -847,8 +852,25 @@ function flowToTemplate(
export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boolean> {
const { pathname, req, res, initialPath, handlePostRequest, broadcastToClients } = ctx;
// Get workflow directory from initialPath
const workflowDir = initialPath || process.cwd();
// Get workflow directory from initialPath, optionally overridden by ?path= (scoped to allowed dirs)
const allowedRoot = initialPath || process.cwd();
let workflowDir = allowedRoot;
const projectPathParam = ctx.url.searchParams.get('path');
if (projectPathParam && projectPathParam.trim()) {
try {
workflowDir = await validateAllowedPath(projectPathParam, {
mustExist: true,
allowedDirectories: [allowedRoot],
});
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
const status = message.toLowerCase().includes('access denied') ? 403 : 400;
res.writeHead(status, { 'Content-Type': 'application/json; charset=utf-8' });
res.end(JSON.stringify({ success: false, error: status === 403 ? 'Access denied' : 'Invalid path' }));
return true;
}
}
// ==== LIST FLOWS ====
// GET /api/orchestrator/flows
@@ -1209,9 +1231,24 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
// Broadcast execution created
broadcastExecutionStateUpdate(execution);
// TODO: Trigger actual flow executor (future enhancement)
// For now, just create the execution in pending state
// The executor will be implemented in a later task
// Trigger actual flow executor (best-effort, async)
// Execution state is persisted by FlowExecutor and updates are broadcast via WebSocket.
try {
const executor = new FlowExecutor(flow, execId, workflowDir);
activeExecutors.set(execId, executor);
void executor.execute(inputVariables).then((finalState) => {
// Keep executor instance if paused, so it can be resumed.
if (finalState.status !== 'paused') {
activeExecutors.delete(execId);
}
}).catch(() => {
// Best-effort cleanup on unexpected failures.
activeExecutors.delete(execId);
});
} catch {
// If executor bootstrap fails, keep the pending execution for inspection.
}
return {
success: true,
@@ -1241,6 +1278,19 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
}
try {
const executor = activeExecutors.get(execId);
if (executor) {
executor.pause();
const execution = await readExecutionStorage(workflowDir, execId);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
success: true,
data: execution ?? executor.getState(),
message: 'Pause requested'
}));
return true;
}
const execution = await readExecutionStorage(workflowDir, execId);
if (!execution) {
res.writeHead(404, { 'Content-Type': 'application/json' });
@@ -1294,6 +1344,36 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
}
try {
const executor = activeExecutors.get(execId);
if (executor) {
const current = executor.getState();
if (current.status !== 'paused') {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
success: false,
error: `Cannot resume execution with status: ${current.status}`
}));
return true;
}
void executor.resume().then((finalState) => {
if (finalState.status !== 'paused') {
activeExecutors.delete(execId);
}
}).catch(() => {
// Best-effort: keep executor for inspection/resume retries.
});
const execution = await readExecutionStorage(workflowDir, execId);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
success: true,
data: execution ?? executor.getState(),
message: 'Resume requested'
}));
return true;
}
const execution = await readExecutionStorage(workflowDir, execId);
if (!execution) {
res.writeHead(404, { 'Content-Type': 'application/json' });
@@ -1347,6 +1427,36 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
}
try {
const executor = activeExecutors.get(execId);
if (executor) {
executor.stop();
// If currently paused, mark as failed immediately (no running loop to observe stop flag).
const current = executor.getState();
if (current.status === 'paused') {
const now = new Date().toISOString();
current.status = 'failed';
current.completedAt = now;
current.logs.push({
timestamp: now,
level: 'warn',
message: 'Execution manually stopped by user'
});
await writeExecutionStorage(workflowDir, current);
broadcastExecutionStateUpdate(current);
activeExecutors.delete(execId);
}
const execution = await readExecutionStorage(workflowDir, execId);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
success: true,
data: execution ?? current,
message: 'Stop requested'
}));
return true;
}
const execution = await readExecutionStorage(workflowDir, execId);
if (!execution) {
res.writeHead(404, { 'Content-Type': 'application/json' });

View File

@@ -9,6 +9,7 @@ import { handleStatusRoutes } from './routes/status-routes.js';
import { handleCliRoutes, cleanupStaleExecutions } from './routes/cli-routes.js';
import { handleCliSettingsRoutes } from './routes/cli-settings-routes.js';
import { handleCliSessionsRoutes } from './routes/cli-sessions-routes.js';
import { handleAuditRoutes } from './routes/audit-routes.js';
import { handleProviderRoutes } from './routes/provider-routes.js';
import { handleMemoryRoutes } from './routes/memory-routes.js';
import { handleCoreMemoryRoutes } from './routes/core-memory-routes.js';
@@ -615,6 +616,11 @@ export async function startServer(options: ServerOptions = {}): Promise<http.Ser
if (await handleCliSessionsRoutes(routeContext)) return;
}
// Audit routes (/api/audit/*)
if (pathname.startsWith('/api/audit')) {
if (await handleAuditRoutes(routeContext)) return;
}
// CLI routes (/api/cli/*)
if (pathname.startsWith('/api/cli/')) {
// CLI Settings routes first (more specific path /api/cli/settings/*)

View File

@@ -177,6 +177,14 @@ export class CliSessionManager {
return Array.from(this.sessions.values()).map(({ pty: _pty, buffer: _buffer, bufferBytes: _bytes, ...rest }) => rest);
}
getProjectRoot(): string {
return this.projectRoot;
}
hasSession(sessionKey: string): boolean {
return this.sessions.has(sessionKey);
}
getSession(sessionKey: string): CliSession | null {
const session = this.sessions.get(sessionKey);
if (!session) return null;
@@ -398,3 +406,15 @@ export function getCliSessionManager(projectRoot: string = process.cwd()): CliSe
managersByRoot.set(resolved, created);
return created;
}
/**
* Find the manager that owns a given sessionKey.
* Useful for cross-workspace routing (tmux-like send) where the executor
* may not share the same workflowDir/projectRoot as the target session.
*/
export function findCliSessionManager(sessionKey: string): CliSessionManager | null {
for (const manager of managersByRoot.values()) {
if (manager.hasSession(sessionKey)) return manager;
}
return null;
}

View File

@@ -0,0 +1,24 @@
/**
* CliSessionMux
*
* A tiny indirection layer used by FlowExecutor (and potentially others) to
* route commands to existing PTY sessions in a testable way.
*
* Why this exists:
* - ESM module namespace exports are immutable, which makes it hard to mock
* named exports in node:test without special loaders.
* - Exporting a mutable object lets tests override behavior by swapping
* functions on the object.
*/
import type { CliSessionManager } from './cli-session-manager.js';
import { findCliSessionManager, getCliSessionManager } from './cli-session-manager.js';
export const cliSessionMux: {
findCliSessionManager: (sessionKey: string) => CliSessionManager | null;
getCliSessionManager: (projectRoot?: string) => CliSessionManager;
} = {
findCliSessionManager,
getCliSessionManager,
};

View File

@@ -19,7 +19,8 @@ import { existsSync } from 'fs';
import { join } from 'path';
import { broadcastToClients } from '../websocket.js';
import { executeCliTool } from '../../tools/cli-executor-core.js';
import { getCliSessionManager } from './cli-session-manager.js';
import { cliSessionMux } from './cli-session-mux.js';
import { appendCliSessionAudit } from './cli-session-audit.js';
import type {
Flow,
FlowNode,
@@ -255,16 +256,46 @@ export class NodeRunner {
};
}
const manager = getCliSessionManager(this.context.workingDir || process.cwd());
const manager = cliSessionMux.findCliSessionManager(targetSessionKey)
?? cliSessionMux.getCliSessionManager(this.context.workingDir || process.cwd());
if (!manager.hasSession(targetSessionKey)) {
return {
success: false,
error: `Target session not found: ${targetSessionKey}`
};
}
const routed = manager.execute(targetSessionKey, {
tool,
prompt: instruction,
mode,
workingDir: this.context.workingDir,
resumeKey: data.resumeKey,
resumeStrategy: data.resumeStrategy === 'promptConcat' ? 'promptConcat' : 'nativeResume'
});
// Best-effort: record audit event so Observability panel includes orchestrator-routed executions.
try {
const session = manager.getSession(targetSessionKey);
appendCliSessionAudit({
type: 'session_execute',
timestamp: new Date().toISOString(),
projectRoot: manager.getProjectRoot(),
sessionKey: targetSessionKey,
tool,
resumeKey: data.resumeKey,
workingDir: session?.workingDir,
details: {
executionId: routed.executionId,
mode,
resumeStrategy: data.resumeStrategy ?? 'nativeResume',
delivery: 'sendToSession',
flowId: this.context.flowId,
nodeId: node.id
}
});
} catch {
// ignore
}
const outputKey = data.outputName || `${node.id}_output`;
this.context.variables[outputKey] = {
delivery: 'sendToSession',