refactor: use a Led controller to allow async command #5

Merged
iGoX merged 3 commits from igox/refactor/controllers into main 2026-03-25 08:37:07 +01:00
Showing only changes of commit 7b4f0b1caa - Show all commits
+306 -86
View File
@@ -1,23 +1,66 @@
from microdot import Microdot, send_file
import machine, neopixel, time
import uasyncio as asyncio import uasyncio as asyncio
import time
from microdot import Microdot, Response app = Microdot()
from machine import Pin
from neopixel import NeoPixel
# ========================= # =========================
# LED CONTROLLER (NEW) # NeoPixel setup
# ========================= # =========================
class _LedController: nbPixels = 12 * 2
pinPixelStrip = 16
neoPixelStrip = neopixel.NeoPixel(machine.Pin(pinPixelStrip), nbPixels)
# =========================
# LED CONTROLLER
# =========================
class LedController:
def __init__(self, np): def __init__(self, np):
self._np = np self._np = np
self._queue = asyncio.Queue(5)
# async control
self._flag = asyncio.ThreadSafeFlag()
self._cmd = None
self._task = None self._task = None
# state
self.statusColors = {
'BUSY': (255, 0, 0),
'AVAILABLE': (0, 255, 0),
'AWAY': (246, 190, 0),
'OFF': (0, 0, 0),
'ON': (255, 255, 255)
}
self.color = self.statusColors['OFF']
self.status = 'off'
self.previousStatus = 'off'
self.brightness = 0.1
self.blinking = False
self.blinkFrequency = 0
self.blinkDuration = 0
self.blinkStart = 0
# ---------- internal ----------
def _apply_brightness(self, color):
r, g, b = color
return (
int(r * self.brightness),
int(g * self.brightness),
int(b * self.brightness),
)
def _write(self, color):
self._np.fill(self._apply_brightness(color))
self._np.write()
async def run(self): async def run(self):
while True: while True:
cmd = await self._queue.get() await self._flag.wait()
cmd = self._cmd
# cancel current animation
if self._task: if self._task:
self._task.cancel() self._task.cancel()
try: try:
@@ -35,114 +78,291 @@ class _LedController:
self._task = asyncio.create_task(self._off()) self._task = asyncio.create_task(self._off())
async def _solid(self, cmd): async def _solid(self, cmd):
self._np.fill(cmd["color"]) self.color = cmd["color"]
self._np.write() self.status = cmd.get("status", "colored")
self._write(self.color)
async def _off(self): async def _off(self):
self._np.fill((0, 0, 0)) self.color = (0, 0, 0)
self._np.write() self.status = "off"
self._write(self.color)
async def _blink(self, cmd): async def _blink(self, cmd):
try: try:
color = cmd["color"] self.blinking = True
freq = max(0.1, cmd["frequency"])
duration = cmd["duration"]
self.color = cmd["color"]
self.previousStatus = cmd["previousStatus"]
self.blinkFrequency = cmd["frequency"]
self.blinkDuration = cmd["duration"]
self.blinkStart = time.ticks_ms()
freq = max(0.1, self.blinkFrequency)
half_ms = int(500 / freq) half_ms = int(500 / freq)
start = time.ticks_ms()
while True: while True:
if duration != 0 and time.ticks_diff(time.ticks_ms(), start) > duration * 1000: if self.blinkDuration != 0:
elapsed = time.ticks_diff(time.ticks_ms(), self.blinkStart)
if elapsed > self.blinkDuration * 1000:
break break
# ON self._write(self.color)
self._np.fill(color)
self._np.write()
await asyncio.sleep_ms(half_ms) await asyncio.sleep_ms(half_ms)
# OFF self._write((0, 0, 0))
self._np.fill((0, 0, 0))
self._np.write()
await asyncio.sleep_ms(half_ms) await asyncio.sleep_ms(half_ms)
except asyncio.CancelledError: except asyncio.CancelledError:
# ensure LED is OFF when cancelled pass
self._np.fill((0, 0, 0))
self._np.write() self.blinking = False
raise self.status = self.previousStatus
self._write(self.color)
async def send(self, cmd): async def send(self, cmd):
await self._queue.put(cmd) self._cmd = cmd
self._flag.set()
# ---------- public API ----------
async def set_color(self, color, brightness=None):
if brightness is not None:
self.brightness = brightness
# ========================= await self.send({
# MAIN APPLICATION
# =========================
class BusyLight:
def __init__(self):
# ---- Hardware init (same as your original) ----
self.__np = NeoPixel(Pin(5), 1) # adapt if needed
# ---- NEW: controller ----
self.__led = _LedController(self.__np)
asyncio.create_task(self.__led.run())
# ---- Web app ----
self.app = Microdot()
Response.default_content_type = 'application/json'
self.__register_routes()
# =========================
# ROUTES (UNCHANGED API)
# =========================
def __register_routes(self):
@self.app.post('/api/blink')
async def blink(request):
data = request.json or {}
await self.__led.send({
"type": "blink",
"color": tuple(data.get("color", [255, 0, 0])),
"frequency": float(data.get("frequency", 1)),
"duration": float(data.get("duration", 0)),
})
return {"status": "ok"}
@self.app.post('/api/color')
async def color(request):
data = request.json or {}
await self.__led.send({
"type": "solid", "type": "solid",
"color": tuple(data.get("color", [255, 255, 255])) "color": color
}) })
return {"status": "ok"} async def set_status(self, status):
status = status.upper()
color = self.statusColors.get(status)
@self.app.post('/api/off') if color is None:
async def off(request): raise ValueError("invalid status")
await self.__led.send({
"type": "off" await self.send({
"type": "solid",
"color": color,
"status": status.lower()
}) })
return {"status": "ok"} async def blink(self, frequency, duration):
self.previousStatus = self.status
# ========================= await self.send({
# START SERVER "type": "blink",
# ========================= "color": self.color,
async def run(self): "frequency": frequency,
await self.app.start_server(host="0.0.0.0", port=80) "duration": duration,
"previousStatus": self.previousStatus
})
async def stop(self):
await self.send({"type": "off"})
def get_state(self):
if self.blinking and self.blinkDuration > 0:
elapsed = time.ticks_diff(time.ticks_ms(), self.blinkStart) / 1000
remains = max(0.0, self.blinkDuration - elapsed)
else:
remains = 0.0
return {
"status": self.status,
"color": self.color,
"brightness": self.brightness,
"isblinking": self.blinking,
"frequency": self.blinkFrequency,
"duration": self.blinkDuration,
"remains": remains
}
# ========================= # =========================
# ENTRYPOINT # INIT
# =========================
led = LedController(neoPixelStrip)
# =========================
# ROUTES
# =========================
@app.get('/static/<path:path>')
async def staticRoutes(request, path):
if '..' in path:
return {'error': '404 Not found'}, 404
return send_file('static/' + path)
@app.get('/')
async def getIndex(request):
return send_file('static/index.html')
@app.get('/api/brightness')
async def getBrightness(request):
return {'brightness': led.brightness}
@app.post('/api/brightness')
async def setBrightness(request):
brightness = request.json.get("brightness")
if brightness is None:
return {'error': 'missing brightness parameter'}, 400
if not isinstance(brightness, (int, float)):
return {'error': 'wrong brightness type'}, 400
if brightness < 0 or brightness > 1:
return {'error': 'brightness out of bound'}, 400
current_status = led.status
current_color = led.color
await led.set_color(current_color, brightness)
led.status = current_status
return {'brightness': led.brightness}
@app.post('/api/color')
async def setColor(request):
r = request.json.get("r")
g = request.json.get("g")
b = request.json.get("b")
if r is None or g is None or b is None:
return {'error': 'missing color'}, 400
if not all(isinstance(v, int) for v in (r, g, b)):
return {'error': 'wrong color type'}, 400
if not (0 <= r <= 255 and 0 <= g <= 255 and 0 <= b <= 255):
return {'error': 'color out of bound'}, 400
brightness = request.json.get("brightness")
await led.set_color((r, g, b), brightness)
return {'status': led.status}
@app.get('/api/color')
async def getColor(request):
r, g, b = led.color
return {'r': r, 'g': g, 'b': b, 'brightness': led.brightness}
@app.get('/api/status')
async def getStatus(request):
return {'status': led.status}
@app.route('/api/status/<status>', methods=['GET', 'POST'])
async def setStatus(request, status):
try:
await led.set_status(status)
except ValueError:
return {'error': 'unknown status'}, 404
return {'status': led.status}
@app.post('/api/blink')
async def setBlink(request):
freq = request.json.get('frequency')
duration = request.json.get('duration')
if freq is None or duration is None:
return {'error': 'missing frequency or duration'}, 400
if not isinstance(freq, (int, float)) or freq <= 0:
return {'error': 'invalid frequency'}, 400
if not isinstance(duration, (int, float)) or duration < 0:
return {'error': 'invalid duration'}, 400
await led.blink(freq, duration)
return {'status': 'blinking', 'frequency': freq, 'duration': duration}
@app.get('/api/blink')
async def getBlink(request):
state = led.get_state()
return {
'isblinking': state["isblinking"],
'frequency': state["frequency"],
'duration': state["duration"],
'remains': state["remains"]
}
@app.post('/api/blink/stop')
async def blinkStop(request):
await led.stop()
await led.set_status(led.previousStatus)
return {'status': led.status}
@app.get('/api/debug')
async def getDebugInfo(request):
r, g, b = led.color
dr, dg, db = led._np[0]
return {
'status': led.status,
'brightness': led.brightness,
'color': {'r': r, 'g': g, 'b': b},
'dimColor': {'r': dr, 'g': dg, 'b': db},
'blinking': led.blinking,
'blinkFrequency': led.blinkFrequency,
'blinkDuration': led.blinkDuration
}
@app.post('/api/mutedeck-webhook')
async def mutedeckWebhook(request):
if request.json.get('control') != 'system':
if request.json.get('call') == 'active':
isMuted = request.json.get('mute') == 'active'
if isMuted:
await led.set_status('away')
else:
await led.set_status('busy')
else:
await led.set_status('available')
return {'status': led.status}
@app.post('/shutdown')
async def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
# =========================
# START
# ========================= # =========================
async def main(): async def main():
app = BusyLight() asyncio.create_task(led.run())
await app.run()
# startup animation
await led.set_status('busy')
await asyncio.sleep_ms(200)
await led.set_status('away')
await asyncio.sleep_ms(300)
await led.set_status('available')
await asyncio.sleep_ms(500)
await led.set_status('off')
await app.start_server(port=80)
asyncio.run(main()) asyncio.run(main())