refactor: use a Led controller to allow async command (#5)

Reviewed-on: #5
This commit was merged in pull request #5.
This commit is contained in:
2026-03-25 08:37:07 +01:00
parent 1096bf5ce3
commit 5361ca4db7
+308 -237
View File
@@ -1,276 +1,346 @@
from microdot import Microdot, send_file
import machine, sys, neopixel, time, asyncio
import machine, neopixel, time
import uasyncio as asyncio
app = Microdot()
# Difine NeoPixel object
nbPixels = 12*2
pinPixelStrip = 16 # ESP32 D1 Mini
# =========================
# NeoPixel setup
# =========================
nbPixels = 12 * 2
pinPixelStrip = 16
neoPixelStrip = neopixel.NeoPixel(machine.Pin(pinPixelStrip), nbPixels)
# Define status colors
statusColors = {
'BUSY': (255,0,0), # red
'AVAILABLE': (0,255,0), # green
'AWAY': (246,190,0), # cyan
'OFF': (0, 0, 0), # off
'ON': (255, 255, 255) # white
}
# Store BusyLight default global status
blColor = statusColors.get('OFF')
blStatus = 'off'
blPreviousStatus='off'
blBrightness = 0.1 # Adjust the brightness (0.0 - 1.0)
blBlinking = False
blBlinkFrequency = 0
blBlinkDuration = 0
blBlinkStartTime = 0
def __setColor(color):
r, g , b = color
r = int(r * blBrightness)
g = int(g * blBrightness)
b = int(b * blBrightness)
return (r, g, b)
# =========================
# LED CONTROLLER
# =========================
class LedController:
def __init__(self, np):
self._np = np
def __setBusyLightColor(color, brightness):
global blBrightness
blBrightness = brightness
global blColor
blColor = color
neoPixelStrip.fill(__setColor(color))
neoPixelStrip.write()
global blStatus
blStatus = 'colored'
# async control
self._flag = asyncio.ThreadSafeFlag()
self._cmd = None
self._task = None
def __setBusyLightStatus(status):
status = status.upper()
color = statusColors.get(status)
__setBusyLightColor(color, blBrightness)
global blStatus
blStatus = status.lower()
# state
self.states = {
'BUSY': (255, 0, 0),
'AVAILABLE': (0, 255, 0),
'AWAY': (246, 190, 0),
'OFF': (0, 0, 0),
'ON': (255, 255, 255)
}
# Microdot APP routes
self.color = self.states['OFF']
self.status = 'off'
self.previousStatus = 'off'
self.brightness = 0.1
self.blinking = False
self.blinkFrequency = 0
self.blinkDuration = 0
self.blinkStart = 0
# ---------- internal ----------
def _apply_brightness(self, color):
r, g, b = color
return (
int(r * self.brightness),
int(g * self.brightness),
int(b * self.brightness),
)
def _write(self, color):
self._np.fill(self._apply_brightness(color))
self._np.write()
async def run(self):
while True:
await self._flag.wait()
cmd = self._cmd
# Cancel any ongoing task before starting a new one
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
# Process the new command
t = cmd["type"]
if t == "blink":
self._task = asyncio.create_task(self._blink(cmd))
elif t == "solid":
self._task = asyncio.create_task(self._solid(cmd))
elif t == "off":
self._task = asyncio.create_task(self._off())
async def _solid(self, cmd):
self.color = cmd["color"]
self.status = cmd.get("status", "colored")
self._write(self.color)
async def _off(self):
self.color = (0, 0, 0)
self.status = "off"
self._write(self.color)
async def _blink(self, cmd):
try:
self.blinking = True
self.color = cmd["color"]
self.previousStatus = cmd["previousStatus"]
self.blinkFrequency = cmd["frequency"]
self.blinkDuration = cmd["duration"]
self.blinkStart = time.ticks_ms()
freq = max(0.1, self.blinkFrequency)
half_ms = int(500 / freq)
while True:
if self.blinkDuration != 0:
elapsed = time.ticks_diff(time.ticks_ms(), self.blinkStart)
if elapsed > self.blinkDuration * 1000:
break
self._write(self.color)
await asyncio.sleep_ms(half_ms)
self._write((0, 0, 0))
await asyncio.sleep_ms(half_ms)
except asyncio.CancelledError:
pass
self.blinking = False
self.status = self.previousStatus
self._write(self.color)
async def send(self, cmd):
self._cmd = cmd
self._flag.set()
# ---------- public API ----------
async def set_color(self, color, brightness=None):
if brightness is not None:
self.brightness = brightness
await self.send({
"type": "solid",
"color": color
})
async def set_status(self, status):
status = status.upper()
color = self.states.get(status)
if color is None:
raise ValueError("invalid status")
await self.send({
"type": "solid",
"color": color,
"status": status.lower()
})
async def blink(self, frequency, duration):
self.previousStatus = self.status
await self.send({
"type": "blink",
"color": self.color,
"frequency": frequency,
"duration": duration,
"previousStatus": self.previousStatus
})
async def stop(self):
await self.send({"type": "off"})
def get_state(self):
if self.blinking and self.blinkDuration > 0:
elapsed = time.ticks_diff(time.ticks_ms(), self.blinkStart) / 1000
remains = max(0.0, self.blinkDuration - elapsed)
else:
remains = 0.0
return {
"status": self.status,
"color": self.color,
"brightness": self.brightness,
"isblinking": self.blinking,
"frequency": self.blinkFrequency,
"duration": self.blinkDuration,
"remains": remains
}
# =========================
# INIT
# =========================
led = LedController(neoPixelStrip)
# =========================
# ROUTES
# =========================
@app.get('/static/<path:path>')
async def staticRoutes(request, path):
if '..' in path:
# directory traversal is not allowed
return {'error': '4040 Not found'}, 404
return {'error': '404 Not found'}, 404
return send_file('static/' + path)
@app.get('/')
async def getIndex(request):
return send_file('static/index.html')
@app.get('/api/brightness')
async def getBrightness(request):
return {'brightness': blBrightness}
return {'brightness': led.brightness}
@app.post('/api/brightness')
async def setBrightness(request):
brightness = request.json.get("brightness")
if brightness is None:
return {'error': 'missing brightness parameter'}, 400
if type(brightness) is float \
or type(brightness) is int:
if brightness < 0 or brightness > 1:
return {'error': 'brigthness out of bound (0.0 - 1.0)'}, 400
else:
return {'error': 'wrong brigthness type (float)'}, 400
# Save blStatus
global blStatus
status = blStatus
# Apply new brightness to current color
color = blColor
__setBusyLightColor(color, brightness)
# Restore global status
blStatus = status
global blBrightness
blBrightness = brightness
return {'brightness': blBrightness}
if not isinstance(brightness, (int, float)):
return {'error': 'wrong brightness type'}, 400
if brightness < 0 or brightness > 1:
return {'error': 'brightness out of bound'}, 400
current_status = led.status
current_color = led.color
await led.set_color(current_color, brightness)
led.status = current_status
return {'brightness': led.brightness}
@app.post('/api/color')
async def setColor(request):
r = request.json.get("r")
g = request.json.get("g")
b = request.json.get("b")
if bool(r is None or g is None or b is None):
if r is None or g is None or b is None:
return {'error': 'missing color'}, 400
else:
if type(r) is int \
and type(g) is int \
and type(b) is int:
color = (r, g, b)
else:
return {'error': 'wrong color type (int)'}, 400
if (r < 0 or r > 255) \
or (g < 0 or g > 255) \
or (b < 0 or b > 255):
return {'error': 'color out of bound (0 - 255)'}, 400
if not all(isinstance(v, int) for v in (r, g, b)):
return {'error': 'wrong color type'}, 400
if not (0 <= r <= 255 and 0 <= g <= 255 and 0 <= b <= 255):
return {'error': 'color out of bound'}, 400
brightness = request.json.get("brightness")
if not brightness is None:
if type(brightness) is float \
or type(brightness) is int:
if brightness < 0 or brightness > 1:
return {'error': 'brightness out of bound (0.0 - 1.0)'}, 400
else:
return {'error': 'wrong brightness type (float)'}, 400
__setBusyLightColor(color, brightness)
__setBusyLightColor(color, blBrightness)
return {'status': blStatus}
await led.set_color((r, g, b), brightness)
@app.route('/api/status/<status>', methods=['GET', 'POST'])
async def setStatus(request, status):
lStatus = status.lower()
if lStatus == 'on':
__setBusyLightStatus('ON')
elif lStatus == 'off':
__setBusyLightStatus('OFF')
elif lStatus == 'available':
__setBusyLightStatus('AVAILABLE')
elif lStatus == 'away':
__setBusyLightStatus('AWAY')
elif lStatus == 'busy':
__setBusyLightStatus('BUSY')
else:
return {'error': 'unknown /api/status/' + lStatus + ' route'}, 404
return {'status': blStatus}
return {'status': led.status}
async def __blinkTask(color, brightness, frequency, duration):
global blBlinking, blStatus, blColor, blBrightness
blBlinking = True
interval = 1.0 / (frequency * 2) # half period in seconds
elapsed = 0.0
while blBlinking:
neoPixelStrip.fill(__setColor(color))
neoPixelStrip.write()
await asyncio.sleep(interval)
neoPixelStrip.fill((0, 0, 0))
neoPixelStrip.write()
await asyncio.sleep(interval)
if duration > 0:
elapsed += interval * 2
if elapsed >= duration:
break
# Restore previous state
blBlinking = False
__setBusyLightColor(color, brightness)
blStatus = blPreviousStatus
@app.post('/api/blink/stop')
async def blinkStop(request):
global blBlinking
blBlinking = False
return {'status': blStatus}
@app.post('/api/blink')
async def setBlink(request):
frequency = request.json.get('frequency')
duration = request.json.get('duration')
if frequency is None or duration is None:
return {'error': 'missing frequency or duration parameter'}, 400
if not isinstance(frequency, int) or frequency <= 0:
return {'error': 'frequency must be a positive integer (Hz)'}, 400
if not (isinstance(duration, (int, float))) or duration < 0:
return {'error': 'duration must be a positive float in seconds (0 = endless)'}, 400
# Save current state
global blPreviousStatus, blBlinking, blBlinkFrequency, blBlinkDuration, blBlinkStartTime
blPreviousStatus = blStatus
savedColor = blColor
savedBrightness = blBrightness
# Stop any ongoing blink
blBlinking = False
await asyncio.sleep(0)
# Save blink params
blBlinkFrequency = frequency
blBlinkDuration = duration
blBlinkStartTime = time.ticks_ms()
# Launch blink task
asyncio.create_task(__blinkTask(savedColor, savedBrightness, frequency, duration))
return {'status': 'blinking', 'frequency': frequency, 'duration': duration}
@app.get('/api/blink')
async def getBlink(request):
if blBlinking and blBlinkDuration > 0:
elapsed = time.ticks_diff(time.ticks_ms(), blBlinkStartTime) / 1000.0
remains = max(0.0, blBlinkDuration - elapsed)
else:
remains = 0.0
return {
'isblinking': blBlinking,
'frequency': blBlinkFrequency,
'duration': blBlinkDuration,
'remains': remains
}
@app.get('/api/color')
async def getColor(request):
# r, g, b = neoPixelStrip.__getitem__(0)
r, g, b = blColor
return {'r': r, 'g': g, 'b': b, 'brightness': blBrightness}
r, g, b = led.color
return {'r': r, 'g': g, 'b': b, 'brightness': led.brightness}
@app.get('/api/status')
async def getStatus(request):
return {'status': blStatus}
return {'status': led.status}
@app.route('/api/status/<status>', methods=['GET', 'POST'])
async def setStatus(request, status):
try:
await led.set_status(status)
except ValueError:
return {'error': 'unknown status'}, 404
return {'status': led.status}
@app.post('/api/blink')
async def setBlink(request):
freq = request.json.get('frequency')
duration = request.json.get('duration')
if freq is None or duration is None:
return {'error': 'missing frequency or duration'}, 400
if not isinstance(freq, (int, float)) or freq <= 0:
return {'error': 'invalid frequency'}, 400
if not isinstance(duration, (int, float)) or duration < 0:
return {'error': 'invalid duration'}, 400
await led.blink(freq, duration)
return {'status': 'blinking', 'frequency': freq, 'duration': duration}
@app.get('/api/blink')
async def getBlink(request):
state = led.get_state()
return {
'isblinking': state["isblinking"],
'frequency': state["frequency"],
'duration': state["duration"],
'remains': state["remains"]
}
@app.post('/api/blink/stop')
async def blinkStop(request):
await led.stop()
await led.set_status(led.previousStatus)
return {'status': led.status}
@app.get('/api/debug')
async def getDebugInfo(request):
r, g, b = blColor
dr, dg, db = neoPixelStrip.__getitem__(0)
return {'status': blStatus, 'brightness': blBrightness, 'color': {'r': r, 'g': g, 'b': b}, 'dimColor': {'r': dr, 'g': dg, 'b': db}}
r, g, b = led.color
dr, dg, db = led._np[0]
return {
'status': led.status,
'brightness': led.brightness,
'color': {'r': r, 'g': g, 'b': b},
'dimColor': {'r': dr, 'g': dg, 'b': db},
'blinking': led.blinking,
'blinkFrequency': led.blinkFrequency,
'blinkDuration': led.blinkDuration
}
@app.post('/api/mutedeck-webhook')
async def mutedeckWebhook(request):
if request.json.get('control') != 'system':
if request.json.get('call') == 'active':
if request.json.get('mute') == 'active':
isMuted = True
else:
isMuted = False
if request.json.get('video') == 'active':
isVideoOn = True
else:
isVideoOn = False
if isMuted:
__setBusyLightStatus('away')
else:
__setBusyLightStatus('busy')
else:
__setBusyLightStatus('available')
isMuted = request.json.get('mute') == 'active'
return {'status': blStatus}
if isMuted:
await led.set_status('away')
else:
await led.set_status('busy')
else:
await led.set_status('available')
return {'status': led.status}
@app.post('/shutdown')
@@ -278,22 +348,23 @@ async def shutdown(request):
request.app.shutdown()
return 'The server is shutting down...'
# Startup effect
def startUpSeq():
print('Start seq begins')
__setBusyLightColor(statusColors.get('OFF'), 0.1)
time.sleep_ms(100)
__setBusyLightStatus('BUSY')
time.sleep_ms(200)
__setBusyLightStatus('AWAY')
time.sleep_ms(300)
__setBusyLightStatus('AVAILABLE')
time.sleep_ms(500)
__setBusyLightStatus('OFF')
print('Start seq is ended')
startUpSeq()
# Start API webserver
if __name__ == '__main__':
app.run(port=80, debug=True)
# =========================
# START
# =========================
async def main():
asyncio.create_task(led.run())
# startup animation
await led.set_status('busy')
await asyncio.sleep_ms(200)
await led.set_status('away')
await asyncio.sleep_ms(300)
await led.set_status('available')
await asyncio.sleep_ms(500)
await led.set_status('off')
await app.start_server(port=80)
asyncio.run(main())