mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-02-05 01:50:27 +08:00
- Introduced a comprehensive design document for a Code Semantic Graph aimed at enhancing static analysis capabilities. - Defined the architecture, core components, and implementation steps for analyzing function calls, data flow, and dependencies. - Included detailed specifications for nodes and edges in the graph, along with database schema for storage. - Outlined phases for implementation, technical challenges, success metrics, and application scenarios.
32 KiB
32 KiB
静态分析语义图谱设计方案
1. 背景与目标
1.1 当前问题
现有的 llm_enhancer.py 对代码的分析是孤立的、原子化的:
- 每个函数/类被视为独立单元
- 无法识别函数调用关系
- 无法追踪数据流
- 无法理解模块依赖
这导致无法回答以下类型的问题:
- "修改这个函数会影响哪些模块?"
- "这个API的完整数据流路径是什么?"
- "找出所有操作User实体的写入方法"
- "哪些函数依赖这个配置参数?"
1.2 设计目标
构建代码语义图谱(Code Semantic Graph),实现:
- 调用关系分析:函数/方法调用图(Call Graph)
- 数据流追踪:变量定义、使用、传递路径
- 依赖关系管理:模块/类/包之间的依赖
- 实体关系映射:识别数据模型及其操作方法
- LLM增强的语义理解:结合静态分析和LLM,理解调用的"意图"
2. 技术架构
2.1 整体架构
Source Code Files
↓
[Static Analysis Layer]
├─ AST Parsing (tree-sitter)
├─ Call Graph Extraction
├─ Data Flow Analysis
└─ Dependency Resolution
↓
[Graph Construction Layer]
├─ Node Creation (Functions/Classes/Modules)
├─ Edge Creation (Calls/Imports/DataFlow)
└─ Graph Storage (SQLite/Neo4j)
↓
[LLM Enhancement Layer]
├─ Relationship Semantics
├─ Intent Analysis
└─ Pattern Recognition
↓
[Query & Reasoning Layer]
├─ Graph Traversal
├─ Impact Analysis
└─ Semantic Search Integration
2.2 核心组件
2.2.1 图节点类型(Nodes)
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Set
class NodeType(Enum):
"""节点类型"""
MODULE = "module" # 模块/文件
CLASS = "class" # 类
FUNCTION = "function" # 函数
METHOD = "method" # 方法
VARIABLE = "variable" # 变量
PARAMETER = "parameter" # 参数
DATA_MODEL = "data_model" # 数据模型(识别出的实体类)
@dataclass
class CodeNode:
"""代码图节点"""
node_id: str # 唯一标识:file:line:name
node_type: NodeType
name: str
qualified_name: str # 完全限定名:module.class.method
file_path: str
start_line: int
end_line: int
# 静态分析元数据
signature: Optional[str] = None # 函数/方法签名
docstring: Optional[str] = None
modifiers: Set[str] = None # public/private/static等
# LLM生成的语义元数据
summary: Optional[str] = None
purpose: Optional[str] = None
tags: List[str] = None # 如:crud, validation, auth
2.2.2 图边类型(Edges)
class EdgeType(Enum):
"""边类型"""
CALLS = "calls" # A调用B
IMPORTS = "imports" # A导入B
INHERITS = "inherits" # A继承B
IMPLEMENTS = "implements" # A实现B(接口)
USES_VARIABLE = "uses_variable" # A使用变量B
DEFINES_VARIABLE = "defines_variable" # A定义变量B
PASSES_DATA = "passes_data" # A向B传递数据
MODIFIES = "modifies" # A修改B(如数据库写入)
READS = "reads" # A读取B(如数据库查询)
@dataclass
class CodeEdge:
"""代码图边"""
edge_id: str
source_id: str # 源节点ID
target_id: str # 目标节点ID
edge_type: EdgeType
# 边的上下文信息
context: Optional[str] = None # 调用发生的代码片段
line_number: Optional[int] = None # 调用所在行号
# LLM生成的语义
semantic_intent: Optional[str] = None # 如"验证用户权限"
confidence: float = 1.0 # 置信度
3. 详细实现步骤
3.1 静态分析引擎
3.1.1 AST解析与符号提取
from tree_sitter import Language, Parser
from pathlib import Path
from typing import Dict, List
class ASTAnalyzer:
"""基于tree-sitter的AST分析器"""
def __init__(self, language: str):
self.language = language
self.parser = Parser()
# 加载语言grammar
def extract_symbols(self, content: str, file_path: str) -> List[CodeNode]:
"""提取所有符号定义"""
tree = self.parser.parse(bytes(content, 'utf-8'))
root = tree.root_node
symbols = []
self._traverse_definitions(root, content, file_path, symbols)
return symbols
def _traverse_definitions(
self,
node,
content: str,
file_path: str,
result: List[CodeNode],
parent_class: str = None
):
"""递归遍历提取定义"""
if node.type == 'function_definition':
func_node = self._create_function_node(node, content, file_path)
result.append(func_node)
elif node.type == 'class_definition':
class_node = self._create_class_node(node, content, file_path)
result.append(class_node)
# 遍历类内部的方法
for child in node.children:
if child.type == 'block':
for method in child.children:
if method.type == 'function_definition':
method_node = self._create_method_node(
method, content, file_path, class_node.name
)
result.append(method_node)
# 递归遍历子节点
for child in node.children:
self._traverse_definitions(
child, content, file_path, result, parent_class
)
def _create_function_node(self, node, content: str, file_path: str) -> CodeNode:
"""创建函数节点"""
name_node = node.child_by_field_name('name')
func_name = content[name_node.start_byte:name_node.end_byte]
# 提取参数列表
params_node = node.child_by_field_name('parameters')
signature = content[params_node.start_byte:params_node.end_byte]
# 提取docstring
docstring = self._extract_docstring(node, content)
return CodeNode(
node_id=f"{file_path}:{node.start_point[0]}:{func_name}",
node_type=NodeType.FUNCTION,
name=func_name,
qualified_name=f"{Path(file_path).stem}.{func_name}",
file_path=file_path,
start_line=node.start_point[0] + 1,
end_line=node.end_point[0] + 1,
signature=f"{func_name}{signature}",
docstring=docstring,
)
def _extract_docstring(self, node, content: str) -> Optional[str]:
"""提取docstring"""
# 查找函数体的第一个表达式语句
body = node.child_by_field_name('body')
if not body:
return None
for child in body.children:
if child.type == 'expression_statement':
expr = child.children[0]
if expr.type == 'string':
# 提取字符串内容(去掉引号)
doc = content[expr.start_byte:expr.end_byte]
return doc.strip('"""').strip("'''").strip()
return None
3.1.2 调用图提取
class CallGraphExtractor:
"""调用图提取器"""
def __init__(self, ast_analyzer: ASTAnalyzer):
self.ast_analyzer = ast_analyzer
def extract_calls(
self,
content: str,
file_path: str,
symbols: List[CodeNode]
) -> List[CodeEdge]:
"""提取函数调用关系"""
tree = self.ast_analyzer.parser.parse(bytes(content, 'utf-8'))
calls = []
# 为每个函数/方法提取其内部的调用
for symbol in symbols:
if symbol.node_type in [NodeType.FUNCTION, NodeType.METHOD]:
symbol_calls = self._extract_calls_in_function(
tree, symbol, content, file_path
)
calls.extend(symbol_calls)
return calls
def _extract_calls_in_function(
self,
tree,
caller: CodeNode,
content: str,
file_path: str
) -> List[CodeEdge]:
"""提取单个函数内的所有调用"""
# 定位到函数的AST节点
func_node = self._find_node_by_line(tree.root_node, caller.start_line)
if not func_node:
return []
calls = []
self._traverse_calls(func_node, caller, content, file_path, calls)
return calls
def _traverse_calls(
self,
node,
caller: CodeNode,
content: str,
file_path: str,
result: List[CodeEdge]
):
"""递归遍历查找call表达式"""
if node.type == 'call':
# 提取被调用的函数名
function_node = node.child_by_field_name('function')
callee_name = content[function_node.start_byte:function_node.end_byte]
# 提取调用的上下文(所在行)
call_line = node.start_point[0] + 1
line_content = content.splitlines()[node.start_point[0]]
edge = CodeEdge(
edge_id=f"{caller.node_id}→{callee_name}:{call_line}",
source_id=caller.node_id,
target_id=callee_name, # 暂时用名称,后续需要解析
edge_type=EdgeType.CALLS,
context=line_content.strip(),
line_number=call_line,
)
result.append(edge)
# 递归遍历
for child in node.children:
self._traverse_calls(child, caller, content, file_path, result)
def _find_node_by_line(self, node, target_line: int):
"""根据行号查找AST节点"""
if node.start_point[0] + 1 == target_line:
return node
for child in node.children:
result = self._find_node_by_line(child, target_line)
if result:
return result
return None
3.1.3 名称解析(Name Resolution)
class NameResolver:
"""将函数调用的名称解析为具体的符号定义"""
def __init__(self, symbol_table: Dict[str, CodeNode]):
"""
symbol_table: 符号表,映射 qualified_name -> CodeNode
"""
self.symbol_table = symbol_table
def resolve_call_target(
self,
call_edge: CodeEdge,
caller_context: CodeNode
) -> Optional[str]:
"""
解析调用目标的完整node_id
策略:
1. 检查是否是本地函数调用(同文件)
2. 检查是否是导入的模块函数
3. 检查是否是方法调用(self.method)
"""
callee_name = call_edge.target_id
# 策略1: 本地调用(同文件)
local_qualified = f"{Path(caller_context.file_path).stem}.{callee_name}"
if local_qualified in self.symbol_table:
return self.symbol_table[local_qualified].node_id
# 策略2: 方法调用(提取对象名)
if '.' in callee_name:
parts = callee_name.split('.')
if parts[0] == 'self':
# self.method_name -> 在当前类中查找
method_name = parts[1]
# 需要找到caller所属的类
class_name = self._find_containing_class(caller_context)
if class_name:
class_qualified = f"{Path(caller_context.file_path).stem}.{class_name}.{method_name}"
if class_qualified in self.symbol_table:
return self.symbol_table[class_qualified].node_id
# 策略3: 导入的函数(需要扫描import语句)
# TODO: 实现跨文件的导入解析
return None
def _find_containing_class(self, node: CodeNode) -> Optional[str]:
"""找到函数/方法所属的类"""
# 通过qualified_name推断
parts = node.qualified_name.split('.')
if len(parts) > 2: # module.class.method
return parts[-2]
return None
3.2 图存储与索引
3.2.1 数据库Schema(SQLite版本)
-- 节点表
CREATE TABLE code_nodes (
node_id TEXT PRIMARY KEY,
node_type TEXT NOT NULL, -- module/class/function/method/variable
name TEXT NOT NULL,
qualified_name TEXT NOT NULL UNIQUE,
file_path TEXT NOT NULL,
start_line INTEGER NOT NULL,
end_line INTEGER NOT NULL,
-- 静态分析元数据
signature TEXT,
docstring TEXT,
modifiers TEXT, -- JSON数组
-- LLM语义元数据
summary TEXT,
purpose TEXT,
tags TEXT, -- JSON数组
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 边表
CREATE TABLE code_edges (
edge_id TEXT PRIMARY KEY,
source_id TEXT NOT NULL,
target_id TEXT NOT NULL,
edge_type TEXT NOT NULL, -- calls/imports/inherits/uses_variable等
-- 上下文
context TEXT,
line_number INTEGER,
-- LLM语义
semantic_intent TEXT,
confidence REAL DEFAULT 1.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (source_id) REFERENCES code_nodes(node_id) ON DELETE CASCADE,
FOREIGN KEY (target_id) REFERENCES code_nodes(node_id) ON DELETE CASCADE
);
-- 索引
CREATE INDEX idx_nodes_type ON code_nodes(node_type);
CREATE INDEX idx_nodes_file ON code_nodes(file_path);
CREATE INDEX idx_nodes_qualified ON code_nodes(qualified_name);
CREATE INDEX idx_edges_source ON code_edges(source_id);
CREATE INDEX idx_edges_target ON code_edges(target_id);
CREATE INDEX idx_edges_type ON code_edges(edge_type);
-- 用于快速查找调用关系
CREATE INDEX idx_edges_source_type ON code_edges(source_id, edge_type);
CREATE INDEX idx_edges_target_type ON code_edges(target_id, edge_type);
3.2.2 图存储接口
import sqlite3
from typing import List, Optional, Set
class CodeGraphStore:
"""代码图谱存储"""
def __init__(self, db_path: Path):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self._create_tables()
def add_node(self, node: CodeNode):
"""添加节点"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO code_nodes (
node_id, node_type, name, qualified_name,
file_path, start_line, end_line,
signature, docstring, summary, purpose
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
node.node_id, node.node_type.value, node.name,
node.qualified_name, node.file_path,
node.start_line, node.end_line,
node.signature, node.docstring,
node.summary, node.purpose
))
self.conn.commit()
def add_edge(self, edge: CodeEdge):
"""添加边"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO code_edges (
edge_id, source_id, target_id, edge_type,
context, line_number, semantic_intent, confidence
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
edge.edge_id, edge.source_id, edge.target_id,
edge.edge_type.value, edge.context, edge.line_number,
edge.semantic_intent, edge.confidence
))
self.conn.commit()
def get_node(self, node_id: str) -> Optional[CodeNode]:
"""获取节点"""
cursor = self.conn.cursor()
cursor.execute("SELECT * FROM code_nodes WHERE node_id = ?", (node_id,))
row = cursor.fetchone()
return self._row_to_node(row) if row else None
def get_callers(self, node_id: str) -> List[CodeNode]:
"""获取所有调用该节点的节点(反向查询)"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT n.* FROM code_nodes n
JOIN code_edges e ON n.node_id = e.source_id
WHERE e.target_id = ? AND e.edge_type = 'calls'
""", (node_id,))
return [self._row_to_node(row) for row in cursor.fetchall()]
def get_callees(self, node_id: str) -> List[CodeNode]:
"""获取该节点调用的所有节点(正向查询)"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT n.* FROM code_nodes n
JOIN code_edges e ON n.node_id = e.target_id
WHERE e.source_id = ? AND e.edge_type = 'calls'
""", (node_id,))
return [self._row_to_node(row) for row in cursor.fetchall()]
def get_call_chain(
self,
start_node_id: str,
max_depth: int = 5
) -> List[List[CodeNode]]:
"""获取调用链(DFS遍历)"""
visited = set()
chains = []
def dfs(node_id: str, path: List[CodeNode], depth: int):
if depth > max_depth or node_id in visited:
return
visited.add(node_id)
node = self.get_node(node_id)
if not node:
return
current_path = path + [node]
callees = self.get_callees(node_id)
if not callees:
# 叶子节点,记录完整路径
chains.append(current_path)
else:
for callee in callees:
dfs(callee.node_id, current_path, depth + 1)
visited.remove(node_id)
dfs(start_node_id, [], 0)
return chains
3.3 LLM语义增强
3.3.1 关系语义分析
class RelationshipSemanticAnalyzer:
"""为代码关系生成语义描述"""
def __init__(self, llm_enhancer: LLMEnhancer):
self.llm_enhancer = llm_enhancer
def analyze_call_intent(
self,
edge: CodeEdge,
caller: CodeNode,
callee: CodeNode
) -> str:
"""分析函数调用的意图"""
# 构建提示词
prompt = f"""
PURPOSE: Analyze the intent of a function call relationship
TASK: Describe in 1 sentence WHY function A calls function B and what purpose it serves
CONTEXT:
Caller Function: {caller.name}
Caller Summary: {caller.summary or 'N/A'}
Caller Code Snippet:
{edge.context}
Callee Function: {callee.name}
Callee Summary: {callee.summary or 'N/A'}
Callee Signature: {callee.signature or 'N/A'}
OUTPUT: A concise semantic description of the call intent.
Example: "validates user credentials before granting access"
"""
response = self.llm_enhancer._invoke_ccw_cli(prompt, tool='gemini')
if response['success']:
intent = response['stdout'].strip()
return intent
return "unknown intent"
def batch_analyze_calls(
self,
edges: List[CodeEdge],
nodes_map: Dict[str, CodeNode]
) -> Dict[str, str]:
"""批量分析调用意图(优化LLM调用)"""
# 构建批量prompt
call_descriptions = []
for edge in edges:
caller = nodes_map.get(edge.source_id)
callee = nodes_map.get(edge.target_id)
if not caller or not callee:
continue
desc = f"""
[CALL {edge.edge_id}]
From: {caller.name} ({caller.summary or 'no summary'})
To: {callee.name} ({callee.summary or 'no summary'})
Context: {edge.context}
"""
call_descriptions.append(desc)
prompt = f"""
PURPOSE: Analyze multiple function call relationships and describe their intents
TASK: For each call, provide a 1-sentence semantic description
{chr(10).join(call_descriptions)}
OUTPUT FORMAT (JSON):
{{
"edge_id_1": "intent description",
"edge_id_2": "intent description",
...
}}
"""
response = self.llm_enhancer._invoke_ccw_cli(prompt, tool='gemini')
if response['success']:
import json
intents = json.loads(self.llm_enhancer._extract_json(response['stdout']))
return intents
return {}
3.3.2 数据模型识别
class DataModelRecognizer:
"""识别代码中的数据模型(实体类)"""
def identify_data_models(
self,
class_nodes: List[CodeNode]
) -> List[CodeNode]:
"""识别哪些类是数据模型"""
data_models = []
for class_node in class_nodes:
# 启发式规则
is_model = False
# 规则1: 类名包含Model/Entity/Schema
if any(keyword in class_node.name for keyword in ['Model', 'Entity', 'Schema']):
is_model = True
# 规则2: 继承自ORM基类(需要分析继承关系)
# TODO: 检查是否继承 db.Model, BaseModel等
# 规则3: 让LLM判断
if not is_model:
is_model = self._ask_llm_if_data_model(class_node)
if is_model:
class_node.tags = class_node.tags or []
class_node.tags.append('data_model')
data_models.append(class_node)
return data_models
def _ask_llm_if_data_model(self, class_node: CodeNode) -> bool:
"""让LLM判断是否为数据模型"""
prompt = f"""
Is this Python class a data model (entity class representing database table or data structure)?
Class Definition:
```python
{class_node.docstring or ''}
class {class_node.name}:
# ... (signature: {class_node.signature})
Answer with: YES or NO """
# 调用LLM...
# 简化实现
return False
### 3.4 图查询与推理
#### 3.4.1 影响分析(Impact Analysis)
```python
class ImpactAnalyzer:
"""代码变更影响分析"""
def __init__(self, graph_store: CodeGraphStore):
self.graph_store = graph_store
def analyze_function_impact(
self,
function_id: str,
max_depth: int = 10
) -> Dict[str, any]:
"""分析修改某个函数的影响范围"""
# 找到所有直接和间接调用者
affected_functions = set()
self._traverse_callers(function_id, affected_functions, 0, max_depth)
# 找到所有受影响的文件
affected_files = set()
for func_id in affected_functions:
node = self.graph_store.get_node(func_id)
if node:
affected_files.add(node.file_path)
return {
'modified_function': function_id,
'affected_functions': list(affected_functions),
'affected_files': list(affected_files),
'impact_scope': len(affected_functions),
}
def _traverse_callers(
self,
node_id: str,
result: Set[str],
current_depth: int,
max_depth: int
):
"""递归遍历调用者"""
if current_depth >= max_depth or node_id in result:
return
callers = self.graph_store.get_callers(node_id)
for caller in callers:
result.add(caller.node_id)
self._traverse_callers(caller.node_id, result, current_depth + 1, max_depth)
3.4.2 数据流追踪
class DataFlowTracer:
"""数据流路径追踪"""
def __init__(self, graph_store: CodeGraphStore):
self.graph_store = graph_store
def trace_variable_flow(
self,
variable_name: str,
start_function_id: str
) -> List[Dict]:
"""追踪变量的数据流"""
# 查找所有使用该变量的边
cursor = self.graph_store.conn.cursor()
cursor.execute("""
SELECT * FROM code_edges
WHERE edge_type IN ('uses_variable', 'defines_variable', 'passes_data')
AND (source_id = ? OR target_id LIKE ?)
""", (start_function_id, f"%{variable_name}%"))
flow_path = []
for row in cursor.fetchall():
edge = self._row_to_edge(row)
source = self.graph_store.get_node(edge.source_id)
target = self.graph_store.get_node(edge.target_id)
flow_path.append({
'from': source.name if source else 'unknown',
'to': target.name if target else 'unknown',
'action': edge.edge_type.value,
'context': edge.context,
})
return flow_path
def find_crud_operations(
self,
entity_name: str
) -> Dict[str, List[CodeNode]]:
"""找到对某个实体的所有CRUD操作"""
cursor = self.graph_store.conn.cursor()
# 查找所有修改该实体的函数
cursor.execute("""
SELECT DISTINCT n.* FROM code_nodes n
JOIN code_edges e ON n.node_id = e.source_id
WHERE e.edge_type = 'modifies'
AND e.target_id LIKE ?
""", (f"%{entity_name}%",))
writers = [self._row_to_node(row) for row in cursor.fetchall()]
# 查找所有读取该实体的函数
cursor.execute("""
SELECT DISTINCT n.* FROM code_nodes n
JOIN code_edges e ON n.node_id = e.source_id
WHERE e.edge_type = 'reads'
AND e.target_id LIKE ?
""", (f"%{entity_name}%",))
readers = [self._row_to_node(row) for row in cursor.fetchall()]
return {
'create': [w for w in writers if 'create' in w.name.lower()],
'read': readers,
'update': [w for w in writers if 'update' in w.name.lower()],
'delete': [w for w in writers if 'delete' in w.name.lower()],
}
3.5 与语义搜索集成
3.5.1 增强的搜索结果
class GraphEnhancedSearchEngine:
"""结合图谱的增强搜索"""
def __init__(
self,
vector_search: VectorStore,
graph_store: CodeGraphStore
):
self.vector_search = vector_search
self.graph_store = graph_store
def search_with_graph_context(
self,
query: str,
top_k: int = 10
) -> List[EnhancedSearchResult]:
"""带图谱上下文的搜索"""
# 1. 向量搜索
vector_results = self.vector_search.search(query, top_k=top_k)
# 2. 为每个结果添加图谱信息
enhanced_results = []
for result in vector_results:
# 找到对应的图节点
node = self.graph_store.get_node(result.path)
if not node:
continue
# 获取调用关系
callers = self.graph_store.get_callers(node.node_id)
callees = self.graph_store.get_callees(node.node_id)
enhanced = EnhancedSearchResult(
**result.dict(),
callers=[c.name for c in callers[:5]],
callees=[c.name for c in callees[:5]],
call_count_in=len(callers),
call_count_out=len(callees),
)
enhanced_results.append(enhanced)
return enhanced_results
def search_by_relationship(
self,
query: str,
relationship_type: str # "calls", "called_by", "uses", etc.
) -> List[CodeNode]:
"""基于关系的搜索"""
# 先找到查询匹配的节点
vector_results = self.vector_search.search(query, top_k=5)
if not vector_results:
return []
target_node_id = vector_results[0].path
# 根据关系类型查找相关节点
if relationship_type == "calls":
return self.graph_store.get_callees(target_node_id)
elif relationship_type == "called_by":
return self.graph_store.get_callers(target_node_id)
# 其他关系类型...
return []
4. 实施路线图
Phase 1: 基础静态分析(3-4周)
- 实现ASTAnalyzer(符号提取)
- 实现CallGraphExtractor(调用图提取)
- 实现NameResolver(名称解析)
- 设计图数据库schema
- 实现CodeGraphStore(基础CRUD)
- 单元测试
Phase 2: 多语言支持(2周)
- Python完整支持
- JavaScript/TypeScript支持
- Java支持(可选)
- 跨语言测试
Phase 3: LLM语义增强(2-3周)
- 实现RelationshipSemanticAnalyzer
- 实现DataModelRecognizer
- 批量处理优化
- 集成测试
Phase 4: 高级查询(2周)
- 实现ImpactAnalyzer
- 实现DataFlowTracer
- 实现GraphEnhancedSearchEngine
- 性能优化
Phase 5: 可视化与工具(2周)
- 调用图可视化(Graphviz/D3.js)
- CLI命令集成
- Web UI(可选)
Phase 6: 生产化(1-2周)
- 增量更新机制
- 大规模项目优化
- 文档和示例
- 发布
总计预估时间:12-15周
5. 技术挑战与解决方案
5.1 挑战:跨文件名称解析
问题:函数调用的目标可能在不同文件/模块中,需要解析import语句。
解决方案:
class ImportResolver:
"""导入语句解析器"""
def extract_imports(self, tree, content: str) -> Dict[str, str]:
"""
提取所有import语句,构建别名映射
返回: {别名 -> 实际模块路径}
"""
imports = {}
for node in tree.root_node.children:
if node.type == 'import_statement':
# from module import func
# import module as alias
pass # 解析逻辑
return imports
def resolve_imported_symbol(
self,
symbol_name: str,
imports: Dict[str, str],
project_root: Path
) -> Optional[str]:
"""解析导入符号的实际位置"""
if symbol_name in imports:
module_path = imports[symbol_name]
# 查找该模块的文件路径
# 在图谱中查找对应的节点
pass
return None
5.2 挑战:动态调用识别
问题:反射、getattr、动态导入等运行时行为无法通过静态分析完全捕获。
解决方案:
- 使用LLM推断可能的调用目标
- 标记为"动态调用",降低置信度
- 结合运行时日志补充
def handle_dynamic_call(edge: CodeEdge) -> CodeEdge:
"""处理动态调用"""
if 'getattr' in edge.context or 'eval' in edge.context:
edge.confidence = 0.5
edge.semantic_intent = "dynamic call (runtime resolution required)"
return edge
5.3 挑战:大型代码库性能
问题:对百万行级别的代码库构建图谱可能耗时很长。
解决方案:
- 并行处理:多进程分析不同文件
- 增量更新:只重新分析变更的文件
- 延迟LLM:初次构建只做静态分析,LLM增强按需触发
from multiprocessing import Pool
class ParallelGraphBuilder:
"""并行图谱构建"""
def build_graph_parallel(
self,
file_paths: List[Path],
workers: int = 8
):
"""并行分析多个文件"""
with Pool(workers) as pool:
results = pool.map(self._analyze_single_file, file_paths)
# 合并结果到图谱
for nodes, edges in results:
for node in nodes:
self.graph_store.add_node(node)
for edge in edges:
self.graph_store.add_edge(edge)
6. 成功指标
- 覆盖率:90%以上的函数调用关系被正确识别
- 准确率:名称解析准确率>85%
- 性能:10万行代码的项目,图谱构建<5分钟
- 查询速度:影响分析查询<100ms
- LLM增强价值:关系语义描述的有用性评分>4/5
7. 应用场景示例
场景1:代码审查助手
# 审查一个PR,分析影响范围
analyzer = ImpactAnalyzer(graph_store)
impact = analyzer.analyze_function_impact('auth.py:45:validate_token')
print(f"修改此函数将影响 {impact['impact_scope']} 个其他函数")
print(f"涉及文件: {', '.join(impact['affected_files'])}")
场景2:重构规划
# 计划重构User类,查看所有相关操作
tracer = DataFlowTracer(graph_store)
crud = tracer.find_crud_operations('User')
print(f"创建User的方法: {[f.name for f in crud['create']]}")
print(f"读取User的方法: {[f.name for f in crud['read']]}")
场景3:知识图谱问答
# "修改登录逻辑会影响哪些API端点?"
search_engine = GraphEnhancedSearchEngine(vector_store, graph_store)
# 先找到登录函数
login_func = search_engine.search("user login authentication")[0]
# 追踪调用链
analyzer = ImpactAnalyzer(graph_store)
impact = analyzer.analyze_function_impact(login_func.node_id)
# 筛选出API端点
api_endpoints = [
f for f in impact['affected_functions']
if '@app.route' in graph_store.get_node(f).modifiers
]
8. 参考资料
- LLVM Call Graph
- Sourcegraph - Code Intelligence
- CodeQL - Semantic Code Analysis
- Neo4j Graph Database
- Tree-sitter AST Queries