Files
Claude-Code-Workflow/codex-lens/docs/SEMANTIC_GRAPH_DESIGN.md
catlog22 3ffb907a6f feat: add semantic graph design for static code analysis
- Introduced a comprehensive design document for a Code Semantic Graph aimed at enhancing static analysis capabilities.
- Defined the architecture, core components, and implementation steps for analyzing function calls, data flow, and dependencies.
- Included detailed specifications for nodes and edges in the graph, along with database schema for storage.
- Outlined phases for implementation, technical challenges, success metrics, and application scenarios.
2025-12-15 09:47:18 +08:00

32 KiB
Raw Permalink Blame History

静态分析语义图谱设计方案

1. 背景与目标

1.1 当前问题

现有的 llm_enhancer.py 对代码的分析是孤立的、原子化的

  • 每个函数/类被视为独立单元
  • 无法识别函数调用关系
  • 无法追踪数据流
  • 无法理解模块依赖

这导致无法回答以下类型的问题:

  • "修改这个函数会影响哪些模块?"
  • "这个API的完整数据流路径是什么"
  • "找出所有操作User实体的写入方法"
  • "哪些函数依赖这个配置参数?"

1.2 设计目标

构建代码语义图谱Code Semantic Graph实现

  1. 调用关系分析:函数/方法调用图Call Graph
  2. 数据流追踪:变量定义、使用、传递路径
  3. 依赖关系管理:模块/类/包之间的依赖
  4. 实体关系映射:识别数据模型及其操作方法
  5. LLM增强的语义理解结合静态分析和LLM理解调用的"意图"

2. 技术架构

2.1 整体架构

Source Code Files
      ↓
[Static Analysis Layer]
   ├─ AST Parsing (tree-sitter)
   ├─ Call Graph Extraction
   ├─ Data Flow Analysis
   └─ Dependency Resolution
      ↓
[Graph Construction Layer]
   ├─ Node Creation (Functions/Classes/Modules)
   ├─ Edge Creation (Calls/Imports/DataFlow)
   └─ Graph Storage (SQLite/Neo4j)
      ↓
[LLM Enhancement Layer]
   ├─ Relationship Semantics
   ├─ Intent Analysis
   └─ Pattern Recognition
      ↓
[Query & Reasoning Layer]
   ├─ Graph Traversal
   ├─ Impact Analysis
   └─ Semantic Search Integration

2.2 核心组件

2.2.1 图节点类型Nodes

from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Set

class NodeType(Enum):
    """节点类型"""
    MODULE = "module"           # 模块/文件
    CLASS = "class"             # 类
    FUNCTION = "function"       # 函数
    METHOD = "method"           # 方法
    VARIABLE = "variable"       # 变量
    PARAMETER = "parameter"     # 参数
    DATA_MODEL = "data_model"   # 数据模型(识别出的实体类)

@dataclass
class CodeNode:
    """代码图节点"""
    node_id: str                # 唯一标识file:line:name
    node_type: NodeType
    name: str
    qualified_name: str         # 完全限定名module.class.method
    file_path: str
    start_line: int
    end_line: int

    # 静态分析元数据
    signature: Optional[str] = None      # 函数/方法签名
    docstring: Optional[str] = None
    modifiers: Set[str] = None          # public/private/static等

    # LLM生成的语义元数据
    summary: Optional[str] = None
    purpose: Optional[str] = None
    tags: List[str] = None              # 如crud, validation, auth

2.2.2 图边类型Edges

class EdgeType(Enum):
    """边类型"""
    CALLS = "calls"                     # A调用B
    IMPORTS = "imports"                 # A导入B
    INHERITS = "inherits"               # A继承B
    IMPLEMENTS = "implements"           # A实现B接口
    USES_VARIABLE = "uses_variable"     # A使用变量B
    DEFINES_VARIABLE = "defines_variable"  # A定义变量B
    PASSES_DATA = "passes_data"         # A向B传递数据
    MODIFIES = "modifies"               # A修改B如数据库写入
    READS = "reads"                     # A读取B如数据库查询

@dataclass
class CodeEdge:
    """代码图边"""
    edge_id: str
    source_id: str              # 源节点ID
    target_id: str              # 目标节点ID
    edge_type: EdgeType

    # 边的上下文信息
    context: Optional[str] = None       # 调用发生的代码片段
    line_number: Optional[int] = None   # 调用所在行号

    # LLM生成的语义
    semantic_intent: Optional[str] = None  # 如"验证用户权限"
    confidence: float = 1.0              # 置信度

3. 详细实现步骤

3.1 静态分析引擎

3.1.1 AST解析与符号提取

from tree_sitter import Language, Parser
from pathlib import Path
from typing import Dict, List

class ASTAnalyzer:
    """基于tree-sitter的AST分析器"""

    def __init__(self, language: str):
        self.language = language
        self.parser = Parser()
        # 加载语言grammar

    def extract_symbols(self, content: str, file_path: str) -> List[CodeNode]:
        """提取所有符号定义"""

        tree = self.parser.parse(bytes(content, 'utf-8'))
        root = tree.root_node

        symbols = []
        self._traverse_definitions(root, content, file_path, symbols)
        return symbols

    def _traverse_definitions(
        self,
        node,
        content: str,
        file_path: str,
        result: List[CodeNode],
        parent_class: str = None
    ):
        """递归遍历提取定义"""

        if node.type == 'function_definition':
            func_node = self._create_function_node(node, content, file_path)
            result.append(func_node)

        elif node.type == 'class_definition':
            class_node = self._create_class_node(node, content, file_path)
            result.append(class_node)

            # 遍历类内部的方法
            for child in node.children:
                if child.type == 'block':
                    for method in child.children:
                        if method.type == 'function_definition':
                            method_node = self._create_method_node(
                                method, content, file_path, class_node.name
                            )
                            result.append(method_node)

        # 递归遍历子节点
        for child in node.children:
            self._traverse_definitions(
                child, content, file_path, result, parent_class
            )

    def _create_function_node(self, node, content: str, file_path: str) -> CodeNode:
        """创建函数节点"""

        name_node = node.child_by_field_name('name')
        func_name = content[name_node.start_byte:name_node.end_byte]

        # 提取参数列表
        params_node = node.child_by_field_name('parameters')
        signature = content[params_node.start_byte:params_node.end_byte]

        # 提取docstring
        docstring = self._extract_docstring(node, content)

        return CodeNode(
            node_id=f"{file_path}:{node.start_point[0]}:{func_name}",
            node_type=NodeType.FUNCTION,
            name=func_name,
            qualified_name=f"{Path(file_path).stem}.{func_name}",
            file_path=file_path,
            start_line=node.start_point[0] + 1,
            end_line=node.end_point[0] + 1,
            signature=f"{func_name}{signature}",
            docstring=docstring,
        )

    def _extract_docstring(self, node, content: str) -> Optional[str]:
        """提取docstring"""

        # 查找函数体的第一个表达式语句
        body = node.child_by_field_name('body')
        if not body:
            return None

        for child in body.children:
            if child.type == 'expression_statement':
                expr = child.children[0]
                if expr.type == 'string':
                    # 提取字符串内容(去掉引号)
                    doc = content[expr.start_byte:expr.end_byte]
                    return doc.strip('"""').strip("'''").strip()

        return None

3.1.2 调用图提取

class CallGraphExtractor:
    """调用图提取器"""

    def __init__(self, ast_analyzer: ASTAnalyzer):
        self.ast_analyzer = ast_analyzer

    def extract_calls(
        self,
        content: str,
        file_path: str,
        symbols: List[CodeNode]
    ) -> List[CodeEdge]:
        """提取函数调用关系"""

        tree = self.ast_analyzer.parser.parse(bytes(content, 'utf-8'))
        calls = []

        # 为每个函数/方法提取其内部的调用
        for symbol in symbols:
            if symbol.node_type in [NodeType.FUNCTION, NodeType.METHOD]:
                symbol_calls = self._extract_calls_in_function(
                    tree, symbol, content, file_path
                )
                calls.extend(symbol_calls)

        return calls

    def _extract_calls_in_function(
        self,
        tree,
        caller: CodeNode,
        content: str,
        file_path: str
    ) -> List[CodeEdge]:
        """提取单个函数内的所有调用"""

        # 定位到函数的AST节点
        func_node = self._find_node_by_line(tree.root_node, caller.start_line)
        if not func_node:
            return []

        calls = []
        self._traverse_calls(func_node, caller, content, file_path, calls)
        return calls

    def _traverse_calls(
        self,
        node,
        caller: CodeNode,
        content: str,
        file_path: str,
        result: List[CodeEdge]
    ):
        """递归遍历查找call表达式"""

        if node.type == 'call':
            # 提取被调用的函数名
            function_node = node.child_by_field_name('function')
            callee_name = content[function_node.start_byte:function_node.end_byte]

            # 提取调用的上下文(所在行)
            call_line = node.start_point[0] + 1
            line_content = content.splitlines()[node.start_point[0]]

            edge = CodeEdge(
                edge_id=f"{caller.node_id}{callee_name}:{call_line}",
                source_id=caller.node_id,
                target_id=callee_name,  # 暂时用名称,后续需要解析
                edge_type=EdgeType.CALLS,
                context=line_content.strip(),
                line_number=call_line,
            )
            result.append(edge)

        # 递归遍历
        for child in node.children:
            self._traverse_calls(child, caller, content, file_path, result)

    def _find_node_by_line(self, node, target_line: int):
        """根据行号查找AST节点"""

        if node.start_point[0] + 1 == target_line:
            return node

        for child in node.children:
            result = self._find_node_by_line(child, target_line)
            if result:
                return result

        return None

3.1.3 名称解析Name Resolution

class NameResolver:
    """将函数调用的名称解析为具体的符号定义"""

    def __init__(self, symbol_table: Dict[str, CodeNode]):
        """
        symbol_table: 符号表,映射 qualified_name -> CodeNode
        """
        self.symbol_table = symbol_table

    def resolve_call_target(
        self,
        call_edge: CodeEdge,
        caller_context: CodeNode
    ) -> Optional[str]:
        """
        解析调用目标的完整node_id

        策略:
        1. 检查是否是本地函数调用(同文件)
        2. 检查是否是导入的模块函数
        3. 检查是否是方法调用self.method
        """

        callee_name = call_edge.target_id

        # 策略1: 本地调用(同文件)
        local_qualified = f"{Path(caller_context.file_path).stem}.{callee_name}"
        if local_qualified in self.symbol_table:
            return self.symbol_table[local_qualified].node_id

        # 策略2: 方法调用(提取对象名)
        if '.' in callee_name:
            parts = callee_name.split('.')
            if parts[0] == 'self':
                # self.method_name -> 在当前类中查找
                method_name = parts[1]
                # 需要找到caller所属的类
                class_name = self._find_containing_class(caller_context)
                if class_name:
                    class_qualified = f"{Path(caller_context.file_path).stem}.{class_name}.{method_name}"
                    if class_qualified in self.symbol_table:
                        return self.symbol_table[class_qualified].node_id

        # 策略3: 导入的函数需要扫描import语句
        # TODO: 实现跨文件的导入解析

        return None

    def _find_containing_class(self, node: CodeNode) -> Optional[str]:
        """找到函数/方法所属的类"""
        # 通过qualified_name推断
        parts = node.qualified_name.split('.')
        if len(parts) > 2:  # module.class.method
            return parts[-2]
        return None

3.2 图存储与索引

3.2.1 数据库SchemaSQLite版本

-- 节点表
CREATE TABLE code_nodes (
    node_id TEXT PRIMARY KEY,
    node_type TEXT NOT NULL,  -- module/class/function/method/variable
    name TEXT NOT NULL,
    qualified_name TEXT NOT NULL UNIQUE,
    file_path TEXT NOT NULL,
    start_line INTEGER NOT NULL,
    end_line INTEGER NOT NULL,

    -- 静态分析元数据
    signature TEXT,
    docstring TEXT,
    modifiers TEXT,  -- JSON数组

    -- LLM语义元数据
    summary TEXT,
    purpose TEXT,
    tags TEXT,  -- JSON数组

    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 边表
CREATE TABLE code_edges (
    edge_id TEXT PRIMARY KEY,
    source_id TEXT NOT NULL,
    target_id TEXT NOT NULL,
    edge_type TEXT NOT NULL,  -- calls/imports/inherits/uses_variable等

    -- 上下文
    context TEXT,
    line_number INTEGER,

    -- LLM语义
    semantic_intent TEXT,
    confidence REAL DEFAULT 1.0,

    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    FOREIGN KEY (source_id) REFERENCES code_nodes(node_id) ON DELETE CASCADE,
    FOREIGN KEY (target_id) REFERENCES code_nodes(node_id) ON DELETE CASCADE
);

-- 索引
CREATE INDEX idx_nodes_type ON code_nodes(node_type);
CREATE INDEX idx_nodes_file ON code_nodes(file_path);
CREATE INDEX idx_nodes_qualified ON code_nodes(qualified_name);

CREATE INDEX idx_edges_source ON code_edges(source_id);
CREATE INDEX idx_edges_target ON code_edges(target_id);
CREATE INDEX idx_edges_type ON code_edges(edge_type);

-- 用于快速查找调用关系
CREATE INDEX idx_edges_source_type ON code_edges(source_id, edge_type);
CREATE INDEX idx_edges_target_type ON code_edges(target_id, edge_type);

3.2.2 图存储接口

import sqlite3
from typing import List, Optional, Set

class CodeGraphStore:
    """代码图谱存储"""

    def __init__(self, db_path: Path):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self._create_tables()

    def add_node(self, node: CodeNode):
        """添加节点"""
        cursor = self.conn.cursor()
        cursor.execute("""
            INSERT OR REPLACE INTO code_nodes (
                node_id, node_type, name, qualified_name,
                file_path, start_line, end_line,
                signature, docstring, summary, purpose
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            node.node_id, node.node_type.value, node.name,
            node.qualified_name, node.file_path,
            node.start_line, node.end_line,
            node.signature, node.docstring,
            node.summary, node.purpose
        ))
        self.conn.commit()

    def add_edge(self, edge: CodeEdge):
        """添加边"""
        cursor = self.conn.cursor()
        cursor.execute("""
            INSERT OR REPLACE INTO code_edges (
                edge_id, source_id, target_id, edge_type,
                context, line_number, semantic_intent, confidence
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            edge.edge_id, edge.source_id, edge.target_id,
            edge.edge_type.value, edge.context, edge.line_number,
            edge.semantic_intent, edge.confidence
        ))
        self.conn.commit()

    def get_node(self, node_id: str) -> Optional[CodeNode]:
        """获取节点"""
        cursor = self.conn.cursor()
        cursor.execute("SELECT * FROM code_nodes WHERE node_id = ?", (node_id,))
        row = cursor.fetchone()
        return self._row_to_node(row) if row else None

    def get_callers(self, node_id: str) -> List[CodeNode]:
        """获取所有调用该节点的节点(反向查询)"""
        cursor = self.conn.cursor()
        cursor.execute("""
            SELECT n.* FROM code_nodes n
            JOIN code_edges e ON n.node_id = e.source_id
            WHERE e.target_id = ? AND e.edge_type = 'calls'
        """, (node_id,))

        return [self._row_to_node(row) for row in cursor.fetchall()]

    def get_callees(self, node_id: str) -> List[CodeNode]:
        """获取该节点调用的所有节点(正向查询)"""
        cursor = self.conn.cursor()
        cursor.execute("""
            SELECT n.* FROM code_nodes n
            JOIN code_edges e ON n.node_id = e.target_id
            WHERE e.source_id = ? AND e.edge_type = 'calls'
        """, (node_id,))

        return [self._row_to_node(row) for row in cursor.fetchall()]

    def get_call_chain(
        self,
        start_node_id: str,
        max_depth: int = 5
    ) -> List[List[CodeNode]]:
        """获取调用链DFS遍历"""

        visited = set()
        chains = []

        def dfs(node_id: str, path: List[CodeNode], depth: int):
            if depth > max_depth or node_id in visited:
                return

            visited.add(node_id)
            node = self.get_node(node_id)
            if not node:
                return

            current_path = path + [node]
            callees = self.get_callees(node_id)

            if not callees:
                # 叶子节点,记录完整路径
                chains.append(current_path)
            else:
                for callee in callees:
                    dfs(callee.node_id, current_path, depth + 1)

            visited.remove(node_id)

        dfs(start_node_id, [], 0)
        return chains

3.3 LLM语义增强

3.3.1 关系语义分析

class RelationshipSemanticAnalyzer:
    """为代码关系生成语义描述"""

    def __init__(self, llm_enhancer: LLMEnhancer):
        self.llm_enhancer = llm_enhancer

    def analyze_call_intent(
        self,
        edge: CodeEdge,
        caller: CodeNode,
        callee: CodeNode
    ) -> str:
        """分析函数调用的意图"""

        # 构建提示词
        prompt = f"""
PURPOSE: Analyze the intent of a function call relationship
TASK: Describe in 1 sentence WHY function A calls function B and what purpose it serves

CONTEXT:
Caller Function: {caller.name}
Caller Summary: {caller.summary or 'N/A'}
Caller Code Snippet:

{edge.context}


Callee Function: {callee.name}
Callee Summary: {callee.summary or 'N/A'}
Callee Signature: {callee.signature or 'N/A'}

OUTPUT: A concise semantic description of the call intent.
Example: "validates user credentials before granting access"
"""

        response = self.llm_enhancer._invoke_ccw_cli(prompt, tool='gemini')
        if response['success']:
            intent = response['stdout'].strip()
            return intent

        return "unknown intent"

    def batch_analyze_calls(
        self,
        edges: List[CodeEdge],
        nodes_map: Dict[str, CodeNode]
    ) -> Dict[str, str]:
        """批量分析调用意图优化LLM调用"""

        # 构建批量prompt
        call_descriptions = []
        for edge in edges:
            caller = nodes_map.get(edge.source_id)
            callee = nodes_map.get(edge.target_id)
            if not caller or not callee:
                continue

            desc = f"""
[CALL {edge.edge_id}]
From: {caller.name} ({caller.summary or 'no summary'})
To: {callee.name} ({callee.summary or 'no summary'})
Context: {edge.context}
"""
            call_descriptions.append(desc)

        prompt = f"""
PURPOSE: Analyze multiple function call relationships and describe their intents
TASK: For each call, provide a 1-sentence semantic description

{chr(10).join(call_descriptions)}

OUTPUT FORMAT (JSON):
{{
    "edge_id_1": "intent description",
    "edge_id_2": "intent description",
    ...
}}
"""

        response = self.llm_enhancer._invoke_ccw_cli(prompt, tool='gemini')
        if response['success']:
            import json
            intents = json.loads(self.llm_enhancer._extract_json(response['stdout']))
            return intents

        return {}

3.3.2 数据模型识别

class DataModelRecognizer:
    """识别代码中的数据模型(实体类)"""

    def identify_data_models(
        self,
        class_nodes: List[CodeNode]
    ) -> List[CodeNode]:
        """识别哪些类是数据模型"""

        data_models = []

        for class_node in class_nodes:
            # 启发式规则
            is_model = False

            # 规则1: 类名包含Model/Entity/Schema
            if any(keyword in class_node.name for keyword in ['Model', 'Entity', 'Schema']):
                is_model = True

            # 规则2: 继承自ORM基类需要分析继承关系
            # TODO: 检查是否继承 db.Model, BaseModel等

            # 规则3: 让LLM判断
            if not is_model:
                is_model = self._ask_llm_if_data_model(class_node)

            if is_model:
                class_node.tags = class_node.tags or []
                class_node.tags.append('data_model')
                data_models.append(class_node)

        return data_models

    def _ask_llm_if_data_model(self, class_node: CodeNode) -> bool:
        """让LLM判断是否为数据模型"""

        prompt = f"""
Is this Python class a data model (entity class representing database table or data structure)?

Class Definition:
```python
{class_node.docstring or ''}
class {class_node.name}:
    # ... (signature: {class_node.signature})

Answer with: YES or NO """

    # 调用LLM...
    # 简化实现
    return False

### 3.4 图查询与推理

#### 3.4.1 影响分析Impact Analysis

```python
class ImpactAnalyzer:
    """代码变更影响分析"""

    def __init__(self, graph_store: CodeGraphStore):
        self.graph_store = graph_store

    def analyze_function_impact(
        self,
        function_id: str,
        max_depth: int = 10
    ) -> Dict[str, any]:
        """分析修改某个函数的影响范围"""

        # 找到所有直接和间接调用者
        affected_functions = set()
        self._traverse_callers(function_id, affected_functions, 0, max_depth)

        # 找到所有受影响的文件
        affected_files = set()
        for func_id in affected_functions:
            node = self.graph_store.get_node(func_id)
            if node:
                affected_files.add(node.file_path)

        return {
            'modified_function': function_id,
            'affected_functions': list(affected_functions),
            'affected_files': list(affected_files),
            'impact_scope': len(affected_functions),
        }

    def _traverse_callers(
        self,
        node_id: str,
        result: Set[str],
        current_depth: int,
        max_depth: int
    ):
        """递归遍历调用者"""

        if current_depth >= max_depth or node_id in result:
            return

        callers = self.graph_store.get_callers(node_id)
        for caller in callers:
            result.add(caller.node_id)
            self._traverse_callers(caller.node_id, result, current_depth + 1, max_depth)

3.4.2 数据流追踪

class DataFlowTracer:
    """数据流路径追踪"""

    def __init__(self, graph_store: CodeGraphStore):
        self.graph_store = graph_store

    def trace_variable_flow(
        self,
        variable_name: str,
        start_function_id: str
    ) -> List[Dict]:
        """追踪变量的数据流"""

        # 查找所有使用该变量的边
        cursor = self.graph_store.conn.cursor()
        cursor.execute("""
            SELECT * FROM code_edges
            WHERE edge_type IN ('uses_variable', 'defines_variable', 'passes_data')
            AND (source_id = ? OR target_id LIKE ?)
        """, (start_function_id, f"%{variable_name}%"))

        flow_path = []
        for row in cursor.fetchall():
            edge = self._row_to_edge(row)
            source = self.graph_store.get_node(edge.source_id)
            target = self.graph_store.get_node(edge.target_id)

            flow_path.append({
                'from': source.name if source else 'unknown',
                'to': target.name if target else 'unknown',
                'action': edge.edge_type.value,
                'context': edge.context,
            })

        return flow_path

    def find_crud_operations(
        self,
        entity_name: str
    ) -> Dict[str, List[CodeNode]]:
        """找到对某个实体的所有CRUD操作"""

        cursor = self.graph_store.conn.cursor()

        # 查找所有修改该实体的函数
        cursor.execute("""
            SELECT DISTINCT n.* FROM code_nodes n
            JOIN code_edges e ON n.node_id = e.source_id
            WHERE e.edge_type = 'modifies'
            AND e.target_id LIKE ?
        """, (f"%{entity_name}%",))

        writers = [self._row_to_node(row) for row in cursor.fetchall()]

        # 查找所有读取该实体的函数
        cursor.execute("""
            SELECT DISTINCT n.* FROM code_nodes n
            JOIN code_edges e ON n.node_id = e.source_id
            WHERE e.edge_type = 'reads'
            AND e.target_id LIKE ?
        """, (f"%{entity_name}%",))

        readers = [self._row_to_node(row) for row in cursor.fetchall()]

        return {
            'create': [w for w in writers if 'create' in w.name.lower()],
            'read': readers,
            'update': [w for w in writers if 'update' in w.name.lower()],
            'delete': [w for w in writers if 'delete' in w.name.lower()],
        }

3.5 与语义搜索集成

3.5.1 增强的搜索结果

class GraphEnhancedSearchEngine:
    """结合图谱的增强搜索"""

    def __init__(
        self,
        vector_search: VectorStore,
        graph_store: CodeGraphStore
    ):
        self.vector_search = vector_search
        self.graph_store = graph_store

    def search_with_graph_context(
        self,
        query: str,
        top_k: int = 10
    ) -> List[EnhancedSearchResult]:
        """带图谱上下文的搜索"""

        # 1. 向量搜索
        vector_results = self.vector_search.search(query, top_k=top_k)

        # 2. 为每个结果添加图谱信息
        enhanced_results = []
        for result in vector_results:
            # 找到对应的图节点
            node = self.graph_store.get_node(result.path)
            if not node:
                continue

            # 获取调用关系
            callers = self.graph_store.get_callers(node.node_id)
            callees = self.graph_store.get_callees(node.node_id)

            enhanced = EnhancedSearchResult(
                **result.dict(),
                callers=[c.name for c in callers[:5]],
                callees=[c.name for c in callees[:5]],
                call_count_in=len(callers),
                call_count_out=len(callees),
            )
            enhanced_results.append(enhanced)

        return enhanced_results

    def search_by_relationship(
        self,
        query: str,
        relationship_type: str  # "calls", "called_by", "uses", etc.
    ) -> List[CodeNode]:
        """基于关系的搜索"""

        # 先找到查询匹配的节点
        vector_results = self.vector_search.search(query, top_k=5)
        if not vector_results:
            return []

        target_node_id = vector_results[0].path

        # 根据关系类型查找相关节点
        if relationship_type == "calls":
            return self.graph_store.get_callees(target_node_id)
        elif relationship_type == "called_by":
            return self.graph_store.get_callers(target_node_id)
        # 其他关系类型...

        return []

4. 实施路线图

Phase 1: 基础静态分析3-4周

  • 实现ASTAnalyzer符号提取
  • 实现CallGraphExtractor调用图提取
  • 实现NameResolver名称解析
  • 设计图数据库schema
  • 实现CodeGraphStore基础CRUD
  • 单元测试

Phase 2: 多语言支持2周

  • Python完整支持
  • JavaScript/TypeScript支持
  • Java支持可选
  • 跨语言测试

Phase 3: LLM语义增强2-3周

  • 实现RelationshipSemanticAnalyzer
  • 实现DataModelRecognizer
  • 批量处理优化
  • 集成测试

Phase 4: 高级查询2周

  • 实现ImpactAnalyzer
  • 实现DataFlowTracer
  • 实现GraphEnhancedSearchEngine
  • 性能优化

Phase 5: 可视化与工具2周

  • 调用图可视化Graphviz/D3.js
  • CLI命令集成
  • Web UI可选

Phase 6: 生产化1-2周

  • 增量更新机制
  • 大规模项目优化
  • 文档和示例
  • 发布

总计预估时间12-15周

5. 技术挑战与解决方案

5.1 挑战:跨文件名称解析

问题:函数调用的目标可能在不同文件/模块中需要解析import语句。

解决方案

class ImportResolver:
    """导入语句解析器"""

    def extract_imports(self, tree, content: str) -> Dict[str, str]:
        """
        提取所有import语句构建别名映射

        返回: {别名 -> 实际模块路径}
        """
        imports = {}

        for node in tree.root_node.children:
            if node.type == 'import_statement':
                # from module import func
                # import module as alias
                pass  # 解析逻辑

        return imports

    def resolve_imported_symbol(
        self,
        symbol_name: str,
        imports: Dict[str, str],
        project_root: Path
    ) -> Optional[str]:
        """解析导入符号的实际位置"""

        if symbol_name in imports:
            module_path = imports[symbol_name]
            # 查找该模块的文件路径
            # 在图谱中查找对应的节点
            pass

        return None

5.2 挑战:动态调用识别

问题反射、getattr、动态导入等运行时行为无法通过静态分析完全捕获。

解决方案

  • 使用LLM推断可能的调用目标
  • 标记为"动态调用",降低置信度
  • 结合运行时日志补充
def handle_dynamic_call(edge: CodeEdge) -> CodeEdge:
    """处理动态调用"""

    if 'getattr' in edge.context or 'eval' in edge.context:
        edge.confidence = 0.5
        edge.semantic_intent = "dynamic call (runtime resolution required)"

    return edge

5.3 挑战:大型代码库性能

问题:对百万行级别的代码库构建图谱可能耗时很长。

解决方案

  • 并行处理:多进程分析不同文件
  • 增量更新:只重新分析变更的文件
  • 延迟LLM初次构建只做静态分析LLM增强按需触发
from multiprocessing import Pool

class ParallelGraphBuilder:
    """并行图谱构建"""

    def build_graph_parallel(
        self,
        file_paths: List[Path],
        workers: int = 8
    ):
        """并行分析多个文件"""

        with Pool(workers) as pool:
            results = pool.map(self._analyze_single_file, file_paths)

        # 合并结果到图谱
        for nodes, edges in results:
            for node in nodes:
                self.graph_store.add_node(node)
            for edge in edges:
                self.graph_store.add_edge(edge)

6. 成功指标

  1. 覆盖率90%以上的函数调用关系被正确识别
  2. 准确率:名称解析准确率>85%
  3. 性能10万行代码的项目图谱构建<5分钟
  4. 查询速度:影响分析查询<100ms
  5. LLM增强价值:关系语义描述的有用性评分>4/5

7. 应用场景示例

场景1代码审查助手

# 审查一个PR分析影响范围
analyzer = ImpactAnalyzer(graph_store)
impact = analyzer.analyze_function_impact('auth.py:45:validate_token')

print(f"修改此函数将影响 {impact['impact_scope']} 个其他函数")
print(f"涉及文件: {', '.join(impact['affected_files'])}")

场景2重构规划

# 计划重构User类查看所有相关操作
tracer = DataFlowTracer(graph_store)
crud = tracer.find_crud_operations('User')

print(f"创建User的方法: {[f.name for f in crud['create']]}")
print(f"读取User的方法: {[f.name for f in crud['read']]}")

场景3知识图谱问答

# "修改登录逻辑会影响哪些API端点"
search_engine = GraphEnhancedSearchEngine(vector_store, graph_store)

# 先找到登录函数
login_func = search_engine.search("user login authentication")[0]

# 追踪调用链
analyzer = ImpactAnalyzer(graph_store)
impact = analyzer.analyze_function_impact(login_func.node_id)

# 筛选出API端点
api_endpoints = [
    f for f in impact['affected_functions']
    if '@app.route' in graph_store.get_node(f).modifiers
]

8. 参考资料