Files
2026-01-02 15:46:19 +09:00

181 lines
6.5 KiB
Python

"""
LoadCell Weight Monitor - 실시간 무게 측정 웹 GUI
Arduino에서 시리얼로 JSON 데이터를 받아 웹 브라우저에 실시간 그래프로 표시
"""
from flask import Flask, render_template
from flask_socketio import SocketIO
import serial
import serial.tools.list_ports
import json
import threading
import time
import logging
# Werkzeug HTTP 요청 로그 비활성화 (Socket.IO polling 로그 숨김)
logging.getLogger('werkzeug').setLevel(logging.ERROR)
app = Flask(__name__)
app.config['SECRET_KEY'] = 'loadcell_secret_2025'
socketio = SocketIO(app, cors_allowed_origins="*")
# 전역 변수
serial_port = None
serial_thread = None
is_running = False
# ===== Arduino 포트 자동 검색 =====
def find_arduino_port():
"""사용 가능한 Arduino 포트를 자동으로 찾습니다"""
ports = serial.tools.list_ports.comports()
for port in ports:
# Arduino가 연결된 포트 찾기 (USB Serial, CH340, Arduino 등)
if 'USB' in port.description or 'Arduino' in port.description or 'CH340' in port.description:
return port.device
# 못 찾으면 사용 가능한 포트 목록 출력
if ports:
print("\n사용 가능한 포트:")
for port in ports:
print(f" - {port.device}: {port.description}")
return ports[0].device # 첫 번째 포트 반환
return None
# ===== 시리얼 데이터 읽기 스레드 =====
def read_serial_data():
"""Arduino로부터 시리얼 데이터를 읽어 WebSocket으로 전송"""
global is_running, serial_port
error_count = 0
max_errors = 5 # 연속 에러 허용 횟수
while is_running:
try:
if serial_port and serial_port.is_open:
# in_waiting 호출을 안전하게 처리
try:
waiting = serial_port.in_waiting
except Exception:
waiting = 0
if waiting > 0:
line = serial_port.readline().decode('utf-8', errors='ignore').strip()
# JSON 데이터 파싱
if line.startswith('{') and line.endswith('}'):
try:
data = json.loads(line)
socketio.emit('weight_data', data)
print(f"Data: {data}")
error_count = 0 # 성공 시 에러 카운트 리셋
except json.JSONDecodeError:
print(f"Invalid JSON: {line}")
elif line: # 빈 줄이 아니면
socketio.emit('log_message', {'message': line})
print(f"Log: {line}")
time.sleep(0.02) # CPU 사용률 낮추기
except Exception as e:
error_count += 1
print(f"Serial error ({error_count}/{max_errors}): {e}")
if error_count >= max_errors:
print("연속 에러 발생. 포트 재연결 시도...")
socketio.emit('log_message', {'message': '시리얼 재연결 시도 중...'})
# 포트 재연결 시도
try:
if serial_port:
serial_port.close()
time.sleep(2)
port = find_arduino_port()
if port:
serial_port = serial.Serial(port, 9600, timeout=1)
time.sleep(10)
print(f"포트 재연결 성공: {port}")
socketio.emit('log_message', {'message': f'재연결 성공: {port}'})
else:
print("Arduino 포트를 찾을 수 없습니다.")
socketio.emit('log_message', {'message': 'Arduino 연결 실패'})
except Exception as reconnect_error:
print(f"재연결 실패: {reconnect_error}")
error_count = 0 # 재연결 시도 후 카운트 리셋
time.sleep(1)
# ===== Flask 라우트 =====
@app.route('/')
def loadcell_measure():
"""메인 페이지"""
return render_template('loadcell_measure.html')
# ===== SocketIO 이벤트 =====
@socketio.on('connect')
def handle_connect():
"""클라이언트 연결"""
print('Client connected')
socketio.emit('log_message', {'message': 'Connected to server'})
@socketio.on('disconnect')
def handle_disconnect():
"""클라이언트 연결 해제"""
print('Client disconnected')
@socketio.on('send_command')
def handle_command(data):
"""클라이언트에서 Arduino로 명령 전송"""
global serial_port
command = data.get('command', '')
if serial_port and serial_port.is_open:
try:
serial_port.write(command.encode('utf-8'))
print(f"Sent command: {command}")
socketio.emit('log_message', {'message': f'Command sent: {command}'})
except Exception as e:
print(f"Error sending command: {e}")
socketio.emit('log_message', {'message': f'Error: {e}'})
else:
socketio.emit('log_message', {'message': 'Serial port not connected'})
# ===== 메인 실행 =====
if __name__ == '__main__':
# Arduino 포트 찾기
port = find_arduino_port()
if port:
print(f"\nArduino 포트 발견: {port}")
try:
# 시리얼 포트 열기
serial_port = serial.Serial(port, 9600, timeout=1)
time.sleep(5) # Arduino 리셋 대기
print("시리얼 포트 연결 성공!")
# 시리얼 읽기 스레드 시작
is_running = True
serial_thread = threading.Thread(target=read_serial_data, daemon=True)
serial_thread.start()
# Flask 서버 시작
print("\n=================================")
print("웹 서버 시작: http://localhost:5000")
print("=================================\n")
socketio.run(app, host='0.0.0.0', port=5000, debug=False)
except serial.SerialException as e:
print(f"시리얼 포트 오류: {e}")
print("Arduino가 제대로 연결되어 있는지 확인하세요.")
except KeyboardInterrupt:
print("\n서버 종료 중...")
finally:
is_running = False
if serial_port and serial_port.is_open:
serial_port.close()
print("서버 종료 완료")
else:
print("Arduino 포트를 찾을 수 없습니다!")
print("Arduino가 연결되어 있는지 확인하세요.")