an eink camera running on an rpi zero 2 w

feat: add auto reload with websockets

dunkirk.sh db2dea2c d794e531

verified
Changed files
+40
src
+40
src/camera_server.py
···
import http.server
import socketserver
import threading
+
import websockets
+
import asyncio
# Setup logging
logger = logging.getLogger('camera_server')
···
BUTTON_PIN = 17
PHOTO_DIR = "/home/kierank/photos"
WEB_PORT = 80
+
WS_PORT = 8765
PHOTO_RESOLUTION = (2592, 1944)
CAMERA_SETTLE_TIME = 1
DEBOUNCE_DELAY = 0.2
···
GPIO.setmode(GPIO.BCM)
GPIO.setup(Config.BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
+
# WebSocket clients set
+
connected_clients = set()
+
# Create a simple HTML gallery template - using triple quotes properly
HTML_TEMPLATE = """<!DOCTYPE html>
<html>
···
.photo img {{ width: 100%; height: auto; }}
.photo a {{ display: block; text-align: center; margin-top: 5px; }}
</style>
+
<script>
+
const ws = new WebSocket('ws://' + window.location.hostname + ':8765');
+
ws.onmessage = function(event) {{
+
if(event.data === 'reload') {{
+
window.location.reload();
+
}}
+
}};
+
</script>
</head>
<body>
<h1>Inkpress: Gallery</h1>
···
else:
super().do_GET()
+
async def websocket_handler(websocket, path):
+
connected_clients.add(websocket)
+
try:
+
await websocket.wait_closed()
+
finally:
+
connected_clients.remove(websocket)
+
+
async def notify_clients():
+
if connected_clients:
+
await asyncio.gather(
+
*[client.send('reload') for client in connected_clients]
+
)
+
def take_photo():
"""
Captures a photo using the Raspberry Pi camera.
···
picam2.capture_file(filename)
logger.info("Photo taken successfully")
+
+
# Notify websocket clients to reload
+
asyncio.run(notify_clients())
except IOError as e:
logger.error(f"IO Error while taking photo: {str(e)}")
except Exception as e:
···
server = None
try:
+
# Start HTTP server
server = socketserver.TCPServer(("", Config.WEB_PORT), PhotoHandler)
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
+
+
# Start WebSocket server
+
ws_server = websockets.serve(websocket_handler, "0.0.0.0", Config.WS_PORT)
+
asyncio.get_event_loop().run_until_complete(ws_server)
+
ws_thread = threading.Thread(
+
target=asyncio.get_event_loop().run_forever,
+
daemon=True
+
)
+
ws_thread.start()
previous_state = GPIO.input(Config.BUTTON_PIN)
while True: