mirror of
https://github.com/cexll/myclaude.git
synced 2026-02-04 02:20:42 +08:00
590 lines
19 KiB
Python
590 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""JSON-driven modular installer.
|
||
|
||
Keep it simple: validate config, expand paths, run three operation types,
|
||
and record what happened. Designed to be small, readable, and predictable.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Iterable, List, Optional
|
||
|
||
try:
|
||
import jsonschema
|
||
except ImportError: # pragma: no cover
|
||
jsonschema = None
|
||
|
||
DEFAULT_INSTALL_DIR = "~/.claude"
|
||
|
||
|
||
def _ensure_list(ctx: Dict[str, Any], key: str) -> List[Any]:
|
||
ctx.setdefault(key, [])
|
||
return ctx[key]
|
||
|
||
|
||
def parse_args(argv: Optional[Iterable[str]] = None) -> argparse.Namespace:
|
||
"""Parse CLI arguments.
|
||
|
||
The default install dir must remain "~/.claude" to match docs/tests.
|
||
"""
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description="JSON-driven modular installation system"
|
||
)
|
||
parser.add_argument(
|
||
"--install-dir",
|
||
default=DEFAULT_INSTALL_DIR,
|
||
help="Installation directory (defaults to ~/.claude)",
|
||
)
|
||
parser.add_argument(
|
||
"--module",
|
||
help="Comma-separated modules to install, or 'all' for all enabled",
|
||
)
|
||
parser.add_argument(
|
||
"--config",
|
||
default="config.json",
|
||
help="Path to configuration file",
|
||
)
|
||
parser.add_argument(
|
||
"--list-modules",
|
||
action="store_true",
|
||
help="List available modules and exit",
|
||
)
|
||
parser.add_argument(
|
||
"--force",
|
||
action="store_true",
|
||
help="Force overwrite existing files",
|
||
)
|
||
parser.add_argument(
|
||
"--verbose", "-v",
|
||
action="store_true",
|
||
help="Enable verbose output to terminal",
|
||
)
|
||
return parser.parse_args(argv)
|
||
|
||
|
||
def _load_json(path: Path) -> Any:
|
||
try:
|
||
with path.open("r", encoding="utf-8") as fh:
|
||
return json.load(fh)
|
||
except FileNotFoundError as exc:
|
||
raise FileNotFoundError(f"File not found: {path}") from exc
|
||
except json.JSONDecodeError as exc:
|
||
raise ValueError(f"Invalid JSON in {path}: {exc}") from exc
|
||
|
||
|
||
def load_config(path: str) -> Dict[str, Any]:
|
||
"""Load config and validate against JSON Schema.
|
||
|
||
Schema is searched in the config directory first, then alongside this file.
|
||
"""
|
||
|
||
config_path = Path(path).expanduser().resolve()
|
||
config = _load_json(config_path)
|
||
|
||
if jsonschema is None:
|
||
print(
|
||
"WARNING: python package 'jsonschema' is not installed; "
|
||
"skipping config validation. To enable validation run:\n"
|
||
" python3 -m pip install jsonschema\n",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
if not isinstance(config, dict):
|
||
raise ValueError(
|
||
f"Config must be a dict, got {type(config).__name__}. "
|
||
"Check your config.json syntax."
|
||
)
|
||
|
||
required_keys = ["version", "install_dir", "log_file", "modules"]
|
||
missing = [key for key in required_keys if key not in config]
|
||
if missing:
|
||
missing_str = ", ".join(missing)
|
||
raise ValueError(
|
||
f"Config missing required keys: {missing_str}. "
|
||
"Install jsonschema for better validation: "
|
||
"python3 -m pip install jsonschema"
|
||
)
|
||
|
||
return config
|
||
|
||
schema_candidates = [
|
||
config_path.parent / "config.schema.json",
|
||
Path(__file__).resolve().with_name("config.schema.json"),
|
||
]
|
||
schema_path = next((p for p in schema_candidates if p.exists()), None)
|
||
if schema_path is None:
|
||
raise FileNotFoundError("config.schema.json not found")
|
||
|
||
schema = _load_json(schema_path)
|
||
try:
|
||
jsonschema.validate(config, schema)
|
||
except jsonschema.ValidationError as exc:
|
||
raise ValueError(f"Config validation failed: {exc.message}") from exc
|
||
|
||
return config
|
||
|
||
|
||
def resolve_paths(config: Dict[str, Any], args: argparse.Namespace) -> Dict[str, Any]:
|
||
"""Resolve all filesystem paths to absolute Path objects."""
|
||
|
||
config_dir = Path(args.config).expanduser().resolve().parent
|
||
|
||
if args.install_dir and args.install_dir != DEFAULT_INSTALL_DIR:
|
||
install_dir_raw = args.install_dir
|
||
elif config.get("install_dir"):
|
||
install_dir_raw = config.get("install_dir")
|
||
else:
|
||
install_dir_raw = DEFAULT_INSTALL_DIR
|
||
|
||
install_dir = Path(install_dir_raw).expanduser().resolve()
|
||
|
||
log_file_raw = config.get("log_file", "install.log")
|
||
log_file = Path(log_file_raw).expanduser()
|
||
if not log_file.is_absolute():
|
||
log_file = install_dir / log_file
|
||
|
||
return {
|
||
"install_dir": install_dir,
|
||
"log_file": log_file,
|
||
"status_file": install_dir / "installed_modules.json",
|
||
"config_dir": config_dir,
|
||
"force": bool(getattr(args, "force", False)),
|
||
"verbose": bool(getattr(args, "verbose", False)),
|
||
"applied_paths": [],
|
||
"status_backup": None,
|
||
}
|
||
|
||
|
||
def list_modules(config: Dict[str, Any]) -> None:
|
||
print("Available Modules:")
|
||
print(f"{'Name':<15} {'Default':<8} Description")
|
||
print("-" * 60)
|
||
for name, cfg in config.get("modules", {}).items():
|
||
default = "✓" if cfg.get("enabled", False) else "✗"
|
||
desc = cfg.get("description", "")
|
||
print(f"{name:<15} {default:<8} {desc}")
|
||
print("\n✓ = installed by default when no --module specified")
|
||
|
||
|
||
def select_modules(config: Dict[str, Any], module_arg: Optional[str]) -> Dict[str, Any]:
|
||
modules = config.get("modules", {})
|
||
if not module_arg:
|
||
return {k: v for k, v in modules.items() if v.get("enabled", False)}
|
||
|
||
if module_arg.strip().lower() == "all":
|
||
return {k: v for k, v in modules.items() if v.get("enabled", False)}
|
||
|
||
selected: Dict[str, Any] = {}
|
||
for name in (part.strip() for part in module_arg.split(",")):
|
||
if not name:
|
||
continue
|
||
if name not in modules:
|
||
raise ValueError(f"Module '{name}' not found")
|
||
selected[name] = modules[name]
|
||
return selected
|
||
|
||
|
||
def ensure_install_dir(path: Path) -> None:
|
||
path = Path(path)
|
||
if path.exists() and not path.is_dir():
|
||
raise NotADirectoryError(f"Install path exists and is not a directory: {path}")
|
||
path.mkdir(parents=True, exist_ok=True)
|
||
if not os.access(path, os.W_OK):
|
||
raise PermissionError(f"No write permission for install dir: {path}")
|
||
|
||
|
||
def execute_module(name: str, cfg: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, Any]:
|
||
result: Dict[str, Any] = {
|
||
"module": name,
|
||
"status": "success",
|
||
"operations": [],
|
||
"installed_at": datetime.now().isoformat(),
|
||
}
|
||
|
||
for op in cfg.get("operations", []):
|
||
op_type = op.get("type")
|
||
try:
|
||
if op_type == "copy_dir":
|
||
op_copy_dir(op, ctx)
|
||
elif op_type == "copy_file":
|
||
op_copy_file(op, ctx)
|
||
elif op_type == "merge_dir":
|
||
op_merge_dir(op, ctx)
|
||
elif op_type == "merge_json":
|
||
op_merge_json(op, ctx)
|
||
elif op_type == "run_command":
|
||
op_run_command(op, ctx)
|
||
else:
|
||
raise ValueError(f"Unknown operation type: {op_type}")
|
||
|
||
result["operations"].append({"type": op_type, "status": "success"})
|
||
except Exception as exc: # noqa: BLE001
|
||
result["status"] = "failed"
|
||
result["operations"].append(
|
||
{"type": op_type, "status": "failed", "error": str(exc)}
|
||
)
|
||
write_log(
|
||
{
|
||
"level": "ERROR",
|
||
"message": f"Module {name} failed on {op_type}: {exc}",
|
||
},
|
||
ctx,
|
||
)
|
||
raise
|
||
|
||
return result
|
||
|
||
|
||
def _source_path(op: Dict[str, Any], ctx: Dict[str, Any]) -> Path:
|
||
return (ctx["config_dir"] / op["source"]).expanduser().resolve()
|
||
|
||
|
||
def _target_path(op: Dict[str, Any], ctx: Dict[str, Any]) -> Path:
|
||
return (ctx["install_dir"] / op["target"]).expanduser().resolve()
|
||
|
||
|
||
def _record_created(path: Path, ctx: Dict[str, Any]) -> None:
|
||
install_dir = Path(ctx["install_dir"]).resolve()
|
||
resolved = Path(path).resolve()
|
||
if resolved == install_dir or install_dir not in resolved.parents:
|
||
return
|
||
applied = _ensure_list(ctx, "applied_paths")
|
||
if resolved not in applied:
|
||
applied.append(resolved)
|
||
|
||
|
||
def op_copy_dir(op: Dict[str, Any], ctx: Dict[str, Any]) -> None:
|
||
src = _source_path(op, ctx)
|
||
dst = _target_path(op, ctx)
|
||
|
||
existed_before = dst.exists()
|
||
if existed_before and not ctx.get("force", False):
|
||
write_log({"level": "INFO", "message": f"Skip existing dir: {dst}"}, ctx)
|
||
return
|
||
|
||
dst.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.copytree(src, dst, dirs_exist_ok=True)
|
||
if not existed_before:
|
||
_record_created(dst, ctx)
|
||
write_log({"level": "INFO", "message": f"Copied dir {src} -> {dst}"}, ctx)
|
||
|
||
|
||
def op_merge_dir(op: Dict[str, Any], ctx: Dict[str, Any]) -> None:
|
||
"""Merge source dir's subdirs (commands/, agents/, etc.) into install_dir."""
|
||
src = _source_path(op, ctx)
|
||
install_dir = ctx["install_dir"]
|
||
force = ctx.get("force", False)
|
||
merged = []
|
||
|
||
for subdir in src.iterdir():
|
||
if not subdir.is_dir():
|
||
continue
|
||
target_subdir = install_dir / subdir.name
|
||
target_subdir.mkdir(parents=True, exist_ok=True)
|
||
for f in subdir.iterdir():
|
||
if f.is_file():
|
||
dst = target_subdir / f.name
|
||
if dst.exists() and not force:
|
||
continue
|
||
shutil.copy2(f, dst)
|
||
merged.append(f"{subdir.name}/{f.name}")
|
||
|
||
write_log({"level": "INFO", "message": f"Merged {src.name}: {', '.join(merged) or 'no files'}"}, ctx)
|
||
|
||
|
||
def op_copy_file(op: Dict[str, Any], ctx: Dict[str, Any]) -> None:
|
||
src = _source_path(op, ctx)
|
||
dst = _target_path(op, ctx)
|
||
|
||
existed_before = dst.exists()
|
||
if existed_before and not ctx.get("force", False):
|
||
write_log({"level": "INFO", "message": f"Skip existing file: {dst}"}, ctx)
|
||
return
|
||
|
||
dst.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.copy2(src, dst)
|
||
if not existed_before:
|
||
_record_created(dst, ctx)
|
||
write_log({"level": "INFO", "message": f"Copied file {src} -> {dst}"}, ctx)
|
||
|
||
|
||
def op_merge_json(op: Dict[str, Any], ctx: Dict[str, Any]) -> None:
|
||
"""Merge JSON from source into target, supporting nested key paths."""
|
||
src = _source_path(op, ctx)
|
||
dst = _target_path(op, ctx)
|
||
merge_key = op.get("merge_key")
|
||
|
||
if not src.exists():
|
||
raise FileNotFoundError(f"Source JSON not found: {src}")
|
||
|
||
src_data = _load_json(src)
|
||
|
||
dst.parent.mkdir(parents=True, exist_ok=True)
|
||
if dst.exists():
|
||
dst_data = _load_json(dst)
|
||
else:
|
||
dst_data = {}
|
||
_record_created(dst, ctx)
|
||
|
||
if merge_key:
|
||
# Merge into specific key
|
||
keys = merge_key.split(".")
|
||
target = dst_data
|
||
for key in keys[:-1]:
|
||
target = target.setdefault(key, {})
|
||
|
||
last_key = keys[-1]
|
||
if isinstance(src_data, dict) and isinstance(target.get(last_key), dict):
|
||
# Deep merge for dicts
|
||
target[last_key] = {**target.get(last_key, {}), **src_data}
|
||
else:
|
||
target[last_key] = src_data
|
||
else:
|
||
# Merge at root level
|
||
if isinstance(src_data, dict) and isinstance(dst_data, dict):
|
||
dst_data = {**dst_data, **src_data}
|
||
else:
|
||
dst_data = src_data
|
||
|
||
with dst.open("w", encoding="utf-8") as fh:
|
||
json.dump(dst_data, fh, indent=2, ensure_ascii=False)
|
||
fh.write("\n")
|
||
|
||
write_log({"level": "INFO", "message": f"Merged JSON {src} -> {dst} (key: {merge_key or 'root'})"}, ctx)
|
||
|
||
|
||
def op_run_command(op: Dict[str, Any], ctx: Dict[str, Any]) -> None:
|
||
env = os.environ.copy()
|
||
for key, value in op.get("env", {}).items():
|
||
env[key] = value.replace("${install_dir}", str(ctx["install_dir"]))
|
||
|
||
command = op.get("command", "")
|
||
if sys.platform == "win32" and command.strip() == "bash install.sh":
|
||
command = "cmd /c install.bat"
|
||
|
||
# Stream output in real-time while capturing for logging
|
||
process = subprocess.Popen(
|
||
command,
|
||
shell=True,
|
||
cwd=ctx["config_dir"],
|
||
env=env,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
)
|
||
|
||
stdout_lines: List[str] = []
|
||
stderr_lines: List[str] = []
|
||
|
||
# Read stdout and stderr in real-time
|
||
if sys.platform == "win32":
|
||
# On Windows, use threads instead of selectors (pipes aren't selectable)
|
||
import threading
|
||
|
||
def read_output(pipe, lines, file=None):
|
||
for line in iter(pipe.readline, ''):
|
||
lines.append(line)
|
||
print(line, end="", flush=True, file=file)
|
||
pipe.close()
|
||
|
||
stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_lines))
|
||
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_lines, sys.stderr))
|
||
|
||
stdout_thread.start()
|
||
stderr_thread.start()
|
||
|
||
stdout_thread.join()
|
||
stderr_thread.join()
|
||
process.wait()
|
||
else:
|
||
# On Unix, use selectors for more efficient I/O
|
||
import selectors
|
||
sel = selectors.DefaultSelector()
|
||
sel.register(process.stdout, selectors.EVENT_READ) # type: ignore[arg-type]
|
||
sel.register(process.stderr, selectors.EVENT_READ) # type: ignore[arg-type]
|
||
|
||
while process.poll() is None or sel.get_map():
|
||
for key, _ in sel.select(timeout=0.1):
|
||
line = key.fileobj.readline() # type: ignore[union-attr]
|
||
if not line:
|
||
sel.unregister(key.fileobj)
|
||
continue
|
||
if key.fileobj == process.stdout:
|
||
stdout_lines.append(line)
|
||
print(line, end="", flush=True)
|
||
else:
|
||
stderr_lines.append(line)
|
||
print(line, end="", file=sys.stderr, flush=True)
|
||
|
||
sel.close()
|
||
process.wait()
|
||
|
||
write_log(
|
||
{
|
||
"level": "INFO",
|
||
"message": f"Command: {command}",
|
||
"stdout": "".join(stdout_lines),
|
||
"stderr": "".join(stderr_lines),
|
||
"returncode": process.returncode,
|
||
},
|
||
ctx,
|
||
)
|
||
|
||
if process.returncode != 0:
|
||
raise RuntimeError(f"Command failed with code {process.returncode}: {command}")
|
||
|
||
|
||
def write_log(entry: Dict[str, Any], ctx: Dict[str, Any]) -> None:
|
||
log_path = Path(ctx["log_file"])
|
||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
ts = datetime.now().isoformat()
|
||
level = entry.get("level", "INFO")
|
||
message = entry.get("message", "")
|
||
|
||
with log_path.open("a", encoding="utf-8") as fh:
|
||
fh.write(f"[{ts}] {level}: {message}\n")
|
||
for key in ("stdout", "stderr", "returncode"):
|
||
if key in entry and entry[key] not in (None, ""):
|
||
fh.write(f" {key}: {entry[key]}\n")
|
||
|
||
# Terminal output when verbose
|
||
if ctx.get("verbose"):
|
||
prefix = {"INFO": "ℹ️ ", "WARNING": "⚠️ ", "ERROR": "❌"}.get(level, "")
|
||
print(f"{prefix}[{level}] {message}")
|
||
if entry.get("stdout"):
|
||
print(f" stdout: {entry['stdout'][:500]}")
|
||
if entry.get("stderr"):
|
||
print(f" stderr: {entry['stderr'][:500]}", file=sys.stderr)
|
||
if entry.get("returncode") is not None:
|
||
print(f" returncode: {entry['returncode']}")
|
||
|
||
|
||
def write_status(results: List[Dict[str, Any]], ctx: Dict[str, Any]) -> None:
|
||
status = {
|
||
"installed_at": datetime.now().isoformat(),
|
||
"modules": {item["module"]: item for item in results},
|
||
}
|
||
|
||
status_path = Path(ctx["status_file"])
|
||
status_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with status_path.open("w", encoding="utf-8") as fh:
|
||
json.dump(status, fh, indent=2, ensure_ascii=False)
|
||
|
||
|
||
def prepare_status_backup(ctx: Dict[str, Any]) -> None:
|
||
status_path = Path(ctx["status_file"])
|
||
if status_path.exists():
|
||
backup = status_path.with_suffix(".json.bak")
|
||
backup.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.copy2(status_path, backup)
|
||
ctx["status_backup"] = backup
|
||
|
||
|
||
def rollback(ctx: Dict[str, Any]) -> None:
|
||
write_log({"level": "WARNING", "message": "Rolling back installation"}, ctx)
|
||
|
||
install_dir = Path(ctx["install_dir"]).resolve()
|
||
for path in reversed(ctx.get("applied_paths", [])):
|
||
resolved = Path(path).resolve()
|
||
try:
|
||
if resolved == install_dir or install_dir not in resolved.parents:
|
||
continue
|
||
if resolved.is_dir():
|
||
shutil.rmtree(resolved, ignore_errors=True)
|
||
else:
|
||
resolved.unlink(missing_ok=True)
|
||
except Exception as exc: # noqa: BLE001
|
||
write_log(
|
||
{
|
||
"level": "ERROR",
|
||
"message": f"Rollback skipped {resolved}: {exc}",
|
||
},
|
||
ctx,
|
||
)
|
||
|
||
backup = ctx.get("status_backup")
|
||
if backup and Path(backup).exists():
|
||
shutil.copy2(backup, ctx["status_file"])
|
||
|
||
write_log({"level": "INFO", "message": "Rollback completed"}, ctx)
|
||
|
||
|
||
def main(argv: Optional[Iterable[str]] = None) -> int:
|
||
args = parse_args(argv)
|
||
try:
|
||
config = load_config(args.config)
|
||
except Exception as exc: # noqa: BLE001
|
||
print(f"Error loading config: {exc}", file=sys.stderr)
|
||
return 1
|
||
|
||
ctx = resolve_paths(config, args)
|
||
|
||
if getattr(args, "list_modules", False):
|
||
list_modules(config)
|
||
return 0
|
||
|
||
modules = select_modules(config, args.module)
|
||
|
||
try:
|
||
ensure_install_dir(ctx["install_dir"])
|
||
except Exception as exc: # noqa: BLE001
|
||
print(f"Failed to prepare install dir: {exc}", file=sys.stderr)
|
||
return 1
|
||
|
||
prepare_status_backup(ctx)
|
||
|
||
total = len(modules)
|
||
print(f"Installing {total} module(s) to {ctx['install_dir']}...")
|
||
|
||
results: List[Dict[str, Any]] = []
|
||
for idx, (name, cfg) in enumerate(modules.items(), 1):
|
||
print(f"[{idx}/{total}] Installing module: {name}...")
|
||
try:
|
||
results.append(execute_module(name, cfg, ctx))
|
||
print(f" ✓ {name} installed successfully")
|
||
except Exception as exc: # noqa: BLE001
|
||
print(f" ✗ {name} failed: {exc}", file=sys.stderr)
|
||
if not args.force:
|
||
rollback(ctx)
|
||
return 1
|
||
rollback(ctx)
|
||
results.append(
|
||
{
|
||
"module": name,
|
||
"status": "failed",
|
||
"operations": [],
|
||
"installed_at": datetime.now().isoformat(),
|
||
}
|
||
)
|
||
break
|
||
|
||
write_status(results, ctx)
|
||
|
||
# Summary
|
||
success = sum(1 for r in results if r.get("status") == "success")
|
||
failed = len(results) - success
|
||
if failed == 0:
|
||
print(f"\n✓ Installation complete: {success} module(s) installed")
|
||
print(f" Log file: {ctx['log_file']}")
|
||
else:
|
||
print(f"\n⚠ Installation finished with errors: {success} success, {failed} failed")
|
||
print(f" Check log file for details: {ctx['log_file']}")
|
||
if not args.force:
|
||
return 1
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__": # pragma: no cover
|
||
sys.exit(main())
|