Files
rapi/main.py
2026-01-14 14:05:24 +09:00

113 lines
3.0 KiB
Python

#!/usr/bin/env python3
"""
메인 실행 파일
- 백그라운드 스레드: 센서 데이터 측정
- 메인 스레드(비동기): FastAPI + Socket.IO 서버 실행
"""
import asyncio
import threading
import time
import rapimeasure
import apiserver
import uvicorn
# 측정 데이터를 저장할 공유 dict 기본값
shared_data = {"volume_ml": 0
, "time": 0
, "status": "Initializing"
}
# 이벤트 루프 참조 (Socket.IO emit용)
main_loop = None
def measurement_loop():
"""
백그라운드 스레드에서 실행되는 측정 무한 루프
센서 데이터를 읽고 Socket.IO로 브로드캐스트
"""
global main_loop
print("[measure Thread] start")
# 센서 초기화
rapimeasure.init_hx711()
shared_data["status"] = "Measuring"
measurement_count = 0
while True:
try:
# 일시정지 체크
if rapimeasure.is_paused:
time.sleep(0.1)
continue
# 센서 데이터 읽기
_, volume_ml = rapimeasure.get_weight()
# 공유 데이터 업데이트
shared_data["volume_ml"] = round(volume_ml, 2)
shared_data["time"] = measurement_count
# Socket.IO로 실시간 브로드캐스트
if main_loop is not None:
try:
# Thread-safe하게 비동기 emit 함수 호출
asyncio.run_coroutine_threadsafe(
apiserver.emit_weight_data(shared_data.copy()), main_loop
)
except Exception:
pass # emit 실패 시 무시 (연결 종료 등)
measurement_count += 1
# 200ms 간격으로 측정 (5Hz)
time.sleep(0.2)
except Exception as e:
print(f"[measure error] {e}")
shared_data["status"] = f"error: {e}"
time.sleep(1)
async def main():
"""비동기 메인 함수"""
global main_loop
main_loop = asyncio.get_running_loop()
print("=" * 50)
print(" LoadCell Dashboard + Socket.IO")
print(" access address:")
print(" - local: http://localhost:8080")
print(" - network: http://192.168.0.191:8080")
print(" Swagger document: http://localhost:8080/docs")
print("=" * 50)
# 1. 공유 데이터를 API 서버에 연결
apiserver.set_shared_data(shared_data)
# 2. 측정 스레드 시작
measure_thread = threading.Thread(target=measurement_loop, daemon=True)
measure_thread.start()
print("[main] measure thread started")
print("[main] FastAPI + Socket.IO server starting...")
# 3. uvicorn 서버 실행
config = uvicorn.Config(
apiserver.app,
host="0.0.0.0",
port=8080,
log_level="warning", # info -> warning으로 줄여서 로그 깔끔하게
)
server = uvicorn.Server(config)
await server.serve()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nProgram terminated.")