344 lines
11 KiB
Python
344 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ESP32 Firmware Flash Web Server
|
|
Web-based GUI interface for flashing firmware packages to ESP32 devices
|
|
"""
|
|
|
|
from flask import Flask, render_template, request, jsonify, send_from_directory
|
|
from flask_socketio import SocketIO, emit
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import zipfile
|
|
import tempfile
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
import serial.tools.list_ports
|
|
import webbrowser
|
|
import socket
|
|
import time
|
|
import signal
|
|
|
|
app = Flask(__name__)
|
|
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max file size
|
|
app.config['SECRET_KEY'] = 'esp32-flasher-secret-key'
|
|
socketio = SocketIO(app, cors_allowed_origins="*")
|
|
|
|
# Global state
|
|
flash_status = {
|
|
'running': False,
|
|
'output': [],
|
|
'success': None,
|
|
'progress': 0
|
|
}
|
|
|
|
# Client connection tracking
|
|
client_connected = False
|
|
last_heartbeat = time.time()
|
|
shutdown_timer = None
|
|
|
|
def get_free_port():
|
|
"""Find a free port to run the server on"""
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind(('', 0))
|
|
s.listen(1)
|
|
port = s.getsockname()[1]
|
|
return port
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Serve the main page"""
|
|
return render_template('index.html')
|
|
|
|
@app.route('/static/<path:filename>')
|
|
def serve_static(filename):
|
|
"""Serve static files"""
|
|
static_dir = os.path.join(os.path.dirname(__file__), 'static')
|
|
return send_from_directory(static_dir, filename)
|
|
|
|
@app.route('/api/ports')
|
|
def get_ports():
|
|
"""Get list of available serial ports"""
|
|
try:
|
|
ports = [{'device': port.device, 'description': port.description}
|
|
for port in serial.tools.list_ports.comports()]
|
|
return jsonify({'success': True, 'ports': ports})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
@app.route('/api/flash', methods=['POST'])
|
|
def flash_firmware():
|
|
"""Flash firmware to ESP32"""
|
|
global flash_status
|
|
|
|
if flash_status['running']:
|
|
return jsonify({'success': False, 'error': 'Flash operation already in progress'})
|
|
|
|
# Get form data
|
|
port = request.form.get('port')
|
|
chip = request.form.get('chip', 'esp32')
|
|
baud = request.form.get('baud', '460800')
|
|
flash_mode = request.form.get('flash_mode', 'dio')
|
|
flash_freq = request.form.get('flash_freq', '40m')
|
|
flash_size = request.form.get('flash_size', '2MB')
|
|
|
|
# Get uploaded file
|
|
if 'firmware' not in request.files:
|
|
return jsonify({'success': False, 'error': 'No firmware file uploaded'})
|
|
|
|
firmware_file = request.files['firmware']
|
|
if firmware_file.filename == '':
|
|
return jsonify({'success': False, 'error': 'No firmware file selected'})
|
|
|
|
# Save and extract firmware
|
|
try:
|
|
# Create temp directory
|
|
temp_dir = tempfile.mkdtemp(prefix="esp32_flash_")
|
|
zip_path = os.path.join(temp_dir, 'firmware.zip')
|
|
firmware_file.save(zip_path)
|
|
|
|
# Extract firmware
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_file:
|
|
zip_file.extractall(temp_dir)
|
|
|
|
# Start flash in background thread
|
|
flash_status = {'running': True, 'output': [], 'success': None, 'progress': 0}
|
|
thread = threading.Thread(
|
|
target=flash_worker,
|
|
args=(temp_dir, port, chip, baud, flash_mode, flash_freq, flash_size)
|
|
)
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
return jsonify({'success': True, 'message': 'Flash started'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
@app.route('/api/status')
|
|
def get_status():
|
|
"""Get current flash status"""
|
|
return jsonify(flash_status)
|
|
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
"""Handle client connection"""
|
|
global client_connected, last_heartbeat, shutdown_timer
|
|
client_connected = True
|
|
last_heartbeat = time.time()
|
|
|
|
# Cancel any pending shutdown
|
|
if shutdown_timer:
|
|
shutdown_timer.cancel()
|
|
shutdown_timer = None
|
|
|
|
print('Client connected')
|
|
|
|
@socketio.on('disconnect')
|
|
def handle_disconnect():
|
|
"""Handle client disconnection"""
|
|
global client_connected, shutdown_timer
|
|
client_connected = False
|
|
print('Client disconnected - scheduling shutdown in 1 second...')
|
|
|
|
# Schedule shutdown after 5 seconds
|
|
shutdown_timer = threading.Timer(1.0, shutdown_server)
|
|
shutdown_timer.daemon = True
|
|
shutdown_timer.start()
|
|
|
|
@socketio.on('heartbeat')
|
|
def handle_heartbeat():
|
|
"""Handle heartbeat from client"""
|
|
global last_heartbeat
|
|
last_heartbeat = time.time()
|
|
emit('heartbeat_ack', {'timestamp': last_heartbeat})
|
|
|
|
def shutdown_server():
|
|
"""Gracefully shutdown the server"""
|
|
print('\nNo clients connected. Shutting down server...')
|
|
|
|
# Give a moment for any pending operations
|
|
time.sleep(1)
|
|
|
|
# Shutdown Flask
|
|
try:
|
|
func = request.environ.get('werkzeug.server.shutdown')
|
|
if func is None:
|
|
# Alternative shutdown method
|
|
os.kill(os.getpid(), signal.SIGTERM)
|
|
else:
|
|
func()
|
|
except:
|
|
# Force exit as last resort
|
|
os._exit(0)
|
|
|
|
def flash_worker(temp_dir, port, chip, baud, flash_mode, flash_freq, flash_size):
|
|
"""Background worker for flashing firmware"""
|
|
global flash_status
|
|
import re
|
|
import io
|
|
from contextlib import redirect_stdout, redirect_stderr
|
|
|
|
try:
|
|
# Define file paths
|
|
bootloader = os.path.join(temp_dir, "bootloader.bin")
|
|
firmware = os.path.join(temp_dir, "soundshot.bin")
|
|
ota_initial = os.path.join(temp_dir, "ota_data_initial.bin")
|
|
partition = os.path.join(temp_dir, "partition-table.bin")
|
|
|
|
# Verify files exist
|
|
required_files = [bootloader, firmware, ota_initial, partition]
|
|
for file_path in required_files:
|
|
if not os.path.exists(file_path):
|
|
flash_status['output'].append(f"ERROR: Missing file {os.path.basename(file_path)}")
|
|
flash_status['success'] = False
|
|
flash_status['running'] = False
|
|
return
|
|
|
|
# Build esptool command arguments
|
|
esptool_args = [
|
|
"--chip", chip,
|
|
"--port", port,
|
|
"--baud", baud,
|
|
"--before", "default_reset",
|
|
"--after", "hard_reset",
|
|
"write-flash",
|
|
"--flash-mode", flash_mode,
|
|
"--flash-freq", flash_freq,
|
|
"--flash-size", flash_size,
|
|
"0x1000", bootloader,
|
|
"0x20000", firmware,
|
|
"0x11000", ota_initial,
|
|
"0x8000", partition
|
|
]
|
|
|
|
flash_status['output'].append(f"Running esptool with args: {' '.join(esptool_args)}")
|
|
flash_status['output'].append("")
|
|
|
|
# Import and run esptool directly (works in both script and EXE)
|
|
import esptool
|
|
|
|
# Fix esptool data path for PyInstaller
|
|
if getattr(sys, 'frozen', False):
|
|
# Running as compiled executable
|
|
bundle_dir = sys._MEIPASS
|
|
esptool_targets = os.path.join(bundle_dir, 'esptool', 'targets')
|
|
if os.path.exists(esptool_targets):
|
|
# Override esptool's resource path
|
|
esptool.RESOURCES_DIR = bundle_dir
|
|
|
|
# Capture stdout/stderr
|
|
output_buffer = io.StringIO()
|
|
|
|
# Backup original sys.argv and replace with our arguments
|
|
original_argv = sys.argv
|
|
sys.argv = ['esptool.py'] + esptool_args
|
|
|
|
# Run esptool and capture output
|
|
return_code = 0
|
|
|
|
# Regex to parse progress lines
|
|
# Example: "Writing at 0x0011325c... (85 %)"
|
|
# or "Writing at 0x0011325c [========================> ] 85.3% 622592/730050 bytes..."
|
|
progress_pattern = re.compile(r'(\d+(?:\.\d+)?)\s*%')
|
|
|
|
# Save original stdout/stderr
|
|
original_stdout = sys.stdout
|
|
original_stderr = sys.stderr
|
|
|
|
try:
|
|
# Custom output class to capture and stream esptool output
|
|
class OutputCapture:
|
|
def __init__(self):
|
|
self.buffer = []
|
|
|
|
def write(self, text):
|
|
if text and text.strip():
|
|
lines = text.rstrip().split('\n')
|
|
for line in lines:
|
|
flash_status['output'].append(line)
|
|
self.buffer.append(line)
|
|
|
|
# Try to extract progress percentage
|
|
match = progress_pattern.search(line)
|
|
if match:
|
|
try:
|
|
percent = float(match.group(1))
|
|
flash_status['progress'] = percent
|
|
except ValueError:
|
|
pass
|
|
return len(text)
|
|
|
|
def flush(self):
|
|
pass
|
|
|
|
capture = OutputCapture()
|
|
|
|
# Redirect stdout/stderr to our capture object
|
|
with redirect_stdout(capture), redirect_stderr(capture):
|
|
esptool.main()
|
|
|
|
except SystemExit as e:
|
|
return_code = e.code if e.code is not None else 0
|
|
finally:
|
|
# Restore original sys.argv and stdout/stderr
|
|
sys.argv = original_argv
|
|
sys.stdout = original_stdout
|
|
sys.stderr = original_stderr
|
|
|
|
if return_code == 0:
|
|
flash_status['output'].append("")
|
|
flash_status['output'].append("✓ Flash completed successfully!")
|
|
flash_status['success'] = True
|
|
flash_status['progress'] = 100
|
|
else:
|
|
flash_status['output'].append("")
|
|
flash_status['output'].append(f"✗ Flash failed with return code {return_code}")
|
|
flash_status['success'] = False
|
|
|
|
except Exception as e:
|
|
flash_status['output'].append(f"✗ Error: {str(e)}")
|
|
flash_status['success'] = False
|
|
finally:
|
|
flash_status['running'] = False
|
|
# Clean up temp directory
|
|
try:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
except:
|
|
pass
|
|
|
|
def open_browser(port):
|
|
"""Open browser to the application"""
|
|
url = f'http://127.0.0.1:{port}'
|
|
webbrowser.open(url)
|
|
|
|
def main():
|
|
"""Run the web server"""
|
|
import logging
|
|
|
|
# Disable Flask request logging to avoid colorama issues with stdout redirection
|
|
log = logging.getLogger('werkzeug')
|
|
log.setLevel(logging.ERROR)
|
|
|
|
port = get_free_port()
|
|
print(f"Starting ESP32 Flasher on port {port}...")
|
|
print(f"Open your browser to: http://127.0.0.1:{port}")
|
|
print(f"Server will automatically shut down when browser is closed.\n")
|
|
|
|
# Open browser after short delay
|
|
timer = threading.Timer(1.5, open_browser, args=[port])
|
|
timer.daemon = True
|
|
timer.start()
|
|
|
|
# Run Flask server with SocketIO
|
|
try:
|
|
socketio.run(app, host='127.0.0.1', port=port, debug=False, allow_unsafe_werkzeug=True, log_output=False)
|
|
except KeyboardInterrupt:
|
|
print('\nShutting down...')
|
|
except SystemExit:
|
|
print('Server stopped.')
|
|
|
|
if __name__ == "__main__":
|
|
main()
|