feat: Add comprehensive tests for contentPattern and glob pattern matching

- Implemented final verification tests for contentPattern to validate behavior with empty strings, dangerous patterns, and normal patterns.
- Created glob pattern matching tests to verify regex conversion and matching functionality.
- Developed infinite loop risk tests using Worker threads to isolate potential blocking operations.
- Introduced optimized contentPattern tests to validate improvements in the findMatches function.
- Added verification tests to assess the effectiveness of contentPattern optimizations.
- Conducted safety tests for contentPattern to identify edge cases and potential vulnerabilities.
- Implemented unrestricted loop tests to analyze infinite loop risks without match limits.
- Developed tests for zero-width pattern detection logic to ensure proper handling of dangerous regex patterns.
This commit is contained in:
catlog22
2026-02-09 11:13:01 +08:00
parent dfe153778c
commit 964292ebdb
62 changed files with 7588 additions and 374 deletions

View File

@@ -1101,6 +1101,140 @@ def lsp_status(
console.print(f" Initialized: {probe.get('initialized')}")
@app.command(name="reranker-status")
def reranker_status(
probe: bool = typer.Option(
False,
"--probe",
help="Send a small rerank request to validate connectivity and credentials.",
),
provider: Optional[str] = typer.Option(
None,
"--provider",
help="Reranker provider: siliconflow | cohere | jina (default: from env, else siliconflow).",
),
api_base: Optional[str] = typer.Option(
None,
"--api-base",
help="Override API base URL (e.g. https://api.siliconflow.cn or https://api.cohere.ai).",
),
model: Optional[str] = typer.Option(
None,
"--model",
help="Override reranker model name (provider-specific).",
),
query: str = typer.Option("ping", "--query", help="Probe query text (used with --probe)."),
document: str = typer.Option("pong", "--document", help="Probe document text (used with --probe)."),
json_mode: bool = typer.Option(False, "--json", help="Output JSON response."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable debug logging."),
) -> None:
"""Show reranker configuration and optionally probe the API backend.
This is the fastest way to confirm that "重排" can actually execute end-to-end.
"""
_configure_logging(verbose, json_mode)
import time
from codexlens.env_config import load_global_env
from codexlens.semantic.reranker.api_reranker import (
APIReranker,
_normalize_api_base_for_endpoint,
)
env = load_global_env()
def _env_get(key: str) -> Optional[str]:
return (
os.environ.get(key)
or os.environ.get(f"CODEXLENS_{key}")
or env.get(key)
or env.get(f"CODEXLENS_{key}")
)
effective_provider = (provider or _env_get("RERANKER_PROVIDER") or "siliconflow").strip()
effective_api_base = (api_base or _env_get("RERANKER_API_BASE") or "").strip() or None
effective_model = (model or _env_get("RERANKER_MODEL") or "").strip() or None
# Do not leak secrets; only report whether a key is configured.
key_present = bool((_env_get("RERANKER_API_KEY") or "").strip())
provider_key = effective_provider.strip().lower()
defaults = getattr(APIReranker, "_PROVIDER_DEFAULTS", {}).get(provider_key, {})
endpoint = defaults.get("endpoint", "/v1/rerank")
configured_base = effective_api_base or defaults.get("api_base") or ""
normalized_base = _normalize_api_base_for_endpoint(api_base=configured_base, endpoint=endpoint)
payload: Dict[str, Any] = {
"provider": effective_provider,
"api_base": effective_api_base,
"endpoint": endpoint,
"normalized_api_base": normalized_base or None,
"request_url": f"{normalized_base}{endpoint}" if normalized_base else None,
"model": effective_model,
"api_key_configured": key_present,
"probe": None,
}
if probe:
t0 = time.perf_counter()
try:
reranker = APIReranker(
provider=effective_provider,
api_base=effective_api_base,
model_name=effective_model,
)
try:
scores = reranker.score_pairs([(query, document)])
finally:
reranker.close()
resolved_base = getattr(reranker, "api_base", None)
resolved_endpoint = getattr(reranker, "endpoint", None)
request_url = (
f"{resolved_base}{resolved_endpoint}"
if resolved_base and resolved_endpoint
else None
)
payload["probe"] = {
"ok": True,
"latency_ms": (time.perf_counter() - t0) * 1000.0,
"score": float(scores[0]) if scores else None,
"normalized_api_base": resolved_base,
"request_url": request_url,
}
except Exception as exc:
payload["probe"] = {
"ok": False,
"latency_ms": (time.perf_counter() - t0) * 1000.0,
"error": f"{type(exc).__name__}: {exc}",
}
if json_mode:
print_json(success=True, result=payload)
return
console.print("[bold]CodexLens Reranker Status[/bold]")
console.print(f" Provider: {payload['provider']}")
console.print(f" API Base: {payload['api_base'] or '(default)'}")
if payload.get("normalized_api_base"):
console.print(f" API Base (normalized): {payload['normalized_api_base']}")
console.print(f" Endpoint: {payload.get('endpoint')}")
if payload.get("request_url"):
console.print(f" Request URL: {payload['request_url']}")
console.print(f" Model: {payload['model'] or '(default)'}")
console.print(f" API Key: {'set' if key_present else 'missing'}")
if payload["probe"] is not None:
probe_payload = payload["probe"]
console.print("\n[bold]Probe:[/bold]")
if probe_payload.get("ok"):
console.print(f" ✓ OK ({probe_payload.get('latency_ms'):.1f}ms)")
console.print(f" Score: {probe_payload.get('score')}")
else:
console.print(f" ✗ Failed ({probe_payload.get('latency_ms'):.1f}ms)")
console.print(f" {probe_payload.get('error')}")
@app.command()
def projects(
action: str = typer.Argument("list", help="Action: list, show, remove"),

View File

@@ -79,11 +79,33 @@ class HDBSCANStrategy(BaseClusteringStrategy):
# Return each result as its own singleton cluster
return [[i] for i in range(n_results)]
metric = self.config.metric
data = embeddings
# Some hdbscan builds do not recognize metric="cosine" even though it's a
# common need for embedding clustering. In that case, compute a precomputed
# cosine distance matrix and run HDBSCAN with metric="precomputed".
if metric == "cosine":
try:
from sklearn.metrics import pairwise_distances
data = pairwise_distances(embeddings, metric="cosine")
# Some hdbscan builds are strict about dtype for precomputed distances.
# Ensure float64 to avoid Buffer dtype mismatch errors.
try:
data = data.astype("float64", copy=False)
except Exception:
pass
metric = "precomputed"
except Exception:
# If we cannot compute distances, fall back to euclidean over raw vectors.
metric = "euclidean"
# Configure HDBSCAN clusterer
clusterer = hdbscan.HDBSCAN(
min_cluster_size=self.config.min_cluster_size,
min_samples=self.config.min_samples,
metric=self.config.metric,
metric=metric,
cluster_selection_epsilon=self.config.cluster_selection_epsilon,
allow_single_cluster=self.config.allow_single_cluster,
prediction_data=self.config.prediction_data,
@@ -91,7 +113,7 @@ class HDBSCANStrategy(BaseClusteringStrategy):
# Fit and get cluster labels
# Labels: -1 = noise, 0+ = cluster index
labels = clusterer.fit_predict(embeddings)
labels = clusterer.fit_predict(data)
# Group indices by cluster label
cluster_map: dict[int, list[int]] = {}

View File

@@ -22,16 +22,52 @@ logger = logging.getLogger(__name__)
_DEFAULT_ENV_API_KEY = "RERANKER_API_KEY"
def _normalize_api_base_for_endpoint(*, api_base: str, endpoint: str) -> str:
"""Normalize api_base to avoid duplicated version paths (e.g. /v1/v1/...).
httpx joins base_url paths with request paths even when the request path
starts with a leading slash. This means:
base_url="https://host/v1" + endpoint="/v1/rerank"
-> "https://host/v1/v1/rerank"
Many users configure OpenAI-style bases with a trailing "/v1", so we
defensively strip that suffix when the endpoint already includes "/v1/".
"""
cleaned = (api_base or "").strip().rstrip("/")
if not cleaned:
return cleaned
endpoint_clean = endpoint or ""
# If api_base already includes the endpoint suffix (e.g. api_base ends with "/v1/rerank"),
# strip it so we don't end up with ".../v1/rerank/v1/rerank".
if endpoint_clean.startswith("/") and cleaned.lower().endswith(endpoint_clean.lower()):
return cleaned[: -len(endpoint_clean)]
# Strip a trailing "/v1" if endpoint already includes "/v1/...".
if endpoint_clean.startswith("/v1/") and cleaned.lower().endswith("/v1"):
return cleaned[:-3]
return cleaned
def _get_env_with_fallback(key: str, workspace_root: Path | None = None) -> str | None:
"""Get environment variable with .env file fallback."""
# Check os.environ first
if key in os.environ:
return os.environ[key]
prefixed_key = f"CODEXLENS_{key}"
if prefixed_key in os.environ:
return os.environ[prefixed_key]
# Try loading from .env files
try:
from codexlens.env_config import get_env
return get_env(key, workspace_root=workspace_root)
value = get_env(key, workspace_root=workspace_root)
if value is not None:
return value
return get_env(prefixed_key, workspace_root=workspace_root)
except ImportError:
return None
@@ -99,8 +135,11 @@ class APIReranker(BaseReranker):
# Load api_base from env with .env fallback
env_api_base = _get_env_with_fallback("RERANKER_API_BASE", self._workspace_root)
self.api_base = (api_base or env_api_base or defaults["api_base"]).strip().rstrip("/")
self.endpoint = defaults["endpoint"]
self.api_base = _normalize_api_base_for_endpoint(
api_base=(api_base or env_api_base or defaults["api_base"]),
endpoint=self.endpoint,
)
# Load model from env with .env fallback
env_model = _get_env_with_fallback("RERANKER_MODEL", self._workspace_root)