113 lines
3 KiB
Python
Executable file
113 lines
3 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""Watch for HDMI connection state change and update screen layout."""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from typing import Any
|
|
|
|
import pyudev
|
|
|
|
context = pyudev.Context()
|
|
monitor = pyudev.Monitor.from_netlink(context)
|
|
|
|
# description of an HDMI connection state change
|
|
want = {
|
|
"action": "change",
|
|
"device_node": "/dev/dri/card0",
|
|
"device_type": "drm_minor",
|
|
"sys_path": "/sys/devices/pci0000:00/0000:00:02.0/drm/card0",
|
|
"sys_name": "card0",
|
|
}
|
|
|
|
|
|
def run_swaymsg() -> bytes:
|
|
"""Run swaymsg to get list of outputs."""
|
|
p = subprocess.run(["swaymsg", "-r", "-t", "get_outputs"], capture_output=True)
|
|
return p.stdout
|
|
|
|
|
|
def parse_json(outputs: bytes) -> dict[str, dict[str, Any]]:
|
|
"""Parse get_outputs JSON."""
|
|
try:
|
|
json_data = json.loads(outputs)
|
|
except json.decoder.JSONDecodeError:
|
|
print("JSON parse error")
|
|
print(outputs)
|
|
raise
|
|
|
|
return {o["name"]: o for o in json_data}
|
|
|
|
|
|
def get_outputs(attempts: int = 10) -> dict[str, dict[str, Any]] | None:
|
|
"""Ask sway for the current list of outputs."""
|
|
for attempt in range(attempts):
|
|
outputs = run_swaymsg()
|
|
if outputs.strip() == b"":
|
|
return None
|
|
try:
|
|
return parse_json(outputs)
|
|
except json.decoder.JSONDecodeError:
|
|
if attempt == attempts - 1:
|
|
raise
|
|
time.sleep(1)
|
|
return None
|
|
|
|
|
|
def is_sony_tv(output: dict[str, Any]) -> bool:
|
|
"""Is a given output a Sony TV."""
|
|
return bool(output["make"] == "Sony" and output["model"] == "SONY TV")
|
|
|
|
|
|
def sony_tv_connected() -> None:
|
|
"""Run command when HDMI is connected."""
|
|
subprocess.run([os.path.expanduser("~/bin/desk")])
|
|
|
|
|
|
def hdmi_disconnected() -> None:
|
|
"""Run command when HDMI is disconnected."""
|
|
subprocess.run([os.path.expanduser("~/bin/unplugged")])
|
|
|
|
|
|
def is_output_connected(name: str) -> bool:
|
|
"""Is HDMI connected."""
|
|
state = open(f"/sys/class/drm/card0-{name}/status").read()
|
|
states = {"connected\n": True, "disconnected\n": False}
|
|
if state not in states:
|
|
print(f"unknown state: [{state}]")
|
|
sys.exit(1)
|
|
return states[state]
|
|
|
|
|
|
def handle_state_change() -> str:
|
|
"""Something happened, check and handle state change."""
|
|
hdmi_name = "HDMI-A-1"
|
|
if not is_output_connected(hdmi_name):
|
|
hdmi_disconnected()
|
|
return "HDMI disconnected, switching to laptop screen."
|
|
|
|
outputs = get_outputs()
|
|
|
|
hdmi = outputs and outputs.get(hdmi_name)
|
|
|
|
if hdmi and is_sony_tv(hdmi):
|
|
sony_tv_connected()
|
|
return "Sony TV connected to HDMI, switching to desk layout."
|
|
|
|
return "Unknown device connected to HDMI, doing nothing."
|
|
|
|
|
|
def watch() -> None:
|
|
"""Watch for state change and respond."""
|
|
d: pyudev.Device
|
|
for d in iter(monitor.poll, None):
|
|
if all(getattr(d, key) == value for key, value in want.items()):
|
|
print(handle_state_change())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
handle_state_change()
|
|
watch()
|