119 lines
3.2 KiB
Python
119 lines
3.2 KiB
Python
"""Socket client for acer-rgbd daemon communication."""
|
|
|
|
import socket
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
SOCKET_PATH = "/run/acer-rgbd.sock"
|
|
|
|
EFFECTS = [
|
|
("static", "Static"),
|
|
("breathing", "Breathing"),
|
|
("neon", "Neon"),
|
|
("wave", "Wave"),
|
|
("ripple", "Ripple"),
|
|
("zoom", "Zoom"),
|
|
("snake", "Snake"),
|
|
("disco", "Disco"),
|
|
("shifting", "Shifting"),
|
|
]
|
|
|
|
DEVICES = ["keyboard", "lid", "button"]
|
|
|
|
|
|
@dataclass
|
|
class RGBState:
|
|
device: str = "keyboard"
|
|
hidraw: str = "/dev/hidraw4"
|
|
effect: str = "static"
|
|
brightness: int = 100
|
|
speed: int = 0
|
|
direction: str = "none"
|
|
r: int = 255
|
|
g: int = 255
|
|
b: int = 255
|
|
zone: str = "all"
|
|
|
|
def to_command(self) -> str:
|
|
return (
|
|
f"SET dev={self.device} hidraw={self.hidraw} "
|
|
f"effect={self.effect} bright={self.brightness} "
|
|
f"speed={self.speed} dir={self.direction} "
|
|
f"r={self.r} g={self.g} b={self.b} zone={self.zone}"
|
|
)
|
|
|
|
@classmethod
|
|
def from_line(cls, line: str) -> "RGBState":
|
|
state = cls()
|
|
for pair in line.split():
|
|
if "=" not in pair:
|
|
continue
|
|
key, val = pair.split("=", 1)
|
|
if key == "dev":
|
|
state.device = val
|
|
elif key == "hidraw":
|
|
state.hidraw = val
|
|
elif key == "effect":
|
|
state.effect = val
|
|
elif key == "bright":
|
|
state.brightness = int(val)
|
|
elif key == "speed":
|
|
state.speed = int(val)
|
|
elif key == "dir":
|
|
state.direction = val
|
|
elif key == "r":
|
|
state.r = int(val)
|
|
elif key == "g":
|
|
state.g = int(val)
|
|
elif key == "b":
|
|
state.b = int(val)
|
|
elif key == "zone":
|
|
state.zone = val
|
|
return state
|
|
|
|
|
|
def send_command(cmd: str) -> str:
|
|
"""Send command to daemon and return response."""
|
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
sock.connect(SOCKET_PATH)
|
|
sock.sendall(cmd.encode() + b"\n")
|
|
response = sock.recv(8192).decode()
|
|
sock.close()
|
|
return response
|
|
|
|
|
|
def is_daemon_running() -> bool:
|
|
"""Check if daemon is available."""
|
|
try:
|
|
send_command("GET")
|
|
return True
|
|
except (FileNotFoundError, ConnectionRefusedError):
|
|
return False
|
|
|
|
|
|
def get_states() -> dict[str, RGBState]:
|
|
"""Get current state for all devices."""
|
|
response = send_command("GET")
|
|
states = {}
|
|
for line in response.strip().split("\n"):
|
|
if line.startswith("SET"):
|
|
state = RGBState.from_line(line)
|
|
states[state.device] = state
|
|
return states
|
|
|
|
|
|
def set_rgb(state: RGBState) -> tuple[bool, str]:
|
|
"""Send RGB command to daemon. Returns (success, message)."""
|
|
try:
|
|
response = send_command(state.to_command())
|
|
if response.strip().startswith("OK"):
|
|
return True, "OK"
|
|
else:
|
|
return False, response.strip()
|
|
except FileNotFoundError:
|
|
return False, "Daemon not running (socket not found)"
|
|
except ConnectionRefusedError:
|
|
return False, "Cannot connect to daemon"
|
|
except Exception as e:
|
|
return False, str(e)
|