From 7b4f0b1caaf2b3b4d32d33fee484a1585127068a Mon Sep 17 00:00:00 2001 From: iGoX Date: Wed, 25 Mar 2026 01:52:53 +0100 Subject: [PATCH] Refactor controlers --- ESP32/main.py | 404 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 312 insertions(+), 92 deletions(-) diff --git a/ESP32/main.py b/ESP32/main.py index 58021be..3aa716c 100644 --- a/ESP32/main.py +++ b/ESP32/main.py @@ -1,23 +1,66 @@ +from microdot import Microdot, send_file +import machine, neopixel, time import uasyncio as asyncio -import time -from microdot import Microdot, Response -from machine import Pin -from neopixel import NeoPixel + +app = Microdot() # ========================= -# LED CONTROLLER (NEW) +# NeoPixel setup # ========================= -class _LedController: +nbPixels = 12 * 2 +pinPixelStrip = 16 +neoPixelStrip = neopixel.NeoPixel(machine.Pin(pinPixelStrip), nbPixels) + + +# ========================= +# LED CONTROLLER +# ========================= +class LedController: def __init__(self, np): self._np = np - self._queue = asyncio.Queue(5) + + # async control + self._flag = asyncio.ThreadSafeFlag() + self._cmd = None self._task = None + # state + self.statusColors = { + 'BUSY': (255, 0, 0), + 'AVAILABLE': (0, 255, 0), + 'AWAY': (246, 190, 0), + 'OFF': (0, 0, 0), + 'ON': (255, 255, 255) + } + + self.color = self.statusColors['OFF'] + self.status = 'off' + self.previousStatus = 'off' + self.brightness = 0.1 + + self.blinking = False + self.blinkFrequency = 0 + self.blinkDuration = 0 + self.blinkStart = 0 + + # ---------- internal ---------- + def _apply_brightness(self, color): + r, g, b = color + return ( + int(r * self.brightness), + int(g * self.brightness), + int(b * self.brightness), + ) + + def _write(self, color): + self._np.fill(self._apply_brightness(color)) + self._np.write() + async def run(self): while True: - cmd = await self._queue.get() + await self._flag.wait() + cmd = self._cmd - # cancel current animation if self._task: self._task.cancel() try: @@ -35,114 +78,291 @@ class _LedController: self._task = asyncio.create_task(self._off()) async def _solid(self, cmd): - self._np.fill(cmd["color"]) - self._np.write() + self.color = cmd["color"] + self.status = cmd.get("status", "colored") + self._write(self.color) async def _off(self): - self._np.fill((0, 0, 0)) - self._np.write() + self.color = (0, 0, 0) + self.status = "off" + self._write(self.color) async def _blink(self, cmd): try: - color = cmd["color"] - freq = max(0.1, cmd["frequency"]) - duration = cmd["duration"] + self.blinking = True + self.color = cmd["color"] + self.previousStatus = cmd["previousStatus"] + self.blinkFrequency = cmd["frequency"] + self.blinkDuration = cmd["duration"] + self.blinkStart = time.ticks_ms() + + freq = max(0.1, self.blinkFrequency) half_ms = int(500 / freq) - start = time.ticks_ms() while True: - if duration != 0 and time.ticks_diff(time.ticks_ms(), start) > duration * 1000: - break + if self.blinkDuration != 0: + elapsed = time.ticks_diff(time.ticks_ms(), self.blinkStart) + if elapsed > self.blinkDuration * 1000: + break - # ON - self._np.fill(color) - self._np.write() + self._write(self.color) await asyncio.sleep_ms(half_ms) - # OFF - self._np.fill((0, 0, 0)) - self._np.write() + self._write((0, 0, 0)) await asyncio.sleep_ms(half_ms) except asyncio.CancelledError: - # ensure LED is OFF when cancelled - self._np.fill((0, 0, 0)) - self._np.write() - raise + pass + + self.blinking = False + self.status = self.previousStatus + self._write(self.color) async def send(self, cmd): - await self._queue.put(cmd) + self._cmd = cmd + self._flag.set() + + # ---------- public API ---------- + async def set_color(self, color, brightness=None): + if brightness is not None: + self.brightness = brightness + + await self.send({ + "type": "solid", + "color": color + }) + + async def set_status(self, status): + status = status.upper() + color = self.statusColors.get(status) + + if color is None: + raise ValueError("invalid status") + + await self.send({ + "type": "solid", + "color": color, + "status": status.lower() + }) + + async def blink(self, frequency, duration): + self.previousStatus = self.status + + await self.send({ + "type": "blink", + "color": self.color, + "frequency": frequency, + "duration": duration, + "previousStatus": self.previousStatus + }) + + async def stop(self): + await self.send({"type": "off"}) + + def get_state(self): + if self.blinking and self.blinkDuration > 0: + elapsed = time.ticks_diff(time.ticks_ms(), self.blinkStart) / 1000 + remains = max(0.0, self.blinkDuration - elapsed) + else: + remains = 0.0 + + return { + "status": self.status, + "color": self.color, + "brightness": self.brightness, + "isblinking": self.blinking, + "frequency": self.blinkFrequency, + "duration": self.blinkDuration, + "remains": remains + } # ========================= -# MAIN APPLICATION +# INIT # ========================= -class BusyLight: - - def __init__(self): - # ---- Hardware init (same as your original) ---- - self.__np = NeoPixel(Pin(5), 1) # adapt if needed - - # ---- NEW: controller ---- - self.__led = _LedController(self.__np) - asyncio.create_task(self.__led.run()) - - # ---- Web app ---- - self.app = Microdot() - Response.default_content_type = 'application/json' - - self.__register_routes() - - # ========================= - # ROUTES (UNCHANGED API) - # ========================= - def __register_routes(self): - - @self.app.post('/api/blink') - async def blink(request): - data = request.json or {} - - await self.__led.send({ - "type": "blink", - "color": tuple(data.get("color", [255, 0, 0])), - "frequency": float(data.get("frequency", 1)), - "duration": float(data.get("duration", 0)), - }) - - return {"status": "ok"} - - @self.app.post('/api/color') - async def color(request): - data = request.json or {} - - await self.__led.send({ - "type": "solid", - "color": tuple(data.get("color", [255, 255, 255])) - }) - - return {"status": "ok"} - - @self.app.post('/api/off') - async def off(request): - await self.__led.send({ - "type": "off" - }) - - return {"status": "ok"} - - # ========================= - # START SERVER - # ========================= - async def run(self): - await self.app.start_server(host="0.0.0.0", port=80) +led = LedController(neoPixelStrip) # ========================= -# ENTRYPOINT +# ROUTES +# ========================= + +@app.get('/static/') +async def staticRoutes(request, path): + if '..' in path: + return {'error': '404 Not found'}, 404 + return send_file('static/' + path) + + +@app.get('/') +async def getIndex(request): + return send_file('static/index.html') + + +@app.get('/api/brightness') +async def getBrightness(request): + return {'brightness': led.brightness} + + +@app.post('/api/brightness') +async def setBrightness(request): + brightness = request.json.get("brightness") + + if brightness is None: + return {'error': 'missing brightness parameter'}, 400 + + if not isinstance(brightness, (int, float)): + return {'error': 'wrong brightness type'}, 400 + + if brightness < 0 or brightness > 1: + return {'error': 'brightness out of bound'}, 400 + + current_status = led.status + current_color = led.color + + await led.set_color(current_color, brightness) + + led.status = current_status + + return {'brightness': led.brightness} + + +@app.post('/api/color') +async def setColor(request): + r = request.json.get("r") + g = request.json.get("g") + b = request.json.get("b") + + if r is None or g is None or b is None: + return {'error': 'missing color'}, 400 + + if not all(isinstance(v, int) for v in (r, g, b)): + return {'error': 'wrong color type'}, 400 + + if not (0 <= r <= 255 and 0 <= g <= 255 and 0 <= b <= 255): + return {'error': 'color out of bound'}, 400 + + brightness = request.json.get("brightness") + + await led.set_color((r, g, b), brightness) + + return {'status': led.status} + + +@app.get('/api/color') +async def getColor(request): + r, g, b = led.color + return {'r': r, 'g': g, 'b': b, 'brightness': led.brightness} + + +@app.get('/api/status') +async def getStatus(request): + return {'status': led.status} + + +@app.route('/api/status/', methods=['GET', 'POST']) +async def setStatus(request, status): + try: + await led.set_status(status) + except ValueError: + return {'error': 'unknown status'}, 404 + + return {'status': led.status} + + +@app.post('/api/blink') +async def setBlink(request): + freq = request.json.get('frequency') + duration = request.json.get('duration') + + if freq is None or duration is None: + return {'error': 'missing frequency or duration'}, 400 + + if not isinstance(freq, (int, float)) or freq <= 0: + return {'error': 'invalid frequency'}, 400 + + if not isinstance(duration, (int, float)) or duration < 0: + return {'error': 'invalid duration'}, 400 + + await led.blink(freq, duration) + + return {'status': 'blinking', 'frequency': freq, 'duration': duration} + + +@app.get('/api/blink') +async def getBlink(request): + state = led.get_state() + return { + 'isblinking': state["isblinking"], + 'frequency': state["frequency"], + 'duration': state["duration"], + 'remains': state["remains"] + } + + +@app.post('/api/blink/stop') +async def blinkStop(request): + await led.stop() + await led.set_status(led.previousStatus) + return {'status': led.status} + + +@app.get('/api/debug') +async def getDebugInfo(request): + r, g, b = led.color + dr, dg, db = led._np[0] + + return { + 'status': led.status, + 'brightness': led.brightness, + 'color': {'r': r, 'g': g, 'b': b}, + 'dimColor': {'r': dr, 'g': dg, 'b': db}, + 'blinking': led.blinking, + 'blinkFrequency': led.blinkFrequency, + 'blinkDuration': led.blinkDuration + } + + +@app.post('/api/mutedeck-webhook') +async def mutedeckWebhook(request): + + if request.json.get('control') != 'system': + if request.json.get('call') == 'active': + isMuted = request.json.get('mute') == 'active' + + if isMuted: + await led.set_status('away') + else: + await led.set_status('busy') + else: + await led.set_status('available') + + return {'status': led.status} + + +@app.post('/shutdown') +async def shutdown(request): + request.app.shutdown() + return 'The server is shutting down...' + + +# ========================= +# START # ========================= async def main(): - app = BusyLight() - await app.run() + asyncio.create_task(led.run()) + + # startup animation + await led.set_status('busy') + await asyncio.sleep_ms(200) + await led.set_status('away') + await asyncio.sleep_ms(300) + await led.set_status('available') + await asyncio.sleep_ms(500) + await led.set_status('off') + + await app.start_server(port=80) + asyncio.run(main()) \ No newline at end of file