Add auxiliary monitor configuration and enhance trigger logic for KVM switching

This commit is contained in:
Lago
2026-03-27 15:36:37 +01:00
parent 208ad243a7
commit c8ab5ad9bc
7 changed files with 310 additions and 43 deletions
+1
View File
@@ -15,6 +15,7 @@ Monitor filtering:
Configure only one value per machine:
- this device Alienware target port: `DP1`, `DP2`, or `HDMI`
- optional auxiliary trigger monitor: selected in UI; if not set, auto-detection is used
Samsung trigger input codes mapped by port:
+30 -6
View File
@@ -18,13 +18,20 @@ SUPPORTED_TARGET_PORTS = {
@dataclass(slots=True)
class AppConfig:
device_port: str = "DP1"
auxiliary_monitor_id: str | None = None
@classmethod
def from_dict(cls, data: dict[str, object]) -> "AppConfig":
return cls(device_port=_coerce_port_name(data.get("device_port"), "DP1"))
return cls(
device_port=_coerce_port_name(data.get("device_port"), "DP1"),
auxiliary_monitor_id=_coerce_aux_monitor_id(data.get("auxiliary_monitor_id")),
)
def to_dict(self) -> dict[str, str]:
return {"device_port": self.device_port}
def to_dict(self) -> dict[str, str | None]:
return {
"device_port": self.device_port,
"auxiliary_monitor_id": self.auxiliary_monitor_id,
}
def validate(self) -> list[str]:
errors: list[str] = []
@@ -43,7 +50,10 @@ class ConfigStore:
def get(self) -> AppConfig:
with self._lock:
return AppConfig(device_port=self._config.device_port)
return AppConfig(
device_port=self._config.device_port,
auxiliary_monitor_id=self._config.auxiliary_monitor_id,
)
def save(self, config: AppConfig) -> AppConfig:
errors = config.validate()
@@ -55,8 +65,14 @@ class ConfigStore:
json.dumps(config.to_dict(), indent=2),
encoding="utf-8",
)
self._config = AppConfig(device_port=config.device_port)
return AppConfig(device_port=self._config.device_port)
self._config = AppConfig(
device_port=config.device_port,
auxiliary_monitor_id=config.auxiliary_monitor_id,
)
return AppConfig(
device_port=self._config.device_port,
auxiliary_monitor_id=self._config.auxiliary_monitor_id,
)
def _load_from_disk(self) -> AppConfig:
if not self.path.exists():
@@ -79,3 +95,11 @@ def _coerce_port_name(value: object, default: str) -> str:
if normalized in SUPPORTED_TARGET_PORTS:
return normalized
return default
def _coerce_aux_monitor_id(value: object) -> str | None:
if isinstance(value, str):
normalized = value.strip()
if normalized:
return normalized
return None
+50 -15
View File
@@ -10,14 +10,30 @@ from app.config import SUPPORTED_TARGET_PORTS
ALIENWARE_MODEL_TOKEN = "AW3423DWF"
@dataclass(slots=True)
class TriggerMonitorCandidate:
id: str
label: str
input_code: int | None = None
def to_dict(self) -> dict[str, int | str | None]:
return {
"id": self.id,
"label": self.label,
"input_code": self.input_code,
}
@dataclass(slots=True)
class HardwareScan:
samsung_present: bool = False
trigger_input_code: int | None = None
trigger_monitor_id: str | None = None
alienware_detected: bool = False
alienware_input_code: int | None = None
trigger_description: str | None = None
alienware_description: str | None = None
trigger_candidates: list[TriggerMonitorCandidate] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
@@ -68,36 +84,44 @@ class RealMonitorBackend:
trigger_description: str | None = None
trigger_input_code: int | None = None
trigger_monitor_id: str | None = None
trigger_candidates = (
samsung_named_candidates
if samsung_named_candidates
else (non_alienware_candidates if samsung_present else [])
)
if len(trigger_candidates) == 1:
trigger_monitor, trigger_description = trigger_candidates[0]
trigger_input_code = self._read_input_code(
trigger_monitor,
"Samsung trigger monitor",
errors,
)
elif not trigger_candidates:
errors.append("Samsung trigger monitor could not be identified through DDC/CI.")
else:
scanned_candidates: list[tuple[object, str, int | None]] = []
for monitor, description in trigger_candidates:
scanned_candidates: list[TriggerMonitorCandidate] = []
for index, (monitor, description) in enumerate(trigger_candidates, start=1):
local_errors: list[str] = []
input_code = self._read_input_code(
monitor,
"Samsung trigger candidate monitor",
local_errors,
)
scanned_candidates.append((monitor, description, input_code))
scanned_candidates.append(
TriggerMonitorCandidate(
id=_build_trigger_candidate_id(description, index),
label=description,
input_code=input_code,
)
)
if len(scanned_candidates) == 1:
selected = scanned_candidates[0]
trigger_description = selected.label
trigger_input_code = selected.input_code
trigger_monitor_id = selected.id
elif not scanned_candidates:
errors.append("Samsung trigger monitor could not be identified through DDC/CI.")
else:
selected_index = _select_trigger_candidate_index(
[(description, input_code) for _, description, input_code in scanned_candidates]
[(candidate.label, candidate.input_code) for candidate in scanned_candidates]
)
if selected_index is not None:
_, trigger_description, trigger_input_code = scanned_candidates[selected_index]
selected = scanned_candidates[selected_index]
trigger_description = selected.label
trigger_input_code = selected.input_code
trigger_monitor_id = selected.id
elif samsung_named_candidates:
errors.append("Multiple Samsung DDC monitors were detected; trigger monitor is ambiguous.")
else:
@@ -111,10 +135,12 @@ class RealMonitorBackend:
return HardwareScan(
samsung_present=samsung_present,
trigger_input_code=trigger_input_code,
trigger_monitor_id=trigger_monitor_id,
alienware_detected=len(alienware_candidates) == 1,
alienware_input_code=alienware_input_code,
trigger_description=trigger_description,
alienware_description=alienware_description,
trigger_candidates=scanned_candidates,
errors=errors,
)
@@ -222,3 +248,12 @@ def _select_trigger_candidate_index(candidates: list[tuple[str, int | None]]) ->
return generic[0]
return None
def _build_trigger_candidate_id(description: str, index: int) -> str:
normalized = description.strip().lower()
safe = "".join(ch if ch.isalnum() else "-" for ch in normalized).strip("-")
safe = "-".join(part for part in safe.split("-") if part)
if not safe:
safe = "monitor"
return f"{safe}-{index}"
+2
View File
@@ -19,6 +19,7 @@ INDEX_PATH = STATIC_DIR / "index.html"
class SettingsPayload(BaseModel):
device_port: str
auxiliary_monitor_id: str | None = None
def create_app(
@@ -59,6 +60,7 @@ def create_app(
try:
return active_service.save_settings(
device_port=payload.device_port,
auxiliary_monitor_id=payload.auxiliary_monitor_id,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
+71 -11
View File
@@ -11,7 +11,7 @@ from app.config import (
SUPPORTED_TARGET_PORTS,
)
from app.ddm import DDMBackend, RealDDMBackend
from app.hardware import HardwareScan, MonitorBackend, RealMonitorBackend
from app.hardware import HardwareScan, MonitorBackend, RealMonitorBackend, TriggerMonitorCandidate
@dataclass(slots=True)
@@ -24,8 +24,10 @@ class ServiceStatus:
samsung_session_attempt_count: int = 0
waiting_for_samsung_disconnect: bool = False
trigger_input_code: int | None = None
active_trigger_monitor_id: str | None = None
trigger_target_port: str | None = None
trigger_matches_device_port: bool = False
trigger_monitor_candidates: list[dict[str, int | str | None]] = field(default_factory=list)
alienware_detected: bool = False
alienware_input_code: int | None = None
ddm_slot: int | None = None
@@ -44,8 +46,10 @@ class ServiceStatus:
"samsung_session_attempt_count": self.samsung_session_attempt_count,
"waiting_for_samsung_disconnect": self.waiting_for_samsung_disconnect,
"trigger_input_code": self.trigger_input_code,
"active_trigger_monitor_id": self.active_trigger_monitor_id,
"trigger_target_port": self.trigger_target_port,
"trigger_matches_device_port": self.trigger_matches_device_port,
"trigger_monitor_candidates": list(self.trigger_monitor_candidates),
"alienware_detected": self.alienware_detected,
"alienware_input_code": self.alienware_input_code,
"ddm_slot": self.ddm_slot,
@@ -100,8 +104,11 @@ class KvmSwitcherService:
with self._state_lock:
return self._status.to_dict()
def save_settings(self, device_port: str) -> dict[str, Any]:
new_config = AppConfig(device_port=device_port)
def save_settings(self, device_port: str, auxiliary_monitor_id: str | None = None) -> dict[str, Any]:
new_config = AppConfig(
device_port=device_port,
auxiliary_monitor_id=auxiliary_monitor_id,
)
self.config_store.save(new_config)
return self.poll_once()
@@ -112,7 +119,11 @@ class KvmSwitcherService:
errors = list(scan.errors)
errors.extend(config.validate())
blocking_errors = [error for error in errors if _is_blocking_error(error)]
trigger_input_code, active_trigger_monitor_id, trigger_candidates = self._resolve_trigger_state(
config=config,
scan=scan,
errors=errors,
)
ddm_slot = self.ddm_backend.resolve_alienware_slot(force=False)
supports_monitor_targeting = bool(
@@ -125,9 +136,10 @@ class KvmSwitcherService:
errors.append("Alienware monitorcontrol backend is not ready.")
elif ddm_slot is None and not supports_monitor_targeting:
errors.append("Alienware monitorcontrol target could not be resolved.")
blocking_errors = [error for error in errors if _is_blocking_error(error)]
trigger_target_port = self._port_name_for_input_code(scan.trigger_input_code)
desired_port = self._resolve_desired_port(config, scan)
trigger_target_port = self._port_name_for_input_code(trigger_input_code)
desired_port = self._resolve_desired_port(config, trigger_input_code)
trigger_matches_device_port = desired_port is not None
last_switch_result = "idle"
with self._state_lock:
@@ -150,6 +162,7 @@ class KvmSwitcherService:
self._samsung_session_successful = True
else:
last_switch_result, last_switch_at, scan, errors = self._attempt_switch_sequence(
config=config,
ddm_slot=ddm_slot,
desired_port=desired_port,
scan=scan,
@@ -171,9 +184,11 @@ class KvmSwitcherService:
samsung_session_successful=self._samsung_session_successful,
samsung_session_attempt_count=self._samsung_session_attempt_count,
waiting_for_samsung_disconnect=self._samsung_session_successful or self._samsung_session_attempt_count >= 3,
trigger_input_code=scan.trigger_input_code,
trigger_input_code=trigger_input_code,
active_trigger_monitor_id=active_trigger_monitor_id,
trigger_target_port=trigger_target_port,
trigger_matches_device_port=trigger_matches_device_port,
trigger_monitor_candidates=[candidate.to_dict() for candidate in trigger_candidates],
alienware_detected=scan.alienware_detected,
alienware_input_code=scan.alienware_input_code,
ddm_slot=ddm_slot,
@@ -201,9 +216,8 @@ class KvmSwitcherService:
@staticmethod
def _resolve_desired_port(
config: AppConfig,
scan: HardwareScan,
trigger_input: int | None,
) -> str | None:
trigger_input = scan.trigger_input_code
if trigger_input is None:
return None
desired_codes = KvmSwitcherService._port_input_codes(config.device_port)
@@ -243,6 +257,7 @@ class KvmSwitcherService:
def _attempt_switch_sequence(
self,
config: AppConfig,
ddm_slot: int | None,
desired_port: str,
scan: HardwareScan,
@@ -274,8 +289,13 @@ class KvmSwitcherService:
if not verify_scan.samsung_present:
return "waiting_for_reconnect", last_switch_at, scan, errors
verify_trigger_input, _, _ = self._resolve_trigger_state(
config=config,
scan=verify_scan,
errors=errors,
)
desired_codes = self._port_input_codes(desired_port)
if verify_scan.trigger_input_code not in desired_codes:
if verify_trigger_input not in desired_codes:
return "waiting_for_trigger_match", last_switch_at, scan, errors
if verify_scan.alienware_input_code is None:
errors.append(
@@ -290,6 +310,39 @@ class KvmSwitcherService:
return "max_attempts_waiting_for_disconnect", last_switch_at, scan, errors
@staticmethod
def _resolve_trigger_state(
config: AppConfig,
scan: HardwareScan,
errors: list[str],
) -> tuple[int | None, str | None, list[TriggerMonitorCandidate]]:
trigger_input_code = scan.trigger_input_code
active_trigger_monitor_id = scan.trigger_monitor_id
candidates = list(scan.trigger_candidates)
configured_id = config.auxiliary_monitor_id
if configured_id is None:
return trigger_input_code, active_trigger_monitor_id, candidates
configured_candidate = next(
(candidate for candidate in candidates if candidate.id == configured_id),
None,
)
if configured_candidate is None:
errors.append(
f"Configured auxiliary monitor '{configured_id}' was not found among detected trigger monitors."
)
return trigger_input_code, active_trigger_monitor_id, candidates
errors[:] = [error for error in errors if not _is_trigger_ambiguity_error(error)]
trigger_input_code = configured_candidate.input_code
active_trigger_monitor_id = configured_candidate.id
if trigger_input_code is None:
errors.append(
f"Unable to read configured auxiliary monitor input source: {configured_candidate.label}."
)
return trigger_input_code, active_trigger_monitor_id, candidates
def _dedupe_errors(errors: list[str]) -> list[str]:
unique: list[str] = []
@@ -308,6 +361,13 @@ def _is_blocking_error(error: str) -> bool:
non_blocking_prefixes = (
"Unable to read Alienware target monitor input source:",
"Alienware input could not be read after switch; assuming success because switch command completed.",
"Samsung trigger monitor did not expose a Samsung model in DDC/CI; using the only non-Alienware DDC monitor as trigger.",
)
return not any(normalized.startswith(prefix) for prefix in non_blocking_prefixes)
def _is_trigger_ambiguity_error(error: str) -> bool:
normalized = error.strip()
return normalized in {
"Multiple Samsung DDC monitors were detected; trigger monitor is ambiguous.",
"Multiple non-Alienware DDC monitors were detected and Samsung did not expose model info; trigger monitor is ambiguous.",
}
+53 -2
View File
@@ -266,7 +266,13 @@
<option value="HDMI">HDMI</option>
</select>
</label>
<p class="hint">The selected port is saved locally in <code>config.json</code>.</p>
<label class="field" for="aux-monitor">
<span>Auxiliary Trigger Monitor</span>
<select id="aux-monitor">
<option value="">Auto-detect</option>
</select>
</label>
<p class="hint">Port and auxiliary monitor are saved locally in <code>config.json</code>.</p>
<div class="actions">
<button id="save-btn" type="submit">Save Settings</button>
</div>
@@ -287,6 +293,7 @@
const els = {
form: document.getElementById("settings-form"),
port: document.getElementById("device-port"),
auxMonitor: document.getElementById("aux-monitor"),
saveBtn: document.getElementById("save-btn"),
msg: document.getElementById("form-message"),
stats: document.getElementById("stats"),
@@ -303,16 +310,55 @@
return `<div class="stat"><span class="k">${label}</span><span class="v">${safe(value)}</span></div>`;
}
function escapeHtml(text) {
return String(text)
.replaceAll("&", "&amp;")
.replaceAll("<", "&lt;")
.replaceAll(">", "&gt;")
.replaceAll("\"", "&quot;")
.replaceAll("'", "&#39;");
}
function renderAuxiliaryMonitorOptions(payloadCandidates, configuredAuxId) {
const candidates = Array.isArray(payloadCandidates) ? payloadCandidates : [];
const currentValue = els.auxMonitor.value || "";
const desiredValue = formDirty ? currentValue : (configuredAuxId || "");
const options = ['<option value="">Auto-detect</option>'];
for (const candidate of candidates) {
if (!candidate || !candidate.id) {
continue;
}
const label = candidate.label || candidate.id;
const inputCode = candidate.input_code === null || candidate.input_code === undefined
? "n/a"
: String(candidate.input_code);
options.push(
`<option value="${escapeHtml(candidate.id)}">${escapeHtml(label)} (input: ${escapeHtml(inputCode)})</option>`
);
}
els.auxMonitor.innerHTML = options.join("");
const desiredExists = [...els.auxMonitor.options].some((opt) => opt.value === desiredValue);
els.auxMonitor.value = desiredExists ? desiredValue : "";
}
function renderStatus(payload) {
const config = payload.config || {};
const portVal = config.device_port;
const auxMonitorVal = config.auxiliary_monitor_id;
if (!formDirty && document.activeElement !== els.port) {
els.port.value = portVal ?? "DP1";
}
if (document.activeElement !== els.auxMonitor || !formDirty) {
renderAuxiliaryMonitorOptions(payload.trigger_monitor_candidates, auxMonitorVal);
}
els.stats.innerHTML = [
stat("This Device Port", config.device_port),
stat("Configured Aux Monitor", config.auxiliary_monitor_id),
stat("Active Trigger Monitor", payload.active_trigger_monitor_id),
stat("Samsung Present", payload.samsung_present ? "Yes" : "No"),
stat("Trigger Input", payload.trigger_input_code),
stat("Trigger Target Port", payload.trigger_target_port),
@@ -359,6 +405,7 @@
event.preventDefault();
const port = els.port.value;
const auxiliaryMonitorId = els.auxMonitor.value || null;
if (!port) {
els.msg.className = "message error";
@@ -376,6 +423,7 @@
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
device_port: port,
auxiliary_monitor_id: auxiliaryMonitorId,
}),
});
@@ -400,7 +448,7 @@
formDirty = false;
renderStatus(payload);
els.msg.className = "message ok";
els.msg.textContent = "Settings saved. Poller now uses this device port.";
els.msg.textContent = "Settings saved. Poller now uses this port and auxiliary monitor selection.";
} catch (error) {
els.msg.className = "message error";
els.msg.textContent = error.message;
@@ -412,6 +460,9 @@
els.port.addEventListener("change", () => {
formDirty = true;
});
els.auxMonitor.addEventListener("change", () => {
formDirty = true;
});
els.form.addEventListener("submit", saveSettings);
loadStatus(true);
setInterval(() => loadStatus(false), 1500);
+99 -5
View File
@@ -26,15 +26,17 @@ def test_config_store_missing_file_and_roundtrip_save() -> None:
config_path = Path(tempfile.mkdtemp()) / "config.json"
store = ConfigStore(config_path)
assert store.get() == AppConfig(device_port="DP1")
assert store.get() == AppConfig(device_port="DP1", auxiliary_monitor_id=None)
assert not config_path.exists()
saved = store.save(AppConfig(device_port="HDMI"))
saved = store.save(AppConfig(device_port="HDMI", auxiliary_monitor_id="generic-pnp-monitor-1"))
assert saved.device_port == "HDMI"
assert saved.auxiliary_monitor_id == "generic-pnp-monitor-1"
assert config_path.exists()
reloaded = ConfigStore(config_path).get()
assert reloaded.device_port == "HDMI"
assert reloaded.auxiliary_monitor_id == "generic-pnp-monitor-1"
def test_config_store_loads_legacy_config_with_device_role() -> None:
@@ -46,6 +48,7 @@ def test_config_store_loads_legacy_config_with_device_role() -> None:
loaded = ConfigStore(config_path).get()
assert loaded.device_port == "DP2"
assert loaded.auxiliary_monitor_id is None
def test_api_status_and_settings_endpoints_with_fake_service() -> None:
@@ -65,8 +68,10 @@ def test_api_status_and_settings_endpoints_with_fake_service() -> None:
"samsung_session_attempt_count": 0,
"waiting_for_samsung_disconnect": False,
"trigger_input_code": None,
"active_trigger_monitor_id": None,
"trigger_target_port": None,
"trigger_matches_device_port": False,
"trigger_monitor_candidates": [],
"alienware_detected": False,
"alienware_input_code": None,
"ddm_slot": None,
@@ -85,8 +90,15 @@ def test_api_status_and_settings_endpoints_with_fake_service() -> None:
def get_status(self) -> dict[str, object]:
return self.payload
def save_settings(self, device_port: str) -> dict[str, object]:
self.payload["config"] = {"device_port": device_port}
def save_settings(
self,
device_port: str,
auxiliary_monitor_id: str | None = None,
) -> dict[str, object]:
self.payload["config"] = {
"device_port": device_port,
"auxiliary_monitor_id": auxiliary_monitor_id,
}
self.payload["last_switch_result"] = "updated"
return self.payload
@@ -98,10 +110,15 @@ def test_api_status_and_settings_endpoints_with_fake_service() -> None:
save_response = client.post(
"/api/settings",
json={"device_port": "HDMI", "device_role": "legacy_ignored"},
json={
"device_port": "HDMI",
"auxiliary_monitor_id": "generic-pnp-monitor-1",
"device_role": "legacy_ignored",
},
)
assert save_response.status_code == 200
assert save_response.json()["config"]["device_port"] == "HDMI"
assert save_response.json()["config"]["auxiliary_monitor_id"] == "generic-pnp-monitor-1"
def test_polling_switches_when_trigger_matches_device_port() -> None:
@@ -172,6 +189,83 @@ def test_polling_switches_when_trigger_matches_device_port() -> None:
assert status["last_switch_result"] == "switched"
def test_polling_uses_configured_auxiliary_monitor() -> None:
service_module = _require_module("app.service")
hardware_module = _require_module("app.hardware")
ddm_module = _require_module("app.ddm")
KvmSwitcherService = getattr(service_module, "KvmSwitcherService", None)
HardwareScan = getattr(hardware_module, "HardwareScan", None)
TriggerMonitorCandidate = getattr(hardware_module, "TriggerMonitorCandidate", None)
DDMCommandResult = getattr(ddm_module, "DDMCommandResult", None)
if (
KvmSwitcherService is None
or HardwareScan is None
or TriggerMonitorCandidate is None
or DDMCommandResult is None
):
pytest.skip("Service backend interfaces are not available yet.")
class FakeMonitorBackend:
def __init__(self) -> None:
self.call_count = 0
def scan(self):
self.call_count += 1
alienware_input = 15 if self.call_count == 1 else 19
return HardwareScan(
samsung_present=True,
trigger_input_code=None,
trigger_monitor_id=None,
trigger_candidates=[
TriggerMonitorCandidate(id="dell-u2720q-1", label="DELL U2720Q", input_code=60),
TriggerMonitorCandidate(id="generic-pnp-monitor-2", label="Generic PnP Monitor", input_code=19),
],
alienware_detected=True,
alienware_input_code=alienware_input,
errors=[
"Multiple non-Alienware DDC monitors were detected and Samsung did not expose model info; trigger monitor is ambiguous."
],
)
class FakeDDMBackend:
def __init__(self) -> None:
self.calls: list[tuple[int, str]] = []
def is_available(self) -> bool:
return True
def resolve_alienware_slot(self, force: bool = False) -> int | None:
return 1
def invalidate_slot(self) -> None:
return None
def switch_to_port(self, slot: int, port_name: str):
self.calls.append((slot, port_name))
return DDMCommandResult(True, "ok")
config_path = Path(tempfile.mkdtemp()) / "config.json"
backend = FakeDDMBackend()
service = KvmSwitcherService(
config_store=ConfigStore(config_path),
monitor_backend=FakeMonitorBackend(),
ddm_backend=backend,
poll_interval_seconds=0.01,
retry_wait_seconds=0.0,
)
status = service.save_settings(
device_port="DP2",
auxiliary_monitor_id="generic-pnp-monitor-2",
)
assert status["config"]["auxiliary_monitor_id"] == "generic-pnp-monitor-2"
assert status["active_trigger_monitor_id"] == "generic-pnp-monitor-2"
assert status["trigger_input_code"] == 19
assert status["trigger_matches_device_port"] is True
assert status["last_switch_result"] == "switched"
assert not status["errors"]
def test_polling_does_not_switch_when_trigger_does_not_match_device_port() -> None:
service_module = _require_module("app.service")
hardware_module = _require_module("app.hardware")