Files
KVM_Switch/app/hardware.py
T

229 lines
8.4 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol
from monitorcontrol import get_monitors
from app.config import SUPPORTED_TARGET_PORTS
ALIENWARE_MODEL_TOKEN = "AW3423DWF"
@dataclass(slots=True)
class HardwareScan:
samsung_present: bool = False
trigger_input_code: int | None = None
alienware_detected: bool = False
alienware_input_code: int | None = None
trigger_description: str | None = None
alienware_description: str | None = None
errors: list[str] = field(default_factory=list)
class MonitorBackend(Protocol):
def scan(self) -> HardwareScan: ...
class RealMonitorBackend:
def scan(self) -> HardwareScan:
errors: list[str] = []
samsung_present = self._is_samsung_present(errors)
try:
monitors = list(get_monitors())
except Exception as exc: # pragma: no cover - hardware path
errors.append(f"Unable to enumerate DDC monitors: {exc}")
return HardwareScan(samsung_present=samsung_present, errors=errors)
alienware_candidates: list[tuple[object, str]] = []
samsung_named_candidates: list[tuple[object, str]] = []
non_alienware_candidates: list[tuple[object, str]] = []
for monitor in monitors:
description = str(getattr(getattr(monitor, "vcp", None), "description", "") or str(monitor))
normalized = description.upper().strip()
if ALIENWARE_MODEL_TOKEN in normalized:
alienware_candidates.append((monitor, description))
continue
candidate = (monitor, description or "Unknown DDC monitor")
non_alienware_candidates.append(candidate)
if _is_samsung_description(normalized):
samsung_named_candidates.append(candidate)
alienware_description: str | None = None
alienware_input_code: int | None = None
if len(alienware_candidates) == 1:
alienware_monitor, alienware_description = alienware_candidates[0]
alienware_input_code = self._read_input_code(
alienware_monitor,
"Alienware target monitor",
errors,
)
elif not alienware_candidates:
errors.append("Alienware AW3423DWF could not be identified through DDC/CI.")
else:
errors.append("Multiple Alienware AW3423DWF monitors were detected; refusing to switch.")
trigger_description: str | None = None
trigger_input_code: int | None = None
trigger_candidates = (
samsung_named_candidates
if samsung_named_candidates
else (non_alienware_candidates if samsung_present else [])
)
if len(trigger_candidates) == 1:
trigger_monitor, trigger_description = trigger_candidates[0]
trigger_input_code = self._read_input_code(
trigger_monitor,
"Samsung trigger monitor",
errors,
)
if not samsung_named_candidates:
errors.append(
"Samsung trigger monitor did not expose a Samsung model in DDC/CI; using the only non-Alienware DDC monitor as trigger."
)
elif not trigger_candidates:
errors.append("Samsung trigger monitor could not be identified through DDC/CI.")
else:
scanned_candidates: list[tuple[object, str, int | None]] = []
for monitor, description in trigger_candidates:
local_errors: list[str] = []
input_code = self._read_input_code(
monitor,
"Samsung trigger candidate monitor",
local_errors,
)
scanned_candidates.append((monitor, description, input_code))
selected_index = _select_trigger_candidate_index(
[(description, input_code) for _, description, input_code in scanned_candidates]
)
if selected_index is not None:
_, trigger_description, trigger_input_code = scanned_candidates[selected_index]
elif samsung_named_candidates:
errors.append("Multiple Samsung DDC monitors were detected; trigger monitor is ambiguous.")
else:
errors.append(
"Multiple non-Alienware DDC monitors were detected and Samsung did not expose model info; trigger monitor is ambiguous."
)
if not samsung_present:
errors.append("Samsung monitor could not be confirmed from Windows monitor metadata.")
return HardwareScan(
samsung_present=samsung_present,
trigger_input_code=trigger_input_code,
alienware_detected=len(alienware_candidates) == 1,
alienware_input_code=alienware_input_code,
trigger_description=trigger_description,
alienware_description=alienware_description,
errors=errors,
)
def _is_samsung_present(self, errors: list[str]) -> bool:
try:
import win32com.client # pyright: ignore[reportMissingImports]
except ImportError as exc: # pragma: no cover - dependency issue
errors.append(f"pywin32 is unavailable for WMI monitor detection: {exc}")
return False
try:
locator = win32com.client.Dispatch("WbemScripting.SWbemLocator")
service = locator.ConnectServer(".", "root\\wmi")
query = "SELECT InstanceName, ManufacturerName, UserFriendlyName FROM WmiMonitorID"
for row in service.ExecQuery(query):
instance_name = str(getattr(row, "InstanceName", "") or "")
manufacturer = _decode_wmi_string(getattr(row, "ManufacturerName", []))
friendly_name = _decode_wmi_string(getattr(row, "UserFriendlyName", []))
haystack = " ".join([instance_name, manufacturer, friendly_name]).upper()
if "SAM" in haystack:
return True
except Exception as exc: # pragma: no cover - hardware path
errors.append(f"Unable to query Windows monitor metadata: {exc}")
return False
return False
def _read_input_code(
self,
monitor: object,
label: str,
errors: list[str],
) -> int | None:
try:
with monitor:
return int(monitor.get_input_source())
except Exception as exc: # pragma: no cover - hardware path
errors.append(f"Unable to read {label} input source: {exc}")
return None
def _decode_wmi_string(raw_values: object) -> str:
if raw_values is None:
return ""
try:
values = list(raw_values)
except TypeError:
return str(raw_values)
chars: list[str] = []
for value in values:
try:
number = int(value)
except (TypeError, ValueError):
continue
if number == 0:
continue
chars.append(chr(number))
return "".join(chars).strip()
def _is_samsung_description(description: str) -> bool:
normalized = description.upper().strip()
if "SAMSUNG" in normalized:
return True
if normalized.startswith("SAM"):
return True
return " SAM " in f" {normalized} "
def _is_generic_description(description: str) -> bool:
normalized = description.upper().strip()
return "GENERIC" in normalized or "UNKNOWN" in normalized
def _trigger_input_codes() -> set[int]:
codes: set[int] = set()
for port_spec in SUPPORTED_TARGET_PORTS.values():
input_codes = port_spec.get("input_codes", set())
codes.update(int(code) for code in input_codes)
return codes
def _select_trigger_candidate_index(candidates: list[tuple[str, int | None]]) -> int | None:
if not candidates:
return None
trigger_codes = _trigger_input_codes()
matching = [
index
for index, (_, input_code) in enumerate(candidates)
if input_code is not None and int(input_code) in trigger_codes
]
if len(matching) == 1:
return matching[0]
if len(matching) > 1:
generic_matches = [index for index in matching if _is_generic_description(candidates[index][0])]
if len(generic_matches) == 1:
return generic_matches[0]
generic = [index for index, (description, _) in enumerate(candidates) if _is_generic_description(description)]
if len(generic) == 1:
return generic[0]
return None