#!/usr/bin/env python3 """JSON-driven modular installer. Keep it simple: validate config, expand paths, run three operation types, and record what happened. Designed to be small, readable, and predictable. """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterable, List, Optional try: import jsonschema except ImportError: # pragma: no cover jsonschema = None DEFAULT_INSTALL_DIR = "~/.claude" SETTINGS_FILE = "settings.json" def _ensure_list(ctx: Dict[str, Any], key: str) -> List[Any]: ctx.setdefault(key, []) return ctx[key] def parse_args(argv: Optional[Iterable[str]] = None) -> argparse.Namespace: """Parse CLI arguments. The default install dir must remain "~/.claude" to match docs/tests. """ parser = argparse.ArgumentParser( description="JSON-driven modular installation system" ) parser.add_argument( "--install-dir", default=DEFAULT_INSTALL_DIR, help="Installation directory (defaults to ~/.claude)", ) parser.add_argument( "--module", help="Comma-separated modules to install/uninstall, or 'all'", ) parser.add_argument( "--config", default="config.json", help="Path to configuration file", ) parser.add_argument( "--list-modules", action="store_true", help="List available modules and exit", ) parser.add_argument( "--status", action="store_true", help="Show installation status of all modules", ) parser.add_argument( "--uninstall", action="store_true", help="Uninstall specified modules", ) parser.add_argument( "--update", action="store_true", help="Update already installed modules", ) parser.add_argument( "--force", action="store_true", help="Force overwrite existing files", ) parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose output to terminal", ) return parser.parse_args(argv) def _load_json(path: Path) -> Any: try: with path.open("r", encoding="utf-8") as fh: return json.load(fh) except FileNotFoundError as exc: raise FileNotFoundError(f"File not found: {path}") from exc except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON in {path}: {exc}") from exc def _save_json(path: Path, data: Any) -> None: """Save data to JSON file with proper formatting.""" path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as fh: json.dump(data, fh, indent=2, ensure_ascii=False) fh.write("\n") # ============================================================================= # Hooks Management # ============================================================================= def load_settings(ctx: Dict[str, Any]) -> Dict[str, Any]: """Load settings.json from install directory.""" settings_path = ctx["install_dir"] / SETTINGS_FILE if settings_path.exists(): try: return _load_json(settings_path) except (ValueError, FileNotFoundError): return {} return {} def save_settings(ctx: Dict[str, Any], settings: Dict[str, Any]) -> None: """Save settings.json to install directory.""" settings_path = ctx["install_dir"] / SETTINGS_FILE _save_json(settings_path, settings) def find_module_hooks(module_name: str, cfg: Dict[str, Any], ctx: Dict[str, Any]) -> List[tuple]: """Find all hooks.json files for a module. Returns list of tuples (hooks_config, plugin_root_path). Searches in order for each copy_dir operation: 1. {target_dir}/hooks/hooks.json (for skills with hooks subdirectory) 2. {target_dir}/hooks.json (for hooks directory itself) """ results = [] seen_paths = set() # Check for hooks in operations (copy_dir targets) for op in cfg.get("operations", []): if op.get("type") == "copy_dir": target_dir = ctx["install_dir"] / op["target"] source_dir = ctx["config_dir"] / op["source"] # Check both target and source directories for base_dir, plugin_root in [(target_dir, str(target_dir)), (source_dir, str(target_dir))]: # First check {dir}/hooks/hooks.json (for skills) hooks_file = base_dir / "hooks" / "hooks.json" if hooks_file.exists() and str(hooks_file) not in seen_paths: try: results.append((_load_json(hooks_file), plugin_root)) seen_paths.add(str(hooks_file)) except (ValueError, FileNotFoundError): pass # Then check {dir}/hooks.json (for hooks directory itself) hooks_file = base_dir / "hooks.json" if hooks_file.exists() and str(hooks_file) not in seen_paths: try: results.append((_load_json(hooks_file), plugin_root)) seen_paths.add(str(hooks_file)) except (ValueError, FileNotFoundError): pass return results def _create_hook_marker(module_name: str) -> str: """Create a marker to identify hooks from a specific module.""" return f"__module:{module_name}__" def _replace_hook_variables(obj: Any, plugin_root: str) -> Any: """Recursively replace ${CLAUDE_PLUGIN_ROOT} in hook config.""" if isinstance(obj, str): return obj.replace("${CLAUDE_PLUGIN_ROOT}", plugin_root) elif isinstance(obj, dict): return {k: _replace_hook_variables(v, plugin_root) for k, v in obj.items()} elif isinstance(obj, list): return [_replace_hook_variables(item, plugin_root) for item in obj] return obj def merge_hooks_to_settings(module_name: str, hooks_config: Dict[str, Any], ctx: Dict[str, Any], plugin_root: str = "") -> None: """Merge module hooks into settings.json.""" settings = load_settings(ctx) settings.setdefault("hooks", {}) module_hooks = hooks_config.get("hooks", {}) marker = _create_hook_marker(module_name) # Replace ${CLAUDE_PLUGIN_ROOT} with actual path if plugin_root: module_hooks = _replace_hook_variables(module_hooks, plugin_root) for hook_type, hook_entries in module_hooks.items(): settings["hooks"].setdefault(hook_type, []) for entry in hook_entries: # Add marker to identify this hook's source module entry_copy = dict(entry) entry_copy["__module__"] = module_name # Check if already exists (avoid duplicates) exists = False for existing in settings["hooks"][hook_type]: if existing.get("__module__") == module_name: # Same module, check if same hook if _hooks_equal(existing, entry_copy): exists = True break if not exists: settings["hooks"][hook_type].append(entry_copy) save_settings(ctx, settings) write_log({"level": "INFO", "message": f"Merged hooks for module: {module_name}"}, ctx) def unmerge_hooks_from_settings(module_name: str, ctx: Dict[str, Any]) -> None: """Remove module hooks from settings.json.""" settings = load_settings(ctx) if "hooks" not in settings: return modified = False for hook_type in list(settings["hooks"].keys()): original_len = len(settings["hooks"][hook_type]) settings["hooks"][hook_type] = [ entry for entry in settings["hooks"][hook_type] if entry.get("__module__") != module_name ] if len(settings["hooks"][hook_type]) < original_len: modified = True # Remove empty hook type arrays if not settings["hooks"][hook_type]: del settings["hooks"][hook_type] if modified: save_settings(ctx, settings) write_log({"level": "INFO", "message": f"Removed hooks for module: {module_name}"}, ctx) def merge_agents_to_models(module_name: str, agents: Dict[str, Any], ctx: Dict[str, Any]) -> None: """Merge module agent configs into ~/.codeagent/models.json.""" models_path = Path.home() / ".codeagent" / "models.json" models_path.parent.mkdir(parents=True, exist_ok=True) if models_path.exists(): with models_path.open("r", encoding="utf-8") as fh: models = json.load(fh) else: template = ctx["config_dir"] / "templates" / "models.json.example" if template.exists(): with template.open("r", encoding="utf-8") as fh: models = json.load(fh) # Clear template agents so modules populate with __module__ tags models["agents"] = {} else: models = { "default_backend": "codex", "default_model": "gpt-4.1", "backends": {}, "agents": {}, } models.setdefault("agents", {}) for agent_name, agent_cfg in agents.items(): entry = dict(agent_cfg) entry["__module__"] = module_name existing = models["agents"].get(agent_name, {}) if not existing or existing.get("__module__"): models["agents"][agent_name] = entry with models_path.open("w", encoding="utf-8") as fh: json.dump(models, fh, indent=2, ensure_ascii=False) write_log( { "level": "INFO", "message": ( f"Merged {len(agents)} agent(s) from {module_name} " "into models.json" ), }, ctx, ) def unmerge_agents_from_models(module_name: str, ctx: Dict[str, Any]) -> None: """Remove module's agent configs from ~/.codeagent/models.json. If another installed module also declares a removed agent, restore that module's version so shared agents (e.g. 'develop') are not lost. """ models_path = Path.home() / ".codeagent" / "models.json" if not models_path.exists(): return with models_path.open("r", encoding="utf-8") as fh: models = json.load(fh) agents = models.get("agents", {}) to_remove = [ name for name, cfg in agents.items() if isinstance(cfg, dict) and cfg.get("__module__") == module_name ] if not to_remove: return # Load config to find other modules that declare the same agents config_path = ctx["config_dir"] / "config.json" config = _load_json(config_path) if config_path.exists() else {} installed = load_installed_status(ctx).get("modules", {}) for name in to_remove: del agents[name] # Check if another installed module also declares this agent for other_mod, other_status in installed.items(): if other_mod == module_name: continue if other_status.get("status") != "success": continue other_cfg = config.get("modules", {}).get(other_mod, {}) other_agents = other_cfg.get("agents", {}) if name in other_agents: restored = dict(other_agents[name]) restored["__module__"] = other_mod agents[name] = restored break with models_path.open("w", encoding="utf-8") as fh: json.dump(models, fh, indent=2, ensure_ascii=False) write_log( { "level": "INFO", "message": ( f"Removed {len(to_remove)} agent(s) from {module_name} " "in models.json" ), }, ctx, ) def _hooks_equal(hook1: Dict[str, Any], hook2: Dict[str, Any]) -> bool: """Compare two hooks ignoring the __module__ marker.""" h1 = {k: v for k, v in hook1.items() if k != "__module__"} h2 = {k: v for k, v in hook2.items() if k != "__module__"} return h1 == h2 def load_config(path: str) -> Dict[str, Any]: """Load config and validate against JSON Schema. Schema is searched in the config directory first, then alongside this file. """ config_path = Path(path).expanduser().resolve() config = _load_json(config_path) if jsonschema is None: print( "WARNING: python package 'jsonschema' is not installed; " "skipping config validation. To enable validation run:\n" " python3 -m pip install jsonschema\n", file=sys.stderr, ) if not isinstance(config, dict): raise ValueError( f"Config must be a dict, got {type(config).__name__}. " "Check your config.json syntax." ) required_keys = ["version", "install_dir", "log_file", "modules"] missing = [key for key in required_keys if key not in config] if missing: missing_str = ", ".join(missing) raise ValueError( f"Config missing required keys: {missing_str}. " "Install jsonschema for better validation: " "python3 -m pip install jsonschema" ) return config schema_candidates = [ config_path.parent / "config.schema.json", Path(__file__).resolve().with_name("config.schema.json"), ] schema_path = next((p for p in schema_candidates if p.exists()), None) if schema_path is None: raise FileNotFoundError("config.schema.json not found") schema = _load_json(schema_path) try: jsonschema.validate(config, schema) except jsonschema.ValidationError as exc: raise ValueError(f"Config validation failed: {exc.message}") from exc return config def resolve_paths(config: Dict[str, Any], args: argparse.Namespace) -> Dict[str, Any]: """Resolve all filesystem paths to absolute Path objects.""" config_dir = Path(args.config).expanduser().resolve().parent if args.install_dir and args.install_dir != DEFAULT_INSTALL_DIR: install_dir_raw = args.install_dir elif config.get("install_dir"): install_dir_raw = config.get("install_dir") else: install_dir_raw = DEFAULT_INSTALL_DIR install_dir = Path(install_dir_raw).expanduser().resolve() log_file_raw = config.get("log_file", "install.log") log_file = Path(log_file_raw).expanduser() if not log_file.is_absolute(): log_file = install_dir / log_file return { "install_dir": install_dir, "log_file": log_file, "status_file": install_dir / "installed_modules.json", "config_dir": config_dir, "force": bool(getattr(args, "force", False)), "verbose": bool(getattr(args, "verbose", False)), "applied_paths": [], "status_backup": None, } def list_modules(config: Dict[str, Any]) -> None: print("Available Modules:") print(f"{'#':<3} {'Name':<15} {'Default':<8} Description") print("-" * 65) for idx, (name, cfg) in enumerate(config.get("modules", {}).items(), 1): default = "✓" if cfg.get("enabled", False) else "✗" desc = cfg.get("description", "") print(f"{idx:<3} {name:<15} {default:<8} {desc}") print("\n✓ = installed by default when no --module specified") def load_installed_status(ctx: Dict[str, Any]) -> Dict[str, Any]: """Load installed modules status from status file.""" status_path = Path(ctx["status_file"]) if status_path.exists(): try: return _load_json(status_path) except (ValueError, FileNotFoundError): return {"modules": {}} return {"modules": {}} def check_module_installed(name: str, cfg: Dict[str, Any], ctx: Dict[str, Any]) -> bool: """Check if a module is installed by verifying its files exist.""" install_dir = ctx["install_dir"] for op in cfg.get("operations", []): op_type = op.get("type") if op_type in ("copy_dir", "copy_file"): target = (install_dir / op["target"]).expanduser().resolve() if target.exists(): return True elif op_type == "merge_dir": src = (ctx["config_dir"] / op["source"]).expanduser().resolve() if not src.exists() or not src.is_dir(): continue for subdir in src.iterdir(): if not subdir.is_dir(): continue for f in subdir.iterdir(): if not f.is_file(): continue candidate = (install_dir / subdir.name / f.name).expanduser().resolve() if candidate.exists(): return True return False def get_installed_modules(config: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, bool]: """Get installation status of all modules by checking files.""" result = {} modules = config.get("modules", {}) # First check status file status = load_installed_status(ctx) status_modules = status.get("modules", {}) for name, cfg in modules.items(): # Check both status file and filesystem in_status = name in status_modules files_exist = check_module_installed(name, cfg, ctx) result[name] = in_status or files_exist return result def list_modules_with_status(config: Dict[str, Any], ctx: Dict[str, Any]) -> None: """List modules with installation status.""" installed_status = get_installed_modules(config, ctx) status_data = load_installed_status(ctx) status_modules = status_data.get("modules", {}) print("\n" + "=" * 70) print("Module Status") print("=" * 70) print(f"{'#':<3} {'Name':<15} {'Status':<15} {'Installed At':<20} Description") print("-" * 70) for idx, (name, cfg) in enumerate(config.get("modules", {}).items(), 1): desc = cfg.get("description", "")[:25] if installed_status.get(name, False): status = "✅ Installed" installed_at = status_modules.get(name, {}).get("installed_at", "")[:16] else: status = "⬚ Not installed" installed_at = "" print(f"{idx:<3} {name:<15} {status:<15} {installed_at:<20} {desc}") total = len(config.get("modules", {})) installed_count = sum(1 for v in installed_status.values() if v) print(f"\nTotal: {installed_count}/{total} modules installed") print(f"Install dir: {ctx['install_dir']}") def select_modules(config: Dict[str, Any], module_arg: Optional[str]) -> Dict[str, Any]: modules = config.get("modules", {}) if not module_arg: # No --module specified: show interactive selection return interactive_select_modules(config) if module_arg.strip().lower() == "all": return dict(modules.items()) selected: Dict[str, Any] = {} for name in (part.strip() for part in module_arg.split(",")): if not name: continue if name not in modules: raise ValueError(f"Module '{name}' not found") selected[name] = modules[name] return selected def interactive_select_modules(config: Dict[str, Any]) -> Dict[str, Any]: """Interactive module selection when no --module is specified.""" modules = config.get("modules", {}) module_names = list(modules.keys()) print("\n" + "=" * 65) print("Welcome to Claude Plugin Installer") print("=" * 65) print("\nNo modules specified. Please select modules to install:\n") list_modules(config) print("\nEnter module numbers or names (comma-separated), or:") print(" 'all' - Install all modules") print(" 'q' - Quit without installing") print() while True: try: user_input = input("Select modules: ").strip() except (EOFError, KeyboardInterrupt): print("\nInstallation cancelled.") sys.exit(0) if not user_input: print("No input. Please enter module numbers, names, 'all', or 'q'.") continue if user_input.lower() == "q": print("Installation cancelled.") sys.exit(0) if user_input.lower() == "all": print(f"\nSelected all {len(modules)} modules.") return dict(modules.items()) # Parse selection selected: Dict[str, Any] = {} parts = [p.strip() for p in user_input.replace(" ", ",").split(",") if p.strip()] try: for part in parts: # Try as number first if part.isdigit(): idx = int(part) - 1 if 0 <= idx < len(module_names): name = module_names[idx] selected[name] = modules[name] else: print(f"Invalid number: {part}. Valid range: 1-{len(module_names)}") selected = {} break # Try as name elif part in modules: selected[part] = modules[part] else: print(f"Module not found: '{part}'") selected = {} break if selected: names = ", ".join(selected.keys()) print(f"\nSelected {len(selected)} module(s): {names}") return selected except ValueError: print("Invalid input. Please try again.") continue def uninstall_module(name: str, cfg: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, Any]: """Uninstall a module by removing its files and hooks.""" result: Dict[str, Any] = { "module": name, "status": "success", "uninstalled_at": datetime.now().isoformat(), } install_dir = ctx["install_dir"] removed_paths = [] status = load_installed_status(ctx) module_status = status.get("modules", {}).get(name, {}) merge_dir_files = module_status.get("merge_dir_files", []) if not isinstance(merge_dir_files, list): merge_dir_files = [] for op in cfg.get("operations", []): op_type = op.get("type") try: if op_type in ("copy_dir", "copy_file"): target = (install_dir / op["target"]).expanduser().resolve() if target.exists(): if target.is_dir(): shutil.rmtree(target) else: target.unlink() removed_paths.append(str(target)) write_log({"level": "INFO", "message": f"Removed: {target}"}, ctx) # Clean up empty parent directories up to install_dir parent = target.parent while parent != install_dir and parent.exists(): try: parent.rmdir() except OSError: break parent = parent.parent elif op_type == "merge_dir": if not merge_dir_files: write_log( { "level": "WARNING", "message": f"No merge_dir_files recorded for {name}; skip merge_dir uninstall", }, ctx, ) continue for rel in dict.fromkeys(merge_dir_files): rel_path = Path(str(rel)) if rel_path.is_absolute() or ".." in rel_path.parts: write_log( { "level": "WARNING", "message": f"Skip unsafe merge_dir path for {name}: {rel}", }, ctx, ) continue target = (install_dir / rel_path).resolve() if target == install_dir or install_dir not in target.parents: write_log( { "level": "WARNING", "message": f"Skip out-of-tree merge_dir path for {name}: {rel}", }, ctx, ) continue if target.exists(): if target.is_dir(): shutil.rmtree(target) else: target.unlink() removed_paths.append(str(target)) write_log({"level": "INFO", "message": f"Removed: {target}"}, ctx) parent = target.parent while parent != install_dir and parent.exists(): try: parent.rmdir() except OSError: break parent = parent.parent except Exception as exc: write_log({"level": "WARNING", "message": f"Failed to remove {op.get('target', 'unknown')}: {exc}"}, ctx) # Remove module hooks from settings.json try: unmerge_hooks_from_settings(name, ctx) result["hooks_removed"] = True except Exception as exc: write_log({"level": "WARNING", "message": f"Failed to remove hooks for {name}: {exc}"}, ctx) # Remove module agents from ~/.codeagent/models.json try: unmerge_agents_from_models(name, ctx) result["agents_removed"] = True except Exception as exc: write_log({"level": "WARNING", "message": f"Failed to remove agents for {name}: {exc}"}, ctx) result["removed_paths"] = removed_paths return result def update_status_after_uninstall(uninstalled_modules: List[str], ctx: Dict[str, Any]) -> None: """Remove uninstalled modules from status file.""" status = load_installed_status(ctx) modules = status.get("modules", {}) for name in uninstalled_modules: if name in modules: del modules[name] status["modules"] = modules status["updated_at"] = datetime.now().isoformat() status_path = Path(ctx["status_file"]) with status_path.open("w", encoding="utf-8") as fh: json.dump(status, fh, indent=2, ensure_ascii=False) def interactive_manage(config: Dict[str, Any], ctx: Dict[str, Any]) -> int: """Interactive module management menu. Returns 0 on success, 1 on error. Sets ctx['_did_install'] = True if any module was installed.""" ctx.setdefault("_did_install", False) while True: installed_status = get_installed_modules(config, ctx) modules = config.get("modules", {}) module_names = list(modules.keys()) print("\n" + "=" * 70) print("Claude Plugin Manager") print("=" * 70) print(f"{'#':<3} {'Name':<15} {'Status':<15} Description") print("-" * 70) for idx, (name, cfg) in enumerate(modules.items(), 1): desc = cfg.get("description", "")[:30] if installed_status.get(name, False): status = "✅ Installed" else: status = "⬚ Not installed" print(f"{idx:<3} {name:<15} {status:<15} {desc}") total = len(modules) installed_count = sum(1 for v in installed_status.values() if v) print(f"\nInstalled: {installed_count}/{total} | Dir: {ctx['install_dir']}") print("\nCommands:") print(" i - Install module(s)") print(" u - Uninstall module(s)") print(" q - Quit") print() try: user_input = input("Enter command: ").strip() except (EOFError, KeyboardInterrupt): print("\nExiting.") return 0 if not user_input: continue if user_input.lower() == "q": print("Goodbye!") return 0 parts = user_input.split(maxsplit=1) cmd = parts[0].lower() args = parts[1] if len(parts) > 1 else "" if cmd == "i": # Install selected = _parse_module_selection(args, modules, module_names) if selected: # Filter out already installed to_install = {k: v for k, v in selected.items() if not installed_status.get(k, False)} if not to_install: print("All selected modules are already installed.") continue print(f"\nInstalling: {', '.join(to_install.keys())}") results = [] for name, cfg in to_install.items(): try: results.append(execute_module(name, cfg, ctx)) print(f" ✓ {name} installed") except Exception as exc: print(f" ✗ {name} failed: {exc}") # Update status current_status = load_installed_status(ctx) for r in results: if r.get("status") == "success": current_status.setdefault("modules", {})[r["module"]] = r ctx["_did_install"] = True current_status["updated_at"] = datetime.now().isoformat() with Path(ctx["status_file"]).open("w", encoding="utf-8") as fh: json.dump(current_status, fh, indent=2, ensure_ascii=False) elif cmd == "u": # Uninstall selected = _parse_module_selection(args, modules, module_names) if selected: # Filter to only installed ones to_uninstall = {k: v for k, v in selected.items() if installed_status.get(k, False)} if not to_uninstall: print("None of the selected modules are installed.") continue print(f"\nUninstalling: {', '.join(to_uninstall.keys())}") confirm = input("Confirm? (y/N): ").strip().lower() if confirm != "y": print("Cancelled.") continue for name, cfg in to_uninstall.items(): try: uninstall_module(name, cfg, ctx) print(f" ✓ {name} uninstalled") except Exception as exc: print(f" ✗ {name} failed: {exc}") update_status_after_uninstall(list(to_uninstall.keys()), ctx) else: print(f"Unknown command: {cmd}. Use 'i', 'u', or 'q'.") def _parse_module_selection( args: str, modules: Dict[str, Any], module_names: List[str] ) -> Dict[str, Any]: """Parse module selection from user input.""" if not args: print("Please specify module number(s) or name(s).") return {} if args.lower() == "all": return dict(modules.items()) selected: Dict[str, Any] = {} parts = [p.strip() for p in args.replace(",", " ").split() if p.strip()] for part in parts: if part.isdigit(): idx = int(part) - 1 if 0 <= idx < len(module_names): name = module_names[idx] selected[name] = modules[name] else: print(f"Invalid number: {part}") return {} elif part in modules: selected[part] = modules[part] else: print(f"Module not found: '{part}'") return {} return selected def ensure_install_dir(path: Path) -> None: path = Path(path) if path.exists() and not path.is_dir(): raise NotADirectoryError(f"Install path exists and is not a directory: {path}") path.mkdir(parents=True, exist_ok=True) if not os.access(path, os.W_OK): raise PermissionError(f"No write permission for install dir: {path}") def execute_module(name: str, cfg: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, Any]: result: Dict[str, Any] = { "module": name, "status": "success", "operations": [], "installed_at": datetime.now().isoformat(), } for op in cfg.get("operations", []): op_type = op.get("type") try: if op_type == "copy_dir": op_copy_dir(op, ctx) elif op_type == "copy_file": op_copy_file(op, ctx) elif op_type == "merge_dir": merged = op_merge_dir(op, ctx) if merged: result.setdefault("merge_dir_files", []).extend(merged) elif op_type == "merge_json": op_merge_json(op, ctx) elif op_type == "run_command": op_run_command(op, ctx) else: raise ValueError(f"Unknown operation type: {op_type}") result["operations"].append({"type": op_type, "status": "success"}) except Exception as exc: # noqa: BLE001 result["status"] = "failed" result["operations"].append( {"type": op_type, "status": "failed", "error": str(exc)} ) write_log( { "level": "ERROR", "message": f"Module {name} failed on {op_type}: {exc}", }, ctx, ) raise # Handle hooks: find and merge module hooks into settings.json hooks_results = find_module_hooks(name, cfg, ctx) if hooks_results: for hooks_config, plugin_root in hooks_results: try: merge_hooks_to_settings(name, hooks_config, ctx, plugin_root) result["operations"].append({"type": "merge_hooks", "status": "success"}) result["has_hooks"] = True except Exception as exc: write_log({"level": "WARNING", "message": f"Failed to merge hooks for {name}: {exc}"}, ctx) result["operations"].append({"type": "merge_hooks", "status": "failed", "error": str(exc)}) # Handle agents: merge module agent configs into ~/.codeagent/models.json module_agents = cfg.get("agents", {}) if module_agents: try: merge_agents_to_models(name, module_agents, ctx) result["operations"].append({"type": "merge_agents", "status": "success"}) result["has_agents"] = True except Exception as exc: write_log({"level": "WARNING", "message": f"Failed to merge agents for {name}: {exc}"}, ctx) result["operations"].append({"type": "merge_agents", "status": "failed", "error": str(exc)}) return result def _source_path(op: Dict[str, Any], ctx: Dict[str, Any]) -> Path: return (ctx["config_dir"] / op["source"]).expanduser().resolve() def _target_path(op: Dict[str, Any], ctx: Dict[str, Any]) -> Path: return (ctx["install_dir"] / op["target"]).expanduser().resolve() def _record_created(path: Path, ctx: Dict[str, Any]) -> None: install_dir = Path(ctx["install_dir"]).resolve() resolved = Path(path).resolve() if resolved == install_dir or install_dir not in resolved.parents: return applied = _ensure_list(ctx, "applied_paths") if resolved not in applied: applied.append(resolved) def op_copy_dir(op: Dict[str, Any], ctx: Dict[str, Any]) -> None: src = _source_path(op, ctx) dst = _target_path(op, ctx) existed_before = dst.exists() if existed_before and not ctx.get("force", False): write_log({"level": "INFO", "message": f"Skip existing dir: {dst}"}, ctx) return dst.parent.mkdir(parents=True, exist_ok=True) shutil.copytree(src, dst, dirs_exist_ok=True) if not existed_before: _record_created(dst, ctx) write_log({"level": "INFO", "message": f"Copied dir {src} -> {dst}"}, ctx) def op_merge_dir(op: Dict[str, Any], ctx: Dict[str, Any]) -> List[str]: """Merge source dir's subdirs (commands/, agents/, etc.) into install_dir.""" src = _source_path(op, ctx) install_dir = ctx["install_dir"] force = ctx.get("force", False) merged = [] for subdir in src.iterdir(): if not subdir.is_dir(): continue target_subdir = install_dir / subdir.name target_subdir.mkdir(parents=True, exist_ok=True) for f in subdir.iterdir(): if f.is_file(): dst = target_subdir / f.name if dst.exists() and not force: continue shutil.copy2(f, dst) merged.append(f"{subdir.name}/{f.name}") write_log({"level": "INFO", "message": f"Merged {src.name}: {', '.join(merged) or 'no files'}"}, ctx) return merged def op_copy_file(op: Dict[str, Any], ctx: Dict[str, Any]) -> None: src = _source_path(op, ctx) dst = _target_path(op, ctx) existed_before = dst.exists() if existed_before and not ctx.get("force", False): write_log({"level": "INFO", "message": f"Skip existing file: {dst}"}, ctx) return dst.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src, dst) if not existed_before: _record_created(dst, ctx) write_log({"level": "INFO", "message": f"Copied file {src} -> {dst}"}, ctx) def op_merge_json(op: Dict[str, Any], ctx: Dict[str, Any]) -> None: """Merge JSON from source into target, supporting nested key paths.""" src = _source_path(op, ctx) dst = _target_path(op, ctx) merge_key = op.get("merge_key") if not src.exists(): raise FileNotFoundError(f"Source JSON not found: {src}") src_data = _load_json(src) dst.parent.mkdir(parents=True, exist_ok=True) if dst.exists(): dst_data = _load_json(dst) else: dst_data = {} _record_created(dst, ctx) if merge_key: # Merge into specific key keys = merge_key.split(".") target = dst_data for key in keys[:-1]: target = target.setdefault(key, {}) last_key = keys[-1] if isinstance(src_data, dict) and isinstance(target.get(last_key), dict): # Deep merge for dicts target[last_key] = {**target.get(last_key, {}), **src_data} else: target[last_key] = src_data else: # Merge at root level if isinstance(src_data, dict) and isinstance(dst_data, dict): dst_data = {**dst_data, **src_data} else: dst_data = src_data with dst.open("w", encoding="utf-8") as fh: json.dump(dst_data, fh, indent=2, ensure_ascii=False) fh.write("\n") write_log({"level": "INFO", "message": f"Merged JSON {src} -> {dst} (key: {merge_key or 'root'})"}, ctx) def op_run_command(op: Dict[str, Any], ctx: Dict[str, Any]) -> None: env = os.environ.copy() for key, value in op.get("env", {}).items(): env[key] = value.replace("${install_dir}", str(ctx["install_dir"])) command = op.get("command", "") if sys.platform == "win32" and command.strip() == "bash install.sh": command = "cmd /c install.bat" # Stream output in real-time while capturing for logging process = subprocess.Popen( command, shell=True, cwd=ctx["config_dir"], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) stdout_lines: List[str] = [] stderr_lines: List[str] = [] # Read stdout and stderr in real-time if sys.platform == "win32": # On Windows, use threads instead of selectors (pipes aren't selectable) import threading def read_output(pipe, lines, file=None): for line in iter(pipe.readline, ''): lines.append(line) print(line, end="", flush=True, file=file) pipe.close() stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_lines)) stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_lines, sys.stderr)) stdout_thread.start() stderr_thread.start() stdout_thread.join() stderr_thread.join() process.wait() else: # On Unix, use selectors for more efficient I/O import selectors sel = selectors.DefaultSelector() sel.register(process.stdout, selectors.EVENT_READ) # type: ignore[arg-type] sel.register(process.stderr, selectors.EVENT_READ) # type: ignore[arg-type] while process.poll() is None or sel.get_map(): for key, _ in sel.select(timeout=0.1): line = key.fileobj.readline() # type: ignore[union-attr] if not line: sel.unregister(key.fileobj) continue if key.fileobj == process.stdout: stdout_lines.append(line) print(line, end="", flush=True) else: stderr_lines.append(line) print(line, end="", file=sys.stderr, flush=True) sel.close() process.wait() write_log( { "level": "INFO", "message": f"Command: {command}", "stdout": "".join(stdout_lines), "stderr": "".join(stderr_lines), "returncode": process.returncode, }, ctx, ) if process.returncode != 0: raise RuntimeError(f"Command failed with code {process.returncode}: {command}") def write_log(entry: Dict[str, Any], ctx: Dict[str, Any]) -> None: log_path = Path(ctx["log_file"]) log_path.parent.mkdir(parents=True, exist_ok=True) ts = datetime.now().isoformat() level = entry.get("level", "INFO") message = entry.get("message", "") with log_path.open("a", encoding="utf-8") as fh: fh.write(f"[{ts}] {level}: {message}\n") for key in ("stdout", "stderr", "returncode"): if key in entry and entry[key] not in (None, ""): fh.write(f" {key}: {entry[key]}\n") # Terminal output when verbose if ctx.get("verbose"): prefix = {"INFO": "ℹ️ ", "WARNING": "⚠️ ", "ERROR": "❌"}.get(level, "") print(f"{prefix}[{level}] {message}") if entry.get("stdout"): print(f" stdout: {entry['stdout'][:500]}") if entry.get("stderr"): print(f" stderr: {entry['stderr'][:500]}", file=sys.stderr) if entry.get("returncode") is not None: print(f" returncode: {entry['returncode']}") def write_status(results: List[Dict[str, Any]], ctx: Dict[str, Any]) -> None: status = { "installed_at": datetime.now().isoformat(), "modules": {item["module"]: item for item in results}, } status_path = Path(ctx["status_file"]) status_path.parent.mkdir(parents=True, exist_ok=True) with status_path.open("w", encoding="utf-8") as fh: json.dump(status, fh, indent=2, ensure_ascii=False) def install_default_configs(ctx: Dict[str, Any]) -> None: """Copy default config files if they don't already exist. Best-effort: never raises.""" try: install_dir = ctx["install_dir"] config_dir = ctx["config_dir"] # Copy memorys/CLAUDE.md -> {install_dir}/CLAUDE.md claude_md_src = config_dir / "memorys" / "CLAUDE.md" claude_md_dst = install_dir / "CLAUDE.md" if not claude_md_dst.exists() and claude_md_src.exists(): shutil.copy2(claude_md_src, claude_md_dst) print(f" Installed CLAUDE.md to {claude_md_dst}") write_log({"level": "INFO", "message": f"Installed CLAUDE.md to {claude_md_dst}"}, ctx) except Exception as exc: print(f" Warning: could not install default configs: {exc}", file=sys.stderr) def print_post_install_info(ctx: Dict[str, Any]) -> None: """Print post-install verification and setup guidance.""" install_dir = ctx["install_dir"] # Check codeagent-wrapper version wrapper_bin = install_dir / "bin" / "codeagent-wrapper" wrapper_version = None try: result = subprocess.run( [str(wrapper_bin), "--version"], capture_output=True, text=True, timeout=5, ) if result.returncode == 0: wrapper_version = result.stdout.strip() except Exception: pass # Check PATH bin_dir = str(install_dir / "bin") env_path = os.environ.get("PATH", "") path_ok = any( os.path.realpath(p) == os.path.realpath(bin_dir) if os.path.exists(p) else p == bin_dir for p in env_path.split(os.pathsep) ) # Check backend CLIs backends = ["codex", "claude", "gemini", "opencode"] detected = {name: shutil.which(name) is not None for name in backends} print("\nSetup Complete!") v_mark = "✓" if wrapper_version else "✗" print(f" codeagent-wrapper: {wrapper_version or '(not found)'} {v_mark}") p_mark = "✓" if path_ok else "✗ (not in PATH)" print(f" PATH: {bin_dir} {p_mark}") print("\nBackend CLIs detected:") cli_parts = [f"{b} {'✓' if detected[b] else '✗'}" for b in backends] print(" " + " | ".join(cli_parts)) print("\nNext steps:") print(" 1. Configure API keys in ~/.codeagent/models.json") print(' 2. Try: /do "your first task"') print() def prepare_status_backup(ctx: Dict[str, Any]) -> None: status_path = Path(ctx["status_file"]) if status_path.exists(): backup = status_path.with_suffix(".json.bak") backup.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(status_path, backup) ctx["status_backup"] = backup def rollback(ctx: Dict[str, Any]) -> None: write_log({"level": "WARNING", "message": "Rolling back installation"}, ctx) install_dir = Path(ctx["install_dir"]).resolve() for path in reversed(ctx.get("applied_paths", [])): resolved = Path(path).resolve() try: if resolved == install_dir or install_dir not in resolved.parents: continue if resolved.is_dir(): shutil.rmtree(resolved, ignore_errors=True) else: resolved.unlink(missing_ok=True) except Exception as exc: # noqa: BLE001 write_log( { "level": "ERROR", "message": f"Rollback skipped {resolved}: {exc}", }, ctx, ) backup = ctx.get("status_backup") if backup and Path(backup).exists(): shutil.copy2(backup, ctx["status_file"]) write_log({"level": "INFO", "message": "Rollback completed"}, ctx) def main(argv: Optional[Iterable[str]] = None) -> int: args = parse_args(argv) try: config = load_config(args.config) except Exception as exc: # noqa: BLE001 print(f"Error loading config: {exc}", file=sys.stderr) return 1 ctx = resolve_paths(config, args) # Handle --list-modules if getattr(args, "list_modules", False): list_modules(config) return 0 # Handle --status if getattr(args, "status", False): list_modules_with_status(config, ctx) return 0 # Handle --uninstall if getattr(args, "uninstall", False): if not args.module: print("Error: --uninstall requires --module to specify which modules to uninstall") return 1 modules = config.get("modules", {}) installed = load_installed_status(ctx) installed_modules = installed.get("modules", {}) selected = select_modules(config, args.module) to_uninstall = {k: v for k, v in selected.items() if k in installed_modules} if not to_uninstall: print("None of the specified modules are installed.") return 0 print(f"Uninstalling {len(to_uninstall)} module(s): {', '.join(to_uninstall.keys())}") for name, cfg in to_uninstall.items(): try: uninstall_module(name, cfg, ctx) print(f" ✓ {name} uninstalled") except Exception as exc: print(f" ✗ {name} failed: {exc}", file=sys.stderr) update_status_after_uninstall(list(to_uninstall.keys()), ctx) print(f"\n✓ Uninstall complete") return 0 # Handle --update if getattr(args, "update", False): try: ensure_install_dir(ctx["install_dir"]) except Exception as exc: print(f"Failed to prepare install dir: {exc}", file=sys.stderr) return 1 installed_status = get_installed_modules(config, ctx) if args.module: selected = select_modules(config, args.module) modules = {k: v for k, v in selected.items() if installed_status.get(k, False)} else: modules = { k: v for k, v in config.get("modules", {}).items() if installed_status.get(k, False) } if not modules: print("No installed modules to update.") return 0 ctx["force"] = True prepare_status_backup(ctx) total = len(modules) print(f"Updating {total} module(s) in {ctx['install_dir']}...") results: List[Dict[str, Any]] = [] for idx, (name, cfg) in enumerate(modules.items(), 1): print(f"[{idx}/{total}] Updating module: {name}...") try: results.append(execute_module(name, cfg, ctx)) print(f" ✓ {name} updated successfully") except Exception as exc: # noqa: BLE001 print(f" ✗ {name} failed: {exc}", file=sys.stderr) rollback(ctx) if not args.force: return 1 results.append( { "module": name, "status": "failed", "operations": [], "installed_at": datetime.now().isoformat(), } ) break current_status = load_installed_status(ctx) for r in results: if r.get("status") == "success": current_status.setdefault("modules", {})[r["module"]] = r current_status["updated_at"] = datetime.now().isoformat() with Path(ctx["status_file"]).open("w", encoding="utf-8") as fh: json.dump(current_status, fh, indent=2, ensure_ascii=False) success = sum(1 for r in results if r.get("status") == "success") failed = len(results) - success if failed == 0: print(f"\n✓ Update complete: {success} module(s) updated") install_default_configs(ctx) print_post_install_info(ctx) else: print(f"\n⚠ Update finished with errors: {success} success, {failed} failed") if not args.force: return 1 return 0 # No --module specified: enter interactive management mode if not args.module: try: ensure_install_dir(ctx["install_dir"]) except Exception as exc: print(f"Failed to prepare install dir: {exc}", file=sys.stderr) return 1 result = interactive_manage(config, ctx) if result == 0 and ctx.get("_did_install"): install_default_configs(ctx) print_post_install_info(ctx) return result # Install specified modules modules = select_modules(config, args.module) try: ensure_install_dir(ctx["install_dir"]) except Exception as exc: # noqa: BLE001 print(f"Failed to prepare install dir: {exc}", file=sys.stderr) return 1 prepare_status_backup(ctx) total = len(modules) print(f"Installing {total} module(s) to {ctx['install_dir']}...") results: List[Dict[str, Any]] = [] for idx, (name, cfg) in enumerate(modules.items(), 1): print(f"[{idx}/{total}] Installing module: {name}...") try: results.append(execute_module(name, cfg, ctx)) print(f" ✓ {name} installed successfully") except Exception as exc: # noqa: BLE001 print(f" ✗ {name} failed: {exc}", file=sys.stderr) if not args.force: rollback(ctx) return 1 rollback(ctx) results.append( { "module": name, "status": "failed", "operations": [], "installed_at": datetime.now().isoformat(), } ) break # Merge with existing status current_status = load_installed_status(ctx) for r in results: if r.get("status") == "success": current_status.setdefault("modules", {})[r["module"]] = r current_status["updated_at"] = datetime.now().isoformat() with Path(ctx["status_file"]).open("w", encoding="utf-8") as fh: json.dump(current_status, fh, indent=2, ensure_ascii=False) # Summary success = sum(1 for r in results if r.get("status") == "success") failed = len(results) - success if failed == 0: print(f"\n✓ Installation complete: {success} module(s) installed") print(f" Log file: {ctx['log_file']}") else: print(f"\n⚠ Installation finished with errors: {success} success, {failed} failed") print(f" Check log file for details: {ctx['log_file']}") if not args.force: return 1 if failed == 0: install_default_configs(ctx) print_post_install_info(ctx) return 0 if __name__ == "__main__": # pragma: no cover sys.exit(main())