diff --git a/ESP32/main.py b/ESP32/main.py index 53b10e0..58021be 100644 --- a/ESP32/main.py +++ b/ESP32/main.py @@ -1,299 +1,148 @@ -from microdot import Microdot, send_file -import machine, sys, neopixel, time, asyncio +import uasyncio as asyncio +import time +from microdot import Microdot, Response +from machine import Pin +from neopixel import NeoPixel -app = Microdot() +# ========================= +# LED CONTROLLER (NEW) +# ========================= +class _LedController: + def __init__(self, np): + self._np = np + self._queue = asyncio.Queue(5) + self._task = None -# Difine NeoPixel object -nbPixels = 12*2 -pinPixelStrip = 16 # ESP32 D1 Mini -neoPixelStrip = neopixel.NeoPixel(machine.Pin(pinPixelStrip), nbPixels) + async def run(self): + while True: + cmd = await self._queue.get() -# Define status colors -statusColors = { - 'BUSY': (255,0,0), # red - 'AVAILABLE': (0,255,0), # green - 'AWAY': (246,190,0), # cyan - 'OFF': (0, 0, 0), # off - 'ON': (255, 255, 255) # white - } + # cancel current animation + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass -# Store BusyLight default global status -blColor = statusColors.get('OFF') -blStatus = 'off' -blPreviousStatus='off' -blBrightness = 0.1 # Adjust the brightness (0.0 - 1.0) -blBlinking = False -blBlinkFrequency = 0 -blBlinkDuration = 0 -blBlinkStartTime = 0 - -def __setColor(color): - r, g , b = color - r = int(r * blBrightness) - g = int(g * blBrightness) - b = int(b * blBrightness) - return (r, g, b) + t = cmd["type"] -def __setBusyLightColor(color, brightness): - global blBrightness - blBrightness = brightness - global blColor - blColor = color - neoPixelStrip.fill(__setColor(color)) - neoPixelStrip.write() - - global blStatus - blStatus = 'colored' + if t == "blink": + self._task = asyncio.create_task(self._blink(cmd)) + elif t == "solid": + self._task = asyncio.create_task(self._solid(cmd)) + elif t == "off": + self._task = asyncio.create_task(self._off()) -def __setBusyLightStatus(status): - status = status.upper() - color = statusColors.get(status) - __setBusyLightColor(color, blBrightness) - - global blStatus - blStatus = status.lower() + async def _solid(self, cmd): + self._np.fill(cmd["color"]) + self._np.write() -# Microdot APP routes + async def _off(self): + self._np.fill((0, 0, 0)) + self._np.write() -@app.get('/static/') -async def staticRoutes(request, path): - if '..' in path: - # directory traversal is not allowed - return {'error': '4040 Not found'}, 404 - return send_file('static/' + path) + async def _blink(self, cmd): + try: + color = cmd["color"] + freq = max(0.1, cmd["frequency"]) + duration = cmd["duration"] -@app.get('/') -async def getIndex(request): - return send_file('static/index.html') + half_ms = int(500 / freq) + start = time.ticks_ms() -@app.get('/api/brightness') -async def getBrightness(request): - return {'brightness': blBrightness} + while True: + if duration != 0 and time.ticks_diff(time.ticks_ms(), start) > duration * 1000: + break -@app.post('/api/brightness') -async def setBrightness(request): - brightness = request.json.get("brightness") - - if brightness is None: - return {'error': 'missing brightness parameter'}, 400 - - if type(brightness) is float \ - or type(brightness) is int: - if brightness < 0 or brightness > 1: - return {'error': 'brigthness out of bound (0.0 - 1.0)'}, 400 - else: - return {'error': 'wrong brigthness type (float)'}, 400 - - # Save blStatus - global blStatus - status = blStatus - - # Apply new brightness to current color - color = blColor - __setBusyLightColor(color, brightness) - - # Restore global status - blStatus = status - - global blBrightness - blBrightness = brightness - - return {'brightness': blBrightness} - + # ON + self._np.fill(color) + self._np.write() + await asyncio.sleep_ms(half_ms) -@app.post('/api/color') -async def setColor(request): - - r = request.json.get("r") - g = request.json.get("g") - b = request.json.get("b") + # OFF + self._np.fill((0, 0, 0)) + self._np.write() + await asyncio.sleep_ms(half_ms) - if bool(r is None or g is None or b is None): - return {'error': 'missing color'}, 400 - else: - if type(r) is int \ - and type(g) is int \ - and type(b) is int: - color = (r, g, b) - else: - return {'error': 'wrong color type (int)'}, 400 - - if (r < 0 or r > 255) \ - or (g < 0 or g > 255) \ - or (b < 0 or b > 255): - return {'error': 'color out of bound (0 - 255)'}, 400 - - brightness = request.json.get("brightness") - - if not brightness is None: - if type(brightness) is float \ - or type(brightness) is int: - if brightness < 0 or brightness > 1: - return {'error': 'brightness out of bound (0.0 - 1.0)'}, 400 - else: - return {'error': 'wrong brightness type (float)'}, 400 - __setBusyLightColor(color, brightness) - - __setBusyLightColor(color, blBrightness) + except asyncio.CancelledError: + # ensure LED is OFF when cancelled + self._np.fill((0, 0, 0)) + self._np.write() + raise - return {'status': blStatus} - -@app.route('/api/status/', methods=['GET', 'POST']) -async def setStatus(request, status): - lStatus = status.lower() - if lStatus == 'on': - __setBusyLightStatus('ON') - elif lStatus == 'off': - __setBusyLightStatus('OFF') - elif lStatus == 'available': - __setBusyLightStatus('AVAILABLE') - elif lStatus == 'away': - __setBusyLightStatus('AWAY') - elif lStatus == 'busy': - __setBusyLightStatus('BUSY') - else: - return {'error': 'unknown /api/status/' + lStatus + ' route'}, 404 - - return {'status': blStatus} - -async def __blinkTask(color, brightness, frequency, duration): - global blBlinking, blStatus, blColor, blBrightness - blBlinking = True - interval = 1.0 / (frequency * 2) # half period in seconds - elapsed = 0.0 - while blBlinking: - neoPixelStrip.fill(__setColor(color)) - neoPixelStrip.write() - await asyncio.sleep(interval) - neoPixelStrip.fill((0, 0, 0)) - neoPixelStrip.write() - await asyncio.sleep(interval) - if duration > 0: - elapsed += interval * 2 - if elapsed >= duration: - break - # Restore previous state - blBlinking = False - __setBusyLightColor(color, brightness) - blStatus = blPreviousStatus - -@app.post('/api/blink/stop') -async def blinkStop(request): - global blBlinking - blBlinking = False - return {'status': blStatus} - -@app.post('/api/blink') -async def setBlink(request): - frequency = request.json.get('frequency') - duration = request.json.get('duration') - - if frequency is None or duration is None: - return {'error': 'missing frequency or duration parameter'}, 400 - - if not isinstance(frequency, int) or frequency <= 0: - return {'error': 'frequency must be a positive integer (Hz)'}, 400 - - if not (isinstance(duration, (int, float))) or duration < 0: - return {'error': 'duration must be a positive float in seconds (0 = endless)'}, 400 - - # Save current state - global blPreviousStatus, blBlinking, blBlinkFrequency, blBlinkDuration, blBlinkStartTime - blPreviousStatus = blStatus - savedColor = blColor - savedBrightness = blBrightness - - # Stop any ongoing blink - blBlinking = False - await asyncio.sleep(0) - - # Save blink params - blBlinkFrequency = frequency - blBlinkDuration = duration - blBlinkStartTime = time.ticks_ms() - - # Launch blink task - asyncio.create_task(__blinkTask(savedColor, savedBrightness, frequency, duration)) - - return {'status': 'blinking', 'frequency': frequency, 'duration': duration} - -@app.get('/api/blink') -async def getBlink(request): - if blBlinking and blBlinkDuration > 0: - elapsed = time.ticks_diff(time.ticks_ms(), blBlinkStartTime) / 1000.0 - remains = max(0.0, blBlinkDuration - elapsed) - else: - remains = 0.0 - - return { - 'isblinking': blBlinking, - 'frequency': blBlinkFrequency, - 'duration': blBlinkDuration, - 'remains': remains - } - -@app.get('/api/color') -async def getColor(request): -# r, g, b = neoPixelStrip.__getitem__(0) - r, g, b = blColor - return {'r': r, 'g': g, 'b': b, 'brightness': blBrightness} - -@app.get('/api/status') -async def getStatus(request): - return {'status': blStatus} - -@app.get('/api/debug') -async def getDebugInfo(request): - r, g, b = blColor - dr, dg, db = neoPixelStrip.__getitem__(0) - return {'status': blStatus, 'brightness': blBrightness, 'color': {'r': r, 'g': g, 'b': b}, 'dimColor': {'r': dr, 'g': dg, 'b': db}} - -@app.post('/api/mutedeck-webhook') -async def mutedeckWebhook(request): - - if request.json.get('control') != 'system': - if request.json.get('call') == 'active': - if request.json.get('mute') == 'active': - isMuted = True - else: - isMuted = False - - if request.json.get('video') == 'active': - isVideoOn = True - else: - isVideoOn = False - - if isMuted: - __setBusyLightStatus('away') - else: - __setBusyLightStatus('busy') - else: - __setBusyLightStatus('available') - - return {'status': blStatus} + async def send(self, cmd): + await self._queue.put(cmd) -@app.post('/shutdown') -async def shutdown(request): - request.app.shutdown() - return 'The server is shutting down...' +# ========================= +# MAIN APPLICATION +# ========================= +class BusyLight: -# Startup effect -def startUpSeq(): - print('Start seq begins') - __setBusyLightColor(statusColors.get('OFF'), 0.1) - time.sleep_ms(100) - __setBusyLightStatus('BUSY') - time.sleep_ms(200) - __setBusyLightStatus('AWAY') - time.sleep_ms(300) - __setBusyLightStatus('AVAILABLE') - time.sleep_ms(500) - __setBusyLightStatus('OFF') - print('Start seq is ended') - -startUpSeq() - -# Start API webserver -if __name__ == '__main__': - app.run(port=80, debug=True) \ No newline at end of file + def __init__(self): + # ---- Hardware init (same as your original) ---- + self.__np = NeoPixel(Pin(5), 1) # adapt if needed + + # ---- NEW: controller ---- + self.__led = _LedController(self.__np) + asyncio.create_task(self.__led.run()) + + # ---- Web app ---- + self.app = Microdot() + Response.default_content_type = 'application/json' + + self.__register_routes() + + # ========================= + # ROUTES (UNCHANGED API) + # ========================= + def __register_routes(self): + + @self.app.post('/api/blink') + async def blink(request): + data = request.json or {} + + await self.__led.send({ + "type": "blink", + "color": tuple(data.get("color", [255, 0, 0])), + "frequency": float(data.get("frequency", 1)), + "duration": float(data.get("duration", 0)), + }) + + return {"status": "ok"} + + @self.app.post('/api/color') + async def color(request): + data = request.json or {} + + await self.__led.send({ + "type": "solid", + "color": tuple(data.get("color", [255, 255, 255])) + }) + + return {"status": "ok"} + + @self.app.post('/api/off') + async def off(request): + await self.__led.send({ + "type": "off" + }) + + return {"status": "ok"} + + # ========================= + # START SERVER + # ========================= + async def run(self): + await self.app.start_server(host="0.0.0.0", port=80) + + +# ========================= +# ENTRYPOINT +# ========================= +async def main(): + app = BusyLight() + await app.run() + +asyncio.run(main()) \ No newline at end of file