Files
rapi/apiserver.py
2026-01-14 14:05:24 +09:00

154 lines
4.2 KiB
Python

#!/usr/bin/env python3
"""
FASTAPI + Socket.IO 서버 - LoadCell 실시간 데이터 중계
센서 데이터를 웹에 실시간 전달 (Socket.IO)
"""
import os
import socketio
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
# ===== Socket.IO 서버 생성 =====
sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")
# ===== FastAPI 인스턴스 생성 =====
fastapi_app = FastAPI(
title="LoadCell API Server",
description="HX711 로드셀 측정 데이터 API + Socket.IO",
version="2.0.0",
)
# Socket.IO ASGI 앱 (FastAPI와 결합)
app = socketio.ASGIApp(sio, fastapi_app)
# 정적 파일 서빙
if os.path.exists("static"):
fastapi_app.mount("/static", StaticFiles(directory="static"), name="static")
# 공유 데이터 (main.py에서 set_shared_data()로 설정)
shared_data = None
def set_shared_data(data: dict):
"""main.py에서 호출하여 측정 데이터 연결"""
global shared_data
shared_data = data
# ===== 요청 모델 정의 =====
class CommandRequest(BaseModel):
"""명령어 요청 모델"""
command: str # 'tare', 'pause', 'resume'
# ===== FastAPI 라우트 =====
@fastapi_app.get("/")
def index():
"""메인 페이지"""
return FileResponse("web.html")
@fastapi_app.get("/uro/data")
def get_data():
"""REST API로 현재 데이터 조회 (폴백용)"""
if shared_data:
return shared_data
return {"volume_ml": 0, "time": 0, "status": "대기 중"}
@fastapi_app.post("/uro/command")
def handle_command(req: CommandRequest):
"""REST API로 명령어 처리"""
import rapimeasure
return process_command(req.command)
def process_command(command: str):
"""명령어 처리 공통 함수"""
import rapimeasure
if command == "tare":
rapimeasure.tare()
return {"success": True, "message": "영점 조정 완료"}
elif command == "pause":
rapimeasure.set_paused(True)
if shared_data:
shared_data["status"] = "일시정지"
return {"success": True, "message": "측정 일시 정지"}
elif command == "resume":
rapimeasure.set_paused(False)
if shared_data:
shared_data["status"] = "측정 중"
return {"success": True, "message": "측정 재시작"}
return {"success": False, "message": f"알 수 없는 명령: {command}"}
# ===== Socket.IO 이벤트 핸들러 =====
@sio.event
async def connect(sid, environ):
"""클라이언트 연결"""
print(f"[Socket.IO] Client connected: {sid}")
await sio.emit("log_message", {"message": "서버에 연결되었습니다."}, to=sid)
@sio.event
async def disconnect(sid):
"""클라이언트 연결 해제"""
print(f"[Socket.IO] Client disconnected: {sid}")
@sio.event
async def send_command(sid, data):
"""클라이언트에서 명령어 수신"""
command = data.get("command", "")
print(f"[Socket.IO] Command from {sid}: {command}")
result = process_command(command)
await sio.emit("command_result", result, to=sid)
@sio.event
async def set_density(sid, data):
"""밀도 설정"""
import rapimeasure
density = data.get("density", 1.0)
rapimeasure.set_density(density)
print(f"[Socket.IO] Density set to: {density}")
await sio.emit("log_message", {"message": f"밀도 설정: {density} g/mL"}, to=sid)
@sio.event
async def reset_measurement(sid):
"""측정 초기화"""
print(f"[Socket.IO] Reset measurement from {sid}")
await sio.emit("log_message", {"message": "측정 데이터 초기화"}, to=sid)
# ===== 외부에서 호출할 emit 함수 =====
async def emit_weight_data(data: dict):
"""측정 데이터를 모든 클라이언트에게 브로드캐스트"""
# 디버깅: 데이터 전송 로그 출력 (너무 많으면 주석 처리)
# print(f"[Socket.IO] Emitting data: {data['weight_g']}g")
await sio.emit("weight_data", data)
# ===== 직접 실행 (테스트용) =====
if __name__ == "__main__":
import uvicorn
# 더미 데이터
shared_data = {"volume_ml": 0, "time": 0, "status": "테스트 모드"}
print("API Server Test Mode")
uvicorn.run(app, host="0.0.0.0", port=8080)