Simplify trigger logic to port-only matching and improve Samsung detection

This commit is contained in:
Lago
2026-03-27 15:09:16 +01:00
parent 33e762c182
commit 37eb9e9fa0
8 changed files with 276 additions and 186 deletions
+11 -46
View File
@@ -2,17 +2,12 @@ from __future__ import annotations
from dataclasses import dataclass
import json
import os
from pathlib import Path
from threading import Lock
PROJECT_ROOT = Path(__file__).resolve().parent.parent
CONFIG_PATH = PROJECT_ROOT / "config.json"
DEVICE_ROLE_ENV_VAR = "KVM_DEVICE_ROLE"
TOWER_TRIGGER_CODE = 15
LAPTOP_TRIGGER_CODE = 19
SUPPORTED_DEVICE_ROLES = {"tower", "laptop"}
SUPPORTED_TARGET_PORTS = {
"DP1": {"ddm_input": "DP1", "input_codes": {15}},
"DP2": {"ddm_input": "DP2", "input_codes": {19}},
@@ -22,29 +17,17 @@ SUPPORTED_TARGET_PORTS = {
@dataclass(slots=True)
class AppConfig:
device_role: str | None = None
device_port: str = "DP1"
@classmethod
def from_dict(cls, data: dict[str, object], default_role: str | None = None) -> "AppConfig":
return cls(
device_role=_coerce_role_name(data.get("device_role"), default_role),
device_port=_coerce_port_name(data.get("device_port"), "DP1"),
)
def from_dict(cls, data: dict[str, object]) -> "AppConfig":
return cls(device_port=_coerce_port_name(data.get("device_port"), "DP1"))
def to_dict(self) -> dict[str, str | None]:
return {
"device_role": self.device_role,
"device_port": self.device_port,
}
def to_dict(self) -> dict[str, str]:
return {"device_port": self.device_port}
def validate(self) -> list[str]:
errors: list[str] = []
if self.device_role not in SUPPORTED_DEVICE_ROLES:
errors.append(
f"Device role is not configured. Set it in the UI, set {DEVICE_ROLE_ENV_VAR} to TOWER or LAPTOP, or add device_role to config.json."
)
supported = ", ".join(SUPPORTED_TARGET_PORTS)
if self.device_port not in SUPPORTED_TARGET_PORTS:
errors.append(f"Device Port must be one of: {supported}.")
@@ -56,15 +39,11 @@ class ConfigStore:
def __init__(self, path: Path = CONFIG_PATH):
self.path = path
self._lock = Lock()
self._default_role = _coerce_role_name(os.environ.get(DEVICE_ROLE_ENV_VAR), None)
self._config = self._load_from_disk()
def get(self) -> AppConfig:
with self._lock:
return AppConfig(
device_role=self._config.device_role,
device_port=self._config.device_port,
)
return AppConfig(device_port=self._config.device_port)
def save(self, config: AppConfig) -> AppConfig:
errors = config.validate()
@@ -76,28 +55,22 @@ class ConfigStore:
json.dumps(config.to_dict(), indent=2),
encoding="utf-8",
)
self._config = AppConfig(
device_role=config.device_role,
device_port=config.device_port,
)
return AppConfig(
device_role=self._config.device_role,
device_port=self._config.device_port,
)
self._config = AppConfig(device_port=config.device_port)
return AppConfig(device_port=self._config.device_port)
def _load_from_disk(self) -> AppConfig:
if not self.path.exists():
return AppConfig(device_role=self._default_role)
return AppConfig()
try:
data = json.loads(self.path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return AppConfig(device_role=self._default_role)
return AppConfig()
if not isinstance(data, dict):
return AppConfig(device_role=self._default_role)
return AppConfig()
return AppConfig.from_dict(data, default_role=self._default_role)
return AppConfig.from_dict(data)
def _coerce_port_name(value: object, default: str) -> str:
@@ -106,11 +79,3 @@ def _coerce_port_name(value: object, default: str) -> str:
if normalized in SUPPORTED_TARGET_PORTS:
return normalized
return default
def _coerce_role_name(value: object, default: str | None) -> str | None:
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in SUPPORTED_DEVICE_ROLES:
return normalized
return default
+35 -7
View File
@@ -36,15 +36,20 @@ class RealMonitorBackend:
return HardwareScan(samsung_present=samsung_present, errors=errors)
alienware_candidates: list[tuple[object, str]] = []
trigger_candidates: list[tuple[object, str]] = []
samsung_named_candidates: list[tuple[object, str]] = []
non_alienware_candidates: list[tuple[object, str]] = []
for monitor in monitors:
description = str(getattr(getattr(monitor, "vcp", None), "description", "") or "")
normalized = description.upper()
description = str(getattr(getattr(monitor, "vcp", None), "description", "") or str(monitor))
normalized = description.upper().strip()
if ALIENWARE_MODEL_TOKEN in normalized:
alienware_candidates.append((monitor, description))
else:
trigger_candidates.append((monitor, description or "Unknown DDC monitor"))
continue
candidate = (monitor, description or "Unknown DDC monitor")
non_alienware_candidates.append(candidate)
if _is_samsung_description(normalized):
samsung_named_candidates.append(candidate)
alienware_description: str | None = None
alienware_input_code: int | None = None
@@ -62,6 +67,11 @@ class RealMonitorBackend:
trigger_description: str | None = None
trigger_input_code: int | None = None
trigger_candidates = (
samsung_named_candidates
if samsung_named_candidates
else (non_alienware_candidates if samsung_present else [])
)
if len(trigger_candidates) == 1:
trigger_monitor, trigger_description = trigger_candidates[0]
trigger_input_code = self._read_input_code(
@@ -69,10 +79,19 @@ class RealMonitorBackend:
"Samsung trigger monitor",
errors,
)
if not samsung_named_candidates:
errors.append(
"Samsung trigger monitor did not expose a Samsung model in DDC/CI; using the only non-Alienware DDC monitor as trigger."
)
elif not trigger_candidates:
errors.append("A non-Alienware DDC monitor was not found for trigger-only logic.")
errors.append("Samsung trigger monitor could not be identified through DDC/CI.")
else:
errors.append("Multiple non-Alienware DDC monitors were detected; trigger monitor is ambiguous.")
if samsung_named_candidates:
errors.append("Multiple Samsung DDC monitors were detected; trigger monitor is ambiguous.")
else:
errors.append(
"Multiple non-Alienware DDC monitors were detected and Samsung did not expose model info; trigger monitor is ambiguous."
)
if not samsung_present:
errors.append("Samsung monitor could not be confirmed from Windows monitor metadata.")
@@ -144,3 +163,12 @@ def _decode_wmi_string(raw_values: object) -> str:
continue
chars.append(chr(number))
return "".join(chars).strip()
def _is_samsung_description(description: str) -> bool:
normalized = description.upper().strip()
if "SAMSUNG" in normalized:
return True
if normalized.startswith("SAM"):
return True
return " SAM " in f" {normalized} "
-2
View File
@@ -18,7 +18,6 @@ INDEX_PATH = STATIC_DIR / "index.html"
class SettingsPayload(BaseModel):
device_role: str
device_port: str
@@ -59,7 +58,6 @@ def create_app(
async def post_settings(payload: SettingsPayload) -> dict[str, Any]:
try:
return active_service.save_settings(
device_role=payload.device_role,
device_port=payload.device_port,
)
except ValueError as exc:
+55 -27
View File
@@ -8,9 +8,7 @@ from typing import Any
from app.config import (
AppConfig,
ConfigStore,
LAPTOP_TRIGGER_CODE,
SUPPORTED_TARGET_PORTS,
TOWER_TRIGGER_CODE,
)
from app.ddm import DDMBackend, RealDDMBackend
from app.hardware import HardwareScan, MonitorBackend, RealMonitorBackend
@@ -26,9 +24,10 @@ class ServiceStatus:
samsung_session_attempt_count: int = 0
waiting_for_samsung_disconnect: bool = False
trigger_input_code: int | None = None
trigger_target_port: str | None = None
trigger_matches_device_port: bool = False
alienware_detected: bool = False
alienware_input_code: int | None = None
resolved_target: str | None = None
ddm_slot: int | None = None
ddm_ready: bool = False
last_switch_result: str = "idle"
@@ -45,9 +44,10 @@ class ServiceStatus:
"samsung_session_attempt_count": self.samsung_session_attempt_count,
"waiting_for_samsung_disconnect": self.waiting_for_samsung_disconnect,
"trigger_input_code": self.trigger_input_code,
"trigger_target_port": self.trigger_target_port,
"trigger_matches_device_port": self.trigger_matches_device_port,
"alienware_detected": self.alienware_detected,
"alienware_input_code": self.alienware_input_code,
"resolved_target": self.resolved_target,
"ddm_slot": self.ddm_slot,
"ddm_ready": self.ddm_ready,
"last_switch_result": self.last_switch_result,
@@ -100,11 +100,8 @@ class KvmSwitcherService:
with self._state_lock:
return self._status.to_dict()
def save_settings(self, device_role: str, device_port: str) -> dict[str, Any]:
new_config = AppConfig(
device_role=device_role,
device_port=device_port,
)
def save_settings(self, device_port: str) -> dict[str, Any]:
new_config = AppConfig(device_port=device_port)
self.config_store.save(new_config)
return self.poll_once()
@@ -115,6 +112,7 @@ class KvmSwitcherService:
errors = list(scan.errors)
errors.extend(config.validate())
blocking_errors = [error for error in errors if _is_blocking_error(error)]
ddm_slot = self.ddm_backend.resolve_alienware_slot(force=False)
supports_monitor_targeting = bool(
@@ -128,15 +126,16 @@ class KvmSwitcherService:
elif ddm_slot is None and not supports_monitor_targeting:
errors.append("Alienware DDM slot could not be resolved.")
resolved_target, desired_port = self._resolve_target(config, scan)
trigger_target_port = self._port_name_for_input_code(scan.trigger_input_code)
desired_port = self._resolve_desired_port(config, scan)
trigger_matches_device_port = desired_port is not None
last_switch_result = "idle"
with self._state_lock:
last_switch_at = self._status.last_switch_at
should_attempt_switch = (
not errors
not blocking_errors
and desired_port is not None
and scan.alienware_input_code is not None
and ddm_ready
and scan.samsung_present
and not self._samsung_session_successful
@@ -145,7 +144,7 @@ class KvmSwitcherService:
if should_attempt_switch:
desired_codes = self._port_input_codes(desired_port)
if scan.alienware_input_code in desired_codes:
if scan.alienware_input_code is not None and scan.alienware_input_code in desired_codes:
last_switch_result = "noop"
self._samsung_session_attempted = True
self._samsung_session_successful = True
@@ -157,12 +156,12 @@ class KvmSwitcherService:
last_switch_at=last_switch_at,
errors=errors,
)
elif desired_port is not None and not config.validate():
last_switch_result = "blocked"
elif resolved_target == config.device_role and self._samsung_session_successful:
elif trigger_matches_device_port and self._samsung_session_successful:
last_switch_result = "waiting_for_disconnect"
elif resolved_target == config.device_role and self._samsung_session_attempt_count >= 3:
elif trigger_matches_device_port and self._samsung_session_attempt_count >= 3:
last_switch_result = "max_attempts_waiting_for_disconnect"
elif scan.samsung_present:
last_switch_result = "waiting_for_trigger_match"
status = ServiceStatus(
config=config,
@@ -173,9 +172,10 @@ class KvmSwitcherService:
samsung_session_attempt_count=self._samsung_session_attempt_count,
waiting_for_samsung_disconnect=self._samsung_session_successful or self._samsung_session_attempt_count >= 3,
trigger_input_code=scan.trigger_input_code,
trigger_target_port=trigger_target_port,
trigger_matches_device_port=trigger_matches_device_port,
alienware_detected=scan.alienware_detected,
alienware_input_code=scan.alienware_input_code,
resolved_target=resolved_target,
ddm_slot=ddm_slot,
ddm_ready=ddm_ready,
last_switch_result=last_switch_result,
@@ -199,19 +199,17 @@ class KvmSwitcherService:
self._stop_event.wait(self.poll_interval_seconds)
@staticmethod
def _resolve_target(
def _resolve_desired_port(
config: AppConfig,
scan: HardwareScan,
) -> tuple[str | None, str | None]:
) -> str | None:
trigger_input = scan.trigger_input_code
if trigger_input is None:
return None, None
if trigger_input == TOWER_TRIGGER_CODE:
return "tower", config.device_port if config.device_role == "tower" else None
if trigger_input == LAPTOP_TRIGGER_CODE:
return "laptop", config.device_port if config.device_role == "laptop" else None
return None, None
return None
desired_codes = KvmSwitcherService._port_input_codes(config.device_port)
if trigger_input in desired_codes:
return config.device_port
return None
@staticmethod
def _port_input_codes(port_name: str) -> set[int]:
@@ -219,6 +217,17 @@ class KvmSwitcherService:
raw_codes = port_spec.get("input_codes", set())
return {int(code) for code in raw_codes}
@staticmethod
def _port_name_for_input_code(input_code: int | None) -> str | None:
if input_code is None:
return None
for port_name in SUPPORTED_TARGET_PORTS:
port_codes = KvmSwitcherService._port_input_codes(port_name)
if input_code in port_codes:
return port_name
return None
def _update_samsung_session(self, samsung_present: bool) -> None:
if samsung_present:
if not self._samsung_session_active:
@@ -266,6 +275,15 @@ class KvmSwitcherService:
return "waiting_for_reconnect", last_switch_at, scan, errors
desired_codes = self._port_input_codes(desired_port)
if verify_scan.trigger_input_code not in desired_codes:
return "waiting_for_trigger_match", last_switch_at, scan, errors
if verify_scan.alienware_input_code is None:
errors.append(
"Alienware input could not be read after switch; assuming success because DDM command completed."
)
self._samsung_session_successful = True
return "switched_unverified", last_switch_at, scan, errors
if verify_scan.alienware_input_code in desired_codes:
self._samsung_session_successful = True
return last_result, last_switch_at, scan, errors
@@ -283,3 +301,13 @@ def _dedupe_errors(errors: list[str]) -> list[str]:
seen.add(normalized)
unique.append(normalized)
return unique
def _is_blocking_error(error: str) -> bool:
normalized = error.strip()
non_blocking_prefixes = (
"Unable to read Alienware target monitor input source:",
"Alienware input could not be read after switch; assuming success because DDM command completed.",
"Samsung trigger monitor did not expose a Samsung model in DDC/CI; using the only non-Alienware DDC monitor as trigger.",
)
return not any(normalized.startswith(prefix) for prefix in non_blocking_prefixes)