Files
soundshot/flash_tool/web_flasher.py
2025-10-14 09:50:35 -05:00

297 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
ESP32 Firmware Flash Web Server
Web-based GUI interface for flashing firmware packages to ESP32 devices
"""
from flask import Flask, render_template, request, jsonify, send_from_directory
from flask_socketio import SocketIO, emit
import subprocess
import sys
import threading
import zipfile
import tempfile
import os
import shutil
from pathlib import Path
import serial.tools.list_ports
import webbrowser
import socket
import time
import signal
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max file size
app.config['SECRET_KEY'] = 'esp32-flasher-secret-key'
socketio = SocketIO(app, cors_allowed_origins="*")
# Global state
flash_status = {
'running': False,
'output': [],
'success': None,
'progress': 0
}
# Client connection tracking
client_connected = False
last_heartbeat = time.time()
shutdown_timer = None
def get_free_port():
"""Find a free port to run the server on"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
s.listen(1)
port = s.getsockname()[1]
return port
@app.route('/')
def index():
"""Serve the main page"""
return render_template('index.html')
@app.route('/static/<path:filename>')
def serve_static(filename):
"""Serve static files"""
static_dir = os.path.join(os.path.dirname(__file__), 'static')
return send_from_directory(static_dir, filename)
@app.route('/api/ports')
def get_ports():
"""Get list of available serial ports"""
try:
ports = [{'device': port.device, 'description': port.description}
for port in serial.tools.list_ports.comports()]
return jsonify({'success': True, 'ports': ports})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/flash', methods=['POST'])
def flash_firmware():
"""Flash firmware to ESP32"""
global flash_status
if flash_status['running']:
return jsonify({'success': False, 'error': 'Flash operation already in progress'})
# Get form data
port = request.form.get('port')
chip = request.form.get('chip', 'esp32')
baud = request.form.get('baud', '460800')
flash_mode = request.form.get('flash_mode', 'dio')
flash_freq = request.form.get('flash_freq', '40m')
flash_size = request.form.get('flash_size', '2MB')
# Get uploaded file
if 'firmware' not in request.files:
return jsonify({'success': False, 'error': 'No firmware file uploaded'})
firmware_file = request.files['firmware']
if firmware_file.filename == '':
return jsonify({'success': False, 'error': 'No firmware file selected'})
# Save and extract firmware
try:
# Create temp directory
temp_dir = tempfile.mkdtemp(prefix="esp32_flash_")
zip_path = os.path.join(temp_dir, 'firmware.zip')
firmware_file.save(zip_path)
# Extract firmware
with zipfile.ZipFile(zip_path, 'r') as zip_file:
zip_file.extractall(temp_dir)
# Start flash in background thread
flash_status = {'running': True, 'output': [], 'success': None, 'progress': 0}
thread = threading.Thread(
target=flash_worker,
args=(temp_dir, port, chip, baud, flash_mode, flash_freq, flash_size)
)
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': 'Flash started'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/status')
def get_status():
"""Get current flash status"""
return jsonify(flash_status)
@socketio.on('connect')
def handle_connect():
"""Handle client connection"""
global client_connected, last_heartbeat, shutdown_timer
client_connected = True
last_heartbeat = time.time()
# Cancel any pending shutdown
if shutdown_timer:
shutdown_timer.cancel()
shutdown_timer = None
print('Client connected')
@socketio.on('disconnect')
def handle_disconnect():
"""Handle client disconnection"""
global client_connected, shutdown_timer
client_connected = False
print('Client disconnected - scheduling shutdown in 1 second...')
# Schedule shutdown after 5 seconds
shutdown_timer = threading.Timer(1.0, shutdown_server)
shutdown_timer.daemon = True
shutdown_timer.start()
@socketio.on('heartbeat')
def handle_heartbeat():
"""Handle heartbeat from client"""
global last_heartbeat
last_heartbeat = time.time()
emit('heartbeat_ack', {'timestamp': last_heartbeat})
def shutdown_server():
"""Gracefully shutdown the server"""
print('\nNo clients connected. Shutting down server...')
# Give a moment for any pending operations
time.sleep(1)
# Shutdown Flask
try:
func = request.environ.get('werkzeug.server.shutdown')
if func is None:
# Alternative shutdown method
os.kill(os.getpid(), signal.SIGTERM)
else:
func()
except:
# Force exit as last resort
os._exit(0)
def flash_worker(temp_dir, port, chip, baud, flash_mode, flash_freq, flash_size):
"""Background worker for flashing firmware"""
global flash_status
import re
try:
# Define file paths
bootloader = os.path.join(temp_dir, "bootloader.bin")
firmware = os.path.join(temp_dir, "soundshot.bin")
ota_initial = os.path.join(temp_dir, "ota_data_initial.bin")
partition = os.path.join(temp_dir, "partition-table.bin")
# Verify files exist
required_files = [bootloader, firmware, ota_initial, partition]
for file_path in required_files:
if not os.path.exists(file_path):
flash_status['output'].append(f"ERROR: Missing file {os.path.basename(file_path)}")
flash_status['success'] = False
flash_status['running'] = False
return
# Build esptool command
cmd = [
sys.executable, "-m", "esptool",
"--chip", chip,
"--port", port,
"--baud", baud,
"--before", "default_reset",
"--after", "hard_reset",
"write-flash",
"--flash-mode", flash_mode,
"--flash-freq", flash_freq,
"--flash-size", flash_size,
"0x1000", bootloader,
"0x20000", firmware,
"0x11000", ota_initial,
"0x8000", partition
]
flash_status['output'].append(f"Running: {' '.join(cmd)}")
flash_status['output'].append("")
# Run esptool
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)
# Regex to parse progress lines
# Example: "Writing at 0x0011325c... (85 %)"
# or "Writing at 0x0011325c [========================> ] 85.3% 622592/730050 bytes..."
progress_pattern = re.compile(r'(\d+(?:\.\d+)?)\s*%')
# Stream output
for line in process.stdout:
line_stripped = line.rstrip()
flash_status['output'].append(line_stripped)
# Try to extract progress percentage
match = progress_pattern.search(line_stripped)
if match:
try:
percent = float(match.group(1))
flash_status['progress'] = percent
except ValueError:
pass
process.wait()
if process.returncode == 0:
flash_status['output'].append("")
flash_status['output'].append("✓ Flash completed successfully!")
flash_status['success'] = True
flash_status['progress'] = 100
else:
flash_status['output'].append("")
flash_status['output'].append(f"✗ Flash failed with return code {process.returncode}")
flash_status['success'] = False
except Exception as e:
flash_status['output'].append(f"✗ Error: {str(e)}")
flash_status['success'] = False
finally:
flash_status['running'] = False
# Clean up temp directory
try:
shutil.rmtree(temp_dir, ignore_errors=True)
except:
pass
def open_browser(port):
"""Open browser to the application"""
url = f'http://127.0.0.1:{port}'
webbrowser.open(url)
def main():
"""Run the web server"""
port = get_free_port()
print(f"Starting ESP32 Flasher on port {port}...")
print(f"Open your browser to: http://127.0.0.1:{port}")
print(f"Server will automatically shut down when browser is closed.\n")
# Open browser after short delay
timer = threading.Timer(1.5, open_browser, args=[port])
timer.daemon = True
timer.start()
# Run Flask server with SocketIO
try:
socketio.run(app, host='127.0.0.1', port=port, debug=False, allow_unsafe_werkzeug=True)
except KeyboardInterrupt:
print('\nShutting down...')
except SystemExit:
print('Server stopped.')
if __name__ == "__main__":
main()