From aa70bf73276d552a88effb5e85da3144139fd097 Mon Sep 17 00:00:00 2001 From: iGoX Date: Wed, 25 Mar 2026 00:57:20 +0100 Subject: [PATCH 1/3] Factor with controlers --- ESP32/main.py | 413 ++++++++++++++++---------------------------------- 1 file changed, 131 insertions(+), 282 deletions(-) diff --git a/ESP32/main.py b/ESP32/main.py index 53b10e0..58021be 100644 --- a/ESP32/main.py +++ b/ESP32/main.py @@ -1,299 +1,148 @@ -from microdot import Microdot, send_file -import machine, sys, neopixel, time, asyncio +import uasyncio as asyncio +import time +from microdot import Microdot, Response +from machine import Pin +from neopixel import NeoPixel -app = Microdot() +# ========================= +# LED CONTROLLER (NEW) +# ========================= +class _LedController: + def __init__(self, np): + self._np = np + self._queue = asyncio.Queue(5) + self._task = None -# Difine NeoPixel object -nbPixels = 12*2 -pinPixelStrip = 16 # ESP32 D1 Mini -neoPixelStrip = neopixel.NeoPixel(machine.Pin(pinPixelStrip), nbPixels) + async def run(self): + while True: + cmd = await self._queue.get() -# Define status colors -statusColors = { - 'BUSY': (255,0,0), # red - 'AVAILABLE': (0,255,0), # green - 'AWAY': (246,190,0), # cyan - 'OFF': (0, 0, 0), # off - 'ON': (255, 255, 255) # white - } + # cancel current animation + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass -# Store BusyLight default global status -blColor = statusColors.get('OFF') -blStatus = 'off' -blPreviousStatus='off' -blBrightness = 0.1 # Adjust the brightness (0.0 - 1.0) -blBlinking = False -blBlinkFrequency = 0 -blBlinkDuration = 0 -blBlinkStartTime = 0 - -def __setColor(color): - r, g , b = color - r = int(r * blBrightness) - g = int(g * blBrightness) - b = int(b * blBrightness) - return (r, g, b) + t = cmd["type"] -def __setBusyLightColor(color, brightness): - global blBrightness - blBrightness = brightness - global blColor - blColor = color - neoPixelStrip.fill(__setColor(color)) - neoPixelStrip.write() - - global blStatus - blStatus = 'colored' + if t == "blink": + self._task = asyncio.create_task(self._blink(cmd)) + elif t == "solid": + self._task = asyncio.create_task(self._solid(cmd)) + elif t == "off": + self._task = asyncio.create_task(self._off()) -def __setBusyLightStatus(status): - status = status.upper() - color = statusColors.get(status) - __setBusyLightColor(color, blBrightness) - - global blStatus - blStatus = status.lower() + async def _solid(self, cmd): + self._np.fill(cmd["color"]) + self._np.write() -# Microdot APP routes + async def _off(self): + self._np.fill((0, 0, 0)) + self._np.write() -@app.get('/static/') -async def staticRoutes(request, path): - if '..' in path: - # directory traversal is not allowed - return {'error': '4040 Not found'}, 404 - return send_file('static/' + path) + async def _blink(self, cmd): + try: + color = cmd["color"] + freq = max(0.1, cmd["frequency"]) + duration = cmd["duration"] -@app.get('/') -async def getIndex(request): - return send_file('static/index.html') + half_ms = int(500 / freq) + start = time.ticks_ms() -@app.get('/api/brightness') -async def getBrightness(request): - return {'brightness': blBrightness} + while True: + if duration != 0 and time.ticks_diff(time.ticks_ms(), start) > duration * 1000: + break -@app.post('/api/brightness') -async def setBrightness(request): - brightness = request.json.get("brightness") - - if brightness is None: - return {'error': 'missing brightness parameter'}, 400 - - if type(brightness) is float \ - or type(brightness) is int: - if brightness < 0 or brightness > 1: - return {'error': 'brigthness out of bound (0.0 - 1.0)'}, 400 - else: - return {'error': 'wrong brigthness type (float)'}, 400 - - # Save blStatus - global blStatus - status = blStatus - - # Apply new brightness to current color - color = blColor - __setBusyLightColor(color, brightness) - - # Restore global status - blStatus = status - - global blBrightness - blBrightness = brightness - - return {'brightness': blBrightness} - + # ON + self._np.fill(color) + self._np.write() + await asyncio.sleep_ms(half_ms) -@app.post('/api/color') -async def setColor(request): - - r = request.json.get("r") - g = request.json.get("g") - b = request.json.get("b") + # OFF + self._np.fill((0, 0, 0)) + self._np.write() + await asyncio.sleep_ms(half_ms) - if bool(r is None or g is None or b is None): - return {'error': 'missing color'}, 400 - else: - if type(r) is int \ - and type(g) is int \ - and type(b) is int: - color = (r, g, b) - else: - return {'error': 'wrong color type (int)'}, 400 - - if (r < 0 or r > 255) \ - or (g < 0 or g > 255) \ - or (b < 0 or b > 255): - return {'error': 'color out of bound (0 - 255)'}, 400 - - brightness = request.json.get("brightness") - - if not brightness is None: - if type(brightness) is float \ - or type(brightness) is int: - if brightness < 0 or brightness > 1: - return {'error': 'brightness out of bound (0.0 - 1.0)'}, 400 - else: - return {'error': 'wrong brightness type (float)'}, 400 - __setBusyLightColor(color, brightness) - - __setBusyLightColor(color, blBrightness) + except asyncio.CancelledError: + # ensure LED is OFF when cancelled + self._np.fill((0, 0, 0)) + self._np.write() + raise - return {'status': blStatus} - -@app.route('/api/status/', methods=['GET', 'POST']) -async def setStatus(request, status): - lStatus = status.lower() - if lStatus == 'on': - __setBusyLightStatus('ON') - elif lStatus == 'off': - __setBusyLightStatus('OFF') - elif lStatus == 'available': - __setBusyLightStatus('AVAILABLE') - elif lStatus == 'away': - __setBusyLightStatus('AWAY') - elif lStatus == 'busy': - __setBusyLightStatus('BUSY') - else: - return {'error': 'unknown /api/status/' + lStatus + ' route'}, 404 - - return {'status': blStatus} - -async def __blinkTask(color, brightness, frequency, duration): - global blBlinking, blStatus, blColor, blBrightness - blBlinking = True - interval = 1.0 / (frequency * 2) # half period in seconds - elapsed = 0.0 - while blBlinking: - neoPixelStrip.fill(__setColor(color)) - neoPixelStrip.write() - await asyncio.sleep(interval) - neoPixelStrip.fill((0, 0, 0)) - neoPixelStrip.write() - await asyncio.sleep(interval) - if duration > 0: - elapsed += interval * 2 - if elapsed >= duration: - break - # Restore previous state - blBlinking = False - __setBusyLightColor(color, brightness) - blStatus = blPreviousStatus - -@app.post('/api/blink/stop') -async def blinkStop(request): - global blBlinking - blBlinking = False - return {'status': blStatus} - -@app.post('/api/blink') -async def setBlink(request): - frequency = request.json.get('frequency') - duration = request.json.get('duration') - - if frequency is None or duration is None: - return {'error': 'missing frequency or duration parameter'}, 400 - - if not isinstance(frequency, int) or frequency <= 0: - return {'error': 'frequency must be a positive integer (Hz)'}, 400 - - if not (isinstance(duration, (int, float))) or duration < 0: - return {'error': 'duration must be a positive float in seconds (0 = endless)'}, 400 - - # Save current state - global blPreviousStatus, blBlinking, blBlinkFrequency, blBlinkDuration, blBlinkStartTime - blPreviousStatus = blStatus - savedColor = blColor - savedBrightness = blBrightness - - # Stop any ongoing blink - blBlinking = False - await asyncio.sleep(0) - - # Save blink params - blBlinkFrequency = frequency - blBlinkDuration = duration - blBlinkStartTime = time.ticks_ms() - - # Launch blink task - asyncio.create_task(__blinkTask(savedColor, savedBrightness, frequency, duration)) - - return {'status': 'blinking', 'frequency': frequency, 'duration': duration} - -@app.get('/api/blink') -async def getBlink(request): - if blBlinking and blBlinkDuration > 0: - elapsed = time.ticks_diff(time.ticks_ms(), blBlinkStartTime) / 1000.0 - remains = max(0.0, blBlinkDuration - elapsed) - else: - remains = 0.0 - - return { - 'isblinking': blBlinking, - 'frequency': blBlinkFrequency, - 'duration': blBlinkDuration, - 'remains': remains - } - -@app.get('/api/color') -async def getColor(request): -# r, g, b = neoPixelStrip.__getitem__(0) - r, g, b = blColor - return {'r': r, 'g': g, 'b': b, 'brightness': blBrightness} - -@app.get('/api/status') -async def getStatus(request): - return {'status': blStatus} - -@app.get('/api/debug') -async def getDebugInfo(request): - r, g, b = blColor - dr, dg, db = neoPixelStrip.__getitem__(0) - return {'status': blStatus, 'brightness': blBrightness, 'color': {'r': r, 'g': g, 'b': b}, 'dimColor': {'r': dr, 'g': dg, 'b': db}} - -@app.post('/api/mutedeck-webhook') -async def mutedeckWebhook(request): - - if request.json.get('control') != 'system': - if request.json.get('call') == 'active': - if request.json.get('mute') == 'active': - isMuted = True - else: - isMuted = False - - if request.json.get('video') == 'active': - isVideoOn = True - else: - isVideoOn = False - - if isMuted: - __setBusyLightStatus('away') - else: - __setBusyLightStatus('busy') - else: - __setBusyLightStatus('available') - - return {'status': blStatus} + async def send(self, cmd): + await self._queue.put(cmd) -@app.post('/shutdown') -async def shutdown(request): - request.app.shutdown() - return 'The server is shutting down...' +# ========================= +# MAIN APPLICATION +# ========================= +class BusyLight: -# Startup effect -def startUpSeq(): - print('Start seq begins') - __setBusyLightColor(statusColors.get('OFF'), 0.1) - time.sleep_ms(100) - __setBusyLightStatus('BUSY') - time.sleep_ms(200) - __setBusyLightStatus('AWAY') - time.sleep_ms(300) - __setBusyLightStatus('AVAILABLE') - time.sleep_ms(500) - __setBusyLightStatus('OFF') - print('Start seq is ended') - -startUpSeq() - -# Start API webserver -if __name__ == '__main__': - app.run(port=80, debug=True) \ No newline at end of file + def __init__(self): + # ---- Hardware init (same as your original) ---- + self.__np = NeoPixel(Pin(5), 1) # adapt if needed + + # ---- NEW: controller ---- + self.__led = _LedController(self.__np) + asyncio.create_task(self.__led.run()) + + # ---- Web app ---- + self.app = Microdot() + Response.default_content_type = 'application/json' + + self.__register_routes() + + # ========================= + # ROUTES (UNCHANGED API) + # ========================= + def __register_routes(self): + + @self.app.post('/api/blink') + async def blink(request): + data = request.json or {} + + await self.__led.send({ + "type": "blink", + "color": tuple(data.get("color", [255, 0, 0])), + "frequency": float(data.get("frequency", 1)), + "duration": float(data.get("duration", 0)), + }) + + return {"status": "ok"} + + @self.app.post('/api/color') + async def color(request): + data = request.json or {} + + await self.__led.send({ + "type": "solid", + "color": tuple(data.get("color", [255, 255, 255])) + }) + + return {"status": "ok"} + + @self.app.post('/api/off') + async def off(request): + await self.__led.send({ + "type": "off" + }) + + return {"status": "ok"} + + # ========================= + # START SERVER + # ========================= + async def run(self): + await self.app.start_server(host="0.0.0.0", port=80) + + +# ========================= +# ENTRYPOINT +# ========================= +async def main(): + app = BusyLight() + await app.run() + +asyncio.run(main()) \ No newline at end of file -- 2.52.0 From 7b4f0b1caaf2b3b4d32d33fee484a1585127068a Mon Sep 17 00:00:00 2001 From: iGoX Date: Wed, 25 Mar 2026 01:52:53 +0100 Subject: [PATCH 2/3] Refactor controlers --- ESP32/main.py | 404 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 312 insertions(+), 92 deletions(-) diff --git a/ESP32/main.py b/ESP32/main.py index 58021be..3aa716c 100644 --- a/ESP32/main.py +++ b/ESP32/main.py @@ -1,23 +1,66 @@ +from microdot import Microdot, send_file +import machine, neopixel, time import uasyncio as asyncio -import time -from microdot import Microdot, Response -from machine import Pin -from neopixel import NeoPixel + +app = Microdot() # ========================= -# LED CONTROLLER (NEW) +# NeoPixel setup # ========================= -class _LedController: +nbPixels = 12 * 2 +pinPixelStrip = 16 +neoPixelStrip = neopixel.NeoPixel(machine.Pin(pinPixelStrip), nbPixels) + + +# ========================= +# LED CONTROLLER +# ========================= +class LedController: def __init__(self, np): self._np = np - self._queue = asyncio.Queue(5) + + # async control + self._flag = asyncio.ThreadSafeFlag() + self._cmd = None self._task = None + # state + self.statusColors = { + 'BUSY': (255, 0, 0), + 'AVAILABLE': (0, 255, 0), + 'AWAY': (246, 190, 0), + 'OFF': (0, 0, 0), + 'ON': (255, 255, 255) + } + + self.color = self.statusColors['OFF'] + self.status = 'off' + self.previousStatus = 'off' + self.brightness = 0.1 + + self.blinking = False + self.blinkFrequency = 0 + self.blinkDuration = 0 + self.blinkStart = 0 + + # ---------- internal ---------- + def _apply_brightness(self, color): + r, g, b = color + return ( + int(r * self.brightness), + int(g * self.brightness), + int(b * self.brightness), + ) + + def _write(self, color): + self._np.fill(self._apply_brightness(color)) + self._np.write() + async def run(self): while True: - cmd = await self._queue.get() + await self._flag.wait() + cmd = self._cmd - # cancel current animation if self._task: self._task.cancel() try: @@ -35,114 +78,291 @@ class _LedController: self._task = asyncio.create_task(self._off()) async def _solid(self, cmd): - self._np.fill(cmd["color"]) - self._np.write() + self.color = cmd["color"] + self.status = cmd.get("status", "colored") + self._write(self.color) async def _off(self): - self._np.fill((0, 0, 0)) - self._np.write() + self.color = (0, 0, 0) + self.status = "off" + self._write(self.color) async def _blink(self, cmd): try: - color = cmd["color"] - freq = max(0.1, cmd["frequency"]) - duration = cmd["duration"] + self.blinking = True + self.color = cmd["color"] + self.previousStatus = cmd["previousStatus"] + self.blinkFrequency = cmd["frequency"] + self.blinkDuration = cmd["duration"] + self.blinkStart = time.ticks_ms() + + freq = max(0.1, self.blinkFrequency) half_ms = int(500 / freq) - start = time.ticks_ms() while True: - if duration != 0 and time.ticks_diff(time.ticks_ms(), start) > duration * 1000: - break + if self.blinkDuration != 0: + elapsed = time.ticks_diff(time.ticks_ms(), self.blinkStart) + if elapsed > self.blinkDuration * 1000: + break - # ON - self._np.fill(color) - self._np.write() + self._write(self.color) await asyncio.sleep_ms(half_ms) - # OFF - self._np.fill((0, 0, 0)) - self._np.write() + self._write((0, 0, 0)) await asyncio.sleep_ms(half_ms) except asyncio.CancelledError: - # ensure LED is OFF when cancelled - self._np.fill((0, 0, 0)) - self._np.write() - raise + pass + + self.blinking = False + self.status = self.previousStatus + self._write(self.color) async def send(self, cmd): - await self._queue.put(cmd) + self._cmd = cmd + self._flag.set() + + # ---------- public API ---------- + async def set_color(self, color, brightness=None): + if brightness is not None: + self.brightness = brightness + + await self.send({ + "type": "solid", + "color": color + }) + + async def set_status(self, status): + status = status.upper() + color = self.statusColors.get(status) + + if color is None: + raise ValueError("invalid status") + + await self.send({ + "type": "solid", + "color": color, + "status": status.lower() + }) + + async def blink(self, frequency, duration): + self.previousStatus = self.status + + await self.send({ + "type": "blink", + "color": self.color, + "frequency": frequency, + "duration": duration, + "previousStatus": self.previousStatus + }) + + async def stop(self): + await self.send({"type": "off"}) + + def get_state(self): + if self.blinking and self.blinkDuration > 0: + elapsed = time.ticks_diff(time.ticks_ms(), self.blinkStart) / 1000 + remains = max(0.0, self.blinkDuration - elapsed) + else: + remains = 0.0 + + return { + "status": self.status, + "color": self.color, + "brightness": self.brightness, + "isblinking": self.blinking, + "frequency": self.blinkFrequency, + "duration": self.blinkDuration, + "remains": remains + } # ========================= -# MAIN APPLICATION +# INIT # ========================= -class BusyLight: - - def __init__(self): - # ---- Hardware init (same as your original) ---- - self.__np = NeoPixel(Pin(5), 1) # adapt if needed - - # ---- NEW: controller ---- - self.__led = _LedController(self.__np) - asyncio.create_task(self.__led.run()) - - # ---- Web app ---- - self.app = Microdot() - Response.default_content_type = 'application/json' - - self.__register_routes() - - # ========================= - # ROUTES (UNCHANGED API) - # ========================= - def __register_routes(self): - - @self.app.post('/api/blink') - async def blink(request): - data = request.json or {} - - await self.__led.send({ - "type": "blink", - "color": tuple(data.get("color", [255, 0, 0])), - "frequency": float(data.get("frequency", 1)), - "duration": float(data.get("duration", 0)), - }) - - return {"status": "ok"} - - @self.app.post('/api/color') - async def color(request): - data = request.json or {} - - await self.__led.send({ - "type": "solid", - "color": tuple(data.get("color", [255, 255, 255])) - }) - - return {"status": "ok"} - - @self.app.post('/api/off') - async def off(request): - await self.__led.send({ - "type": "off" - }) - - return {"status": "ok"} - - # ========================= - # START SERVER - # ========================= - async def run(self): - await self.app.start_server(host="0.0.0.0", port=80) +led = LedController(neoPixelStrip) # ========================= -# ENTRYPOINT +# ROUTES +# ========================= + +@app.get('/static/') +async def staticRoutes(request, path): + if '..' in path: + return {'error': '404 Not found'}, 404 + return send_file('static/' + path) + + +@app.get('/') +async def getIndex(request): + return send_file('static/index.html') + + +@app.get('/api/brightness') +async def getBrightness(request): + return {'brightness': led.brightness} + + +@app.post('/api/brightness') +async def setBrightness(request): + brightness = request.json.get("brightness") + + if brightness is None: + return {'error': 'missing brightness parameter'}, 400 + + if not isinstance(brightness, (int, float)): + return {'error': 'wrong brightness type'}, 400 + + if brightness < 0 or brightness > 1: + return {'error': 'brightness out of bound'}, 400 + + current_status = led.status + current_color = led.color + + await led.set_color(current_color, brightness) + + led.status = current_status + + return {'brightness': led.brightness} + + +@app.post('/api/color') +async def setColor(request): + r = request.json.get("r") + g = request.json.get("g") + b = request.json.get("b") + + if r is None or g is None or b is None: + return {'error': 'missing color'}, 400 + + if not all(isinstance(v, int) for v in (r, g, b)): + return {'error': 'wrong color type'}, 400 + + if not (0 <= r <= 255 and 0 <= g <= 255 and 0 <= b <= 255): + return {'error': 'color out of bound'}, 400 + + brightness = request.json.get("brightness") + + await led.set_color((r, g, b), brightness) + + return {'status': led.status} + + +@app.get('/api/color') +async def getColor(request): + r, g, b = led.color + return {'r': r, 'g': g, 'b': b, 'brightness': led.brightness} + + +@app.get('/api/status') +async def getStatus(request): + return {'status': led.status} + + +@app.route('/api/status/', methods=['GET', 'POST']) +async def setStatus(request, status): + try: + await led.set_status(status) + except ValueError: + return {'error': 'unknown status'}, 404 + + return {'status': led.status} + + +@app.post('/api/blink') +async def setBlink(request): + freq = request.json.get('frequency') + duration = request.json.get('duration') + + if freq is None or duration is None: + return {'error': 'missing frequency or duration'}, 400 + + if not isinstance(freq, (int, float)) or freq <= 0: + return {'error': 'invalid frequency'}, 400 + + if not isinstance(duration, (int, float)) or duration < 0: + return {'error': 'invalid duration'}, 400 + + await led.blink(freq, duration) + + return {'status': 'blinking', 'frequency': freq, 'duration': duration} + + +@app.get('/api/blink') +async def getBlink(request): + state = led.get_state() + return { + 'isblinking': state["isblinking"], + 'frequency': state["frequency"], + 'duration': state["duration"], + 'remains': state["remains"] + } + + +@app.post('/api/blink/stop') +async def blinkStop(request): + await led.stop() + await led.set_status(led.previousStatus) + return {'status': led.status} + + +@app.get('/api/debug') +async def getDebugInfo(request): + r, g, b = led.color + dr, dg, db = led._np[0] + + return { + 'status': led.status, + 'brightness': led.brightness, + 'color': {'r': r, 'g': g, 'b': b}, + 'dimColor': {'r': dr, 'g': dg, 'b': db}, + 'blinking': led.blinking, + 'blinkFrequency': led.blinkFrequency, + 'blinkDuration': led.blinkDuration + } + + +@app.post('/api/mutedeck-webhook') +async def mutedeckWebhook(request): + + if request.json.get('control') != 'system': + if request.json.get('call') == 'active': + isMuted = request.json.get('mute') == 'active' + + if isMuted: + await led.set_status('away') + else: + await led.set_status('busy') + else: + await led.set_status('available') + + return {'status': led.status} + + +@app.post('/shutdown') +async def shutdown(request): + request.app.shutdown() + return 'The server is shutting down...' + + +# ========================= +# START # ========================= async def main(): - app = BusyLight() - await app.run() + asyncio.create_task(led.run()) + + # startup animation + await led.set_status('busy') + await asyncio.sleep_ms(200) + await led.set_status('away') + await asyncio.sleep_ms(300) + await led.set_status('available') + await asyncio.sleep_ms(500) + await led.set_status('off') + + await app.start_server(port=80) + asyncio.run(main()) \ No newline at end of file -- 2.52.0 From 6fe45026b05668d1604fe23ef9d867b0be0c801c Mon Sep 17 00:00:00 2001 From: iGoX Date: Wed, 25 Mar 2026 08:35:40 +0100 Subject: [PATCH 3/3] Refactor controlers --- ESP32/main.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ESP32/main.py b/ESP32/main.py index 3aa716c..aefe8e8 100644 --- a/ESP32/main.py +++ b/ESP32/main.py @@ -25,7 +25,7 @@ class LedController: self._task = None # state - self.statusColors = { + self.states = { 'BUSY': (255, 0, 0), 'AVAILABLE': (0, 255, 0), 'AWAY': (246, 190, 0), @@ -33,7 +33,7 @@ class LedController: 'ON': (255, 255, 255) } - self.color = self.statusColors['OFF'] + self.color = self.states['OFF'] self.status = 'off' self.previousStatus = 'off' self.brightness = 0.1 @@ -61,6 +61,7 @@ class LedController: await self._flag.wait() cmd = self._cmd + # Cancel any ongoing task before starting a new one if self._task: self._task.cancel() try: @@ -68,6 +69,7 @@ class LedController: except asyncio.CancelledError: pass + # Process the new command t = cmd["type"] if t == "blink": @@ -135,7 +137,7 @@ class LedController: async def set_status(self, status): status = status.upper() - color = self.statusColors.get(status) + color = self.states.get(status) if color is None: raise ValueError("invalid status") -- 2.52.0