""" LoadCell Weight Monitor - 실시간 무게 측정 웹 GUI Arduino에서 시리얼로 JSON 데이터를 받아 웹 브라우저에 실시간 그래프로 표시 """ from flask import Flask, render_template from flask_socketio import SocketIO import serial import serial.tools.list_ports import json import threading import time import logging # Werkzeug HTTP 요청 로그 비활성화 (Socket.IO polling 로그 숨김) logging.getLogger('werkzeug').setLevel(logging.ERROR) app = Flask(__name__) app.config['SECRET_KEY'] = 'loadcell_secret_2025' socketio = SocketIO(app, cors_allowed_origins="*") # 전역 변수 serial_port = None serial_thread = None is_running = False # ===== Arduino 포트 자동 검색 ===== def find_arduino_port(): """사용 가능한 Arduino 포트를 자동으로 찾습니다""" ports = serial.tools.list_ports.comports() for port in ports: # Arduino가 연결된 포트 찾기 (USB Serial, CH340, Arduino 등) if 'USB' in port.description or 'Arduino' in port.description or 'CH340' in port.description: return port.device # 못 찾으면 사용 가능한 포트 목록 출력 if ports: print("\n사용 가능한 포트:") for port in ports: print(f" - {port.device}: {port.description}") return ports[0].device # 첫 번째 포트 반환 return None # ===== 시리얼 데이터 읽기 스레드 ===== def read_serial_data(): """Arduino로부터 시리얼 데이터를 읽어 WebSocket으로 전송""" global is_running, serial_port error_count = 0 max_errors = 5 # 연속 에러 허용 횟수 while is_running: try: if serial_port and serial_port.is_open: # in_waiting 호출을 안전하게 처리 try: waiting = serial_port.in_waiting except Exception: waiting = 0 if waiting > 0: line = serial_port.readline().decode('utf-8', errors='ignore').strip() # JSON 데이터 파싱 if line.startswith('{') and line.endswith('}'): try: data = json.loads(line) socketio.emit('weight_data', data) print(f"Data: {data}") error_count = 0 # 성공 시 에러 카운트 리셋 except json.JSONDecodeError: print(f"Invalid JSON: {line}") elif line: # 빈 줄이 아니면 socketio.emit('log_message', {'message': line}) print(f"Log: {line}") time.sleep(0.02) # CPU 사용률 낮추기 except Exception as e: error_count += 1 print(f"Serial error ({error_count}/{max_errors}): {e}") if error_count >= max_errors: print("연속 에러 발생. 포트 재연결 시도...") socketio.emit('log_message', {'message': '시리얼 재연결 시도 중...'}) # 포트 재연결 시도 try: if serial_port: serial_port.close() time.sleep(2) port = find_arduino_port() if port: serial_port = serial.Serial(port, 9600, timeout=1) time.sleep(10) print(f"포트 재연결 성공: {port}") socketio.emit('log_message', {'message': f'재연결 성공: {port}'}) else: print("Arduino 포트를 찾을 수 없습니다.") socketio.emit('log_message', {'message': 'Arduino 연결 실패'}) except Exception as reconnect_error: print(f"재연결 실패: {reconnect_error}") error_count = 0 # 재연결 시도 후 카운트 리셋 time.sleep(1) # ===== Flask 라우트 ===== @app.route('/') def loadcell_measure(): """메인 페이지""" return render_template('loadcell_measure.html') # ===== SocketIO 이벤트 ===== @socketio.on('connect') def handle_connect(): """클라이언트 연결""" print('Client connected') socketio.emit('log_message', {'message': 'Connected to server'}) @socketio.on('disconnect') def handle_disconnect(): """클라이언트 연결 해제""" print('Client disconnected') @socketio.on('send_command') def handle_command(data): """클라이언트에서 Arduino로 명령 전송""" global serial_port command = data.get('command', '') if serial_port and serial_port.is_open: try: serial_port.write(command.encode('utf-8')) print(f"Sent command: {command}") socketio.emit('log_message', {'message': f'Command sent: {command}'}) except Exception as e: print(f"Error sending command: {e}") socketio.emit('log_message', {'message': f'Error: {e}'}) else: socketio.emit('log_message', {'message': 'Serial port not connected'}) # ===== 메인 실행 ===== if __name__ == '__main__': # Arduino 포트 찾기 port = find_arduino_port() if port: print(f"\nArduino 포트 발견: {port}") try: # 시리얼 포트 열기 serial_port = serial.Serial(port, 9600, timeout=1) time.sleep(5) # Arduino 리셋 대기 print("시리얼 포트 연결 성공!") # 시리얼 읽기 스레드 시작 is_running = True serial_thread = threading.Thread(target=read_serial_data, daemon=True) serial_thread.start() # Flask 서버 시작 print("\n=================================") print("웹 서버 시작: http://localhost:5000") print("=================================\n") socketio.run(app, host='0.0.0.0', port=5000, debug=False) except serial.SerialException as e: print(f"시리얼 포트 오류: {e}") print("Arduino가 제대로 연결되어 있는지 확인하세요.") except KeyboardInterrupt: print("\n서버 종료 중...") finally: is_running = False if serial_port and serial_port.is_open: serial_port.close() print("서버 종료 완료") else: print("Arduino 포트를 찾을 수 없습니다!") print("Arduino가 연결되어 있는지 확인하세요.")