mirror of
https://github.com/catlog22/Claude-Code-Workflow.git
synced 2026-02-09 02:24:11 +08:00
feat(codex-lens): add unified reranker architecture and file watcher
Unified Reranker Architecture: - Add BaseReranker ABC with factory pattern - Implement 4 backends: ONNX (default), API, LiteLLM, Legacy - Add .env configuration parsing for API credentials - Migrate from sentence-transformers to optimum+onnxruntime File Watcher Module: - Add real-time file system monitoring with watchdog - Implement IncrementalIndexer for single-file updates - Add WatcherManager with signal handling and graceful shutdown - Add 'codexlens watch' CLI command - Event filtering, debouncing, and deduplication - Thread-safe design with proper resource cleanup Tests: 16 watcher tests + 5 reranker test files 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
171
codex-lens/tests/test_api_reranker.py
Normal file
171
codex-lens/tests/test_api_reranker.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""Tests for APIReranker backend."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import types
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from codexlens.semantic.reranker import get_reranker
|
||||
from codexlens.semantic.reranker.api_reranker import APIReranker
|
||||
|
||||
|
||||
class DummyResponse:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
status_code: int = 200,
|
||||
json_data: Any = None,
|
||||
text: str = "",
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> None:
|
||||
self.status_code = int(status_code)
|
||||
self._json_data = json_data
|
||||
self.text = text
|
||||
self.headers = headers or {}
|
||||
|
||||
def json(self) -> Any:
|
||||
return self._json_data
|
||||
|
||||
|
||||
class DummyClient:
|
||||
def __init__(self, *, base_url: str | None = None, headers: dict[str, str] | None = None, timeout: float | None = None) -> None:
|
||||
self.base_url = base_url
|
||||
self.headers = headers or {}
|
||||
self.timeout = timeout
|
||||
self.closed = False
|
||||
self.calls: list[dict[str, Any]] = []
|
||||
self._responses: list[DummyResponse] = []
|
||||
|
||||
def queue(self, response: DummyResponse) -> None:
|
||||
self._responses.append(response)
|
||||
|
||||
def post(self, endpoint: str, *, json: dict[str, Any] | None = None) -> DummyResponse:
|
||||
self.calls.append({"endpoint": endpoint, "json": json})
|
||||
if not self._responses:
|
||||
raise AssertionError("DummyClient has no queued responses")
|
||||
return self._responses.pop(0)
|
||||
|
||||
def close(self) -> None:
|
||||
self.closed = True
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def httpx_clients(monkeypatch: pytest.MonkeyPatch) -> list[DummyClient]:
|
||||
clients: list[DummyClient] = []
|
||||
|
||||
dummy_httpx = types.ModuleType("httpx")
|
||||
|
||||
def Client(*, base_url: str | None = None, headers: dict[str, str] | None = None, timeout: float | None = None) -> DummyClient:
|
||||
client = DummyClient(base_url=base_url, headers=headers, timeout=timeout)
|
||||
clients.append(client)
|
||||
return client
|
||||
|
||||
dummy_httpx.Client = Client
|
||||
monkeypatch.setitem(sys.modules, "httpx", dummy_httpx)
|
||||
|
||||
return clients
|
||||
|
||||
|
||||
def test_api_reranker_requires_api_key(
|
||||
monkeypatch: pytest.MonkeyPatch, httpx_clients: list[DummyClient]
|
||||
) -> None:
|
||||
monkeypatch.delenv("RERANKER_API_KEY", raising=False)
|
||||
|
||||
with pytest.raises(ValueError, match="Missing API key"):
|
||||
APIReranker()
|
||||
|
||||
assert httpx_clients == []
|
||||
|
||||
|
||||
def test_api_reranker_reads_api_key_from_env(
|
||||
monkeypatch: pytest.MonkeyPatch, httpx_clients: list[DummyClient]
|
||||
) -> None:
|
||||
monkeypatch.setenv("RERANKER_API_KEY", "test-key")
|
||||
|
||||
reranker = APIReranker()
|
||||
assert len(httpx_clients) == 1
|
||||
assert httpx_clients[0].headers["Authorization"] == "Bearer test-key"
|
||||
reranker.close()
|
||||
assert httpx_clients[0].closed is True
|
||||
|
||||
|
||||
def test_api_reranker_scores_pairs_siliconflow(
|
||||
monkeypatch: pytest.MonkeyPatch, httpx_clients: list[DummyClient]
|
||||
) -> None:
|
||||
monkeypatch.delenv("RERANKER_API_KEY", raising=False)
|
||||
|
||||
reranker = APIReranker(api_key="k", provider="siliconflow")
|
||||
client = httpx_clients[0]
|
||||
|
||||
client.queue(
|
||||
DummyResponse(
|
||||
json_data={
|
||||
"results": [
|
||||
{"index": 0, "relevance_score": 0.9},
|
||||
{"index": 1, "relevance_score": 0.1},
|
||||
]
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
scores = reranker.score_pairs([("q", "d1"), ("q", "d2")])
|
||||
assert scores == pytest.approx([0.9, 0.1])
|
||||
|
||||
assert client.calls[0]["endpoint"] == "/v1/rerank"
|
||||
payload = client.calls[0]["json"]
|
||||
assert payload["model"] == "BAAI/bge-reranker-v2-m3"
|
||||
assert payload["query"] == "q"
|
||||
assert payload["documents"] == ["d1", "d2"]
|
||||
assert payload["top_n"] == 2
|
||||
assert payload["return_documents"] is False
|
||||
|
||||
|
||||
def test_api_reranker_retries_on_5xx(
|
||||
monkeypatch: pytest.MonkeyPatch, httpx_clients: list[DummyClient]
|
||||
) -> None:
|
||||
monkeypatch.setenv("RERANKER_API_KEY", "k")
|
||||
|
||||
from codexlens.semantic.reranker import api_reranker as api_reranker_module
|
||||
|
||||
monkeypatch.setattr(api_reranker_module.time, "sleep", lambda *_args, **_kwargs: None)
|
||||
|
||||
reranker = APIReranker(max_retries=1)
|
||||
client = httpx_clients[0]
|
||||
|
||||
client.queue(DummyResponse(status_code=500, text="oops", json_data={"error": "oops"}))
|
||||
client.queue(
|
||||
DummyResponse(
|
||||
json_data={"results": [{"index": 0, "relevance_score": 0.7}]},
|
||||
)
|
||||
)
|
||||
|
||||
scores = reranker.score_pairs([("q", "d")])
|
||||
assert scores == pytest.approx([0.7])
|
||||
assert len(client.calls) == 2
|
||||
|
||||
|
||||
def test_api_reranker_unauthorized_raises(
|
||||
monkeypatch: pytest.MonkeyPatch, httpx_clients: list[DummyClient]
|
||||
) -> None:
|
||||
monkeypatch.setenv("RERANKER_API_KEY", "k")
|
||||
|
||||
reranker = APIReranker()
|
||||
client = httpx_clients[0]
|
||||
client.queue(DummyResponse(status_code=401, text="unauthorized"))
|
||||
|
||||
with pytest.raises(RuntimeError, match="unauthorized"):
|
||||
reranker.score_pairs([("q", "d")])
|
||||
|
||||
|
||||
def test_factory_api_backend_constructs_reranker(
|
||||
monkeypatch: pytest.MonkeyPatch, httpx_clients: list[DummyClient]
|
||||
) -> None:
|
||||
monkeypatch.setenv("RERANKER_API_KEY", "k")
|
||||
|
||||
reranker = get_reranker(backend="api")
|
||||
assert isinstance(reranker, APIReranker)
|
||||
assert len(httpx_clients) == 1
|
||||
|
||||
139
codex-lens/tests/test_hybrid_search_reranker_backend.py
Normal file
139
codex-lens/tests/test_hybrid_search_reranker_backend.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Tests for HybridSearchEngine reranker backend selection."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from codexlens.config import Config
|
||||
from codexlens.search.hybrid_search import HybridSearchEngine
|
||||
|
||||
|
||||
def test_get_cross_encoder_reranker_uses_factory_backend_legacy(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
tmp_path,
|
||||
) -> None:
|
||||
calls: dict[str, object] = {}
|
||||
|
||||
def fake_check_reranker_available(backend: str):
|
||||
calls["check_backend"] = backend
|
||||
return True, None
|
||||
|
||||
sentinel = object()
|
||||
|
||||
def fake_get_reranker(*, backend: str, model_name=None, device=None, **kwargs):
|
||||
calls["get_args"] = {
|
||||
"backend": backend,
|
||||
"model_name": model_name,
|
||||
"device": device,
|
||||
"kwargs": kwargs,
|
||||
}
|
||||
return sentinel
|
||||
|
||||
monkeypatch.setattr(
|
||||
"codexlens.semantic.reranker.check_reranker_available",
|
||||
fake_check_reranker_available,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"codexlens.semantic.reranker.get_reranker",
|
||||
fake_get_reranker,
|
||||
)
|
||||
|
||||
config = Config(
|
||||
data_dir=tmp_path / "legacy",
|
||||
enable_reranking=True,
|
||||
enable_cross_encoder_rerank=True,
|
||||
reranker_backend="legacy",
|
||||
reranker_model="dummy-model",
|
||||
)
|
||||
engine = HybridSearchEngine(config=config)
|
||||
|
||||
reranker = engine._get_cross_encoder_reranker()
|
||||
assert reranker is sentinel
|
||||
assert calls["check_backend"] == "legacy"
|
||||
|
||||
get_args = calls["get_args"]
|
||||
assert isinstance(get_args, dict)
|
||||
assert get_args["backend"] == "legacy"
|
||||
assert get_args["model_name"] == "dummy-model"
|
||||
assert get_args["device"] is None
|
||||
|
||||
|
||||
def test_get_cross_encoder_reranker_uses_factory_backend_onnx_gpu_flag(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
tmp_path,
|
||||
) -> None:
|
||||
calls: dict[str, object] = {}
|
||||
|
||||
def fake_check_reranker_available(backend: str):
|
||||
calls["check_backend"] = backend
|
||||
return True, None
|
||||
|
||||
sentinel = object()
|
||||
|
||||
def fake_get_reranker(*, backend: str, model_name=None, device=None, **kwargs):
|
||||
calls["get_args"] = {
|
||||
"backend": backend,
|
||||
"model_name": model_name,
|
||||
"device": device,
|
||||
"kwargs": kwargs,
|
||||
}
|
||||
return sentinel
|
||||
|
||||
monkeypatch.setattr(
|
||||
"codexlens.semantic.reranker.check_reranker_available",
|
||||
fake_check_reranker_available,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"codexlens.semantic.reranker.get_reranker",
|
||||
fake_get_reranker,
|
||||
)
|
||||
|
||||
config = Config(
|
||||
data_dir=tmp_path / "onnx",
|
||||
enable_reranking=True,
|
||||
enable_cross_encoder_rerank=True,
|
||||
reranker_backend="onnx",
|
||||
embedding_use_gpu=False,
|
||||
)
|
||||
engine = HybridSearchEngine(config=config)
|
||||
|
||||
reranker = engine._get_cross_encoder_reranker()
|
||||
assert reranker is sentinel
|
||||
assert calls["check_backend"] == "onnx"
|
||||
|
||||
get_args = calls["get_args"]
|
||||
assert isinstance(get_args, dict)
|
||||
assert get_args["backend"] == "onnx"
|
||||
assert get_args["model_name"] is None
|
||||
assert get_args["device"] is None
|
||||
assert get_args["kwargs"]["use_gpu"] is False
|
||||
|
||||
|
||||
def test_get_cross_encoder_reranker_returns_none_when_backend_unavailable(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
tmp_path,
|
||||
) -> None:
|
||||
def fake_check_reranker_available(backend: str):
|
||||
return False, "missing deps"
|
||||
|
||||
def fake_get_reranker(*args, **kwargs):
|
||||
raise AssertionError("get_reranker should not be called when backend is unavailable")
|
||||
|
||||
monkeypatch.setattr(
|
||||
"codexlens.semantic.reranker.check_reranker_available",
|
||||
fake_check_reranker_available,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"codexlens.semantic.reranker.get_reranker",
|
||||
fake_get_reranker,
|
||||
)
|
||||
|
||||
config = Config(
|
||||
data_dir=tmp_path / "unavailable",
|
||||
enable_reranking=True,
|
||||
enable_cross_encoder_rerank=True,
|
||||
reranker_backend="onnx",
|
||||
)
|
||||
engine = HybridSearchEngine(config=config)
|
||||
|
||||
assert engine._get_cross_encoder_reranker() is None
|
||||
85
codex-lens/tests/test_litellm_reranker.py
Normal file
85
codex-lens/tests/test_litellm_reranker.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Tests for LiteLLMReranker (LLM-based reranking)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import types
|
||||
from dataclasses import dataclass
|
||||
|
||||
import pytest
|
||||
|
||||
from codexlens.semantic.reranker.litellm_reranker import LiteLLMReranker
|
||||
|
||||
|
||||
def _install_dummy_ccw_litellm(
|
||||
monkeypatch: pytest.MonkeyPatch, *, responses: list[str]
|
||||
) -> None:
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class ChatMessage:
|
||||
role: str
|
||||
content: str
|
||||
|
||||
class LiteLLMClient:
|
||||
def __init__(self, model: str = "default", **kwargs) -> None:
|
||||
self.model = model
|
||||
self.kwargs = kwargs
|
||||
self._responses = list(responses)
|
||||
self.calls: list[list[ChatMessage]] = []
|
||||
|
||||
def chat(self, messages, **kwargs):
|
||||
self.calls.append(list(messages))
|
||||
content = self._responses.pop(0) if self._responses else ""
|
||||
return types.SimpleNamespace(content=content)
|
||||
|
||||
dummy = types.ModuleType("ccw_litellm")
|
||||
dummy.ChatMessage = ChatMessage
|
||||
dummy.LiteLLMClient = LiteLLMClient
|
||||
monkeypatch.setitem(sys.modules, "ccw_litellm", dummy)
|
||||
|
||||
|
||||
def test_score_pairs_parses_numbers_and_normalizes_scales(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
_install_dummy_ccw_litellm(monkeypatch, responses=["0.73", "7", "80"])
|
||||
|
||||
reranker = LiteLLMReranker(model="dummy")
|
||||
scores = reranker.score_pairs([("q", "d1"), ("q", "d2"), ("q", "d3")])
|
||||
assert scores == pytest.approx([0.73, 0.7, 0.8])
|
||||
|
||||
|
||||
def test_score_pairs_parses_json_score_field(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
_install_dummy_ccw_litellm(monkeypatch, responses=['{"score": 0.42}'])
|
||||
|
||||
reranker = LiteLLMReranker(model="dummy")
|
||||
scores = reranker.score_pairs([("q", "d")])
|
||||
assert scores == pytest.approx([0.42])
|
||||
|
||||
|
||||
def test_score_pairs_uses_default_score_on_parse_failure(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
_install_dummy_ccw_litellm(monkeypatch, responses=["N/A"])
|
||||
|
||||
reranker = LiteLLMReranker(model="dummy", default_score=0.123)
|
||||
scores = reranker.score_pairs([("q", "d")])
|
||||
assert scores == pytest.approx([0.123])
|
||||
|
||||
|
||||
def test_rate_limiting_sleeps_between_requests(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
_install_dummy_ccw_litellm(monkeypatch, responses=["0.1", "0.2"])
|
||||
|
||||
reranker = LiteLLMReranker(model="dummy", min_interval_seconds=1.0)
|
||||
|
||||
import codexlens.semantic.reranker.litellm_reranker as litellm_reranker_module
|
||||
|
||||
sleeps: list[float] = []
|
||||
times = iter([100.0, 100.0, 100.1, 100.1])
|
||||
|
||||
monkeypatch.setattr(litellm_reranker_module.time, "monotonic", lambda: next(times))
|
||||
monkeypatch.setattr(
|
||||
litellm_reranker_module.time, "sleep", lambda seconds: sleeps.append(seconds)
|
||||
)
|
||||
|
||||
_ = reranker.score_pairs([("q", "d1"), ("q", "d2")])
|
||||
assert sleeps == pytest.approx([0.9])
|
||||
|
||||
115
codex-lens/tests/test_reranker_backends.py
Normal file
115
codex-lens/tests/test_reranker_backends.py
Normal file
@@ -0,0 +1,115 @@
|
||||
"""Mocked smoke tests for all reranker backends."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import types
|
||||
from dataclasses import dataclass
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_reranker_backend_legacy_scores_pairs(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from codexlens.semantic.reranker import legacy as legacy_module
|
||||
|
||||
class DummyCrossEncoder:
|
||||
def __init__(self, model_name: str, *, device: str | None = None) -> None:
|
||||
self.model_name = model_name
|
||||
self.device = device
|
||||
self.calls: list[dict[str, object]] = []
|
||||
|
||||
def predict(self, pairs: list[tuple[str, str]], *, batch_size: int = 32) -> list[float]:
|
||||
self.calls.append({"pairs": list(pairs), "batch_size": int(batch_size)})
|
||||
return [0.5 for _ in pairs]
|
||||
|
||||
monkeypatch.setattr(legacy_module, "_CrossEncoder", DummyCrossEncoder)
|
||||
monkeypatch.setattr(legacy_module, "CROSS_ENCODER_AVAILABLE", True)
|
||||
monkeypatch.setattr(legacy_module, "_import_error", None)
|
||||
|
||||
reranker = legacy_module.CrossEncoderReranker(model_name="dummy-model", device="cpu")
|
||||
scores = reranker.score_pairs([("q", "d1"), ("q", "d2")], batch_size=0)
|
||||
assert scores == pytest.approx([0.5, 0.5])
|
||||
|
||||
|
||||
def test_reranker_backend_onnx_availability_check(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from codexlens.semantic.reranker.onnx_reranker import check_onnx_reranker_available
|
||||
|
||||
dummy_numpy = types.ModuleType("numpy")
|
||||
dummy_onnxruntime = types.ModuleType("onnxruntime")
|
||||
|
||||
dummy_optimum = types.ModuleType("optimum")
|
||||
dummy_optimum.__path__ = [] # Mark as package for submodule imports.
|
||||
dummy_optimum_ort = types.ModuleType("optimum.onnxruntime")
|
||||
dummy_optimum_ort.ORTModelForSequenceClassification = object()
|
||||
|
||||
dummy_transformers = types.ModuleType("transformers")
|
||||
dummy_transformers.AutoTokenizer = object()
|
||||
|
||||
monkeypatch.setitem(sys.modules, "numpy", dummy_numpy)
|
||||
monkeypatch.setitem(sys.modules, "onnxruntime", dummy_onnxruntime)
|
||||
monkeypatch.setitem(sys.modules, "optimum", dummy_optimum)
|
||||
monkeypatch.setitem(sys.modules, "optimum.onnxruntime", dummy_optimum_ort)
|
||||
monkeypatch.setitem(sys.modules, "transformers", dummy_transformers)
|
||||
|
||||
ok, err = check_onnx_reranker_available()
|
||||
assert ok is True
|
||||
assert err is None
|
||||
|
||||
|
||||
def test_reranker_backend_api_constructs_with_dummy_httpx(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from codexlens.semantic.reranker.api_reranker import APIReranker
|
||||
|
||||
created: list[object] = []
|
||||
|
||||
class DummyClient:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
base_url: str | None = None,
|
||||
headers: dict[str, str] | None = None,
|
||||
timeout: float | None = None,
|
||||
) -> None:
|
||||
self.base_url = base_url
|
||||
self.headers = headers or {}
|
||||
self.timeout = timeout
|
||||
self.closed = False
|
||||
created.append(self)
|
||||
|
||||
def close(self) -> None:
|
||||
self.closed = True
|
||||
|
||||
dummy_httpx = types.ModuleType("httpx")
|
||||
dummy_httpx.Client = DummyClient
|
||||
monkeypatch.setitem(sys.modules, "httpx", dummy_httpx)
|
||||
|
||||
reranker = APIReranker(api_key="k", provider="siliconflow")
|
||||
assert reranker.provider == "siliconflow"
|
||||
assert len(created) == 1
|
||||
assert created[0].headers["Authorization"] == "Bearer k"
|
||||
reranker.close()
|
||||
assert created[0].closed is True
|
||||
|
||||
|
||||
def test_reranker_backend_litellm_scores_pairs(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from codexlens.semantic.reranker.litellm_reranker import LiteLLMReranker
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class ChatMessage:
|
||||
role: str
|
||||
content: str
|
||||
|
||||
class DummyLiteLLMClient:
|
||||
def __init__(self, model: str = "default", **_kwargs: object) -> None:
|
||||
self.model = model
|
||||
|
||||
def chat(self, _messages: list[ChatMessage]) -> object:
|
||||
return types.SimpleNamespace(content="0.5")
|
||||
|
||||
dummy_litellm = types.ModuleType("ccw_litellm")
|
||||
dummy_litellm.ChatMessage = ChatMessage
|
||||
dummy_litellm.LiteLLMClient = DummyLiteLLMClient
|
||||
monkeypatch.setitem(sys.modules, "ccw_litellm", dummy_litellm)
|
||||
|
||||
reranker = LiteLLMReranker(model="dummy")
|
||||
assert reranker.score_pairs([("q", "d")]) == pytest.approx([0.5])
|
||||
|
||||
315
codex-lens/tests/test_reranker_factory.py
Normal file
315
codex-lens/tests/test_reranker_factory.py
Normal file
@@ -0,0 +1,315 @@
|
||||
"""Tests for reranker factory and availability checks."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import builtins
|
||||
import math
|
||||
import sys
|
||||
import types
|
||||
|
||||
import pytest
|
||||
|
||||
from codexlens.semantic.reranker import (
|
||||
BaseReranker,
|
||||
ONNXReranker,
|
||||
check_reranker_available,
|
||||
get_reranker,
|
||||
)
|
||||
from codexlens.semantic.reranker import legacy as legacy_module
|
||||
|
||||
|
||||
def test_public_imports_work() -> None:
|
||||
from codexlens.semantic.reranker import BaseReranker as ImportedBaseReranker
|
||||
from codexlens.semantic.reranker import get_reranker as imported_get_reranker
|
||||
|
||||
assert ImportedBaseReranker is BaseReranker
|
||||
assert imported_get_reranker is get_reranker
|
||||
|
||||
|
||||
def test_base_reranker_is_abstract() -> None:
|
||||
with pytest.raises(TypeError):
|
||||
BaseReranker() # type: ignore[abstract]
|
||||
|
||||
|
||||
def test_check_reranker_available_invalid_backend() -> None:
|
||||
ok, err = check_reranker_available("nope")
|
||||
assert ok is False
|
||||
assert "Invalid reranker backend" in (err or "")
|
||||
|
||||
|
||||
def test_get_reranker_invalid_backend_raises_value_error() -> None:
|
||||
with pytest.raises(ValueError, match="Unknown backend"):
|
||||
get_reranker("nope")
|
||||
|
||||
|
||||
def test_get_reranker_legacy_missing_dependency_raises_import_error(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
monkeypatch.setattr(legacy_module, "CROSS_ENCODER_AVAILABLE", False)
|
||||
monkeypatch.setattr(legacy_module, "_import_error", "missing sentence-transformers")
|
||||
|
||||
with pytest.raises(ImportError, match="missing sentence-transformers"):
|
||||
get_reranker(backend="legacy", model_name="dummy-model")
|
||||
|
||||
|
||||
def test_get_reranker_legacy_returns_cross_encoder_reranker(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
class DummyCrossEncoder:
|
||||
def __init__(self, model_name: str, *, device: str | None = None) -> None:
|
||||
self.model_name = model_name
|
||||
self.device = device
|
||||
self.last_batch_size: int | None = None
|
||||
|
||||
def predict(self, pairs: list[tuple[str, str]], *, batch_size: int = 32) -> list[float]:
|
||||
self.last_batch_size = int(batch_size)
|
||||
return [0.5 for _ in pairs]
|
||||
|
||||
monkeypatch.setattr(legacy_module, "_CrossEncoder", DummyCrossEncoder)
|
||||
monkeypatch.setattr(legacy_module, "CROSS_ENCODER_AVAILABLE", True)
|
||||
monkeypatch.setattr(legacy_module, "_import_error", None)
|
||||
|
||||
reranker = get_reranker(backend=" LEGACY ", model_name="dummy-model", device="cpu")
|
||||
assert isinstance(reranker, legacy_module.CrossEncoderReranker)
|
||||
|
||||
assert reranker.score_pairs([]) == []
|
||||
|
||||
scores = reranker.score_pairs([("q", "d1"), ("q", "d2")], batch_size=0)
|
||||
assert scores == pytest.approx([0.5, 0.5])
|
||||
assert reranker._model is not None
|
||||
assert reranker._model.last_batch_size == 32
|
||||
|
||||
|
||||
def test_check_reranker_available_onnx_missing_deps(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
real_import = builtins.__import__
|
||||
|
||||
def fake_import(name: str, globals=None, locals=None, fromlist=(), level: int = 0):
|
||||
if name == "onnxruntime":
|
||||
raise ImportError("no onnxruntime")
|
||||
return real_import(name, globals, locals, fromlist, level)
|
||||
|
||||
monkeypatch.setattr(builtins, "__import__", fake_import)
|
||||
|
||||
ok, err = check_reranker_available("onnx")
|
||||
assert ok is False
|
||||
assert "onnxruntime not available" in (err or "")
|
||||
|
||||
|
||||
def test_check_reranker_available_onnx_deps_present(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
dummy_onnxruntime = types.ModuleType("onnxruntime")
|
||||
dummy_optimum = types.ModuleType("optimum")
|
||||
dummy_optimum.__path__ = [] # Mark as package for submodule imports.
|
||||
dummy_optimum_ort = types.ModuleType("optimum.onnxruntime")
|
||||
dummy_optimum_ort.ORTModelForSequenceClassification = object()
|
||||
|
||||
dummy_transformers = types.ModuleType("transformers")
|
||||
dummy_transformers.AutoTokenizer = object()
|
||||
|
||||
monkeypatch.setitem(sys.modules, "onnxruntime", dummy_onnxruntime)
|
||||
monkeypatch.setitem(sys.modules, "optimum", dummy_optimum)
|
||||
monkeypatch.setitem(sys.modules, "optimum.onnxruntime", dummy_optimum_ort)
|
||||
monkeypatch.setitem(sys.modules, "transformers", dummy_transformers)
|
||||
|
||||
ok, err = check_reranker_available("onnx")
|
||||
assert ok is True
|
||||
assert err is None
|
||||
|
||||
|
||||
def test_check_reranker_available_litellm_missing_deps(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
real_import = builtins.__import__
|
||||
|
||||
def fake_import(name: str, globals=None, locals=None, fromlist=(), level: int = 0):
|
||||
if name == "ccw_litellm":
|
||||
raise ImportError("no ccw-litellm")
|
||||
return real_import(name, globals, locals, fromlist, level)
|
||||
|
||||
monkeypatch.setattr(builtins, "__import__", fake_import)
|
||||
|
||||
ok, err = check_reranker_available("litellm")
|
||||
assert ok is False
|
||||
assert "ccw-litellm not available" in (err or "")
|
||||
|
||||
|
||||
def test_check_reranker_available_litellm_deps_present(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
dummy_litellm = types.ModuleType("ccw_litellm")
|
||||
monkeypatch.setitem(sys.modules, "ccw_litellm", dummy_litellm)
|
||||
|
||||
ok, err = check_reranker_available("litellm")
|
||||
assert ok is True
|
||||
assert err is None
|
||||
|
||||
|
||||
def test_check_reranker_available_api_missing_deps(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
real_import = builtins.__import__
|
||||
|
||||
def fake_import(name: str, globals=None, locals=None, fromlist=(), level: int = 0):
|
||||
if name == "httpx":
|
||||
raise ImportError("no httpx")
|
||||
return real_import(name, globals, locals, fromlist, level)
|
||||
|
||||
monkeypatch.setattr(builtins, "__import__", fake_import)
|
||||
|
||||
ok, err = check_reranker_available("api")
|
||||
assert ok is False
|
||||
assert "httpx not available" in (err or "")
|
||||
|
||||
|
||||
def test_check_reranker_available_api_deps_present(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
dummy_httpx = types.ModuleType("httpx")
|
||||
monkeypatch.setitem(sys.modules, "httpx", dummy_httpx)
|
||||
|
||||
ok, err = check_reranker_available("api")
|
||||
assert ok is True
|
||||
assert err is None
|
||||
|
||||
|
||||
def test_get_reranker_litellm_returns_litellm_reranker(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class ChatMessage:
|
||||
role: str
|
||||
content: str
|
||||
|
||||
class DummyLiteLLMClient:
|
||||
def __init__(self, model: str = "default", **kwargs) -> None:
|
||||
self.model = model
|
||||
self.kwargs = kwargs
|
||||
|
||||
def chat(self, messages, **kwargs):
|
||||
return types.SimpleNamespace(content="0.5")
|
||||
|
||||
dummy_litellm = types.ModuleType("ccw_litellm")
|
||||
dummy_litellm.ChatMessage = ChatMessage
|
||||
dummy_litellm.LiteLLMClient = DummyLiteLLMClient
|
||||
monkeypatch.setitem(sys.modules, "ccw_litellm", dummy_litellm)
|
||||
|
||||
reranker = get_reranker(backend="litellm", model_name="dummy-model")
|
||||
|
||||
from codexlens.semantic.reranker.litellm_reranker import LiteLLMReranker
|
||||
|
||||
assert isinstance(reranker, LiteLLMReranker)
|
||||
assert reranker.score_pairs([("q", "d")]) == pytest.approx([0.5])
|
||||
|
||||
|
||||
def test_get_reranker_onnx_raises_import_error_with_dependency_hint(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
real_import = builtins.__import__
|
||||
|
||||
def fake_import(name: str, globals=None, locals=None, fromlist=(), level: int = 0):
|
||||
if name == "onnxruntime":
|
||||
raise ImportError("no onnxruntime")
|
||||
return real_import(name, globals, locals, fromlist, level)
|
||||
|
||||
monkeypatch.setattr(builtins, "__import__", fake_import)
|
||||
|
||||
with pytest.raises(ImportError) as exc:
|
||||
get_reranker(backend="onnx", model_name="any")
|
||||
|
||||
assert "onnxruntime" in str(exc.value)
|
||||
|
||||
|
||||
def test_get_reranker_default_backend_is_onnx(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
dummy_onnxruntime = types.ModuleType("onnxruntime")
|
||||
dummy_optimum = types.ModuleType("optimum")
|
||||
dummy_optimum.__path__ = [] # Mark as package for submodule imports.
|
||||
dummy_optimum_ort = types.ModuleType("optimum.onnxruntime")
|
||||
dummy_optimum_ort.ORTModelForSequenceClassification = object()
|
||||
|
||||
dummy_transformers = types.ModuleType("transformers")
|
||||
dummy_transformers.AutoTokenizer = object()
|
||||
|
||||
monkeypatch.setitem(sys.modules, "onnxruntime", dummy_onnxruntime)
|
||||
monkeypatch.setitem(sys.modules, "optimum", dummy_optimum)
|
||||
monkeypatch.setitem(sys.modules, "optimum.onnxruntime", dummy_optimum_ort)
|
||||
monkeypatch.setitem(sys.modules, "transformers", dummy_transformers)
|
||||
|
||||
reranker = get_reranker()
|
||||
assert isinstance(reranker, ONNXReranker)
|
||||
|
||||
|
||||
def test_onnx_reranker_scores_pairs_with_sigmoid_normalization(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
import numpy as np
|
||||
|
||||
dummy_onnxruntime = types.ModuleType("onnxruntime")
|
||||
|
||||
dummy_optimum = types.ModuleType("optimum")
|
||||
dummy_optimum.__path__ = [] # Mark as package for submodule imports.
|
||||
dummy_optimum_ort = types.ModuleType("optimum.onnxruntime")
|
||||
|
||||
class DummyModelOutput:
|
||||
def __init__(self, logits: np.ndarray) -> None:
|
||||
self.logits = logits
|
||||
|
||||
class DummyModel:
|
||||
input_names = ["input_ids", "attention_mask"]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.calls: list[int] = []
|
||||
self._next_logit = 0
|
||||
|
||||
def __call__(self, **inputs):
|
||||
batch = int(inputs["input_ids"].shape[0])
|
||||
start = self._next_logit
|
||||
self._next_logit += batch
|
||||
self.calls.append(batch)
|
||||
logits = np.arange(start, start + batch, dtype=np.float32).reshape(batch, 1)
|
||||
return DummyModelOutput(logits=logits)
|
||||
|
||||
class DummyORTModelForSequenceClassification:
|
||||
@classmethod
|
||||
def from_pretrained(cls, model_name: str, providers=None, **kwargs):
|
||||
_ = model_name, providers, kwargs
|
||||
return DummyModel()
|
||||
|
||||
dummy_optimum_ort.ORTModelForSequenceClassification = DummyORTModelForSequenceClassification
|
||||
|
||||
dummy_transformers = types.ModuleType("transformers")
|
||||
|
||||
class DummyAutoTokenizer:
|
||||
model_max_length = 512
|
||||
|
||||
@classmethod
|
||||
def from_pretrained(cls, model_name: str, **kwargs):
|
||||
_ = model_name, kwargs
|
||||
return cls()
|
||||
|
||||
def __call__(self, *, text, text_pair, return_tensors, **kwargs):
|
||||
_ = text_pair, kwargs
|
||||
assert return_tensors == "np"
|
||||
batch = len(text)
|
||||
# Include token_type_ids to ensure input filtering is exercised.
|
||||
return {
|
||||
"input_ids": np.zeros((batch, 4), dtype=np.int64),
|
||||
"attention_mask": np.ones((batch, 4), dtype=np.int64),
|
||||
"token_type_ids": np.zeros((batch, 4), dtype=np.int64),
|
||||
}
|
||||
|
||||
dummy_transformers.AutoTokenizer = DummyAutoTokenizer
|
||||
|
||||
monkeypatch.setitem(sys.modules, "onnxruntime", dummy_onnxruntime)
|
||||
monkeypatch.setitem(sys.modules, "optimum", dummy_optimum)
|
||||
monkeypatch.setitem(sys.modules, "optimum.onnxruntime", dummy_optimum_ort)
|
||||
monkeypatch.setitem(sys.modules, "transformers", dummy_transformers)
|
||||
|
||||
reranker = get_reranker(backend="onnx", model_name="dummy-model", use_gpu=False)
|
||||
assert isinstance(reranker, ONNXReranker)
|
||||
assert reranker._model is None
|
||||
|
||||
pairs = [("q", f"d{idx}") for idx in range(5)]
|
||||
scores = reranker.score_pairs(pairs, batch_size=2)
|
||||
|
||||
assert reranker._model is not None
|
||||
assert reranker._model.calls == [2, 2, 1]
|
||||
assert len(scores) == len(pairs)
|
||||
assert all(0.0 <= s <= 1.0 for s in scores)
|
||||
|
||||
expected = [1.0 / (1.0 + math.exp(-float(i))) for i in range(len(pairs))]
|
||||
assert scores == pytest.approx(expected, rel=1e-6, abs=1e-6)
|
||||
1
codex-lens/tests/test_watcher/__init__.py
Normal file
1
codex-lens/tests/test_watcher/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for watcher module."""
|
||||
43
codex-lens/tests/test_watcher/conftest.py
Normal file
43
codex-lens/tests/test_watcher/conftest.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Fixtures for watcher tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Generator
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_project() -> Generator[Path, None, None]:
|
||||
"""Create a temporary project directory with sample files."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
project = Path(tmpdir)
|
||||
|
||||
# Create sample Python file
|
||||
py_file = project / "main.py"
|
||||
py_file.write_text("def hello():\n print('Hello')\n")
|
||||
|
||||
# Create sample JavaScript file
|
||||
js_file = project / "app.js"
|
||||
js_file.write_text("function greet() {\n console.log('Hi');\n}\n")
|
||||
|
||||
# Create subdirectory with file
|
||||
sub_dir = project / "src"
|
||||
sub_dir.mkdir()
|
||||
(sub_dir / "utils.py").write_text("def add(a, b):\n return a + b\n")
|
||||
|
||||
# Create ignored directory
|
||||
git_dir = project / ".git"
|
||||
git_dir.mkdir()
|
||||
(git_dir / "config").write_text("[core]\n")
|
||||
|
||||
yield project
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def watcher_config():
|
||||
"""Create default watcher configuration."""
|
||||
from codexlens.watcher import WatcherConfig
|
||||
return WatcherConfig(debounce_ms=100) # Short debounce for tests
|
||||
103
codex-lens/tests/test_watcher/test_events.py
Normal file
103
codex-lens/tests/test_watcher/test_events.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Tests for watcher event types."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from codexlens.watcher import ChangeType, FileEvent, WatcherConfig, IndexResult, WatcherStats
|
||||
|
||||
|
||||
class TestChangeType:
|
||||
"""Tests for ChangeType enum."""
|
||||
|
||||
def test_change_types_exist(self):
|
||||
"""Verify all change types are defined."""
|
||||
assert ChangeType.CREATED.value == "created"
|
||||
assert ChangeType.MODIFIED.value == "modified"
|
||||
assert ChangeType.DELETED.value == "deleted"
|
||||
assert ChangeType.MOVED.value == "moved"
|
||||
|
||||
def test_change_type_count(self):
|
||||
"""Verify we have exactly 4 change types."""
|
||||
assert len(ChangeType) == 4
|
||||
|
||||
|
||||
class TestFileEvent:
|
||||
"""Tests for FileEvent dataclass."""
|
||||
|
||||
def test_create_event(self):
|
||||
"""Test creating a file event."""
|
||||
event = FileEvent(
|
||||
path=Path("/test/file.py"),
|
||||
change_type=ChangeType.CREATED,
|
||||
timestamp=time.time(),
|
||||
)
|
||||
assert event.path == Path("/test/file.py")
|
||||
assert event.change_type == ChangeType.CREATED
|
||||
assert event.old_path is None
|
||||
|
||||
def test_moved_event(self):
|
||||
"""Test creating a moved event with old_path."""
|
||||
event = FileEvent(
|
||||
path=Path("/test/new.py"),
|
||||
change_type=ChangeType.MOVED,
|
||||
timestamp=time.time(),
|
||||
old_path=Path("/test/old.py"),
|
||||
)
|
||||
assert event.old_path == Path("/test/old.py")
|
||||
|
||||
|
||||
class TestWatcherConfig:
|
||||
"""Tests for WatcherConfig dataclass."""
|
||||
|
||||
def test_default_config(self):
|
||||
"""Test default configuration values."""
|
||||
config = WatcherConfig()
|
||||
assert config.debounce_ms == 1000
|
||||
assert ".git" in config.ignored_patterns
|
||||
assert "node_modules" in config.ignored_patterns
|
||||
assert "__pycache__" in config.ignored_patterns
|
||||
assert config.languages is None
|
||||
|
||||
def test_custom_debounce(self):
|
||||
"""Test custom debounce setting."""
|
||||
config = WatcherConfig(debounce_ms=500)
|
||||
assert config.debounce_ms == 500
|
||||
|
||||
|
||||
class TestIndexResult:
|
||||
"""Tests for IndexResult dataclass."""
|
||||
|
||||
def test_default_result(self):
|
||||
"""Test default result values."""
|
||||
result = IndexResult()
|
||||
assert result.files_indexed == 0
|
||||
assert result.files_removed == 0
|
||||
assert result.symbols_added == 0
|
||||
assert result.errors == []
|
||||
|
||||
def test_custom_result(self):
|
||||
"""Test creating result with values."""
|
||||
result = IndexResult(
|
||||
files_indexed=5,
|
||||
files_removed=2,
|
||||
symbols_added=50,
|
||||
errors=["error1"],
|
||||
)
|
||||
assert result.files_indexed == 5
|
||||
assert result.files_removed == 2
|
||||
|
||||
|
||||
class TestWatcherStats:
|
||||
"""Tests for WatcherStats dataclass."""
|
||||
|
||||
def test_default_stats(self):
|
||||
"""Test default stats values."""
|
||||
stats = WatcherStats()
|
||||
assert stats.files_watched == 0
|
||||
assert stats.events_processed == 0
|
||||
assert stats.last_event_time is None
|
||||
assert stats.is_running is False
|
||||
124
codex-lens/tests/test_watcher/test_file_watcher.py
Normal file
124
codex-lens/tests/test_watcher/test_file_watcher.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""Tests for FileWatcher class."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
|
||||
from codexlens.watcher import FileWatcher, WatcherConfig, FileEvent, ChangeType
|
||||
|
||||
|
||||
class TestFileWatcherInit:
|
||||
"""Tests for FileWatcher initialization."""
|
||||
|
||||
def test_init_with_valid_path(self, temp_project: Path, watcher_config: WatcherConfig):
|
||||
"""Test initializing with valid path."""
|
||||
events: List[FileEvent] = []
|
||||
watcher = FileWatcher(temp_project, watcher_config, lambda e: events.extend(e))
|
||||
|
||||
assert watcher.root_path == temp_project.resolve()
|
||||
assert watcher.config == watcher_config
|
||||
assert not watcher.is_running
|
||||
|
||||
def test_start_with_invalid_path(self, watcher_config: WatcherConfig):
|
||||
"""Test starting watcher with non-existent path."""
|
||||
events: List[FileEvent] = []
|
||||
watcher = FileWatcher(Path("/nonexistent/path"), watcher_config, lambda e: events.extend(e))
|
||||
|
||||
with pytest.raises(ValueError, match="does not exist"):
|
||||
watcher.start()
|
||||
|
||||
|
||||
class TestFileWatcherLifecycle:
|
||||
"""Tests for FileWatcher start/stop lifecycle."""
|
||||
|
||||
def test_start_stop(self, temp_project: Path, watcher_config: WatcherConfig):
|
||||
"""Test basic start and stop."""
|
||||
events: List[FileEvent] = []
|
||||
watcher = FileWatcher(temp_project, watcher_config, lambda e: events.extend(e))
|
||||
|
||||
watcher.start()
|
||||
assert watcher.is_running
|
||||
|
||||
watcher.stop()
|
||||
assert not watcher.is_running
|
||||
|
||||
def test_double_start(self, temp_project: Path, watcher_config: WatcherConfig):
|
||||
"""Test calling start twice."""
|
||||
events: List[FileEvent] = []
|
||||
watcher = FileWatcher(temp_project, watcher_config, lambda e: events.extend(e))
|
||||
|
||||
watcher.start()
|
||||
watcher.start() # Should not raise
|
||||
assert watcher.is_running
|
||||
|
||||
watcher.stop()
|
||||
|
||||
def test_double_stop(self, temp_project: Path, watcher_config: WatcherConfig):
|
||||
"""Test calling stop twice."""
|
||||
events: List[FileEvent] = []
|
||||
watcher = FileWatcher(temp_project, watcher_config, lambda e: events.extend(e))
|
||||
|
||||
watcher.start()
|
||||
watcher.stop()
|
||||
watcher.stop() # Should not raise
|
||||
assert not watcher.is_running
|
||||
|
||||
|
||||
class TestFileWatcherEvents:
|
||||
"""Tests for FileWatcher event detection."""
|
||||
|
||||
def test_detect_file_creation(self, temp_project: Path, watcher_config: WatcherConfig):
|
||||
"""Test detecting new file creation."""
|
||||
events: List[FileEvent] = []
|
||||
watcher = FileWatcher(temp_project, watcher_config, lambda e: events.extend(e))
|
||||
|
||||
try:
|
||||
watcher.start()
|
||||
time.sleep(0.3) # Let watcher start (longer for Windows)
|
||||
|
||||
# Create new file
|
||||
new_file = temp_project / "new_file.py"
|
||||
new_file.write_text("# New file\n")
|
||||
|
||||
# Wait for event with retries (watchdog timing varies by platform)
|
||||
max_wait = 2.0
|
||||
waited = 0.0
|
||||
while waited < max_wait:
|
||||
time.sleep(0.2)
|
||||
waited += 0.2
|
||||
# Windows may report MODIFIED instead of CREATED
|
||||
file_events = [e for e in events if e.change_type in (ChangeType.CREATED, ChangeType.MODIFIED)]
|
||||
if any(e.path.name == "new_file.py" for e in file_events):
|
||||
break
|
||||
|
||||
# Check event was detected (Windows may report MODIFIED instead of CREATED)
|
||||
relevant_events = [e for e in events if e.change_type in (ChangeType.CREATED, ChangeType.MODIFIED)]
|
||||
assert len(relevant_events) >= 1, f"Expected file event, got: {events}"
|
||||
assert any(e.path.name == "new_file.py" for e in relevant_events)
|
||||
finally:
|
||||
watcher.stop()
|
||||
|
||||
def test_filter_ignored_directories(self, temp_project: Path, watcher_config: WatcherConfig):
|
||||
"""Test that files in ignored directories are filtered."""
|
||||
events: List[FileEvent] = []
|
||||
watcher = FileWatcher(temp_project, watcher_config, lambda e: events.extend(e))
|
||||
|
||||
try:
|
||||
watcher.start()
|
||||
time.sleep(0.1)
|
||||
|
||||
# Create file in .git (should be ignored)
|
||||
git_file = temp_project / ".git" / "test.py"
|
||||
git_file.write_text("# In git\n")
|
||||
|
||||
time.sleep(watcher_config.debounce_ms / 1000.0 + 0.2)
|
||||
|
||||
# No events should be detected for .git files
|
||||
git_events = [e for e in events if ".git" in str(e.path)]
|
||||
assert len(git_events) == 0
|
||||
finally:
|
||||
watcher.stop()
|
||||
Reference in New Issue
Block a user