adding web flasher tool
This commit is contained in:
272
flash_tool/web_flasher.py
Normal file
272
flash_tool/web_flasher.py
Normal file
@@ -0,0 +1,272 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ESP32 Firmware Flash Web Server
|
||||
Web-based GUI interface for flashing firmware packages to ESP32 devices
|
||||
"""
|
||||
|
||||
from flask import Flask, render_template, request, jsonify, send_from_directory
|
||||
from flask_socketio import SocketIO, emit
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import zipfile
|
||||
import tempfile
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
import serial.tools.list_ports
|
||||
import webbrowser
|
||||
import socket
|
||||
import time
|
||||
import signal
|
||||
|
||||
app = Flask(__name__)
|
||||
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # 100MB max file size
|
||||
app.config['SECRET_KEY'] = 'esp32-flasher-secret-key'
|
||||
socketio = SocketIO(app, cors_allowed_origins="*")
|
||||
|
||||
# Global state
|
||||
flash_status = {
|
||||
'running': False,
|
||||
'output': [],
|
||||
'success': None
|
||||
}
|
||||
|
||||
# Client connection tracking
|
||||
client_connected = False
|
||||
last_heartbeat = time.time()
|
||||
shutdown_timer = None
|
||||
|
||||
def get_free_port():
|
||||
"""Find a free port to run the server on"""
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.bind(('', 0))
|
||||
s.listen(1)
|
||||
port = s.getsockname()[1]
|
||||
return port
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
"""Serve the main page"""
|
||||
return render_template('index.html')
|
||||
|
||||
@app.route('/api/ports')
|
||||
def get_ports():
|
||||
"""Get list of available serial ports"""
|
||||
try:
|
||||
ports = [{'device': port.device, 'description': port.description}
|
||||
for port in serial.tools.list_ports.comports()]
|
||||
return jsonify({'success': True, 'ports': ports})
|
||||
except Exception as e:
|
||||
return jsonify({'success': False, 'error': str(e)})
|
||||
|
||||
@app.route('/api/flash', methods=['POST'])
|
||||
def flash_firmware():
|
||||
"""Flash firmware to ESP32"""
|
||||
global flash_status
|
||||
|
||||
if flash_status['running']:
|
||||
return jsonify({'success': False, 'error': 'Flash operation already in progress'})
|
||||
|
||||
# Get form data
|
||||
port = request.form.get('port')
|
||||
chip = request.form.get('chip', 'esp32')
|
||||
baud = request.form.get('baud', '460800')
|
||||
flash_mode = request.form.get('flash_mode', 'dio')
|
||||
flash_freq = request.form.get('flash_freq', '40m')
|
||||
flash_size = request.form.get('flash_size', '2MB')
|
||||
|
||||
# Get uploaded file
|
||||
if 'firmware' not in request.files:
|
||||
return jsonify({'success': False, 'error': 'No firmware file uploaded'})
|
||||
|
||||
firmware_file = request.files['firmware']
|
||||
if firmware_file.filename == '':
|
||||
return jsonify({'success': False, 'error': 'No firmware file selected'})
|
||||
|
||||
# Save and extract firmware
|
||||
try:
|
||||
# Create temp directory
|
||||
temp_dir = tempfile.mkdtemp(prefix="esp32_flash_")
|
||||
zip_path = os.path.join(temp_dir, 'firmware.zip')
|
||||
firmware_file.save(zip_path)
|
||||
|
||||
# Extract firmware
|
||||
with zipfile.ZipFile(zip_path, 'r') as zip_file:
|
||||
zip_file.extractall(temp_dir)
|
||||
|
||||
# Start flash in background thread
|
||||
flash_status = {'running': True, 'output': [], 'success': None}
|
||||
thread = threading.Thread(
|
||||
target=flash_worker,
|
||||
args=(temp_dir, port, chip, baud, flash_mode, flash_freq, flash_size)
|
||||
)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
return jsonify({'success': True, 'message': 'Flash started'})
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({'success': False, 'error': str(e)})
|
||||
|
||||
@app.route('/api/status')
|
||||
def get_status():
|
||||
"""Get current flash status"""
|
||||
return jsonify(flash_status)
|
||||
|
||||
@socketio.on('connect')
|
||||
def handle_connect():
|
||||
"""Handle client connection"""
|
||||
global client_connected, last_heartbeat, shutdown_timer
|
||||
client_connected = True
|
||||
last_heartbeat = time.time()
|
||||
|
||||
# Cancel any pending shutdown
|
||||
if shutdown_timer:
|
||||
shutdown_timer.cancel()
|
||||
shutdown_timer = None
|
||||
|
||||
print('Client connected')
|
||||
|
||||
@socketio.on('disconnect')
|
||||
def handle_disconnect():
|
||||
"""Handle client disconnection"""
|
||||
global client_connected, shutdown_timer
|
||||
client_connected = False
|
||||
print('Client disconnected - scheduling shutdown in 5 seconds...')
|
||||
|
||||
# Schedule shutdown after 5 seconds
|
||||
shutdown_timer = threading.Timer(5.0, shutdown_server)
|
||||
shutdown_timer.daemon = True
|
||||
shutdown_timer.start()
|
||||
|
||||
@socketio.on('heartbeat')
|
||||
def handle_heartbeat():
|
||||
"""Handle heartbeat from client"""
|
||||
global last_heartbeat
|
||||
last_heartbeat = time.time()
|
||||
emit('heartbeat_ack', {'timestamp': last_heartbeat})
|
||||
|
||||
def shutdown_server():
|
||||
"""Gracefully shutdown the server"""
|
||||
print('\nNo clients connected. Shutting down server...')
|
||||
|
||||
# Give a moment for any pending operations
|
||||
time.sleep(1)
|
||||
|
||||
# Shutdown Flask
|
||||
try:
|
||||
func = request.environ.get('werkzeug.server.shutdown')
|
||||
if func is None:
|
||||
# Alternative shutdown method
|
||||
os.kill(os.getpid(), signal.SIGTERM)
|
||||
else:
|
||||
func()
|
||||
except:
|
||||
# Force exit as last resort
|
||||
os._exit(0)
|
||||
|
||||
def flash_worker(temp_dir, port, chip, baud, flash_mode, flash_freq, flash_size):
|
||||
"""Background worker for flashing firmware"""
|
||||
global flash_status
|
||||
|
||||
try:
|
||||
# Define file paths
|
||||
bootloader = os.path.join(temp_dir, "bootloader.bin")
|
||||
firmware = os.path.join(temp_dir, "soundshot.bin")
|
||||
ota_initial = os.path.join(temp_dir, "ota_data_initial.bin")
|
||||
partition = os.path.join(temp_dir, "partition-table.bin")
|
||||
|
||||
# Verify files exist
|
||||
required_files = [bootloader, firmware, ota_initial, partition]
|
||||
for file_path in required_files:
|
||||
if not os.path.exists(file_path):
|
||||
flash_status['output'].append(f"ERROR: Missing file {os.path.basename(file_path)}")
|
||||
flash_status['success'] = False
|
||||
flash_status['running'] = False
|
||||
return
|
||||
|
||||
# Build esptool command
|
||||
cmd = [
|
||||
sys.executable, "-m", "esptool",
|
||||
"--chip", chip,
|
||||
"--port", port,
|
||||
"--baud", baud,
|
||||
"--before", "default_reset",
|
||||
"--after", "hard_reset",
|
||||
"write-flash",
|
||||
"--flash-mode", flash_mode,
|
||||
"--flash-freq", flash_freq,
|
||||
"--flash-size", flash_size,
|
||||
"0x1000", bootloader,
|
||||
"0x20000", firmware,
|
||||
"0x11000", ota_initial,
|
||||
"0x8000", partition
|
||||
]
|
||||
|
||||
flash_status['output'].append(f"Running: {' '.join(cmd)}")
|
||||
flash_status['output'].append("")
|
||||
|
||||
# Run esptool
|
||||
process = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
universal_newlines=True
|
||||
)
|
||||
|
||||
# Stream output
|
||||
for line in process.stdout:
|
||||
flash_status['output'].append(line.rstrip())
|
||||
|
||||
process.wait()
|
||||
|
||||
if process.returncode == 0:
|
||||
flash_status['output'].append("")
|
||||
flash_status['output'].append("✓ Flash completed successfully!")
|
||||
flash_status['success'] = True
|
||||
else:
|
||||
flash_status['output'].append("")
|
||||
flash_status['output'].append(f"✗ Flash failed with return code {process.returncode}")
|
||||
flash_status['success'] = False
|
||||
|
||||
except Exception as e:
|
||||
flash_status['output'].append(f"✗ Error: {str(e)}")
|
||||
flash_status['success'] = False
|
||||
finally:
|
||||
flash_status['running'] = False
|
||||
# Clean up temp directory
|
||||
try:
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
except:
|
||||
pass
|
||||
|
||||
def open_browser(port):
|
||||
"""Open browser to the application"""
|
||||
url = f'http://127.0.0.1:{port}'
|
||||
webbrowser.open(url)
|
||||
|
||||
def main():
|
||||
"""Run the web server"""
|
||||
port = get_free_port()
|
||||
print(f"Starting ESP32 Flasher on port {port}...")
|
||||
print(f"Open your browser to: http://127.0.0.1:{port}")
|
||||
print(f"Server will automatically shut down when browser is closed.\n")
|
||||
|
||||
# Open browser after short delay
|
||||
timer = threading.Timer(1.5, open_browser, args=[port])
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
|
||||
# Run Flask server with SocketIO
|
||||
try:
|
||||
socketio.run(app, host='127.0.0.1', port=port, debug=False, allow_unsafe_werkzeug=True)
|
||||
except KeyboardInterrupt:
|
||||
print('\nShutting down...')
|
||||
except SystemExit:
|
||||
print('Server stopped.')
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user