feat:로드셀 3개 연결 로직

- 라즈베리파이 -> 아두이노 구동을 위한 코드 변경
- 하드웨어 구성을 위한 기본 코드 재작성
This commit is contained in:
2026-01-15 16:49:09 +09:00
commit bc2da5dd7f
4 changed files with 446 additions and 0 deletions

286
measure.py Normal file
View File

@@ -0,0 +1,286 @@
#!/usr/bin/env python3
"""
LoadCell - Raspberry Pi Python library for HX711 load cell amplifier
기능 :
1. 무게 측정 (양수 값으로 출력)
2. 키보드 명령어: 'c' 영점조절, 'p' 일시정지, 'r' 재시작
3. 시작 시 자동 캘리브레이션
하드웨어 연결:
- HX711 [VCC] -> Pi 5 [Pin 1] (3.3V)
- HX711 [GND] -> Pi 5 [Pin 6] (GND)
- HX711 [DT] -> Pi 5 [Pin 29] (GPIO5)
- HX711 [SCK] -> Pi 5 [Pin 31] (GPIO6)
"""
from HX711 import SimpleHX711, Rate
import time
import sys
import select
import tty
import termios
# HX711 #1 핀 설정
DT_PIN = 5 # GPIO5 -> 29번 핀
SCK_PIN = 6 # GPIO6 -> 31번 핀
# HX711 #2 핀 설정
DT_PIN_2 = 17 # GPIO17 -> 11번 핀
SCK_PIN_2 = 27 # GPIO27 -> 13번 핀
# HX711 #3 핀 설정
DT_PIN_3 = 22 # GPIO22 -> 15번 핀
SCK_PIN_3 = 23 # GPIO23 -> 16번 핀
# 설정 변수
CALIBRATION_FACTOR_1 = -411.17 # 캘리브레이션 팩터 #1
CALIBRATION_FACTOR_2 = -409.85 # 캘리브레이션 팩터 #2
CALIBRATION_FACTOR_3 = -410.20 # 캘리브레이션 팩터 #3
measure_delay = 0.1 # 측정 간격 (초) - 기본 100ms
DELAY_STEP = 0.01 # 측정 간격 조절 단위 (10ms)
density = 1.0 # 기본값: 물 (1.0 g/ml)
# 상태 변수
is_paused = False
is_calibrated = False
start_time = 0
offset_value_1 = 0
offset_value_2 = 0
offset_value_3 = 0
# HX711 인스턴스
hx1 = None
hx2 = None
hx3 = None
def init_hx711():
"""HX711 초기화"""
global hx1, hx2, hx3
print("HX711 초기화 중...")
try:
hx1 = SimpleHX711(DT_PIN, SCK_PIN, 1, 0, Rate.HZ_10)
print("HX711 #1 연결 성공!")
hx2 = SimpleHX711(DT_PIN_2, SCK_PIN_2, 1, 0, Rate.HZ_10)
print("HX711 #2 연결 성공!")
hx3 = SimpleHX711(DT_PIN_3, SCK_PIN_3, 1, 0, Rate.HZ_10)
print("HX711 #3 연결 성공!")
return True
except Exception as e:
print(f"HX711 연결 실패: {e}")
print("배선 확인 필요")
return False
def perform_calibration():
"""캘리브레이션 수행"""
global is_calibrated, offset_value_1, offset_value_2, offset_value_3, start_time
if is_calibrated:
return
print()
print("-" * 50)
print(">> 최초 캘리브레이션 진행")
print("-" * 50)
print("로드셀 위에 아무것도 올리지 마세요!")
print("2초 후 영점 조정을 시작합니다...")
time.sleep(2)
print("영점 조정 중...")
readings_1 = []
readings_2 = []
readings_3 = []
for _ in range(10):
readings_1.append(hx1.read())
time.sleep(0.1)
offset_value_1 = sum(readings_1) / len(readings_1)
for _ in range(10):
readings_2.append(hx2.read())
time.sleep(0.1)
offset_value_2 = sum(readings_2) / len(readings_2)
for _ in range(10):
readings_3.append(hx3.read())
time.sleep(0.1)
offset_value_3 = sum(readings_3) / len(readings_3)
print(">> 캘리브레이션 완료!")
print(f"현재 오프셋 값: {offset_value_1:.2f}, {offset_value_2:.2f}, {offset_value_3:.2f}")
print()
is_calibrated = True
start_time = time.time()
def tare():
"""영점 조정 함수"""
global offset_value_1, offset_value_2, offset_value_3
print("\n>> 영점 조정 중...")
print(" 로드셀 위에 아무것도 올리지 마세요")
time.sleep(1)
readings_1 = []
for _ in range(10):
readings_1.append(hx1.read())
time.sleep(0.1)
offset_value_1 = sum(readings_1) / len(readings_1)
readings_2 = []
for _ in range(10):
readings_2.append(hx2.read())
time.sleep(0.1)
offset_value_2 = sum(readings_2) / len(readings_2)
readings_3 = []
for _ in range(10):
readings_3.append(hx3.read())
time.sleep(0.1)
offset_value_3 = sum(readings_3) / len(readings_3)
print(">> 영점 조정 완료!\n")
def get_weight():
"""무게 측정 함수"""
raw_value_1 = hx1.read()
raw_value_2 = hx2.read()
raw_value_3 = hx3.read()
weight_g_1 = (raw_value_1 - offset_value_1) / CALIBRATION_FACTOR_1
weight_g_2 = (raw_value_2 - offset_value_2) / CALIBRATION_FACTOR_2
weight_g_3 = (raw_value_3 - offset_value_3) / CALIBRATION_FACTOR_3
weight_avg = (weight_g_1 + weight_g_2 + weight_g_3) / 3
volume_ml = (weight_avg / density)*3
return weight_avg, volume_ml, weight_g_1, weight_g_2, weight_g_3
def print_help():
"""도움말 출력"""
print()
print("-" * 50)
print("키보드 명령어:")
print(" 'c' : 영점 조절 (Calibration)")
print(" 'p' : 일시정지 (Pause)")
print(" 'r' : 재시작 (Resume)")
print(" 'h' : 도움말 보기")
print(" 'q' : 종료 (Quit)")
print("-" * 50)
print()
def check_input():
"""비동기 키보드 입력 확인"""
if select.select([sys.stdin], [], [], 0)[0]:
return sys.stdin.read(1)
return None
def process_command(cmd):
"""명령어 처리"""
global is_paused
cmd = cmd.lower()
if cmd == 'c':
tare()
elif cmd == 'p':
if not is_paused:
is_paused = True
print("\n>> 측정 일시정지됨")
print(" 'r'을 입력하면 다시 시작합니다.\n")
else:
print("\n>> 이미 일시정지 상태입니다.\n")
elif cmd == 'r':
if is_paused:
is_paused = False
print("\n>> 측정 재시작!\n")
else:
print("\n>> 이미 측정 중입니다.\n")
elif cmd == 'h':
print_help()
elif cmd == 'q':
print("\n>> 프로그램 종료\n")
return False
elif cmd not in ['\n', '\r', ' ']:
print(f">> 알 수 없는 명령어: {cmd}")
print(" 'h'를 입력하면 도움말을 볼 수 있습니다.")
return True
def set_paused(value):
global is_paused
is_paused = value
def set_density(value):
global density
density = value
def main():
global start_time
if not init_hx711():
return
perform_calibration()
print_help()
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno())
print("측정 시작 (종료하려면 'q' 입력)...\n")
while True:
cmd = check_input()
if cmd:
if not process_command(cmd):
break
if is_paused:
time.sleep(0.1)
continue
weight_avg, volume_ml, weight_g_1, weight_g_2, weight_g_3 = get_weight()
elapsed_time = time.time() - start_time
# JSON 형식으로 출력
print(f'{{"volume_ml":{volume_ml:.1f},"w1":{weight_g_1:.1f},"w2":{weight_g_2:.1f},"w3":{weight_g_3:.1f},"time":{elapsed_time:.2f}}}')
print(f"─────────────────────")
print(f" volume_ml : {volume_ml:.1f} mL")
print(f" w1 : {weight_g_1:.1f} g")
print(f" w2 : {weight_g_2:.1f} g")
print(f" w3 : {weight_g_3:.1f} g")
print(f" time : {elapsed_time:.2f} s")
time.sleep(measure_delay)
except KeyboardInterrupt:
print("\n>> 프로그램 종료\n")
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
print("프로그램 종료됨.")
if __name__ == "__main__":
main()