615 lines
22 KiB
Python
615 lines
22 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
import importlib
|
|
import sys
|
|
import tempfile
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
from app.config import AppConfig, ConfigStore
|
|
|
|
|
|
def _require_module(module_name: str):
|
|
try:
|
|
return importlib.import_module(module_name)
|
|
except ModuleNotFoundError as exc:
|
|
pytest.skip(f"Module '{module_name}' is not available yet: {exc}")
|
|
|
|
|
|
def test_config_store_missing_file_and_roundtrip_save() -> None:
|
|
config_path = Path(tempfile.mkdtemp()) / "config.json"
|
|
store = ConfigStore(config_path)
|
|
|
|
assert store.get() == AppConfig(device_port="DP1", auxiliary_monitor_id=None)
|
|
assert not config_path.exists()
|
|
|
|
saved = store.save(AppConfig(device_port="HDMI", auxiliary_monitor_id="generic-pnp-monitor-1"))
|
|
assert saved.device_port == "HDMI"
|
|
assert saved.auxiliary_monitor_id == "generic-pnp-monitor-1"
|
|
assert config_path.exists()
|
|
|
|
reloaded = ConfigStore(config_path).get()
|
|
assert reloaded.device_port == "HDMI"
|
|
assert reloaded.auxiliary_monitor_id == "generic-pnp-monitor-1"
|
|
|
|
|
|
def test_config_store_loads_legacy_config_with_device_role() -> None:
|
|
config_path = Path(tempfile.mkdtemp()) / "config.json"
|
|
config_path.write_text(
|
|
'{\n "device_role": "laptop",\n "device_port": "DP2"\n}\n',
|
|
encoding="utf-8",
|
|
)
|
|
|
|
loaded = ConfigStore(config_path).get()
|
|
assert loaded.device_port == "DP2"
|
|
assert loaded.auxiliary_monitor_id is None
|
|
|
|
|
|
def test_api_status_and_settings_endpoints_with_fake_service() -> None:
|
|
app_main = _require_module("app.main")
|
|
create_app = getattr(app_main, "create_app", None)
|
|
if create_app is None:
|
|
pytest.skip("app.main.create_app is not available yet.")
|
|
|
|
class FakeService:
|
|
def __init__(self) -> None:
|
|
self.payload = {
|
|
"config": {"device_port": "DP1"},
|
|
"samsung_present": False,
|
|
"samsung_connected_session_active": False,
|
|
"samsung_session_attempted": False,
|
|
"samsung_session_successful": False,
|
|
"samsung_session_attempt_count": 0,
|
|
"waiting_for_samsung_disconnect": False,
|
|
"trigger_input_code": None,
|
|
"active_trigger_monitor_id": None,
|
|
"trigger_target_port": None,
|
|
"trigger_matches_device_port": False,
|
|
"trigger_monitor_candidates": [],
|
|
"alienware_detected": False,
|
|
"alienware_input_code": None,
|
|
"ddm_slot": None,
|
|
"ddm_ready": False,
|
|
"last_switch_result": "idle",
|
|
"last_switch_at": None,
|
|
"errors": [],
|
|
}
|
|
|
|
def start(self) -> None:
|
|
return None
|
|
|
|
def stop(self) -> None:
|
|
return None
|
|
|
|
def get_status(self) -> dict[str, object]:
|
|
return self.payload
|
|
|
|
def save_settings(
|
|
self,
|
|
device_port: str,
|
|
auxiliary_monitor_id: str | None = None,
|
|
) -> dict[str, object]:
|
|
self.payload["config"] = {
|
|
"device_port": device_port,
|
|
"auxiliary_monitor_id": auxiliary_monitor_id,
|
|
}
|
|
self.payload["last_switch_result"] = "updated"
|
|
return self.payload
|
|
|
|
app = create_app(service=FakeService(), manage_lifecycle=False)
|
|
with TestClient(app) as client:
|
|
status_response = client.get("/api/status")
|
|
assert status_response.status_code == 200
|
|
assert status_response.json()["config"]["device_port"] == "DP1"
|
|
|
|
save_response = client.post(
|
|
"/api/settings",
|
|
json={
|
|
"device_port": "HDMI",
|
|
"auxiliary_monitor_id": "generic-pnp-monitor-1",
|
|
"device_role": "legacy_ignored",
|
|
},
|
|
)
|
|
assert save_response.status_code == 200
|
|
assert save_response.json()["config"]["device_port"] == "HDMI"
|
|
assert save_response.json()["config"]["auxiliary_monitor_id"] == "generic-pnp-monitor-1"
|
|
|
|
|
|
def test_polling_switches_when_trigger_matches_device_port() -> None:
|
|
service_module = _require_module("app.service")
|
|
hardware_module = _require_module("app.hardware")
|
|
ddm_module = _require_module("app.ddm")
|
|
|
|
KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None)
|
|
HardwareScan = getattr(hardware_module, "HardwareScan", None)
|
|
DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None)
|
|
if KvmSwitcherService is None or HardwareScan is None or DDMCommandResult is None:
|
|
pytest.skip("Service backend interfaces are not available yet.")
|
|
|
|
class FakeMonitorBackend:
|
|
def __init__(self) -> None:
|
|
self.call_count = 0
|
|
|
|
def scan(self):
|
|
self.call_count += 1
|
|
if self.call_count == 1:
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=17,
|
|
alienware_detected=True,
|
|
alienware_input_code=15,
|
|
errors=[],
|
|
)
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=17,
|
|
alienware_detected=True,
|
|
alienware_input_code=17,
|
|
errors=[],
|
|
)
|
|
|
|
class FakeDDMBackend:
|
|
def __init__(self) -> None:
|
|
self.calls: list[tuple[int, str]] = []
|
|
self.slot = 1
|
|
|
|
def is_available(self) -> bool:
|
|
return True
|
|
|
|
def resolve_alienware_slot(self, force: bool = False) -> int | None:
|
|
return self.slot
|
|
|
|
def invalidate_slot(self) -> None:
|
|
self.slot = None
|
|
|
|
def switch_to_port(self, slot: int, port_name: str):
|
|
self.calls.append((slot, port_name))
|
|
return DDMCommandResult(True, "ok")
|
|
|
|
config_path = Path(tempfile.mkdtemp()) / "config.json"
|
|
service = KvmSwitcherService(
|
|
config_store=ConfigStore(config_path),
|
|
monitor_backend=FakeMonitorBackend(),
|
|
ddm_backend=FakeDDMBackend(),
|
|
poll_interval_seconds=0.01,
|
|
retry_wait_seconds=0.0,
|
|
)
|
|
status = service.save_settings(device_port="HDMI")
|
|
assert status["trigger_target_port"] == "HDMI"
|
|
assert status["trigger_matches_device_port"] is True
|
|
assert status["config"]["device_port"] == "HDMI"
|
|
assert status["samsung_session_attempted"] is True
|
|
assert status["samsung_session_successful"] is True
|
|
assert status["last_switch_result"] == "switched"
|
|
|
|
|
|
def test_polling_uses_configured_auxiliary_monitor() -> None:
|
|
service_module = _require_module("app.service")
|
|
hardware_module = _require_module("app.hardware")
|
|
ddm_module = _require_module("app.ddm")
|
|
|
|
KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None)
|
|
HardwareScan = getattr(hardware_module, "HardwareScan", None)
|
|
TriggerMonitorCandidate = getattr(hardware_module, "TriggerMonitorCandidate", None)
|
|
DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None)
|
|
if (
|
|
KvmSwitcherService is None
|
|
or HardwareScan is None
|
|
or TriggerMonitorCandidate is None
|
|
or DDMCommandResult is None
|
|
):
|
|
pytest.skip("Service backend interfaces are not available yet.")
|
|
|
|
class FakeMonitorBackend:
|
|
def __init__(self) -> None:
|
|
self.call_count = 0
|
|
|
|
def scan(self):
|
|
self.call_count += 1
|
|
alienware_input = 15 if self.call_count == 1 else 19
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=None,
|
|
trigger_monitor_id=None,
|
|
trigger_candidates=[
|
|
TriggerMonitorCandidate(id="dell-u2720q-1", label="DELL U2720Q", input_code=60),
|
|
TriggerMonitorCandidate(id="generic-pnp-monitor-2", label="Generic PnP Monitor", input_code=19),
|
|
],
|
|
alienware_detected=True,
|
|
alienware_input_code=alienware_input,
|
|
errors=[
|
|
"Multiple non-Alienware DDC monitors were detected and Samsung did not expose model info; trigger monitor is ambiguous."
|
|
],
|
|
)
|
|
|
|
class FakeDDMBackend:
|
|
def __init__(self) -> None:
|
|
self.calls: list[tuple[int, str]] = []
|
|
|
|
def is_available(self) -> bool:
|
|
return True
|
|
|
|
def resolve_alienware_slot(self, force: bool = False) -> int | None:
|
|
return 1
|
|
|
|
def invalidate_slot(self) -> None:
|
|
return None
|
|
|
|
def switch_to_port(self, slot: int, port_name: str):
|
|
self.calls.append((slot, port_name))
|
|
return DDMCommandResult(True, "ok")
|
|
|
|
config_path = Path(tempfile.mkdtemp()) / "config.json"
|
|
backend = FakeDDMBackend()
|
|
service = KvmSwitcherService(
|
|
config_store=ConfigStore(config_path),
|
|
monitor_backend=FakeMonitorBackend(),
|
|
ddm_backend=backend,
|
|
poll_interval_seconds=0.01,
|
|
retry_wait_seconds=0.0,
|
|
)
|
|
status = service.save_settings(
|
|
device_port="DP2",
|
|
auxiliary_monitor_id="generic-pnp-monitor-2",
|
|
)
|
|
assert status["config"]["auxiliary_monitor_id"] == "generic-pnp-monitor-2"
|
|
assert status["active_trigger_monitor_id"] == "generic-pnp-monitor-2"
|
|
assert status["trigger_input_code"] == 19
|
|
assert status["trigger_matches_device_port"] is True
|
|
assert status["last_switch_result"] == "switched"
|
|
assert not status["errors"]
|
|
|
|
|
|
def test_polling_switches_even_when_trigger_input_does_not_match_device_port() -> None:
|
|
service_module = _require_module("app.service")
|
|
hardware_module = _require_module("app.hardware")
|
|
ddm_module = _require_module("app.ddm")
|
|
|
|
KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None)
|
|
HardwareScan = getattr(hardware_module, "HardwareScan", None)
|
|
DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None)
|
|
if KvmSwitcherService is None or HardwareScan is None or DDMCommandResult is None:
|
|
pytest.skip("Service backend interfaces are not available yet.")
|
|
|
|
class FakeMonitorBackend:
|
|
def __init__(self) -> None:
|
|
self.call_count = 0
|
|
|
|
def scan(self):
|
|
self.call_count += 1
|
|
alienware_input = 15 if self.call_count == 1 else 19
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=15,
|
|
alienware_detected=True,
|
|
alienware_input_code=alienware_input,
|
|
errors=[],
|
|
)
|
|
|
|
class FakeDDMBackend:
|
|
def __init__(self) -> None:
|
|
self.calls: list[tuple[int, str]] = []
|
|
|
|
def is_available(self) -> bool:
|
|
return True
|
|
|
|
def resolve_alienware_slot(self, force: bool = False) -> int | None:
|
|
return 1
|
|
|
|
def invalidate_slot(self) -> None:
|
|
return None
|
|
|
|
def switch_to_port(self, slot: int, port_name: str):
|
|
self.calls.append((slot, port_name))
|
|
return DDMCommandResult(True, "ok")
|
|
|
|
config_path = Path(tempfile.mkdtemp()) / "config.json"
|
|
backend = FakeDDMBackend()
|
|
service = KvmSwitcherService(
|
|
config_store=ConfigStore(config_path),
|
|
monitor_backend=FakeMonitorBackend(),
|
|
ddm_backend=backend,
|
|
poll_interval_seconds=0.01,
|
|
retry_wait_seconds=0.0,
|
|
)
|
|
status = service.save_settings(device_port="DP2")
|
|
assert status["trigger_matches_device_port"] is True
|
|
assert status["last_switch_result"] == "switched"
|
|
assert backend.calls == [(1, "DP2")]
|
|
|
|
|
|
def test_polling_switches_with_monitor_targeting_when_slot_is_unavailable() -> None:
|
|
service_module = _require_module("app.service")
|
|
hardware_module = _require_module("app.hardware")
|
|
ddm_module = _require_module("app.ddm")
|
|
|
|
KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None)
|
|
HardwareScan = getattr(hardware_module, "HardwareScan", None)
|
|
DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None)
|
|
if KvmSwitcherService is None or HardwareScan is None or DDMCommandResult is None:
|
|
pytest.skip("Service backend interfaces are not available yet.")
|
|
|
|
class FakeMonitorBackend:
|
|
def __init__(self) -> None:
|
|
self.call_count = 0
|
|
|
|
def scan(self):
|
|
self.call_count += 1
|
|
if self.call_count == 1:
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=15,
|
|
alienware_detected=True,
|
|
alienware_input_code=19,
|
|
errors=[],
|
|
)
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=15,
|
|
alienware_detected=True,
|
|
alienware_input_code=15,
|
|
errors=[],
|
|
)
|
|
|
|
class FakeDDMBackend:
|
|
def __init__(self) -> None:
|
|
self.calls: list[tuple[int, str]] = []
|
|
|
|
def is_available(self) -> bool:
|
|
return True
|
|
|
|
def supports_monitor_targeting(self) -> bool:
|
|
return True
|
|
|
|
def resolve_alienware_slot(self, force: bool = False) -> int | None:
|
|
return None
|
|
|
|
def invalidate_slot(self) -> None:
|
|
return None
|
|
|
|
def switch_to_port(self, slot: int, port_name: str):
|
|
self.calls.append((slot, port_name))
|
|
return DDMCommandResult(True, "ok")
|
|
|
|
config_path = Path(tempfile.mkdtemp()) / "config.json"
|
|
backend = FakeDDMBackend()
|
|
service = KvmSwitcherService(
|
|
config_store=ConfigStore(config_path),
|
|
monitor_backend=FakeMonitorBackend(),
|
|
ddm_backend=backend,
|
|
poll_interval_seconds=0.01,
|
|
retry_wait_seconds=0.0,
|
|
)
|
|
status = service.save_settings(device_port="DP1")
|
|
assert status["ddm_ready"] is True
|
|
assert status["ddm_slot"] is None
|
|
assert status["last_switch_result"] == "switched"
|
|
assert backend.calls == [(0, "DP1")]
|
|
|
|
|
|
def test_polling_switches_when_alienware_input_is_unreadable() -> None:
|
|
service_module = _require_module("app.service")
|
|
hardware_module = _require_module("app.hardware")
|
|
ddm_module = _require_module("app.ddm")
|
|
|
|
KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None)
|
|
HardwareScan = getattr(hardware_module, "HardwareScan", None)
|
|
DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None)
|
|
if KvmSwitcherService is None or HardwareScan is None or DDMCommandResult is None:
|
|
pytest.skip("Service backend interfaces are not available yet.")
|
|
|
|
class FakeMonitorBackend:
|
|
def scan(self):
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=15,
|
|
alienware_detected=True,
|
|
alienware_input_code=None,
|
|
errors=[
|
|
"Unable to read Alienware target monitor input source: failed to get VCP feature."
|
|
],
|
|
)
|
|
|
|
class FakeDDMBackend:
|
|
def __init__(self) -> None:
|
|
self.calls: list[tuple[int, str]] = []
|
|
|
|
def is_available(self) -> bool:
|
|
return True
|
|
|
|
def resolve_alienware_slot(self, force: bool = False) -> int | None:
|
|
return 1
|
|
|
|
def invalidate_slot(self) -> None:
|
|
return None
|
|
|
|
def switch_to_port(self, slot: int, port_name: str):
|
|
self.calls.append((slot, port_name))
|
|
return DDMCommandResult(True, "ok")
|
|
|
|
config_path = Path(tempfile.mkdtemp()) / "config.json"
|
|
backend = FakeDDMBackend()
|
|
service = KvmSwitcherService(
|
|
config_store=ConfigStore(config_path),
|
|
monitor_backend=FakeMonitorBackend(),
|
|
ddm_backend=backend,
|
|
poll_interval_seconds=0.01,
|
|
retry_wait_seconds=0.0,
|
|
)
|
|
status = service.save_settings(device_port="DP1")
|
|
assert status["last_switch_result"] == "switched_unverified"
|
|
assert status["samsung_session_successful"] is True
|
|
assert backend.calls == [(1, "DP1")]
|
|
|
|
|
|
def test_polling_does_not_reswitch_after_manual_change_while_monitor_stays_connected() -> None:
|
|
service_module = _require_module("app.service")
|
|
hardware_module = _require_module("app.hardware")
|
|
ddm_module = _require_module("app.ddm")
|
|
|
|
KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None)
|
|
HardwareScan = getattr(hardware_module, "HardwareScan", None)
|
|
DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None)
|
|
if KvmSwitcherService is None or HardwareScan is None or DDMCommandResult is None:
|
|
pytest.skip("Service backend interfaces are not available yet.")
|
|
|
|
class FakeMonitorBackend:
|
|
def __init__(self) -> None:
|
|
self.call_count = 0
|
|
|
|
def scan(self):
|
|
self.call_count += 1
|
|
if self.call_count == 1:
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=19,
|
|
alienware_detected=True,
|
|
alienware_input_code=19,
|
|
errors=[],
|
|
)
|
|
if self.call_count == 2:
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=15,
|
|
alienware_detected=True,
|
|
alienware_input_code=15,
|
|
errors=[],
|
|
)
|
|
if self.call_count == 3:
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=19,
|
|
alienware_detected=True,
|
|
alienware_input_code=15,
|
|
errors=[],
|
|
)
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=19,
|
|
alienware_detected=True,
|
|
alienware_input_code=19,
|
|
errors=[],
|
|
)
|
|
|
|
class FakeDDMBackend:
|
|
def __init__(self) -> None:
|
|
self.calls: list[tuple[int, str]] = []
|
|
|
|
def is_available(self) -> bool:
|
|
return True
|
|
|
|
def resolve_alienware_slot(self, force: bool = False) -> int | None:
|
|
return 1
|
|
|
|
def invalidate_slot(self) -> None:
|
|
return None
|
|
|
|
def switch_to_port(self, slot: int, port_name: str):
|
|
self.calls.append((slot, port_name))
|
|
return DDMCommandResult(True, "ok")
|
|
|
|
config_path = Path(tempfile.mkdtemp()) / "config.json"
|
|
backend = FakeDDMBackend()
|
|
service = KvmSwitcherService(
|
|
config_store=ConfigStore(config_path),
|
|
monitor_backend=FakeMonitorBackend(),
|
|
ddm_backend=backend,
|
|
poll_interval_seconds=0.01,
|
|
retry_wait_seconds=0.0,
|
|
)
|
|
first = service.save_settings(device_port="DP2")
|
|
assert first["last_switch_result"] == "noop"
|
|
second = service.poll_once()
|
|
assert second["last_switch_result"] == "waiting_for_disconnect"
|
|
assert backend.calls == []
|
|
|
|
|
|
def test_polling_switches_again_after_aux_monitor_disconnect_and_reconnect() -> None:
|
|
service_module = _require_module("app.service")
|
|
hardware_module = _require_module("app.hardware")
|
|
ddm_module = _require_module("app.ddm")
|
|
|
|
KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None)
|
|
HardwareScan = getattr(hardware_module, "HardwareScan", None)
|
|
DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None)
|
|
if KvmSwitcherService is None or HardwareScan is None or DDMCommandResult is None:
|
|
pytest.skip("Service backend interfaces are not available yet.")
|
|
|
|
class FakeMonitorBackend:
|
|
def __init__(self) -> None:
|
|
self.call_count = 0
|
|
|
|
def scan(self):
|
|
self.call_count += 1
|
|
if self.call_count == 1:
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=19,
|
|
alienware_detected=True,
|
|
alienware_input_code=19,
|
|
errors=[],
|
|
)
|
|
if self.call_count == 2:
|
|
return HardwareScan(
|
|
samsung_present=False,
|
|
trigger_input_code=None,
|
|
alienware_detected=True,
|
|
alienware_input_code=15,
|
|
errors=[],
|
|
)
|
|
if self.call_count == 3:
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=19,
|
|
alienware_detected=True,
|
|
alienware_input_code=15,
|
|
errors=[],
|
|
)
|
|
return HardwareScan(
|
|
samsung_present=True,
|
|
trigger_input_code=19,
|
|
alienware_detected=True,
|
|
alienware_input_code=19,
|
|
errors=[],
|
|
)
|
|
|
|
class FakeDDMBackend:
|
|
def __init__(self) -> None:
|
|
self.calls: list[tuple[int, str]] = []
|
|
|
|
def is_available(self) -> bool:
|
|
return True
|
|
|
|
def resolve_alienware_slot(self, force: bool = False) -> int | None:
|
|
return 1
|
|
|
|
def invalidate_slot(self) -> None:
|
|
return None
|
|
|
|
def switch_to_port(self, slot: int, port_name: str):
|
|
self.calls.append((slot, port_name))
|
|
return DDMCommandResult(True, "ok")
|
|
|
|
config_path = Path(tempfile.mkdtemp()) / "config.json"
|
|
backend = FakeDDMBackend()
|
|
service = KvmSwitcherService(
|
|
config_store=ConfigStore(config_path),
|
|
monitor_backend=FakeMonitorBackend(),
|
|
ddm_backend=backend,
|
|
poll_interval_seconds=0.01,
|
|
retry_wait_seconds=0.0,
|
|
)
|
|
first = service.save_settings(device_port="DP2")
|
|
assert first["last_switch_result"] == "noop"
|
|
second = service.poll_once()
|
|
assert second["last_switch_result"] == "idle"
|
|
third = service.poll_once()
|
|
assert third["last_switch_result"] == "switched"
|
|
assert backend.calls == [(1, "DP2")]
|