refactor: use a Led controller to allow async command #5

Merged
iGoX merged 3 commits from igox/refactor/controllers into main 2026-03-25 08:37:07 +01:00
Showing only changes of commit 6fe45026b0 - Show all commits
+5 -3
View File
@@ -25,7 +25,7 @@ class LedController:
self._task = None self._task = None
# state # state
self.statusColors = { self.states = {
'BUSY': (255, 0, 0), 'BUSY': (255, 0, 0),
'AVAILABLE': (0, 255, 0), 'AVAILABLE': (0, 255, 0),
'AWAY': (246, 190, 0), 'AWAY': (246, 190, 0),
@@ -33,7 +33,7 @@ class LedController:
'ON': (255, 255, 255) 'ON': (255, 255, 255)
} }
self.color = self.statusColors['OFF'] self.color = self.states['OFF']
self.status = 'off' self.status = 'off'
self.previousStatus = 'off' self.previousStatus = 'off'
self.brightness = 0.1 self.brightness = 0.1
@@ -61,6 +61,7 @@ class LedController:
await self._flag.wait() await self._flag.wait()
cmd = self._cmd cmd = self._cmd
# Cancel any ongoing task before starting a new one
if self._task: if self._task:
self._task.cancel() self._task.cancel()
try: try:
@@ -68,6 +69,7 @@ class LedController:
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
# Process the new command
t = cmd["type"] t = cmd["type"]
if t == "blink": if t == "blink":
@@ -135,7 +137,7 @@ class LedController:
async def set_status(self, status): async def set_status(self, status):
status = status.upper() status = status.upper()
color = self.statusColors.get(status) color = self.states.get(status)
if color is None: if color is None:
raise ValueError("invalid status") raise ValueError("invalid status")