diff --git a/README.md b/README.md index cc88dae..818bd02 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Monitor filtering: Configure only one value per machine: - this device Alienware target port: `DP1`, `DP2`, or `HDMI` +- optional auxiliary trigger monitor: selected in UI; if not set, auto-detection is used Samsung trigger input codes mapped by port: diff --git a/app/config.py b/app/config.py index ab20e10..4f5f28d 100644 --- a/app/config.py +++ b/app/config.py @@ -18,13 +18,20 @@ SUPPORTED_TARGET_PORTS = { @dataclass(slots=True) class AppConfig: device_port: str = "DP1" + auxiliary_monitor_id: str | None = None @classmethod def from_dict(cls, data: dict[str, object]) -> "AppConfig": - return cls(device_port=_coerce_port_name(data.get("device_port"), "DP1")) + return cls( + device_port=_coerce_port_name(data.get("device_port"), "DP1"), + auxiliary_monitor_id=_coerce_aux_monitor_id(data.get("auxiliary_monitor_id")), + ) - def to_dict(self) -> dict[str, str]: - return {"device_port": self.device_port} + def to_dict(self) -> dict[str, str | None]: + return { + "device_port": self.device_port, + "auxiliary_monitor_id": self.auxiliary_monitor_id, + } def validate(self) -> list[str]: errors: list[str] = [] @@ -43,7 +50,10 @@ class ConfigStore: def get(self) -> AppConfig: with self._lock: - return AppConfig(device_port=self._config.device_port) + return AppConfig( + device_port=self._config.device_port, + auxiliary_monitor_id=self._config.auxiliary_monitor_id, + ) def save(self, config: AppConfig) -> AppConfig: errors = config.validate() @@ -55,8 +65,14 @@ class ConfigStore: json.dumps(config.to_dict(), indent=2), encoding="utf-8", ) - self._config = AppConfig(device_port=config.device_port) - return AppConfig(device_port=self._config.device_port) + self._config = AppConfig( + device_port=config.device_port, + auxiliary_monitor_id=config.auxiliary_monitor_id, + ) + return AppConfig( + device_port=self._config.device_port, + auxiliary_monitor_id=self._config.auxiliary_monitor_id, + ) def _load_from_disk(self) -> AppConfig: if not self.path.exists(): @@ -79,3 +95,11 @@ def _coerce_port_name(value: object, default: str) -> str: if normalized in SUPPORTED_TARGET_PORTS: return normalized return default + + +def _coerce_aux_monitor_id(value: object) -> str | None: + if isinstance(value, str): + normalized = value.strip() + if normalized: + return normalized + return None diff --git a/app/hardware.py b/app/hardware.py index aa8a50c..2784685 100644 --- a/app/hardware.py +++ b/app/hardware.py @@ -10,14 +10,30 @@ from app.config import SUPPORTED_TARGET_PORTS ALIENWARE_MODEL_TOKEN = "AW3423DWF" +@dataclass(slots=True) +class TriggerMonitorCandidate: + id: str + label: str + input_code: int | None = None + + def to_dict(self) -> dict[str, int | str | None]: + return { + "id": self.id, + "label": self.label, + "input_code": self.input_code, + } + + @dataclass(slots=True) class HardwareScan: samsung_present: bool = False trigger_input_code: int | None = None + trigger_monitor_id: str | None = None alienware_detected: bool = False alienware_input_code: int | None = None trigger_description: str | None = None alienware_description: str | None = None + trigger_candidates: list[TriggerMonitorCandidate] = field(default_factory=list) errors: list[str] = field(default_factory=list) @@ -68,36 +84,44 @@ class RealMonitorBackend: trigger_description: str | None = None trigger_input_code: int | None = None + trigger_monitor_id: str | None = None trigger_candidates = ( samsung_named_candidates if samsung_named_candidates else (non_alienware_candidates if samsung_present else []) ) - if len(trigger_candidates) == 1: - trigger_monitor, trigger_description = trigger_candidates[0] - trigger_input_code = self._read_input_code( - trigger_monitor, - "Samsung trigger monitor", - errors, + scanned_candidates: list[TriggerMonitorCandidate] = [] + for index, (monitor, description) in enumerate(trigger_candidates, start=1): + local_errors: list[str] = [] + input_code = self._read_input_code( + monitor, + "Samsung trigger candidate monitor", + local_errors, ) - elif not trigger_candidates: + scanned_candidates.append( + TriggerMonitorCandidate( + id=_build_trigger_candidate_id(description, index), + label=description, + input_code=input_code, + ) + ) + + if len(scanned_candidates) == 1: + selected = scanned_candidates[0] + trigger_description = selected.label + trigger_input_code = selected.input_code + trigger_monitor_id = selected.id + elif not scanned_candidates: errors.append("Samsung trigger monitor could not be identified through DDC/CI.") else: - scanned_candidates: list[tuple[object, str, int | None]] = [] - for monitor, description in trigger_candidates: - local_errors: list[str] = [] - input_code = self._read_input_code( - monitor, - "Samsung trigger candidate monitor", - local_errors, - ) - scanned_candidates.append((monitor, description, input_code)) - selected_index = _select_trigger_candidate_index( - [(description, input_code) for _, description, input_code in scanned_candidates] + [(candidate.label, candidate.input_code) for candidate in scanned_candidates] ) if selected_index is not None: - _, trigger_description, trigger_input_code = scanned_candidates[selected_index] + selected = scanned_candidates[selected_index] + trigger_description = selected.label + trigger_input_code = selected.input_code + trigger_monitor_id = selected.id elif samsung_named_candidates: errors.append("Multiple Samsung DDC monitors were detected; trigger monitor is ambiguous.") else: @@ -111,10 +135,12 @@ class RealMonitorBackend: return HardwareScan( samsung_present=samsung_present, trigger_input_code=trigger_input_code, + trigger_monitor_id=trigger_monitor_id, alienware_detected=len(alienware_candidates) == 1, alienware_input_code=alienware_input_code, trigger_description=trigger_description, alienware_description=alienware_description, + trigger_candidates=scanned_candidates, errors=errors, ) @@ -222,3 +248,12 @@ def _select_trigger_candidate_index(candidates: list[tuple[str, int | None]]) -> return generic[0] return None + + +def _build_trigger_candidate_id(description: str, index: int) -> str: + normalized = description.strip().lower() + safe = "".join(ch if ch.isalnum() else "-" for ch in normalized).strip("-") + safe = "-".join(part for part in safe.split("-") if part) + if not safe: + safe = "monitor" + return f"{safe}-{index}" diff --git a/app/main.py b/app/main.py index b3071f8..47539b6 100644 --- a/app/main.py +++ b/app/main.py @@ -19,6 +19,7 @@ INDEX_PATH = STATIC_DIR / "index.html" class SettingsPayload(BaseModel): device_port: str + auxiliary_monitor_id: str | None = None def create_app( @@ -59,6 +60,7 @@ def create_app( try: return active_service.save_settings( device_port=payload.device_port, + auxiliary_monitor_id=payload.auxiliary_monitor_id, ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc diff --git a/app/service.py b/app/service.py index f6e31db..3d8be37 100644 --- a/app/service.py +++ b/app/service.py @@ -11,7 +11,7 @@ from app.config import ( SUPPORTED_TARGET_PORTS, ) from app.ddm import DDMBackend, RealDDMBackend -from app.hardware import HardwareScan, MonitorBackend, RealMonitorBackend +from app.hardware import HardwareScan, MonitorBackend, RealMonitorBackend, TriggerMonitorCandidate @dataclass(slots=True) @@ -24,8 +24,10 @@ class ServiceStatus: samsung_session_attempt_count: int = 0 waiting_for_samsung_disconnect: bool = False trigger_input_code: int | None = None + active_trigger_monitor_id: str | None = None trigger_target_port: str | None = None trigger_matches_device_port: bool = False + trigger_monitor_candidates: list[dict[str, int | str | None]] = field(default_factory=list) alienware_detected: bool = False alienware_input_code: int | None = None ddm_slot: int | None = None @@ -44,8 +46,10 @@ class ServiceStatus: "samsung_session_attempt_count": self.samsung_session_attempt_count, "waiting_for_samsung_disconnect": self.waiting_for_samsung_disconnect, "trigger_input_code": self.trigger_input_code, + "active_trigger_monitor_id": self.active_trigger_monitor_id, "trigger_target_port": self.trigger_target_port, "trigger_matches_device_port": self.trigger_matches_device_port, + "trigger_monitor_candidates": list(self.trigger_monitor_candidates), "alienware_detected": self.alienware_detected, "alienware_input_code": self.alienware_input_code, "ddm_slot": self.ddm_slot, @@ -100,8 +104,11 @@ class KvmSwitcherService: with self._state_lock: return self._status.to_dict() - def save_settings(self, device_port: str) -> dict[str, Any]: - new_config = AppConfig(device_port=device_port) + def save_settings(self, device_port: str, auxiliary_monitor_id: str | None = None) -> dict[str, Any]: + new_config = AppConfig( + device_port=device_port, + auxiliary_monitor_id=auxiliary_monitor_id, + ) self.config_store.save(new_config) return self.poll_once() @@ -112,7 +119,11 @@ class KvmSwitcherService: errors = list(scan.errors) errors.extend(config.validate()) - blocking_errors = [error for error in errors if _is_blocking_error(error)] + trigger_input_code, active_trigger_monitor_id, trigger_candidates = self._resolve_trigger_state( + config=config, + scan=scan, + errors=errors, + ) ddm_slot = self.ddm_backend.resolve_alienware_slot(force=False) supports_monitor_targeting = bool( @@ -125,9 +136,10 @@ class KvmSwitcherService: errors.append("Alienware monitorcontrol backend is not ready.") elif ddm_slot is None and not supports_monitor_targeting: errors.append("Alienware monitorcontrol target could not be resolved.") + blocking_errors = [error for error in errors if _is_blocking_error(error)] - trigger_target_port = self._port_name_for_input_code(scan.trigger_input_code) - desired_port = self._resolve_desired_port(config, scan) + trigger_target_port = self._port_name_for_input_code(trigger_input_code) + desired_port = self._resolve_desired_port(config, trigger_input_code) trigger_matches_device_port = desired_port is not None last_switch_result = "idle" with self._state_lock: @@ -150,6 +162,7 @@ class KvmSwitcherService: self._samsung_session_successful = True else: last_switch_result, last_switch_at, scan, errors = self._attempt_switch_sequence( + config=config, ddm_slot=ddm_slot, desired_port=desired_port, scan=scan, @@ -171,9 +184,11 @@ class KvmSwitcherService: samsung_session_successful=self._samsung_session_successful, samsung_session_attempt_count=self._samsung_session_attempt_count, waiting_for_samsung_disconnect=self._samsung_session_successful or self._samsung_session_attempt_count >= 3, - trigger_input_code=scan.trigger_input_code, + trigger_input_code=trigger_input_code, + active_trigger_monitor_id=active_trigger_monitor_id, trigger_target_port=trigger_target_port, trigger_matches_device_port=trigger_matches_device_port, + trigger_monitor_candidates=[candidate.to_dict() for candidate in trigger_candidates], alienware_detected=scan.alienware_detected, alienware_input_code=scan.alienware_input_code, ddm_slot=ddm_slot, @@ -201,9 +216,8 @@ class KvmSwitcherService: @staticmethod def _resolve_desired_port( config: AppConfig, - scan: HardwareScan, + trigger_input: int | None, ) -> str | None: - trigger_input = scan.trigger_input_code if trigger_input is None: return None desired_codes = KvmSwitcherService._port_input_codes(config.device_port) @@ -243,6 +257,7 @@ class KvmSwitcherService: def _attempt_switch_sequence( self, + config: AppConfig, ddm_slot: int | None, desired_port: str, scan: HardwareScan, @@ -274,8 +289,13 @@ class KvmSwitcherService: if not verify_scan.samsung_present: return "waiting_for_reconnect", last_switch_at, scan, errors + verify_trigger_input, _, _ = self._resolve_trigger_state( + config=config, + scan=verify_scan, + errors=errors, + ) desired_codes = self._port_input_codes(desired_port) - if verify_scan.trigger_input_code not in desired_codes: + if verify_trigger_input not in desired_codes: return "waiting_for_trigger_match", last_switch_at, scan, errors if verify_scan.alienware_input_code is None: errors.append( @@ -290,6 +310,39 @@ class KvmSwitcherService: return "max_attempts_waiting_for_disconnect", last_switch_at, scan, errors + @staticmethod + def _resolve_trigger_state( + config: AppConfig, + scan: HardwareScan, + errors: list[str], + ) -> tuple[int | None, str | None, list[TriggerMonitorCandidate]]: + trigger_input_code = scan.trigger_input_code + active_trigger_monitor_id = scan.trigger_monitor_id + candidates = list(scan.trigger_candidates) + + configured_id = config.auxiliary_monitor_id + if configured_id is None: + return trigger_input_code, active_trigger_monitor_id, candidates + + configured_candidate = next( + (candidate for candidate in candidates if candidate.id == configured_id), + None, + ) + if configured_candidate is None: + errors.append( + f"Configured auxiliary monitor '{configured_id}' was not found among detected trigger monitors." + ) + return trigger_input_code, active_trigger_monitor_id, candidates + + errors[:] = [error for error in errors if not _is_trigger_ambiguity_error(error)] + trigger_input_code = configured_candidate.input_code + active_trigger_monitor_id = configured_candidate.id + if trigger_input_code is None: + errors.append( + f"Unable to read configured auxiliary monitor input source: {configured_candidate.label}." + ) + return trigger_input_code, active_trigger_monitor_id, candidates + def _dedupe_errors(errors: list[str]) -> list[str]: unique: list[str] = [] @@ -308,6 +361,13 @@ def _is_blocking_error(error: str) -> bool: non_blocking_prefixes = ( "Unable to read Alienware target monitor input source:", "Alienware input could not be read after switch; assuming success because switch command completed.", - "Samsung trigger monitor did not expose a Samsung model in DDC/CI; using the only non-Alienware DDC monitor as trigger.", ) return not any(normalized.startswith(prefix) for prefix in non_blocking_prefixes) + + +def _is_trigger_ambiguity_error(error: str) -> bool: + normalized = error.strip() + return normalized in { + "Multiple Samsung DDC monitors were detected; trigger monitor is ambiguous.", + "Multiple non-Alienware DDC monitors were detected and Samsung did not expose model info; trigger monitor is ambiguous.", + } diff --git a/static/index.html b/static/index.html index 497d69d..ee51617 100644 --- a/static/index.html +++ b/static/index.html @@ -266,7 +266,13 @@ -

The selected port is saved locally in config.json.

+ +

Port and auxiliary monitor are saved locally in config.json.

@@ -287,6 +293,7 @@ const els = { form: document.getElementById("settings-form"), port: document.getElementById("device-port"), + auxMonitor: document.getElementById("aux-monitor"), saveBtn: document.getElementById("save-btn"), msg: document.getElementById("form-message"), stats: document.getElementById("stats"), @@ -303,16 +310,55 @@ return `
${label}${safe(value)}
`; } + function escapeHtml(text) { + return String(text) + .replaceAll("&", "&") + .replaceAll("<", "<") + .replaceAll(">", ">") + .replaceAll("\"", """) + .replaceAll("'", "'"); + } + + function renderAuxiliaryMonitorOptions(payloadCandidates, configuredAuxId) { + const candidates = Array.isArray(payloadCandidates) ? payloadCandidates : []; + const currentValue = els.auxMonitor.value || ""; + const desiredValue = formDirty ? currentValue : (configuredAuxId || ""); + + const options = ['']; + for (const candidate of candidates) { + if (!candidate || !candidate.id) { + continue; + } + const label = candidate.label || candidate.id; + const inputCode = candidate.input_code === null || candidate.input_code === undefined + ? "n/a" + : String(candidate.input_code); + options.push( + `` + ); + } + els.auxMonitor.innerHTML = options.join(""); + + const desiredExists = [...els.auxMonitor.options].some((opt) => opt.value === desiredValue); + els.auxMonitor.value = desiredExists ? desiredValue : ""; + } + function renderStatus(payload) { const config = payload.config || {}; const portVal = config.device_port; + const auxMonitorVal = config.auxiliary_monitor_id; if (!formDirty && document.activeElement !== els.port) { els.port.value = portVal ?? "DP1"; } + if (document.activeElement !== els.auxMonitor || !formDirty) { + renderAuxiliaryMonitorOptions(payload.trigger_monitor_candidates, auxMonitorVal); + } els.stats.innerHTML = [ stat("This Device Port", config.device_port), + stat("Configured Aux Monitor", config.auxiliary_monitor_id), + stat("Active Trigger Monitor", payload.active_trigger_monitor_id), stat("Samsung Present", payload.samsung_present ? "Yes" : "No"), stat("Trigger Input", payload.trigger_input_code), stat("Trigger Target Port", payload.trigger_target_port), @@ -359,6 +405,7 @@ event.preventDefault(); const port = els.port.value; + const auxiliaryMonitorId = els.auxMonitor.value || null; if (!port) { els.msg.className = "message error"; @@ -376,6 +423,7 @@ headers: { "Content-Type": "application/json" }, body: JSON.stringify({ device_port: port, + auxiliary_monitor_id: auxiliaryMonitorId, }), }); @@ -400,7 +448,7 @@ formDirty = false; renderStatus(payload); els.msg.className = "message ok"; - els.msg.textContent = "Settings saved. Poller now uses this device port."; + els.msg.textContent = "Settings saved. Poller now uses this port and auxiliary monitor selection."; } catch (error) { els.msg.className = "message error"; els.msg.textContent = error.message; @@ -412,6 +460,9 @@ els.port.addEventListener("change", () => { formDirty = true; }); + els.auxMonitor.addEventListener("change", () => { + formDirty = true; + }); els.form.addEventListener("submit", saveSettings); loadStatus(true); setInterval(() => loadStatus(false), 1500); diff --git a/tests/test_app.py b/tests/test_app.py index ff7b661..22a6ed1 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -26,15 +26,17 @@ def test_config_store_missing_file_and_roundtrip_save() -> None: config_path = Path(tempfile.mkdtemp()) / "config.json" store = ConfigStore(config_path) - assert store.get() == AppConfig(device_port="DP1") + assert store.get() == AppConfig(device_port="DP1", auxiliary_monitor_id=None) assert not config_path.exists() - saved = store.save(AppConfig(device_port="HDMI")) + saved = store.save(AppConfig(device_port="HDMI", auxiliary_monitor_id="generic-pnp-monitor-1")) assert saved.device_port == "HDMI" + assert saved.auxiliary_monitor_id == "generic-pnp-monitor-1" assert config_path.exists() reloaded = ConfigStore(config_path).get() assert reloaded.device_port == "HDMI" + assert reloaded.auxiliary_monitor_id == "generic-pnp-monitor-1" def test_config_store_loads_legacy_config_with_device_role() -> None: @@ -46,6 +48,7 @@ def test_config_store_loads_legacy_config_with_device_role() -> None: loaded = ConfigStore(config_path).get() assert loaded.device_port == "DP2" + assert loaded.auxiliary_monitor_id is None def test_api_status_and_settings_endpoints_with_fake_service() -> None: @@ -65,8 +68,10 @@ def test_api_status_and_settings_endpoints_with_fake_service() -> None: "samsung_session_attempt_count": 0, "waiting_for_samsung_disconnect": False, "trigger_input_code": None, + "active_trigger_monitor_id": None, "trigger_target_port": None, "trigger_matches_device_port": False, + "trigger_monitor_candidates": [], "alienware_detected": False, "alienware_input_code": None, "ddm_slot": None, @@ -85,8 +90,15 @@ def test_api_status_and_settings_endpoints_with_fake_service() -> None: def get_status(self) -> dict[str, object]: return self.payload - def save_settings(self, device_port: str) -> dict[str, object]: - self.payload["config"] = {"device_port": device_port} + def save_settings( + self, + device_port: str, + auxiliary_monitor_id: str | None = None, + ) -> dict[str, object]: + self.payload["config"] = { + "device_port": device_port, + "auxiliary_monitor_id": auxiliary_monitor_id, + } self.payload["last_switch_result"] = "updated" return self.payload @@ -98,10 +110,15 @@ def test_api_status_and_settings_endpoints_with_fake_service() -> None: save_response = client.post( "/api/settings", - json={"device_port": "HDMI", "device_role": "legacy_ignored"}, + json={ + "device_port": "HDMI", + "auxiliary_monitor_id": "generic-pnp-monitor-1", + "device_role": "legacy_ignored", + }, ) assert save_response.status_code == 200 assert save_response.json()["config"]["device_port"] == "HDMI" + assert save_response.json()["config"]["auxiliary_monitor_id"] == "generic-pnp-monitor-1" def test_polling_switches_when_trigger_matches_device_port() -> None: @@ -172,6 +189,83 @@ def test_polling_switches_when_trigger_matches_device_port() -> None: assert status["last_switch_result"] == "switched" +def test_polling_uses_configured_auxiliary_monitor() -> None: + service_module = _require_module("app.service") + hardware_module = _require_module("app.hardware") + ddm_module = _require_module("app.ddm") + + KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None) + HardwareScan = getattr(hardware_module, "HardwareScan", None) + TriggerMonitorCandidate = getattr(hardware_module, "TriggerMonitorCandidate", None) + DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None) + if ( + KvmSwitcherService is None + or HardwareScan is None + or TriggerMonitorCandidate is None + or DDMCommandResult is None + ): + pytest.skip("Service backend interfaces are not available yet.") + + class FakeMonitorBackend: + def __init__(self) -> None: + self.call_count = 0 + + def scan(self): + self.call_count += 1 + alienware_input = 15 if self.call_count == 1 else 19 + return HardwareScan( + samsung_present=True, + trigger_input_code=None, + trigger_monitor_id=None, + trigger_candidates=[ + TriggerMonitorCandidate(id="dell-u2720q-1", label="DELL U2720Q", input_code=60), + TriggerMonitorCandidate(id="generic-pnp-monitor-2", label="Generic PnP Monitor", input_code=19), + ], + alienware_detected=True, + alienware_input_code=alienware_input, + errors=[ + "Multiple non-Alienware DDC monitors were detected and Samsung did not expose model info; trigger monitor is ambiguous." + ], + ) + + class FakeDDMBackend: + def __init__(self) -> None: + self.calls: list[tuple[int, str]] = [] + + def is_available(self) -> bool: + return True + + def resolve_alienware_slot(self, force: bool = False) -> int | None: + return 1 + + def invalidate_slot(self) -> None: + return None + + def switch_to_port(self, slot: int, port_name: str): + self.calls.append((slot, port_name)) + return DDMCommandResult(True, "ok") + + config_path = Path(tempfile.mkdtemp()) / "config.json" + backend = FakeDDMBackend() + service = KvmSwitcherService( + config_store=ConfigStore(config_path), + monitor_backend=FakeMonitorBackend(), + ddm_backend=backend, + poll_interval_seconds=0.01, + retry_wait_seconds=0.0, + ) + status = service.save_settings( + device_port="DP2", + auxiliary_monitor_id="generic-pnp-monitor-2", + ) + assert status["config"]["auxiliary_monitor_id"] == "generic-pnp-monitor-2" + assert status["active_trigger_monitor_id"] == "generic-pnp-monitor-2" + assert status["trigger_input_code"] == 19 + assert status["trigger_matches_device_port"] is True + assert status["last_switch_result"] == "switched" + assert not status["errors"] + + def test_polling_does_not_switch_when_trigger_does_not_match_device_port() -> None: service_module = _require_module("app.service") hardware_module = _require_module("app.hardware")