mirror of
https://github.com/GuDaStudio/codexmcp.git
synced 2026-02-05 02:00:24 +08:00
v0.0:测试版发布
This commit is contained in:
4
src/codexmcp/__init__.py
Normal file
4
src/codexmcp/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
"""Codex MCP server package."""
|
||||
|
||||
__all__ = ["__version__"]
|
||||
__version__ = "0.1.0"
|
||||
12
src/codexmcp/cli.py
Normal file
12
src/codexmcp/cli.py
Normal file
@@ -0,0 +1,12 @@
|
||||
"""Console entry point for the Codex MCP server."""
|
||||
|
||||
from codexmcp.server import run
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Start the Codex MCP server."""
|
||||
run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
173
src/codexmcp/server.py
Normal file
173
src/codexmcp/server.py
Normal file
@@ -0,0 +1,173 @@
|
||||
"""FastMCP server implementation for the Codex MCP project."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import queue
|
||||
import subprocess
|
||||
import threading
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Annotated, Any, Dict, Generator, Literal, Optional
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
from pydantic import BeforeValidator, Field
|
||||
|
||||
mcp = FastMCP("Codex MCP Server-from guda.studio")
|
||||
|
||||
|
||||
def _empty_str_to_none(value: str | None) -> str | None:
|
||||
"""Convert empty strings to None for optional UUID parameters."""
|
||||
if isinstance(value, str) and not value.strip():
|
||||
return None
|
||||
return value
|
||||
|
||||
|
||||
def run_shell_command(cmd: list[str]) -> Generator[str, None, None]:
|
||||
"""Execute a command and stream its output line-by-line.
|
||||
|
||||
Args:
|
||||
cmd: Command and arguments as a list (e.g., ["codex", "exec", "prompt"])
|
||||
|
||||
Yields:
|
||||
Output lines from the command
|
||||
"""
|
||||
process = subprocess.Popen(
|
||||
cmd,
|
||||
shell=False, # Safer: no shell injection
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
universal_newlines=True,
|
||||
bufsize=1,
|
||||
)
|
||||
|
||||
output_queue: queue.Queue[str] = queue.Queue()
|
||||
|
||||
def read_output() -> None:
|
||||
"""Read process output in a separate thread."""
|
||||
if process.stdout:
|
||||
for line in iter(process.stdout.readline, ""):
|
||||
output_queue.put(line.strip())
|
||||
process.stdout.close()
|
||||
|
||||
thread = threading.Thread(target=read_output)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
# Yield lines while process is running
|
||||
while process.poll() is None:
|
||||
try:
|
||||
yield output_queue.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
process.wait()
|
||||
|
||||
# Drain remaining output from queue
|
||||
while not output_queue.empty():
|
||||
try:
|
||||
yield output_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="codex",
|
||||
description="""
|
||||
Executes a non-interactive Codex session via CLI to perform AI-assisted coding tasks in a secure workspace.
|
||||
This tool wraps the `codex exec` command, enabling model-driven code generation, debugging, or automation based on natural language prompts.
|
||||
It supports resuming ongoing sessions for continuity and enforces sandbox policies to prevent unsafe operations. Ideal for integrating Codex into MCP servers for agentic workflows, such as code reviews or repo modifications.
|
||||
|
||||
**Key Features:**
|
||||
- **Prompt-Driven Execution:** Send task instructions to Codex for step-by-step code handling.
|
||||
- **Workspace Isolation:** Operate within a specified directory, with optional Git repo skipping.
|
||||
- **Security Controls:** Three sandbox levels balance functionality and safety.
|
||||
- **Session Persistence:** Resume prior conversations via `SESSION_ID` for iterative tasks.
|
||||
|
||||
**Edge Cases & Best Practices:**
|
||||
- Ensure `cd` exists and is accessible; tool fails silently on invalid paths.
|
||||
- For most repos, prefer "read-only" to avoid accidental changes.
|
||||
- If needed, set `return_all_messages` to `True` to parse "all_messages" for detailed tracing (e.g., reasoning, tool calls, etc.).
|
||||
""",
|
||||
meta={"version": "0.0.0", "author": "guda.studio"},
|
||||
)
|
||||
async def codex(
|
||||
PROMPT: Annotated[str, "Instruction for the task to send to codex."],
|
||||
cd: Annotated[Path, "Set the workspace root for codex before executing the task."],
|
||||
sandbox: Annotated[
|
||||
Literal["read-only", "workspace-write", "danger-full-access"],
|
||||
Field(
|
||||
description="Sandbox policy for model-generated commands. Defaults to `read-only`."
|
||||
),
|
||||
] = "read-only",
|
||||
SESSION_ID: Annotated[
|
||||
Optional[uuid.UUID],
|
||||
BeforeValidator(_empty_str_to_none),
|
||||
"Resume the specified session of the codex. Defaults to `None`, start a new session.",
|
||||
] = None,
|
||||
skip_git_repo_check: Annotated[
|
||||
bool,
|
||||
"Allow codex running outside a Git repository (useful for one-off directories).",
|
||||
] = False,
|
||||
return_all_messages: Annotated[
|
||||
bool,
|
||||
"Return all messages (e.g. reasoning, tool calls, etc.) from the codex session. Set to `False` by default, only the agent's final reply message is returned.",
|
||||
] = False,
|
||||
) -> Dict[str, Any]:
|
||||
"""Execute a Codex CLI session and return the results."""
|
||||
# Build command as list to avoid injection
|
||||
cmd = ["codex", "exec", PROMPT, "--sandbox", sandbox, "--cd", str(cd), "--json"]
|
||||
|
||||
if skip_git_repo_check:
|
||||
cmd.append("--skip-git-repo-check")
|
||||
|
||||
if SESSION_ID is not None:
|
||||
cmd.extend(["resume", str(SESSION_ID)])
|
||||
|
||||
all_messages: list[Dict[str, Any]] = []
|
||||
agent_messages = ""
|
||||
success = True
|
||||
err_message = ""
|
||||
thread_id: Optional[str] = None
|
||||
|
||||
for line in run_shell_command(cmd):
|
||||
try:
|
||||
line_dict = json.loads(line.strip())
|
||||
all_messages.append(line_dict)
|
||||
item = line_dict.get("item", {})
|
||||
item_type = item.get("type")
|
||||
if item_type == "agent_message":
|
||||
agent_messages = agent_messages + item.get("text", "")
|
||||
if line_dict.get("thread_id") is not None:
|
||||
thread_id = line_dict.get("thread_id")
|
||||
except json.JSONDecodeError as error:
|
||||
# Improved error handling: include problematic line
|
||||
err_message = line
|
||||
success = False
|
||||
break
|
||||
except Exception as error:
|
||||
err_message = f"Unexpected error: {error}. Line: {line!r}"
|
||||
success = False
|
||||
break
|
||||
|
||||
if thread_id is None:
|
||||
success = False
|
||||
err_message = "Failed to get `SESSION_ID` from the codex session. \n\n" + err_message
|
||||
|
||||
if success:
|
||||
result: Dict[str, Any] = {
|
||||
"success": True,
|
||||
"SESSION_ID": thread_id,
|
||||
"agent_messages": agent_messages,
|
||||
}
|
||||
if return_all_messages:
|
||||
result["all_messages"] = all_messages
|
||||
else:
|
||||
result = {"success": False, "error": err_message}
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def run() -> None:
|
||||
"""Start the MCP server over stdio transport."""
|
||||
mcp.run(transport="stdio")
|
||||
Reference in New Issue
Block a user