Reset switch session on trigger changes and add DP2 force helper
This commit is contained in:
+21
-10
@@ -80,6 +80,7 @@ class KvmSwitcherService:
|
||||
self._samsung_session_attempted = False
|
||||
self._samsung_session_successful = False
|
||||
self._samsung_session_attempt_count = 0
|
||||
self._last_trigger_input_code: int | None = None
|
||||
self._state_lock = Lock()
|
||||
self._stop_event = Event()
|
||||
self._thread: Thread | None = None
|
||||
@@ -115,7 +116,6 @@ class KvmSwitcherService:
|
||||
def poll_once(self) -> dict[str, Any]:
|
||||
config = self.config_store.get()
|
||||
scan = self.monitor_backend.scan()
|
||||
self._update_samsung_session(scan.samsung_present)
|
||||
|
||||
errors = list(scan.errors)
|
||||
errors.extend(config.validate())
|
||||
@@ -124,6 +124,7 @@ class KvmSwitcherService:
|
||||
scan=scan,
|
||||
errors=errors,
|
||||
)
|
||||
self._update_samsung_session(scan.samsung_present, trigger_input_code)
|
||||
|
||||
ddm_slot = self.ddm_backend.resolve_alienware_slot(force=False)
|
||||
supports_monitor_targeting = bool(
|
||||
@@ -242,18 +243,28 @@ class KvmSwitcherService:
|
||||
return port_name
|
||||
return None
|
||||
|
||||
def _update_samsung_session(self, samsung_present: bool) -> None:
|
||||
if samsung_present:
|
||||
def _update_samsung_session(self, samsung_present: bool, trigger_input_code: int | None) -> None:
|
||||
if not samsung_present:
|
||||
self._samsung_session_active = False
|
||||
self._samsung_session_attempted = False
|
||||
self._samsung_session_successful = False
|
||||
self._samsung_session_attempt_count = 0
|
||||
self._last_trigger_input_code = None
|
||||
return
|
||||
|
||||
if not self._samsung_session_active:
|
||||
self._samsung_session_active = True
|
||||
self._samsung_session_attempted = False
|
||||
self._samsung_session_successful = False
|
||||
self._samsung_session_attempt_count = 0
|
||||
else:
|
||||
self._samsung_session_active = False
|
||||
self._last_trigger_input_code = trigger_input_code
|
||||
return
|
||||
|
||||
if trigger_input_code != self._last_trigger_input_code:
|
||||
self._samsung_session_attempted = False
|
||||
self._samsung_session_successful = False
|
||||
self._samsung_session_attempt_count = 0
|
||||
self._last_trigger_input_code = trigger_input_code
|
||||
|
||||
def _attempt_switch_sequence(
|
||||
self,
|
||||
@@ -284,16 +295,16 @@ class KvmSwitcherService:
|
||||
verify_scan = self.monitor_backend.scan()
|
||||
scan = verify_scan
|
||||
errors.extend(verify_scan.errors)
|
||||
self._update_samsung_session(verify_scan.samsung_present)
|
||||
|
||||
if not verify_scan.samsung_present:
|
||||
return "waiting_for_reconnect", last_switch_at, scan, errors
|
||||
|
||||
verify_trigger_input, _, _ = self._resolve_trigger_state(
|
||||
config=config,
|
||||
scan=verify_scan,
|
||||
errors=errors,
|
||||
)
|
||||
self._update_samsung_session(verify_scan.samsung_present, verify_trigger_input)
|
||||
|
||||
if not verify_scan.samsung_present:
|
||||
return "waiting_for_reconnect", last_switch_at, scan, errors
|
||||
|
||||
desired_codes = self._port_input_codes(desired_port)
|
||||
if verify_trigger_input not in desired_codes:
|
||||
return "waiting_for_trigger_match", last_switch_at, scan, errors
|
||||
|
||||
+19
-4
@@ -1,7 +1,22 @@
|
||||
from monitorcontrol import get_monitors, InputSource
|
||||
from monitorcontrol import get_monitors
|
||||
|
||||
|
||||
TARGET_MODEL = "AW3423DWF"
|
||||
DP2_CODE = 19
|
||||
|
||||
|
||||
for monitor in get_monitors():
|
||||
description = str(getattr(getattr(monitor, "vcp", None), "description", "") or str(monitor))
|
||||
if TARGET_MODEL not in description.upper():
|
||||
continue
|
||||
|
||||
try:
|
||||
with monitor:
|
||||
print(monitor.vcp.description)
|
||||
input_source_raw: int = monitor.get_input_source()
|
||||
print(InputSource(input_source_raw).name)
|
||||
monitor.set_input_source(DP2_CODE)
|
||||
print(f"Target: {description}")
|
||||
print(f"Forced DP2 code {DP2_CODE}.")
|
||||
except Exception as exc:
|
||||
print(f"Failed forcing DP2 on {description}: {exc}")
|
||||
break
|
||||
else:
|
||||
print(f"Monitor {TARGET_MODEL} not found.")
|
||||
|
||||
@@ -441,3 +441,87 @@ def test_polling_switches_when_alienware_input_is_unreadable() -> None:
|
||||
assert status["last_switch_result"] == "switched_unverified"
|
||||
assert status["samsung_session_successful"] is True
|
||||
assert backend.calls == [(1, "DP1")]
|
||||
|
||||
|
||||
def test_polling_resets_session_when_trigger_changes_without_disconnect() -> None:
|
||||
service_module = _require_module("app.service")
|
||||
hardware_module = _require_module("app.hardware")
|
||||
ddm_module = _require_module("app.ddm")
|
||||
|
||||
KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None)
|
||||
HardwareScan = getattr(hardware_module, "HardwareScan", None)
|
||||
DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None)
|
||||
if KvmSwitcherService is None or HardwareScan is None or DDMCommandResult is None:
|
||||
pytest.skip("Service backend interfaces are not available yet.")
|
||||
|
||||
class FakeMonitorBackend:
|
||||
def __init__(self) -> None:
|
||||
self.call_count = 0
|
||||
|
||||
def scan(self):
|
||||
self.call_count += 1
|
||||
if self.call_count == 1:
|
||||
return HardwareScan(
|
||||
samsung_present=True,
|
||||
trigger_input_code=19,
|
||||
alienware_detected=True,
|
||||
alienware_input_code=19,
|
||||
errors=[],
|
||||
)
|
||||
if self.call_count == 2:
|
||||
return HardwareScan(
|
||||
samsung_present=True,
|
||||
trigger_input_code=15,
|
||||
alienware_detected=True,
|
||||
alienware_input_code=15,
|
||||
errors=[],
|
||||
)
|
||||
if self.call_count == 3:
|
||||
return HardwareScan(
|
||||
samsung_present=True,
|
||||
trigger_input_code=19,
|
||||
alienware_detected=True,
|
||||
alienware_input_code=15,
|
||||
errors=[],
|
||||
)
|
||||
return HardwareScan(
|
||||
samsung_present=True,
|
||||
trigger_input_code=19,
|
||||
alienware_detected=True,
|
||||
alienware_input_code=19,
|
||||
errors=[],
|
||||
)
|
||||
|
||||
class FakeDDMBackend:
|
||||
def __init__(self) -> None:
|
||||
self.calls: list[tuple[int, str]] = []
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True
|
||||
|
||||
def resolve_alienware_slot(self, force: bool = False) -> int | None:
|
||||
return 1
|
||||
|
||||
def invalidate_slot(self) -> None:
|
||||
return None
|
||||
|
||||
def switch_to_port(self, slot: int, port_name: str):
|
||||
self.calls.append((slot, port_name))
|
||||
return DDMCommandResult(True, "ok")
|
||||
|
||||
config_path = Path(tempfile.mkdtemp()) / "config.json"
|
||||
backend = FakeDDMBackend()
|
||||
service = KvmSwitcherService(
|
||||
config_store=ConfigStore(config_path),
|
||||
monitor_backend=FakeMonitorBackend(),
|
||||
ddm_backend=backend,
|
||||
poll_interval_seconds=0.01,
|
||||
retry_wait_seconds=0.0,
|
||||
)
|
||||
first = service.save_settings(device_port="DP2")
|
||||
assert first["last_switch_result"] == "noop"
|
||||
second = service.poll_once()
|
||||
assert second["last_switch_result"] == "waiting_for_trigger_match"
|
||||
third = service.poll_once()
|
||||
assert third["last_switch_result"] == "switched"
|
||||
assert backend.calls == [(1, "DP2")]
|
||||
|
||||
Reference in New Issue
Block a user