Files
busylight/ESP32/main.py
T
2026-03-25 00:57:20 +01:00

148 lines
3.9 KiB
Python

import uasyncio as asyncio
import time
from microdot import Microdot, Response
from machine import Pin
from neopixel import NeoPixel
# =========================
# LED CONTROLLER (NEW)
# =========================
class _LedController:
def __init__(self, np):
self._np = np
self._queue = asyncio.Queue(5)
self._task = None
async def run(self):
while True:
cmd = await self._queue.get()
# cancel current animation
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
t = cmd["type"]
if t == "blink":
self._task = asyncio.create_task(self._blink(cmd))
elif t == "solid":
self._task = asyncio.create_task(self._solid(cmd))
elif t == "off":
self._task = asyncio.create_task(self._off())
async def _solid(self, cmd):
self._np.fill(cmd["color"])
self._np.write()
async def _off(self):
self._np.fill((0, 0, 0))
self._np.write()
async def _blink(self, cmd):
try:
color = cmd["color"]
freq = max(0.1, cmd["frequency"])
duration = cmd["duration"]
half_ms = int(500 / freq)
start = time.ticks_ms()
while True:
if duration != 0 and time.ticks_diff(time.ticks_ms(), start) > duration * 1000:
break
# ON
self._np.fill(color)
self._np.write()
await asyncio.sleep_ms(half_ms)
# OFF
self._np.fill((0, 0, 0))
self._np.write()
await asyncio.sleep_ms(half_ms)
except asyncio.CancelledError:
# ensure LED is OFF when cancelled
self._np.fill((0, 0, 0))
self._np.write()
raise
async def send(self, cmd):
await self._queue.put(cmd)
# =========================
# MAIN APPLICATION
# =========================
class BusyLight:
def __init__(self):
# ---- Hardware init (same as your original) ----
self.__np = NeoPixel(Pin(5), 1) # adapt if needed
# ---- NEW: controller ----
self.__led = _LedController(self.__np)
asyncio.create_task(self.__led.run())
# ---- Web app ----
self.app = Microdot()
Response.default_content_type = 'application/json'
self.__register_routes()
# =========================
# ROUTES (UNCHANGED API)
# =========================
def __register_routes(self):
@self.app.post('/api/blink')
async def blink(request):
data = request.json or {}
await self.__led.send({
"type": "blink",
"color": tuple(data.get("color", [255, 0, 0])),
"frequency": float(data.get("frequency", 1)),
"duration": float(data.get("duration", 0)),
})
return {"status": "ok"}
@self.app.post('/api/color')
async def color(request):
data = request.json or {}
await self.__led.send({
"type": "solid",
"color": tuple(data.get("color", [255, 255, 255]))
})
return {"status": "ok"}
@self.app.post('/api/off')
async def off(request):
await self.__led.send({
"type": "off"
})
return {"status": "ok"}
# =========================
# START SERVER
# =========================
async def run(self):
await self.app.start_server(host="0.0.0.0", port=80)
# =========================
# ENTRYPOINT
# =========================
async def main():
app = BusyLight()
await app.run()
asyncio.run(main())