Add internal KVM switch dashboard and service
This commit is contained in:
+146
@@ -0,0 +1,146 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Protocol
|
||||
|
||||
from monitorcontrol import get_monitors
|
||||
|
||||
|
||||
ALIENWARE_MODEL_TOKEN = "AW3423DWF"
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class HardwareScan:
|
||||
samsung_present: bool = False
|
||||
trigger_input_code: int | None = None
|
||||
alienware_detected: bool = False
|
||||
alienware_input_code: int | None = None
|
||||
trigger_description: str | None = None
|
||||
alienware_description: str | None = None
|
||||
errors: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class MonitorBackend(Protocol):
|
||||
def scan(self) -> HardwareScan: ...
|
||||
|
||||
|
||||
class RealMonitorBackend:
|
||||
def scan(self) -> HardwareScan:
|
||||
errors: list[str] = []
|
||||
samsung_present = self._is_samsung_present(errors)
|
||||
|
||||
try:
|
||||
monitors = list(get_monitors())
|
||||
except Exception as exc: # pragma: no cover - hardware path
|
||||
errors.append(f"Unable to enumerate DDC monitors: {exc}")
|
||||
return HardwareScan(samsung_present=samsung_present, errors=errors)
|
||||
|
||||
alienware_candidates: list[tuple[object, str]] = []
|
||||
trigger_candidates: list[tuple[object, str]] = []
|
||||
|
||||
for monitor in monitors:
|
||||
description = str(getattr(getattr(monitor, "vcp", None), "description", "") or "")
|
||||
normalized = description.upper()
|
||||
if ALIENWARE_MODEL_TOKEN in normalized:
|
||||
alienware_candidates.append((monitor, description))
|
||||
else:
|
||||
trigger_candidates.append((monitor, description or "Unknown DDC monitor"))
|
||||
|
||||
alienware_description: str | None = None
|
||||
alienware_input_code: int | None = None
|
||||
if len(alienware_candidates) == 1:
|
||||
alienware_monitor, alienware_description = alienware_candidates[0]
|
||||
alienware_input_code = self._read_input_code(
|
||||
alienware_monitor,
|
||||
"Alienware target monitor",
|
||||
errors,
|
||||
)
|
||||
elif not alienware_candidates:
|
||||
errors.append("Alienware AW3423DWF could not be identified through DDC/CI.")
|
||||
else:
|
||||
errors.append("Multiple Alienware AW3423DWF monitors were detected; refusing to switch.")
|
||||
|
||||
trigger_description: str | None = None
|
||||
trigger_input_code: int | None = None
|
||||
if len(trigger_candidates) == 1:
|
||||
trigger_monitor, trigger_description = trigger_candidates[0]
|
||||
trigger_input_code = self._read_input_code(
|
||||
trigger_monitor,
|
||||
"Samsung trigger monitor",
|
||||
errors,
|
||||
)
|
||||
elif not trigger_candidates:
|
||||
errors.append("A non-Alienware DDC monitor was not found for trigger-only logic.")
|
||||
else:
|
||||
errors.append("Multiple non-Alienware DDC monitors were detected; trigger monitor is ambiguous.")
|
||||
|
||||
if not samsung_present:
|
||||
errors.append("Samsung monitor could not be confirmed from Windows monitor metadata.")
|
||||
|
||||
return HardwareScan(
|
||||
samsung_present=samsung_present,
|
||||
trigger_input_code=trigger_input_code,
|
||||
alienware_detected=len(alienware_candidates) == 1,
|
||||
alienware_input_code=alienware_input_code,
|
||||
trigger_description=trigger_description,
|
||||
alienware_description=alienware_description,
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
def _is_samsung_present(self, errors: list[str]) -> bool:
|
||||
try:
|
||||
import win32com.client # pyright: ignore[reportMissingImports]
|
||||
except ImportError as exc: # pragma: no cover - dependency issue
|
||||
errors.append(f"pywin32 is unavailable for WMI monitor detection: {exc}")
|
||||
return False
|
||||
|
||||
try:
|
||||
locator = win32com.client.Dispatch("WbemScripting.SWbemLocator")
|
||||
service = locator.ConnectServer(".", "root\\wmi")
|
||||
query = "SELECT InstanceName, ManufacturerName, UserFriendlyName FROM WmiMonitorID"
|
||||
for row in service.ExecQuery(query):
|
||||
instance_name = str(getattr(row, "InstanceName", "") or "")
|
||||
manufacturer = _decode_wmi_string(getattr(row, "ManufacturerName", []))
|
||||
friendly_name = _decode_wmi_string(getattr(row, "UserFriendlyName", []))
|
||||
haystack = " ".join([instance_name, manufacturer, friendly_name]).upper()
|
||||
if "SAM" in haystack:
|
||||
return True
|
||||
except Exception as exc: # pragma: no cover - hardware path
|
||||
errors.append(f"Unable to query Windows monitor metadata: {exc}")
|
||||
return False
|
||||
|
||||
return False
|
||||
|
||||
def _read_input_code(
|
||||
self,
|
||||
monitor: object,
|
||||
label: str,
|
||||
errors: list[str],
|
||||
) -> int | None:
|
||||
try:
|
||||
with monitor:
|
||||
return int(monitor.get_input_source())
|
||||
except Exception as exc: # pragma: no cover - hardware path
|
||||
errors.append(f"Unable to read {label} input source: {exc}")
|
||||
return None
|
||||
|
||||
|
||||
def _decode_wmi_string(raw_values: object) -> str:
|
||||
if raw_values is None:
|
||||
return ""
|
||||
|
||||
try:
|
||||
values = list(raw_values)
|
||||
except TypeError:
|
||||
return str(raw_values)
|
||||
|
||||
chars: list[str] = []
|
||||
for value in values:
|
||||
try:
|
||||
number = int(value)
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
if number == 0:
|
||||
continue
|
||||
chars.append(chr(number))
|
||||
return "".join(chars).strip()
|
||||
Reference in New Issue
Block a user