feat: unify CLI output handling and enhance theme variables

- Updated `CliStreamMonitorNew`, `CliStreamMonitorLegacy`, and `CliViewerPage` components to prioritize `unitContent` from payloads, falling back to `data` when necessary.
- Enhanced `colorGenerator` to include legacy variables for compatibility with shadcn/ui.
- Refactored orchestrator index to unify node exports under a single module.
- Improved `appStore` to clear both new and legacy CSS variables when applying themes.
- Added new options to CLI execution for raw and final output modes, improving programmatic output handling.
- Enhanced `cli-output-converter` to normalize cumulative delta frames and avoid duplication in streaming outputs.
- Introduced a new unified workflow specification for prompt template-based workflows, replacing the previous multi-type node system.
- Added tests for CLI final output handling and streaming output converter to ensure correct behavior in various scenarios.
This commit is contained in:
catlog22
2026-02-04 22:57:41 +08:00
parent 4ee165119b
commit de989aa038
12 changed files with 899 additions and 107 deletions

View File

@@ -141,6 +141,8 @@ interface CliExecOptions {
// Template/Rules options
rule?: string; // Template name for auto-discovery (defines $PROTO and $TMPL env vars)
// Output options
raw?: boolean; // Raw output only (best for piping)
final?: boolean; // Final agent result only (best for piping)
toFile?: string; // Save output to file
}
@@ -590,7 +592,30 @@ async function statusAction(debug?: boolean): Promise<void> {
* @param {Object} options - CLI options
*/
async function execAction(positionalPrompt: string | undefined, options: CliExecOptions): Promise<void> {
const { prompt: optionPrompt, file, tool: userTool, mode = 'analysis', model, cd, includeDirs, stream, resume, id, noNative, cache, injectMode, debug, uncommitted, base, commit, title, rule, toFile } = options;
const {
prompt: optionPrompt,
file,
tool: userTool,
mode = 'analysis',
model,
cd,
includeDirs,
stream,
resume,
id,
noNative,
cache,
injectMode,
debug,
uncommitted,
base,
commit,
title,
rule,
toFile,
raw,
final: finalOnly,
} = options;
// Determine the tool to use: explicit --tool option, or defaultTool from config
let tool = userTool;
@@ -857,8 +882,17 @@ async function execAction(positionalPrompt: string | undefined, options: CliExec
const nativeMode = noNative ? ' (prompt-concat)' : '';
const idInfo = id ? ` [${id}]` : '';
// Programmatic output mode:
// - `--raw`: stdout/stderr passthrough semantics (minimal noise)
// - `--final`: agent-message only semantics (minimal noise)
// - non-TTY stdout (e.g. called from another process): default to final-only unless `--stream` is used
const programmaticOutput = Boolean(raw || finalOnly) || (!process.stdout.isTTY && !stream);
const showUi = !programmaticOutput;
const useRawOutput = Boolean(raw);
const useFinalOnlyOutput = Boolean(finalOnly) || (!useRawOutput && !process.stdout.isTTY && !stream);
// Show merge details
if (isMerge) {
if (isMerge && showUi) {
console.log(chalk.gray(' Merging conversations:'));
for (const rid of resumeIds) {
console.log(chalk.gray(`${rid}`));
@@ -871,9 +905,11 @@ async function execAction(positionalPrompt: string | undefined, options: CliExec
const startTime = Date.now();
const modelInfo = model ? ` @${model}` : '';
const spinnerBaseText = `Executing ${tool}${modelInfo} (${mode} mode${resumeInfo}${nativeMode})${idInfo}...`;
console.log();
if (showUi) {
console.log();
}
const spinner = stream ? null : createSpinner(` ${spinnerBaseText}`).start();
const spinner = (showUi && !stream) ? createSpinner(` ${spinnerBaseText}`).start() : null;
const elapsedInterval = spinner
? setInterval(() => {
const elapsedSeconds = Math.floor((Date.now() - startTime) / 1000);
@@ -882,7 +918,7 @@ async function execAction(positionalPrompt: string | undefined, options: CliExec
: null;
elapsedInterval?.unref?.();
if (!spinner) {
if (showUi && !spinner) {
console.log(chalk.cyan(` ${spinnerBaseText}\n`));
}
@@ -892,7 +928,7 @@ async function execAction(positionalPrompt: string | undefined, options: CliExec
if (elapsedInterval) clearInterval(elapsedInterval);
if (spinner) {
spinner.warn(`Interrupted by ${signal} (${Math.floor(duration / 1000)}s elapsed)`);
} else {
} else if (showUi) {
console.log(chalk.yellow(`\n Interrupted by ${signal}`));
}
@@ -1028,9 +1064,15 @@ async function execAction(positionalPrompt: string | undefined, options: CliExec
// If not streaming (default), print output now
// Prefer parsedOutput (from stream parser) over raw stdout for better formatting
if (!stream) {
const output = result.parsedOutput || result.stdout;
const output = useRawOutput
? result.stdout
: (useFinalOnlyOutput ? (result.finalOutput || result.parsedOutput || result.stdout) : (result.parsedOutput || result.stdout));
if (output) {
console.log(output);
if (programmaticOutput) {
process.stdout.write(output);
} else {
console.log(output);
}
// Save to file if --to-file is specified
if (toFile) {
@@ -1051,8 +1093,11 @@ async function execAction(positionalPrompt: string | undefined, options: CliExec
}
}
// Print summary with execution ID and turn info
console.log();
// Print summary with execution ID and turn info (interactive mode only)
if (showUi) {
console.log();
}
if (result.success) {
// Save streaming output to file if needed
if (stream && toFile && streamBuffer) {
@@ -1068,31 +1113,34 @@ async function execAction(positionalPrompt: string | undefined, options: CliExec
}
}
if (!spinner) {
if (showUi && !spinner) {
const turnInfo = result.conversation.turn_count > 1
? ` (turn ${result.conversation.turn_count})`
: '';
console.log(chalk.green(` ✓ Completed in ${(result.execution.duration_ms / 1000).toFixed(1)}s${turnInfo}`));
}
console.log(chalk.gray(` ID: ${result.execution.id}`));
if (isMerge && !id) {
// Merge without custom ID: updated all source conversations
console.log(chalk.gray(` Updated ${resumeIds.length} conversations: ${resumeIds.join(', ')}`));
} else if (isMerge && id) {
// Merge with custom ID: created new merged conversation
console.log(chalk.gray(` Created merged conversation from ${resumeIds.length} sources`));
}
if (result.conversation.turn_count > 1) {
console.log(chalk.gray(` Total: ${result.conversation.turn_count} turns, ${(result.conversation.total_duration_ms / 1000).toFixed(1)}s`));
}
console.log(chalk.dim(` Continue: ccw cli -p "..." --resume ${result.execution.id}`));
if (!stream) {
console.log(chalk.dim(` Output (optional): ccw cli output ${result.execution.id}`));
}
if (toFile) {
const { resolve } = await import('path');
const filePath = resolve(cd || process.cwd(), toFile);
console.log(chalk.green(` Saved to: ${filePath}`));
if (showUi) {
console.log(chalk.gray(` ID: ${result.execution.id}`));
if (isMerge && !id) {
// Merge without custom ID: updated all source conversations
console.log(chalk.gray(` Updated ${resumeIds.length} conversations: ${resumeIds.join(', ')}`));
} else if (isMerge && id) {
// Merge with custom ID: created new merged conversation
console.log(chalk.gray(` Created merged conversation from ${resumeIds.length} sources`));
}
if (result.conversation.turn_count > 1) {
console.log(chalk.gray(` Total: ${result.conversation.turn_count} turns, ${(result.conversation.total_duration_ms / 1000).toFixed(1)}s`));
}
console.log(chalk.dim(` Continue: ccw cli -p "..." --resume ${result.execution.id}`));
if (!stream) {
console.log(chalk.dim(` Output (optional): ccw cli output ${result.execution.id}`));
}
if (toFile) {
const { resolve } = await import('path');
const filePath = resolve(cd || process.cwd(), toFile);
console.log(chalk.green(` Saved to: ${filePath}`));
}
}
// Notify dashboard: execution completed (legacy)
@@ -1129,41 +1177,48 @@ async function execAction(positionalPrompt: string | undefined, options: CliExec
process.exit(0);
}, 500);
} else {
if (!spinner) {
console.log(chalk.red(` ✗ Failed (${result.execution.status})`));
}
console.log(chalk.gray(` ID: ${result.execution.id}`));
console.log(chalk.gray(` Duration: ${(result.execution.duration_ms / 1000).toFixed(1)}s`));
console.log(chalk.gray(` Exit Code: ${result.execution.exit_code}`));
// Show stderr with better formatting
if (result.stderr) {
console.log();
console.log(chalk.red.bold(' Error Output:'));
console.log(chalk.gray(' ' + '─'.repeat(60)));
// Indent stderr for better readability
const stderrLines = result.stderr.split('\n');
for (const line of stderrLines.slice(0, 30)) { // Limit to 30 lines
console.error(chalk.red(` ${line}`));
if (!showUi) {
// Programmatic mode: avoid banners/hints; write stderr only if available.
if (result.stderr) {
process.stderr.write(result.stderr);
}
if (stderrLines.length > 30) {
console.log(chalk.yellow(` ... ${stderrLines.length - 30} more lines`));
console.log(chalk.cyan(` 💡 View full output: ccw cli output ${result.execution.id}`));
} else {
if (!spinner) {
console.log(chalk.red(` ✗ Failed (${result.execution.status})`));
}
console.log(chalk.gray(` ID: ${result.execution.id}`));
console.log(chalk.gray(` Duration: ${(result.execution.duration_ms / 1000).toFixed(1)}s`));
console.log(chalk.gray(` Exit Code: ${result.execution.exit_code}`));
// Show stderr with better formatting
if (result.stderr) {
console.log();
console.log(chalk.red.bold(' Error Output:'));
console.log(chalk.gray(' ' + '─'.repeat(60)));
// Indent stderr for better readability
const stderrLines = result.stderr.split('\n');
for (const line of stderrLines.slice(0, 30)) { // Limit to 30 lines
console.error(chalk.red(` ${line}`));
}
if (stderrLines.length > 30) {
console.log(chalk.yellow(` ... ${stderrLines.length - 30} more lines`));
console.log(chalk.cyan(` 💡 View full output: ccw cli output ${result.execution.id}`));
console.log();
}
console.log(chalk.gray(' ' + '─'.repeat(60)));
}
console.log(chalk.gray(' ' + '─'.repeat(60)));
}
// Show troubleshooting hints
console.log();
console.log(chalk.yellow.bold(' Troubleshooting:'));
console.log(chalk.gray(` • Check if ${tool} is properly installed: ccw cli status`));
console.log(chalk.gray(` • Enable debug mode: DEBUG=true ccw cli -p "..." or set DEBUG=true && ccw cli -p "..."`));
if (result.stderr?.includes('API key') || result.stderr?.includes('Authentication')) {
console.log(chalk.gray(` • Check API key configuration for ${tool}`));
}
if (result.stderr?.includes('rate limit')) {
console.log(chalk.gray(` • Wait and retry - rate limit exceeded`));
// Show troubleshooting hints
console.log();
console.log(chalk.yellow.bold(' Troubleshooting:'));
console.log(chalk.gray(` • Check if ${tool} is properly installed: ccw cli status`));
console.log(chalk.gray(` • Enable debug mode: DEBUG=true ccw cli -p "..." or set DEBUG=true && ccw cli -p "..."`));
if (result.stderr?.includes('API key') || result.stderr?.includes('Authentication')) {
console.log(chalk.gray(` • Check API key configuration for ${tool}`));
}
if (result.stderr?.includes('rate limit')) {
console.log(chalk.gray(` • Wait and retry - rate limit exceeded`));
}
}
// Notify dashboard: execution failed (legacy)

View File

@@ -107,6 +107,12 @@ export class PlainTextParser implements IOutputParser {
export class JsonLinesParser implements IOutputParser {
private buffer: string = '';
// Gemini "message" frames may be true deltas OR cumulative content (varies by CLI/version).
// Track cumulative assistant content so we can normalize cumulative frames into true deltas and
// avoid emitting duplicated content downstream (terminal + dashboard + final reconstruction).
private geminiAssistantCumulative: string = '';
private geminiSawAssistantDelta: boolean = false;
/**
* Classify non-JSON content to determine appropriate output type
* Helps distinguish real errors from normal progress/output sent to stderr
@@ -294,12 +300,67 @@ export class JsonLinesParser implements IOutputParser {
if (json.type === 'message' && json.role) {
// Gemini assistant/user message
if (json.role === 'assistant') {
// Delta messages use 'streaming_content' type - aggregated to agent_message later
// Non-delta (final) messages use 'agent_message' type directly
const outputType = json.delta === true ? 'streaming_content' : 'agent_message';
const content = json.content || '';
if (!content) {
return null;
}
// Delta messages use 'streaming_content' type (should be incremental).
// Some CLIs send delta=true with cumulative content; normalize to a suffix-delta when possible.
if (json.delta === true) {
this.geminiSawAssistantDelta = true;
// Duplicate frame
if (content === this.geminiAssistantCumulative) {
return null;
}
// Cumulative frame (new content starts with previous content)
if (this.geminiAssistantCumulative && content.startsWith(this.geminiAssistantCumulative)) {
const delta = content.slice(this.geminiAssistantCumulative.length);
this.geminiAssistantCumulative = content;
if (!delta) {
return null;
}
return {
type: 'streaming_content',
content: delta,
timestamp
};
}
// Unexpected reset/shortening: treat as a fresh stream restart to avoid negative slicing
if (this.geminiAssistantCumulative && this.geminiAssistantCumulative.startsWith(content)) {
this.geminiAssistantCumulative = content;
return {
type: 'streaming_content',
content,
timestamp
};
}
// True delta frame (append-only)
this.geminiAssistantCumulative += content;
return {
type: 'streaming_content',
content,
timestamp
};
}
// Non-delta (final) messages use 'agent_message' type directly.
// If we already streamed deltas for this assistant message, skip this final frame to avoid duplication
// in streaming UIs (frontend already has the assembled content from deltas).
if (this.geminiSawAssistantDelta) {
// Keep cumulative for potential later comparisons but do not emit.
this.geminiAssistantCumulative = content;
return null;
}
this.geminiAssistantCumulative = content;
return {
type: outputType,
content: json.content || '',
type: 'agent_message',
content,
timestamp
};
}
@@ -1141,17 +1202,24 @@ export function flattenOutputUnits(
let processedUnits = units;
const streamingUnits = units.filter(u => u.type === 'streaming_content');
if (streamingUnits.length > 0) {
// Concatenate all streaming_content into one
const concatenatedContent = streamingUnits
.map(u => typeof u.content === 'string' ? u.content : '')
.join('');
const hasAgentMessage = units.some(u => u.type === 'agent_message');
// If a non-delta final agent_message already exists, prefer it and simply drop streaming_content.
// This avoids duplicated final output when providers emit BOTH streaming deltas and a final message frame.
processedUnits = units.filter(u => u.type !== 'streaming_content');
// Add concatenated content as agent_message type for final output
processedUnits.push({
type: 'agent_message',
content: concatenatedContent,
timestamp: streamingUnits[streamingUnits.length - 1].timestamp
});
// If no agent_message exists, synthesize one from streaming_content (delta-only streams).
if (!hasAgentMessage) {
const concatenatedContent = streamingUnits
.map(u => typeof u.content === 'string' ? u.content : '')
.join('');
processedUnits.push({
type: 'agent_message',
content: concatenatedContent,
timestamp: streamingUnits[streamingUnits.length - 1].timestamp
});
}
}
// Filter units by type