feat: Implement executor assignment and clustering optimizations for session management

This commit is contained in:
catlog22
2025-12-20 11:29:16 +08:00
parent e1cac5dd50
commit 4de4db3c69
3 changed files with 194 additions and 33 deletions

View File

@@ -301,13 +301,65 @@ export class SessionClusteringService {
return intersection.size / union.size;
}
/**
* Find the most relevant existing cluster for a set of session IDs
* Returns the cluster with highest session overlap
*/
private findExistingClusterForSessions(sessionIds: string[]): SessionCluster | null {
if (sessionIds.length === 0) return null;
const clusterCounts = new Map<string, number>();
let maxCount = 0;
let bestClusterId: string | null = null;
for (const sessionId of sessionIds) {
const clusters = this.coreMemoryStore.getSessionClusters(sessionId);
for (const cluster of clusters) {
if (cluster.status !== 'active') continue;
const count = (clusterCounts.get(cluster.id) || 0) + 1;
clusterCounts.set(cluster.id, count);
if (count > maxCount) {
maxCount = count;
bestClusterId = cluster.id;
}
}
}
if (bestClusterId) {
return this.coreMemoryStore.getCluster(bestClusterId);
}
return null;
}
/**
* Determine if a new cluster should merge with an existing one
* Based on 70% session overlap threshold
*/
private shouldMergeWithExisting(newClusterSessions: SessionMetadataCache[], existingCluster: SessionCluster): boolean {
const MERGE_THRESHOLD = 0.7;
const existingMembers = this.coreMemoryStore.getClusterMembers(existingCluster.id);
const newSessionIds = new Set(newClusterSessions.map(s => s.session_id));
const existingSessionIds = new Set(existingMembers.map(m => m.session_id));
if (newSessionIds.size === 0) return false;
const intersection = new Set([...newSessionIds].filter(id => existingSessionIds.has(id)));
const overlapRatio = intersection.size / newSessionIds.size;
return overlapRatio > MERGE_THRESHOLD;
}
/**
* Run auto-clustering algorithm
* Optimized to prevent duplicate clusters by checking existing clusters first
*/
async autocluster(options?: ClusteringOptions): Promise<ClusteringResult> {
// 1. Collect sessions
const sessions = await this.collectSessions(options);
console.log(`[Clustering] Collected ${sessions.length} sessions`);
// 1. Collect only unclustered sessions to prevent re-clustering
const sessions = await this.collectSessions({ ...options, scope: 'unclustered' });
console.log(`[Clustering] Collected ${sessions.length} unclustered sessions`);
// 2. Update metadata cache
for (const session of sessions) {
@@ -327,43 +379,88 @@ export class SessionClusteringService {
}
// 4. Agglomerative clustering
const clusters = this.agglomerativeClustering(sessions, relevanceMatrix, CLUSTER_THRESHOLD);
console.log(`[Clustering] Generated ${clusters.length} clusters`);
const minClusterSize = options?.minClusterSize || 2;
// 5. Create session_clusters
// Early return if not enough sessions
if (sessions.length < minClusterSize) {
console.log('[Clustering] Not enough unclustered sessions to form new clusters');
return { clustersCreated: 0, sessionsProcessed: sessions.length, sessionsClustered: 0 };
}
const newPotentialClusters = this.agglomerativeClustering(sessions, relevanceMatrix, CLUSTER_THRESHOLD);
console.log(`[Clustering] Generated ${newPotentialClusters.length} potential clusters`);
// 5. Process clusters: create new or merge with existing
let clustersCreated = 0;
let clustersMerged = 0;
let sessionsClustered = 0;
for (const cluster of clusters) {
if (cluster.length < (options?.minClusterSize || 2)) {
for (const clusterSessions of newPotentialClusters) {
if (clusterSessions.length < minClusterSize) {
continue; // Skip small clusters
}
const clusterName = this.generateClusterName(cluster);
const clusterIntent = this.generateClusterIntent(cluster);
const sessionIds = clusterSessions.map(s => s.session_id);
const existingCluster = this.findExistingClusterForSessions(sessionIds);
const clusterRecord = this.coreMemoryStore.createCluster({
name: clusterName,
description: `Auto-generated cluster with ${cluster.length} sessions`,
intent: clusterIntent,
status: 'active'
});
// Check if we should merge with an existing cluster
if (existingCluster && this.shouldMergeWithExisting(clusterSessions, existingCluster)) {
const existingMembers = this.coreMemoryStore.getClusterMembers(existingCluster.id);
const existingSessionIds = new Set(existingMembers.map(m => m.session_id));
// Add members
cluster.forEach((session, index) => {
this.coreMemoryStore.addClusterMember({
cluster_id: clusterRecord.id,
session_id: session.session_id,
session_type: session.session_type as 'core_memory' | 'workflow' | 'cli_history' | 'native',
sequence_order: index + 1,
relevance_score: 1.0 // TODO: Calculate based on centrality
// Only add sessions not already in the cluster
const newSessions = clusterSessions.filter(s => !existingSessionIds.has(s.session_id));
if (newSessions.length > 0) {
newSessions.forEach((session, index) => {
this.coreMemoryStore.addClusterMember({
cluster_id: existingCluster.id,
session_id: session.session_id,
session_type: session.session_type as 'core_memory' | 'workflow' | 'cli_history' | 'native',
sequence_order: existingMembers.length + index + 1,
relevance_score: 1.0
});
});
// Update cluster description
this.coreMemoryStore.updateCluster(existingCluster.id, {
description: `Auto-generated cluster with ${existingMembers.length + newSessions.length} sessions`
});
clustersMerged++;
sessionsClustered += newSessions.length;
console.log(`[Clustering] Merged ${newSessions.length} sessions into existing cluster '${existingCluster.name}'`);
}
} else {
// Create new cluster
const clusterName = this.generateClusterName(clusterSessions);
const clusterIntent = this.generateClusterIntent(clusterSessions);
const clusterRecord = this.coreMemoryStore.createCluster({
name: clusterName,
description: `Auto-generated cluster with ${clusterSessions.length} sessions`,
intent: clusterIntent,
status: 'active'
});
});
clustersCreated++;
sessionsClustered += cluster.length;
// Add members
clusterSessions.forEach((session, index) => {
this.coreMemoryStore.addClusterMember({
cluster_id: clusterRecord.id,
session_id: session.session_id,
session_type: session.session_type as 'core_memory' | 'workflow' | 'cli_history' | 'native',
sequence_order: index + 1,
relevance_score: 1.0
});
});
clustersCreated++;
sessionsClustered += clusterSessions.length;
}
}
console.log(`[Clustering] Summary: ${clustersCreated} created, ${clustersMerged} merged`);
return {
clustersCreated,
sessionsProcessed: sessions.length,