From 3f7c5a0677a4d07848f830e8262a27b70a75694a Mon Sep 17 00:00:00 2001 From: Lago Date: Fri, 27 Mar 2026 15:53:16 +0100 Subject: [PATCH] Reset switch session on trigger changes and add DP2 force helper --- app/service.py | 39 +++++++++++++------- monitorcontrol_main.py | 25 ++++++++++--- tests/test_app.py | 84 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 19 deletions(-) diff --git a/app/service.py b/app/service.py index 3d8be37..f2e4306 100644 --- a/app/service.py +++ b/app/service.py @@ -80,6 +80,7 @@ class KvmSwitcherService: self._samsung_session_attempted = False self._samsung_session_successful = False self._samsung_session_attempt_count = 0 + self._last_trigger_input_code: int | None = None self._state_lock = Lock() self._stop_event = Event() self._thread: Thread | None = None @@ -115,7 +116,6 @@ class KvmSwitcherService: def poll_once(self) -> dict[str, Any]: config = self.config_store.get() scan = self.monitor_backend.scan() - self._update_samsung_session(scan.samsung_present) errors = list(scan.errors) errors.extend(config.validate()) @@ -124,6 +124,7 @@ class KvmSwitcherService: scan=scan, errors=errors, ) + self._update_samsung_session(scan.samsung_present, trigger_input_code) ddm_slot = self.ddm_backend.resolve_alienware_slot(force=False) supports_monitor_targeting = bool( @@ -242,18 +243,28 @@ class KvmSwitcherService: return port_name return None - def _update_samsung_session(self, samsung_present: bool) -> None: - if samsung_present: - if not self._samsung_session_active: - self._samsung_session_active = True - self._samsung_session_attempted = False - self._samsung_session_successful = False - self._samsung_session_attempt_count = 0 - else: + def _update_samsung_session(self, samsung_present: bool, trigger_input_code: int | None) -> None: + if not samsung_present: self._samsung_session_active = False self._samsung_session_attempted = False self._samsung_session_successful = False self._samsung_session_attempt_count = 0 + self._last_trigger_input_code = None + return + + if not self._samsung_session_active: + self._samsung_session_active = True + self._samsung_session_attempted = False + self._samsung_session_successful = False + self._samsung_session_attempt_count = 0 + self._last_trigger_input_code = trigger_input_code + return + + if trigger_input_code != self._last_trigger_input_code: + self._samsung_session_attempted = False + self._samsung_session_successful = False + self._samsung_session_attempt_count = 0 + self._last_trigger_input_code = trigger_input_code def _attempt_switch_sequence( self, @@ -284,16 +295,16 @@ class KvmSwitcherService: verify_scan = self.monitor_backend.scan() scan = verify_scan errors.extend(verify_scan.errors) - self._update_samsung_session(verify_scan.samsung_present) - - if not verify_scan.samsung_present: - return "waiting_for_reconnect", last_switch_at, scan, errors - verify_trigger_input, _, _ = self._resolve_trigger_state( config=config, scan=verify_scan, errors=errors, ) + self._update_samsung_session(verify_scan.samsung_present, verify_trigger_input) + + if not verify_scan.samsung_present: + return "waiting_for_reconnect", last_switch_at, scan, errors + desired_codes = self._port_input_codes(desired_port) if verify_trigger_input not in desired_codes: return "waiting_for_trigger_match", last_switch_at, scan, errors diff --git a/monitorcontrol_main.py b/monitorcontrol_main.py index 4d3c431..bdda47e 100644 --- a/monitorcontrol_main.py +++ b/monitorcontrol_main.py @@ -1,7 +1,22 @@ -from monitorcontrol import get_monitors, InputSource +from monitorcontrol import get_monitors + + +TARGET_MODEL = "AW3423DWF" +DP2_CODE = 19 + for monitor in get_monitors(): - with monitor: - print(monitor.vcp.description) - input_source_raw: int = monitor.get_input_source() - print(InputSource(input_source_raw).name) \ No newline at end of file + description = str(getattr(getattr(monitor, "vcp", None), "description", "") or str(monitor)) + if TARGET_MODEL not in description.upper(): + continue + + try: + with monitor: + monitor.set_input_source(DP2_CODE) + print(f"Target: {description}") + print(f"Forced DP2 code {DP2_CODE}.") + except Exception as exc: + print(f"Failed forcing DP2 on {description}: {exc}") + break +else: + print(f"Monitor {TARGET_MODEL} not found.") diff --git a/tests/test_app.py b/tests/test_app.py index 22a6ed1..58a7a85 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -441,3 +441,87 @@ def test_polling_switches_when_alienware_input_is_unreadable() -> None: assert status["last_switch_result"] == "switched_unverified" assert status["samsung_session_successful"] is True assert backend.calls == [(1, "DP1")] + + +def test_polling_resets_session_when_trigger_changes_without_disconnect() -> None: + service_module = _require_module("app.service") + hardware_module = _require_module("app.hardware") + ddm_module = _require_module("app.ddm") + + KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None) + HardwareScan = getattr(hardware_module, "HardwareScan", None) + DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None) + if KvmSwitcherService is None or HardwareScan is None or DDMCommandResult is None: + pytest.skip("Service backend interfaces are not available yet.") + + class FakeMonitorBackend: + def __init__(self) -> None: + self.call_count = 0 + + def scan(self): + self.call_count += 1 + if self.call_count == 1: + return HardwareScan( + samsung_present=True, + trigger_input_code=19, + alienware_detected=True, + alienware_input_code=19, + errors=[], + ) + if self.call_count == 2: + return HardwareScan( + samsung_present=True, + trigger_input_code=15, + alienware_detected=True, + alienware_input_code=15, + errors=[], + ) + if self.call_count == 3: + return HardwareScan( + samsung_present=True, + trigger_input_code=19, + alienware_detected=True, + alienware_input_code=15, + errors=[], + ) + return HardwareScan( + samsung_present=True, + trigger_input_code=19, + alienware_detected=True, + alienware_input_code=19, + errors=[], + ) + + class FakeDDMBackend: + def __init__(self) -> None: + self.calls: list[tuple[int, str]] = [] + + def is_available(self) -> bool: + return True + + def resolve_alienware_slot(self, force: bool = False) -> int | None: + return 1 + + def invalidate_slot(self) -> None: + return None + + def switch_to_port(self, slot: int, port_name: str): + self.calls.append((slot, port_name)) + return DDMCommandResult(True, "ok") + + config_path = Path(tempfile.mkdtemp()) / "config.json" + backend = FakeDDMBackend() + service = KvmSwitcherService( + config_store=ConfigStore(config_path), + monitor_backend=FakeMonitorBackend(), + ddm_backend=backend, + poll_interval_seconds=0.01, + retry_wait_seconds=0.0, + ) + first = service.save_settings(device_port="DP2") + assert first["last_switch_result"] == "noop" + second = service.poll_once() + assert second["last_switch_result"] == "waiting_for_trigger_match" + third = service.poll_once() + assert third["last_switch_result"] == "switched" + assert backend.calls == [(1, "DP2")]