225 lines
8.2 KiB
Python
225 lines
8.2 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Protocol
|
|
|
|
from monitorcontrol import get_monitors
|
|
from app.config import SUPPORTED_TARGET_PORTS
|
|
|
|
|
|
ALIENWARE_MODEL_TOKEN = "AW3423DWF"
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class HardwareScan:
|
|
samsung_present: bool = False
|
|
trigger_input_code: int | None = None
|
|
alienware_detected: bool = False
|
|
alienware_input_code: int | None = None
|
|
trigger_description: str | None = None
|
|
alienware_description: str | None = None
|
|
errors: list[str] = field(default_factory=list)
|
|
|
|
|
|
class MonitorBackend(Protocol):
|
|
def scan(self) -> HardwareScan: ...
|
|
|
|
|
|
class RealMonitorBackend:
|
|
def scan(self) -> HardwareScan:
|
|
errors: list[str] = []
|
|
samsung_present = self._is_samsung_present(errors)
|
|
|
|
try:
|
|
monitors = list(get_monitors())
|
|
except Exception as exc: # pragma: no cover - hardware path
|
|
errors.append(f"Unable to enumerate DDC monitors: {exc}")
|
|
return HardwareScan(samsung_present=samsung_present, errors=errors)
|
|
|
|
alienware_candidates: list[tuple[object, str]] = []
|
|
samsung_named_candidates: list[tuple[object, str]] = []
|
|
non_alienware_candidates: list[tuple[object, str]] = []
|
|
|
|
for monitor in monitors:
|
|
description = str(getattr(getattr(monitor, "vcp", None), "description", "") or str(monitor))
|
|
normalized = description.upper().strip()
|
|
if ALIENWARE_MODEL_TOKEN in normalized:
|
|
alienware_candidates.append((monitor, description))
|
|
continue
|
|
|
|
candidate = (monitor, description or "Unknown DDC monitor")
|
|
non_alienware_candidates.append(candidate)
|
|
if _is_samsung_description(normalized):
|
|
samsung_named_candidates.append(candidate)
|
|
|
|
alienware_description: str | None = None
|
|
alienware_input_code: int | None = None
|
|
if len(alienware_candidates) == 1:
|
|
alienware_monitor, alienware_description = alienware_candidates[0]
|
|
alienware_input_code = self._read_input_code(
|
|
alienware_monitor,
|
|
"Alienware target monitor",
|
|
errors,
|
|
)
|
|
elif not alienware_candidates:
|
|
errors.append("Alienware AW3423DWF could not be identified through DDC/CI.")
|
|
else:
|
|
errors.append("Multiple Alienware AW3423DWF monitors were detected; refusing to switch.")
|
|
|
|
trigger_description: str | None = None
|
|
trigger_input_code: int | None = None
|
|
trigger_candidates = (
|
|
samsung_named_candidates
|
|
if samsung_named_candidates
|
|
else (non_alienware_candidates if samsung_present else [])
|
|
)
|
|
if len(trigger_candidates) == 1:
|
|
trigger_monitor, trigger_description = trigger_candidates[0]
|
|
trigger_input_code = self._read_input_code(
|
|
trigger_monitor,
|
|
"Samsung trigger monitor",
|
|
errors,
|
|
)
|
|
elif not trigger_candidates:
|
|
errors.append("Samsung trigger monitor could not be identified through DDC/CI.")
|
|
else:
|
|
scanned_candidates: list[tuple[object, str, int | None]] = []
|
|
for monitor, description in trigger_candidates:
|
|
local_errors: list[str] = []
|
|
input_code = self._read_input_code(
|
|
monitor,
|
|
"Samsung trigger candidate monitor",
|
|
local_errors,
|
|
)
|
|
scanned_candidates.append((monitor, description, input_code))
|
|
|
|
selected_index = _select_trigger_candidate_index(
|
|
[(description, input_code) for _, description, input_code in scanned_candidates]
|
|
)
|
|
if selected_index is not None:
|
|
_, trigger_description, trigger_input_code = scanned_candidates[selected_index]
|
|
elif samsung_named_candidates:
|
|
errors.append("Multiple Samsung DDC monitors were detected; trigger monitor is ambiguous.")
|
|
else:
|
|
errors.append(
|
|
"Multiple non-Alienware DDC monitors were detected and Samsung did not expose model info; trigger monitor is ambiguous."
|
|
)
|
|
|
|
if not samsung_present:
|
|
errors.append("Samsung monitor could not be confirmed from Windows monitor metadata.")
|
|
|
|
return HardwareScan(
|
|
samsung_present=samsung_present,
|
|
trigger_input_code=trigger_input_code,
|
|
alienware_detected=len(alienware_candidates) == 1,
|
|
alienware_input_code=alienware_input_code,
|
|
trigger_description=trigger_description,
|
|
alienware_description=alienware_description,
|
|
errors=errors,
|
|
)
|
|
|
|
def _is_samsung_present(self, errors: list[str]) -> bool:
|
|
try:
|
|
import win32com.client # pyright: ignore[reportMissingImports]
|
|
except ImportError as exc: # pragma: no cover - dependency issue
|
|
errors.append(f"pywin32 is unavailable for WMI monitor detection: {exc}")
|
|
return False
|
|
|
|
try:
|
|
locator = win32com.client.Dispatch("WbemScripting.SWbemLocator")
|
|
service = locator.ConnectServer(".", "root\\wmi")
|
|
query = "SELECT InstanceName, ManufacturerName, UserFriendlyName FROM WmiMonitorID"
|
|
for row in service.ExecQuery(query):
|
|
instance_name = str(getattr(row, "InstanceName", "") or "")
|
|
manufacturer = _decode_wmi_string(getattr(row, "ManufacturerName", []))
|
|
friendly_name = _decode_wmi_string(getattr(row, "UserFriendlyName", []))
|
|
haystack = " ".join([instance_name, manufacturer, friendly_name]).upper()
|
|
if "SAM" in haystack:
|
|
return True
|
|
except Exception as exc: # pragma: no cover - hardware path
|
|
errors.append(f"Unable to query Windows monitor metadata: {exc}")
|
|
return False
|
|
|
|
return False
|
|
|
|
def _read_input_code(
|
|
self,
|
|
monitor: object,
|
|
label: str,
|
|
errors: list[str],
|
|
) -> int | None:
|
|
try:
|
|
with monitor:
|
|
return int(monitor.get_input_source())
|
|
except Exception as exc: # pragma: no cover - hardware path
|
|
errors.append(f"Unable to read {label} input source: {exc}")
|
|
return None
|
|
|
|
|
|
def _decode_wmi_string(raw_values: object) -> str:
|
|
if raw_values is None:
|
|
return ""
|
|
|
|
try:
|
|
values = list(raw_values)
|
|
except TypeError:
|
|
return str(raw_values)
|
|
|
|
chars: list[str] = []
|
|
for value in values:
|
|
try:
|
|
number = int(value)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
if number == 0:
|
|
continue
|
|
chars.append(chr(number))
|
|
return "".join(chars).strip()
|
|
|
|
|
|
def _is_samsung_description(description: str) -> bool:
|
|
normalized = description.upper().strip()
|
|
if "SAMSUNG" in normalized:
|
|
return True
|
|
if normalized.startswith("SAM"):
|
|
return True
|
|
return " SAM " in f" {normalized} "
|
|
|
|
|
|
def _is_generic_description(description: str) -> bool:
|
|
normalized = description.upper().strip()
|
|
return "GENERIC" in normalized or "UNKNOWN" in normalized
|
|
|
|
|
|
def _trigger_input_codes() -> set[int]:
|
|
codes: set[int] = set()
|
|
for port_spec in SUPPORTED_TARGET_PORTS.values():
|
|
input_codes = port_spec.get("input_codes", set())
|
|
codes.update(int(code) for code in input_codes)
|
|
return codes
|
|
|
|
|
|
def _select_trigger_candidate_index(candidates: list[tuple[str, int | None]]) -> int | None:
|
|
if not candidates:
|
|
return None
|
|
|
|
trigger_codes = _trigger_input_codes()
|
|
matching = [
|
|
index
|
|
for index, (_, input_code) in enumerate(candidates)
|
|
if input_code is not None and int(input_code) in trigger_codes
|
|
]
|
|
if len(matching) == 1:
|
|
return matching[0]
|
|
|
|
if len(matching) > 1:
|
|
generic_matches = [index for index in matching if _is_generic_description(candidates[index][0])]
|
|
if len(generic_matches) == 1:
|
|
return generic_matches[0]
|
|
|
|
generic = [index for index, (description, _) in enumerate(candidates) if _is_generic_description(description)]
|
|
if len(generic) == 1:
|
|
return generic[0]
|
|
|
|
return None
|