Initial commit.
This commit is contained in:
commit
8bb508f3db
102
watch.py
Executable file
102
watch.py
Executable file
|
@ -0,0 +1,102 @@
|
|||
#!/usr/bin/python3
|
||||
"""Watch for HDMI connection state change and update screen layout."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import pyudev
|
||||
|
||||
context = pyudev.Context()
|
||||
monitor = pyudev.Monitor.from_netlink(context)
|
||||
|
||||
# description of an HDMI connection state change
|
||||
want = {
|
||||
"action": "change",
|
||||
"device_node": "/dev/dri/card0",
|
||||
"device_type": "drm_minor",
|
||||
"sys_path": "/sys/devices/pci0000:00/0000:00:02.0/drm/card0",
|
||||
"sys_name": "card0",
|
||||
}
|
||||
|
||||
|
||||
def run_swaymsg() -> bytes:
|
||||
"""Run swaymsg to get list of outputs."""
|
||||
p = subprocess.run(["swaymsg", "-r", "-t", "get_outputs"], capture_output=True)
|
||||
return p.stdout
|
||||
|
||||
|
||||
def get_outputs(attempts: int = 10) -> dict[str, dict[str, Any]] | None:
|
||||
"""Ask sway for the current list of outputs."""
|
||||
for attempt in range(attempts):
|
||||
outputs = run_swaymsg()
|
||||
if outputs.strip() == b"":
|
||||
return None
|
||||
try:
|
||||
json_data = json.loads(outputs)
|
||||
break
|
||||
except json.decoder.JSONDecodeError:
|
||||
print("JSON parse error")
|
||||
print(outputs)
|
||||
if attempt == attempts - 1:
|
||||
raise
|
||||
time.sleep(1)
|
||||
|
||||
return {o["name"]: o for o in json_data}
|
||||
|
||||
|
||||
def is_sony_tv(output: dict[str, Any]) -> bool:
|
||||
"""Is a given output a Sony TV."""
|
||||
return bool(output["make"] == "Sony" and output["model"] == "SONY TV")
|
||||
|
||||
|
||||
def sony_tv_connected() -> None:
|
||||
"""Run command when HDMI is connected."""
|
||||
subprocess.run([os.path.expanduser("~/bin/desk")])
|
||||
|
||||
|
||||
def hdmi_disconnected() -> None:
|
||||
"""Run command when HDMI is disconnected."""
|
||||
subprocess.run([os.path.expanduser("~/bin/unplugged")])
|
||||
|
||||
|
||||
def is_output_connected(name: str) -> bool:
|
||||
"""Is HDMI connected."""
|
||||
state = open(f"/sys/class/drm/card0-{name}/status").read()
|
||||
states = {"connected\n": True, "disconnected\n": False}
|
||||
if state not in states:
|
||||
print(f"unknown state: [{state}]")
|
||||
sys.exit(1)
|
||||
return states[state]
|
||||
|
||||
|
||||
def handle_state_change() -> str:
|
||||
"""Something happened, check and handle state change."""
|
||||
hdmi_name = "HDMI-A-1"
|
||||
if not is_output_connected(hdmi_name):
|
||||
hdmi_disconnected()
|
||||
return "HDMI disconnected, switching to laptop screen."
|
||||
|
||||
hdmi = get_outputs().get(hdmi_name)
|
||||
|
||||
if hdmi and is_sony_tv(hdmi):
|
||||
sony_tv_connected()
|
||||
return "Sony TV connected to HDMI, switching to desk layout."
|
||||
|
||||
return "Unknown device connected to HDMI, doing nothing."
|
||||
|
||||
|
||||
def watch() -> None:
|
||||
"""Watch for state change and respond."""
|
||||
d: pyudev.Device
|
||||
for d in iter(monitor.poll, None):
|
||||
if all(getattr(d, key) == value for key, value in want.items()):
|
||||
print(handle_state_change())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
handle_state_change()
|
||||
watch()
|
Loading…
Reference in a new issue