From 5361ca4db780652f3c5939c90c1ded2f2da45b78 Mon Sep 17 00:00:00 2001 From: iGoX Date: Wed, 25 Mar 2026 08:37:07 +0100 Subject: [PATCH] refactor: use a Led controller to allow async command (#5) Reviewed-on: https://code.igox.org/iGoX/busylight/pulls/5 --- ESP32/main.py | 545 ++++++++++++++++++++++++++++---------------------- 1 file changed, 308 insertions(+), 237 deletions(-) diff --git a/ESP32/main.py b/ESP32/main.py index 53b10e0..aefe8e8 100644 --- a/ESP32/main.py +++ b/ESP32/main.py @@ -1,276 +1,346 @@ from microdot import Microdot, send_file -import machine, sys, neopixel, time, asyncio +import machine, neopixel, time +import uasyncio as asyncio app = Microdot() -# Difine NeoPixel object -nbPixels = 12*2 -pinPixelStrip = 16 # ESP32 D1 Mini +# ========================= +# NeoPixel setup +# ========================= +nbPixels = 12 * 2 +pinPixelStrip = 16 neoPixelStrip = neopixel.NeoPixel(machine.Pin(pinPixelStrip), nbPixels) -# Define status colors -statusColors = { - 'BUSY': (255,0,0), # red - 'AVAILABLE': (0,255,0), # green - 'AWAY': (246,190,0), # cyan - 'OFF': (0, 0, 0), # off - 'ON': (255, 255, 255) # white - } -# Store BusyLight default global status -blColor = statusColors.get('OFF') -blStatus = 'off' -blPreviousStatus='off' -blBrightness = 0.1 # Adjust the brightness (0.0 - 1.0) -blBlinking = False -blBlinkFrequency = 0 -blBlinkDuration = 0 -blBlinkStartTime = 0 - -def __setColor(color): - r, g , b = color - r = int(r * blBrightness) - g = int(g * blBrightness) - b = int(b * blBrightness) - return (r, g, b) +# ========================= +# LED CONTROLLER +# ========================= +class LedController: + def __init__(self, np): + self._np = np -def __setBusyLightColor(color, brightness): - global blBrightness - blBrightness = brightness - global blColor - blColor = color - neoPixelStrip.fill(__setColor(color)) - neoPixelStrip.write() - - global blStatus - blStatus = 'colored' + # async control + self._flag = asyncio.ThreadSafeFlag() + self._cmd = None + self._task = None -def __setBusyLightStatus(status): - status = status.upper() - color = statusColors.get(status) - __setBusyLightColor(color, blBrightness) - - global blStatus - blStatus = status.lower() + # state + self.states = { + 'BUSY': (255, 0, 0), + 'AVAILABLE': (0, 255, 0), + 'AWAY': (246, 190, 0), + 'OFF': (0, 0, 0), + 'ON': (255, 255, 255) + } -# Microdot APP routes + self.color = self.states['OFF'] + self.status = 'off' + self.previousStatus = 'off' + self.brightness = 0.1 + + self.blinking = False + self.blinkFrequency = 0 + self.blinkDuration = 0 + self.blinkStart = 0 + + # ---------- internal ---------- + def _apply_brightness(self, color): + r, g, b = color + return ( + int(r * self.brightness), + int(g * self.brightness), + int(b * self.brightness), + ) + + def _write(self, color): + self._np.fill(self._apply_brightness(color)) + self._np.write() + + async def run(self): + while True: + await self._flag.wait() + cmd = self._cmd + + # Cancel any ongoing task before starting a new one + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + # Process the new command + t = cmd["type"] + + if t == "blink": + self._task = asyncio.create_task(self._blink(cmd)) + elif t == "solid": + self._task = asyncio.create_task(self._solid(cmd)) + elif t == "off": + self._task = asyncio.create_task(self._off()) + + async def _solid(self, cmd): + self.color = cmd["color"] + self.status = cmd.get("status", "colored") + self._write(self.color) + + async def _off(self): + self.color = (0, 0, 0) + self.status = "off" + self._write(self.color) + + async def _blink(self, cmd): + try: + self.blinking = True + + self.color = cmd["color"] + self.previousStatus = cmd["previousStatus"] + self.blinkFrequency = cmd["frequency"] + self.blinkDuration = cmd["duration"] + self.blinkStart = time.ticks_ms() + + freq = max(0.1, self.blinkFrequency) + half_ms = int(500 / freq) + + while True: + if self.blinkDuration != 0: + elapsed = time.ticks_diff(time.ticks_ms(), self.blinkStart) + if elapsed > self.blinkDuration * 1000: + break + + self._write(self.color) + await asyncio.sleep_ms(half_ms) + + self._write((0, 0, 0)) + await asyncio.sleep_ms(half_ms) + + except asyncio.CancelledError: + pass + + self.blinking = False + self.status = self.previousStatus + self._write(self.color) + + async def send(self, cmd): + self._cmd = cmd + self._flag.set() + + # ---------- public API ---------- + async def set_color(self, color, brightness=None): + if brightness is not None: + self.brightness = brightness + + await self.send({ + "type": "solid", + "color": color + }) + + async def set_status(self, status): + status = status.upper() + color = self.states.get(status) + + if color is None: + raise ValueError("invalid status") + + await self.send({ + "type": "solid", + "color": color, + "status": status.lower() + }) + + async def blink(self, frequency, duration): + self.previousStatus = self.status + + await self.send({ + "type": "blink", + "color": self.color, + "frequency": frequency, + "duration": duration, + "previousStatus": self.previousStatus + }) + + async def stop(self): + await self.send({"type": "off"}) + + def get_state(self): + if self.blinking and self.blinkDuration > 0: + elapsed = time.ticks_diff(time.ticks_ms(), self.blinkStart) / 1000 + remains = max(0.0, self.blinkDuration - elapsed) + else: + remains = 0.0 + + return { + "status": self.status, + "color": self.color, + "brightness": self.brightness, + "isblinking": self.blinking, + "frequency": self.blinkFrequency, + "duration": self.blinkDuration, + "remains": remains + } + + +# ========================= +# INIT +# ========================= +led = LedController(neoPixelStrip) + + +# ========================= +# ROUTES +# ========================= @app.get('/static/') async def staticRoutes(request, path): if '..' in path: - # directory traversal is not allowed - return {'error': '4040 Not found'}, 404 + return {'error': '404 Not found'}, 404 return send_file('static/' + path) + @app.get('/') async def getIndex(request): return send_file('static/index.html') + @app.get('/api/brightness') async def getBrightness(request): - return {'brightness': blBrightness} + return {'brightness': led.brightness} + @app.post('/api/brightness') async def setBrightness(request): brightness = request.json.get("brightness") - + if brightness is None: return {'error': 'missing brightness parameter'}, 400 - - if type(brightness) is float \ - or type(brightness) is int: - if brightness < 0 or brightness > 1: - return {'error': 'brigthness out of bound (0.0 - 1.0)'}, 400 - else: - return {'error': 'wrong brigthness type (float)'}, 400 - - # Save blStatus - global blStatus - status = blStatus - - # Apply new brightness to current color - color = blColor - __setBusyLightColor(color, brightness) - - # Restore global status - blStatus = status - - global blBrightness - blBrightness = brightness - - return {'brightness': blBrightness} - + + if not isinstance(brightness, (int, float)): + return {'error': 'wrong brightness type'}, 400 + + if brightness < 0 or brightness > 1: + return {'error': 'brightness out of bound'}, 400 + + current_status = led.status + current_color = led.color + + await led.set_color(current_color, brightness) + + led.status = current_status + + return {'brightness': led.brightness} + @app.post('/api/color') async def setColor(request): - r = request.json.get("r") g = request.json.get("g") b = request.json.get("b") - if bool(r is None or g is None or b is None): + if r is None or g is None or b is None: return {'error': 'missing color'}, 400 - else: - if type(r) is int \ - and type(g) is int \ - and type(b) is int: - color = (r, g, b) - else: - return {'error': 'wrong color type (int)'}, 400 - - if (r < 0 or r > 255) \ - or (g < 0 or g > 255) \ - or (b < 0 or b > 255): - return {'error': 'color out of bound (0 - 255)'}, 400 - + + if not all(isinstance(v, int) for v in (r, g, b)): + return {'error': 'wrong color type'}, 400 + + if not (0 <= r <= 255 and 0 <= g <= 255 and 0 <= b <= 255): + return {'error': 'color out of bound'}, 400 + brightness = request.json.get("brightness") - - if not brightness is None: - if type(brightness) is float \ - or type(brightness) is int: - if brightness < 0 or brightness > 1: - return {'error': 'brightness out of bound (0.0 - 1.0)'}, 400 - else: - return {'error': 'wrong brightness type (float)'}, 400 - __setBusyLightColor(color, brightness) - - __setBusyLightColor(color, blBrightness) - return {'status': blStatus} + await led.set_color((r, g, b), brightness) -@app.route('/api/status/', methods=['GET', 'POST']) -async def setStatus(request, status): - lStatus = status.lower() - if lStatus == 'on': - __setBusyLightStatus('ON') - elif lStatus == 'off': - __setBusyLightStatus('OFF') - elif lStatus == 'available': - __setBusyLightStatus('AVAILABLE') - elif lStatus == 'away': - __setBusyLightStatus('AWAY') - elif lStatus == 'busy': - __setBusyLightStatus('BUSY') - else: - return {'error': 'unknown /api/status/' + lStatus + ' route'}, 404 - - return {'status': blStatus} + return {'status': led.status} -async def __blinkTask(color, brightness, frequency, duration): - global blBlinking, blStatus, blColor, blBrightness - blBlinking = True - interval = 1.0 / (frequency * 2) # half period in seconds - elapsed = 0.0 - while blBlinking: - neoPixelStrip.fill(__setColor(color)) - neoPixelStrip.write() - await asyncio.sleep(interval) - neoPixelStrip.fill((0, 0, 0)) - neoPixelStrip.write() - await asyncio.sleep(interval) - if duration > 0: - elapsed += interval * 2 - if elapsed >= duration: - break - # Restore previous state - blBlinking = False - __setBusyLightColor(color, brightness) - blStatus = blPreviousStatus - -@app.post('/api/blink/stop') -async def blinkStop(request): - global blBlinking - blBlinking = False - return {'status': blStatus} - -@app.post('/api/blink') -async def setBlink(request): - frequency = request.json.get('frequency') - duration = request.json.get('duration') - - if frequency is None or duration is None: - return {'error': 'missing frequency or duration parameter'}, 400 - - if not isinstance(frequency, int) or frequency <= 0: - return {'error': 'frequency must be a positive integer (Hz)'}, 400 - - if not (isinstance(duration, (int, float))) or duration < 0: - return {'error': 'duration must be a positive float in seconds (0 = endless)'}, 400 - - # Save current state - global blPreviousStatus, blBlinking, blBlinkFrequency, blBlinkDuration, blBlinkStartTime - blPreviousStatus = blStatus - savedColor = blColor - savedBrightness = blBrightness - - # Stop any ongoing blink - blBlinking = False - await asyncio.sleep(0) - - # Save blink params - blBlinkFrequency = frequency - blBlinkDuration = duration - blBlinkStartTime = time.ticks_ms() - - # Launch blink task - asyncio.create_task(__blinkTask(savedColor, savedBrightness, frequency, duration)) - - return {'status': 'blinking', 'frequency': frequency, 'duration': duration} - -@app.get('/api/blink') -async def getBlink(request): - if blBlinking and blBlinkDuration > 0: - elapsed = time.ticks_diff(time.ticks_ms(), blBlinkStartTime) / 1000.0 - remains = max(0.0, blBlinkDuration - elapsed) - else: - remains = 0.0 - - return { - 'isblinking': blBlinking, - 'frequency': blBlinkFrequency, - 'duration': blBlinkDuration, - 'remains': remains - } @app.get('/api/color') async def getColor(request): -# r, g, b = neoPixelStrip.__getitem__(0) - r, g, b = blColor - return {'r': r, 'g': g, 'b': b, 'brightness': blBrightness} + r, g, b = led.color + return {'r': r, 'g': g, 'b': b, 'brightness': led.brightness} + @app.get('/api/status') async def getStatus(request): - return {'status': blStatus} + return {'status': led.status} + + +@app.route('/api/status/', methods=['GET', 'POST']) +async def setStatus(request, status): + try: + await led.set_status(status) + except ValueError: + return {'error': 'unknown status'}, 404 + + return {'status': led.status} + + +@app.post('/api/blink') +async def setBlink(request): + freq = request.json.get('frequency') + duration = request.json.get('duration') + + if freq is None or duration is None: + return {'error': 'missing frequency or duration'}, 400 + + if not isinstance(freq, (int, float)) or freq <= 0: + return {'error': 'invalid frequency'}, 400 + + if not isinstance(duration, (int, float)) or duration < 0: + return {'error': 'invalid duration'}, 400 + + await led.blink(freq, duration) + + return {'status': 'blinking', 'frequency': freq, 'duration': duration} + + +@app.get('/api/blink') +async def getBlink(request): + state = led.get_state() + return { + 'isblinking': state["isblinking"], + 'frequency': state["frequency"], + 'duration': state["duration"], + 'remains': state["remains"] + } + + +@app.post('/api/blink/stop') +async def blinkStop(request): + await led.stop() + await led.set_status(led.previousStatus) + return {'status': led.status} + @app.get('/api/debug') async def getDebugInfo(request): - r, g, b = blColor - dr, dg, db = neoPixelStrip.__getitem__(0) - return {'status': blStatus, 'brightness': blBrightness, 'color': {'r': r, 'g': g, 'b': b}, 'dimColor': {'r': dr, 'g': dg, 'b': db}} + r, g, b = led.color + dr, dg, db = led._np[0] + + return { + 'status': led.status, + 'brightness': led.brightness, + 'color': {'r': r, 'g': g, 'b': b}, + 'dimColor': {'r': dr, 'g': dg, 'b': db}, + 'blinking': led.blinking, + 'blinkFrequency': led.blinkFrequency, + 'blinkDuration': led.blinkDuration + } + @app.post('/api/mutedeck-webhook') async def mutedeckWebhook(request): - + if request.json.get('control') != 'system': if request.json.get('call') == 'active': - if request.json.get('mute') == 'active': - isMuted = True - else: - isMuted = False - - if request.json.get('video') == 'active': - isVideoOn = True - else: - isVideoOn = False - - if isMuted: - __setBusyLightStatus('away') - else: - __setBusyLightStatus('busy') - else: - __setBusyLightStatus('available') + isMuted = request.json.get('mute') == 'active' - return {'status': blStatus} + if isMuted: + await led.set_status('away') + else: + await led.set_status('busy') + else: + await led.set_status('available') + + return {'status': led.status} @app.post('/shutdown') @@ -278,22 +348,23 @@ async def shutdown(request): request.app.shutdown() return 'The server is shutting down...' -# Startup effect -def startUpSeq(): - print('Start seq begins') - __setBusyLightColor(statusColors.get('OFF'), 0.1) - time.sleep_ms(100) - __setBusyLightStatus('BUSY') - time.sleep_ms(200) - __setBusyLightStatus('AWAY') - time.sleep_ms(300) - __setBusyLightStatus('AVAILABLE') - time.sleep_ms(500) - __setBusyLightStatus('OFF') - print('Start seq is ended') - -startUpSeq() - -# Start API webserver -if __name__ == '__main__': - app.run(port=80, debug=True) \ No newline at end of file + +# ========================= +# START +# ========================= +async def main(): + asyncio.create_task(led.run()) + + # startup animation + await led.set_status('busy') + await asyncio.sleep_ms(200) + await led.set_status('away') + await asyncio.sleep_ms(300) + await led.set_status('available') + await asyncio.sleep_ms(500) + await led.set_status('off') + + await app.start_server(port=80) + + +asyncio.run(main()) \ No newline at end of file