From 8bb508f3db92f515fc60c1f2dee206c3a6f1d21f Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Sat, 8 Apr 2023 08:53:28 +0100 Subject: [PATCH] Initial commit. --- watch.py | 102 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100755 watch.py diff --git a/watch.py b/watch.py new file mode 100755 index 0000000..17381ea --- /dev/null +++ b/watch.py @@ -0,0 +1,102 @@ +#!/usr/bin/python3 +"""Watch for HDMI connection state change and update screen layout.""" + +import json +import os +import subprocess +import sys +import time +from typing import Any + +import pyudev + +context = pyudev.Context() +monitor = pyudev.Monitor.from_netlink(context) + +# description of an HDMI connection state change +want = { + "action": "change", + "device_node": "/dev/dri/card0", + "device_type": "drm_minor", + "sys_path": "/sys/devices/pci0000:00/0000:00:02.0/drm/card0", + "sys_name": "card0", +} + + +def run_swaymsg() -> bytes: + """Run swaymsg to get list of outputs.""" + p = subprocess.run(["swaymsg", "-r", "-t", "get_outputs"], capture_output=True) + return p.stdout + + +def get_outputs(attempts: int = 10) -> dict[str, dict[str, Any]] | None: + """Ask sway for the current list of outputs.""" + for attempt in range(attempts): + outputs = run_swaymsg() + if outputs.strip() == b"": + return None + try: + json_data = json.loads(outputs) + break + except json.decoder.JSONDecodeError: + print("JSON parse error") + print(outputs) + if attempt == attempts - 1: + raise + time.sleep(1) + + return {o["name"]: o for o in json_data} + + +def is_sony_tv(output: dict[str, Any]) -> bool: + """Is a given output a Sony TV.""" + return bool(output["make"] == "Sony" and output["model"] == "SONY TV") + + +def sony_tv_connected() -> None: + """Run command when HDMI is connected.""" + subprocess.run([os.path.expanduser("~/bin/desk")]) + + +def hdmi_disconnected() -> None: + """Run command when HDMI is disconnected.""" + subprocess.run([os.path.expanduser("~/bin/unplugged")]) + + +def is_output_connected(name: str) -> bool: + """Is HDMI connected.""" + state = open(f"/sys/class/drm/card0-{name}/status").read() + states = {"connected\n": True, "disconnected\n": False} + if state not in states: + print(f"unknown state: [{state}]") + sys.exit(1) + return states[state] + + +def handle_state_change() -> str: + """Something happened, check and handle state change.""" + hdmi_name = "HDMI-A-1" + if not is_output_connected(hdmi_name): + hdmi_disconnected() + return "HDMI disconnected, switching to laptop screen." + + hdmi = get_outputs().get(hdmi_name) + + if hdmi and is_sony_tv(hdmi): + sony_tv_connected() + return "Sony TV connected to HDMI, switching to desk layout." + + return "Unknown device connected to HDMI, doing nothing." + + +def watch() -> None: + """Watch for state change and respond.""" + d: pyudev.Device + for d in iter(monitor.poll, None): + if all(getattr(d, key) == value for key, value in want.items()): + print(handle_state_change()) + + +if __name__ == "__main__": + handle_state_change() + watch()