#!/usr/bin/env python3 """ 메인 실행 파일 - 백그라운드 스레드: 센서 데이터 측정 - 메인 스레드(비동기): FastAPI + Socket.IO 서버 실행 """ import asyncio import threading import time import rapimeasure import apiserver import uvicorn # 측정 데이터를 저장할 공유 dict 기본값 shared_data = {"volume_ml": 0 , "time": 0 , "status": "Initializing" } # 이벤트 루프 참조 (Socket.IO emit용) main_loop = None def measurement_loop(): """ 백그라운드 스레드에서 실행되는 측정 무한 루프 센서 데이터를 읽고 Socket.IO로 브로드캐스트 """ global main_loop print("[measure Thread] start") # 센서 초기화 rapimeasure.init_hx711() shared_data["status"] = "Measuring" measurement_count = 0 while True: try: # 일시정지 체크 if rapimeasure.is_paused: time.sleep(0.1) continue # 센서 데이터 읽기 _, volume_ml = rapimeasure.get_weight() # 공유 데이터 업데이트 shared_data["volume_ml"] = round(volume_ml, 2) shared_data["time"] = measurement_count # Socket.IO로 실시간 브로드캐스트 if main_loop is not None: try: # Thread-safe하게 비동기 emit 함수 호출 asyncio.run_coroutine_threadsafe( apiserver.emit_weight_data(shared_data.copy()), main_loop ) except Exception: pass # emit 실패 시 무시 (연결 종료 등) measurement_count += 1 # 200ms 간격으로 측정 (5Hz) time.sleep(0.2) except Exception as e: print(f"[measure error] {e}") shared_data["status"] = f"error: {e}" time.sleep(1) async def main(): """비동기 메인 함수""" global main_loop main_loop = asyncio.get_running_loop() print("=" * 50) print(" LoadCell Dashboard + Socket.IO") print(" access address:") print(" - local: http://localhost:8080") print(" - network: http://192.168.0.191:8080") print(" Swagger document: http://localhost:8080/docs") print("=" * 50) # 1. 공유 데이터를 API 서버에 연결 apiserver.set_shared_data(shared_data) # 2. 측정 스레드 시작 measure_thread = threading.Thread(target=measurement_loop, daemon=True) measure_thread.start() print("[main] measure thread started") print("[main] FastAPI + Socket.IO server starting...") # 3. uvicorn 서버 실행 config = uvicorn.Config( apiserver.app, host="0.0.0.0", port=8080, log_level="warning", # info -> warning으로 줄여서 로그 깔끔하게 ) server = uvicorn.Server(config) await server.serve() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nProgram terminated.")