#!/usr/bin/env python3 """ ESP32 Firmware Flash Web Server Web-based GUI interface for flashing firmware packages to ESP32 devices """ from flask import Flask, render_template, request, jsonify, send_from_directory from flask_socketio import SocketIO, emit import subprocess import sys import threading import zipfile import tempfile import os import shutil from pathlib import Path import serial.tools.list_ports import webbrowser import socket import time import signal app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max file size app.config['SECRET_KEY'] = 'esp32-flasher-secret-key' socketio = SocketIO(app, cors_allowed_origins="*") # Global state flash_status = { 'running': False, 'output': [], 'success': None, 'progress': 0 } # Client connection tracking client_connected = False last_heartbeat = time.time() shutdown_timer = None def get_free_port(): """Find a free port to run the server on""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) s.listen(1) port = s.getsockname()[1] return port @app.route('/') def index(): """Serve the main page""" return render_template('index.html') @app.route('/api/ports') def get_ports(): """Get list of available serial ports""" try: ports = [{'device': port.device, 'description': port.description} for port in serial.tools.list_ports.comports()] return jsonify({'success': True, 'ports': ports}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/flash', methods=['POST']) def flash_firmware(): """Flash firmware to ESP32""" global flash_status if flash_status['running']: return jsonify({'success': False, 'error': 'Flash operation already in progress'}) # Get form data port = request.form.get('port') chip = request.form.get('chip', 'esp32') baud = request.form.get('baud', '460800') flash_mode = request.form.get('flash_mode', 'dio') flash_freq = request.form.get('flash_freq', '40m') flash_size = request.form.get('flash_size', '2MB') # Get uploaded file if 'firmware' not in request.files: return jsonify({'success': False, 'error': 'No firmware file uploaded'}) firmware_file = request.files['firmware'] if firmware_file.filename == '': return jsonify({'success': False, 'error': 'No firmware file selected'}) # Save and extract firmware try: # Create temp directory temp_dir = tempfile.mkdtemp(prefix="esp32_flash_") zip_path = os.path.join(temp_dir, 'firmware.zip') firmware_file.save(zip_path) # Extract firmware with zipfile.ZipFile(zip_path, 'r') as zip_file: zip_file.extractall(temp_dir) # Start flash in background thread flash_status = {'running': True, 'output': [], 'success': None, 'progress': 0} thread = threading.Thread( target=flash_worker, args=(temp_dir, port, chip, baud, flash_mode, flash_freq, flash_size) ) thread.daemon = True thread.start() return jsonify({'success': True, 'message': 'Flash started'}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/status') def get_status(): """Get current flash status""" return jsonify(flash_status) @socketio.on('connect') def handle_connect(): """Handle client connection""" global client_connected, last_heartbeat, shutdown_timer client_connected = True last_heartbeat = time.time() # Cancel any pending shutdown if shutdown_timer: shutdown_timer.cancel() shutdown_timer = None print('Client connected') @socketio.on('disconnect') def handle_disconnect(): """Handle client disconnection""" global client_connected, shutdown_timer client_connected = False print('Client disconnected - scheduling shutdown in 5 seconds...') # Schedule shutdown after 5 seconds shutdown_timer = threading.Timer(5.0, shutdown_server) shutdown_timer.daemon = True shutdown_timer.start() @socketio.on('heartbeat') def handle_heartbeat(): """Handle heartbeat from client""" global last_heartbeat last_heartbeat = time.time() emit('heartbeat_ack', {'timestamp': last_heartbeat}) def shutdown_server(): """Gracefully shutdown the server""" print('\nNo clients connected. Shutting down server...') # Give a moment for any pending operations time.sleep(1) # Shutdown Flask try: func = request.environ.get('werkzeug.server.shutdown') if func is None: # Alternative shutdown method os.kill(os.getpid(), signal.SIGTERM) else: func() except: # Force exit as last resort os._exit(0) def flash_worker(temp_dir, port, chip, baud, flash_mode, flash_freq, flash_size): """Background worker for flashing firmware""" global flash_status import re try: # Define file paths bootloader = os.path.join(temp_dir, "bootloader.bin") firmware = os.path.join(temp_dir, "soundshot.bin") ota_initial = os.path.join(temp_dir, "ota_data_initial.bin") partition = os.path.join(temp_dir, "partition-table.bin") # Verify files exist required_files = [bootloader, firmware, ota_initial, partition] for file_path in required_files: if not os.path.exists(file_path): flash_status['output'].append(f"ERROR: Missing file {os.path.basename(file_path)}") flash_status['success'] = False flash_status['running'] = False return # Build esptool command cmd = [ sys.executable, "-m", "esptool", "--chip", chip, "--port", port, "--baud", baud, "--before", "default_reset", "--after", "hard_reset", "write-flash", "--flash-mode", flash_mode, "--flash-freq", flash_freq, "--flash-size", flash_size, "0x1000", bootloader, "0x20000", firmware, "0x11000", ota_initial, "0x8000", partition ] flash_status['output'].append(f"Running: {' '.join(cmd)}") flash_status['output'].append("") # Run esptool process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True ) # Regex to parse progress lines # Example: "Writing at 0x0011325c... (85 %)" # or "Writing at 0x0011325c [========================> ] 85.3% 622592/730050 bytes..." progress_pattern = re.compile(r'(\d+(?:\.\d+)?)\s*%') # Stream output for line in process.stdout: line_stripped = line.rstrip() flash_status['output'].append(line_stripped) # Try to extract progress percentage match = progress_pattern.search(line_stripped) if match: try: percent = float(match.group(1)) flash_status['progress'] = percent except ValueError: pass process.wait() if process.returncode == 0: flash_status['output'].append("") flash_status['output'].append("✓ Flash completed successfully!") flash_status['success'] = True flash_status['progress'] = 100 else: flash_status['output'].append("") flash_status['output'].append(f"✗ Flash failed with return code {process.returncode}") flash_status['success'] = False except Exception as e: flash_status['output'].append(f"✗ Error: {str(e)}") flash_status['success'] = False finally: flash_status['running'] = False # Clean up temp directory try: shutil.rmtree(temp_dir, ignore_errors=True) except: pass def open_browser(port): """Open browser to the application""" url = f'http://127.0.0.1:{port}' webbrowser.open(url) def main(): """Run the web server""" port = get_free_port() print(f"Starting ESP32 Flasher on port {port}...") print(f"Open your browser to: http://127.0.0.1:{port}") print(f"Server will automatically shut down when browser is closed.\n") # Open browser after short delay timer = threading.Timer(1.5, open_browser, args=[port]) timer.daemon = True timer.start() # Run Flask server with SocketIO try: socketio.run(app, host='127.0.0.1', port=port, debug=False, allow_unsafe_werkzeug=True) except KeyboardInterrupt: print('\nShutting down...') except SystemExit: print('Server stopped.') if __name__ == "__main__": main()