#!/usr/bin/env python3 """ LoadCell - Raspberry Pi Python library for HX711 load cell amplifier 기능 : 1. 무게 측정 (양수 값으로 출력) 2. 키보드 명령어: 'c' 영점조절, 'p' 일시정지, 'r' 재시작 3. 시작 시 자동 캘리브레이션 하드웨어 연결: - HX711 [VCC] -> Pi 5 [Pin 1] (3.3V) - HX711 [GND] -> Pi 5 [Pin 6] (GND) - HX711 [DT] -> Pi 5 [Pin 29] (GPIO5) - HX711 [SCK] -> Pi 5 [Pin 31] (GPIO6) """ from HX711 import SimpleHX711, Rate import time import sys import select import tty import termios # HX711 #1 핀 설정 DT_PIN = 5 # GPIO5 -> 29번 핀 SCK_PIN = 6 # GPIO6 -> 31번 핀 # HX711 #2 핀 설정 DT_PIN_2 = 17 # GPIO17 -> 11번 핀 SCK_PIN_2 = 27 # GPIO27 -> 13번 핀 # HX711 #3 핀 설정 DT_PIN_3 = 22 # GPIO22 -> 15번 핀 SCK_PIN_3 = 23 # GPIO23 -> 16번 핀 # 설정 변수 CALIBRATION_FACTOR_1 = -411.17 # 캘리브레이션 팩터 #1 CALIBRATION_FACTOR_2 = -409.85 # 캘리브레이션 팩터 #2 CALIBRATION_FACTOR_3 = -410.20 # 캘리브레이션 팩터 #3 measure_delay = 0.1 # 측정 간격 (초) - 기본 100ms DELAY_STEP = 0.01 # 측정 간격 조절 단위 (10ms) density = 1.0 # 기본값: 물 (1.0 g/ml) # 상태 변수 is_paused = False is_calibrated = False start_time = 0 offset_value_1 = 0 offset_value_2 = 0 offset_value_3 = 0 # HX711 인스턴스 hx1 = None hx2 = None hx3 = None def init_hx711(): """HX711 초기화""" global hx1, hx2, hx3 print("HX711 초기화 중...") try: hx1 = SimpleHX711(DT_PIN, SCK_PIN, 1, 0, Rate.HZ_10) print("HX711 #1 연결 성공!") hx2 = SimpleHX711(DT_PIN_2, SCK_PIN_2, 1, 0, Rate.HZ_10) print("HX711 #2 연결 성공!") hx3 = SimpleHX711(DT_PIN_3, SCK_PIN_3, 1, 0, Rate.HZ_10) print("HX711 #3 연결 성공!") return True except Exception as e: print(f"HX711 연결 실패: {e}") print("배선 확인 필요") return False def perform_calibration(): """캘리브레이션 수행""" global is_calibrated, offset_value_1, offset_value_2, offset_value_3, start_time if is_calibrated: return print() print("-" * 50) print(">> 최초 캘리브레이션 진행") print("-" * 50) print("로드셀 위에 아무것도 올리지 마세요!") print("2초 후 영점 조정을 시작합니다...") time.sleep(2) print("영점 조정 중...") readings_1 = [] readings_2 = [] readings_3 = [] for _ in range(10): readings_1.append(hx1.read()) time.sleep(0.1) offset_value_1 = sum(readings_1) / len(readings_1) for _ in range(10): readings_2.append(hx2.read()) time.sleep(0.1) offset_value_2 = sum(readings_2) / len(readings_2) for _ in range(10): readings_3.append(hx3.read()) time.sleep(0.1) offset_value_3 = sum(readings_3) / len(readings_3) print(">> 캘리브레이션 완료!") print(f"현재 오프셋 값: {offset_value_1:.2f}, {offset_value_2:.2f}, {offset_value_3:.2f}") print() is_calibrated = True start_time = time.time() def tare(): """영점 조정 함수""" global offset_value_1, offset_value_2, offset_value_3 print("\n>> 영점 조정 중...") print(" 로드셀 위에 아무것도 올리지 마세요") time.sleep(1) readings_1 = [] for _ in range(10): readings_1.append(hx1.read()) time.sleep(0.1) offset_value_1 = sum(readings_1) / len(readings_1) readings_2 = [] for _ in range(10): readings_2.append(hx2.read()) time.sleep(0.1) offset_value_2 = sum(readings_2) / len(readings_2) readings_3 = [] for _ in range(10): readings_3.append(hx3.read()) time.sleep(0.1) offset_value_3 = sum(readings_3) / len(readings_3) print(">> 영점 조정 완료!\n") def get_weight(): """무게 측정 함수""" raw_value_1 = hx1.read() raw_value_2 = hx2.read() raw_value_3 = hx3.read() weight_g_1 = (raw_value_1 - offset_value_1) / CALIBRATION_FACTOR_1 weight_g_2 = (raw_value_2 - offset_value_2) / CALIBRATION_FACTOR_2 weight_g_3 = (raw_value_3 - offset_value_3) / CALIBRATION_FACTOR_3 weight_avg = (weight_g_1 + weight_g_2 + weight_g_3) / 3 volume_ml = (weight_avg / density)*3 return weight_avg, volume_ml, weight_g_1, weight_g_2, weight_g_3 def print_help(): """도움말 출력""" print() print("-" * 50) print("키보드 명령어:") print(" 'c' : 영점 조절 (Calibration)") print(" 'p' : 일시정지 (Pause)") print(" 'r' : 재시작 (Resume)") print(" 'h' : 도움말 보기") print(" 'q' : 종료 (Quit)") print("-" * 50) print() def check_input(): """비동기 키보드 입력 확인""" if select.select([sys.stdin], [], [], 0)[0]: return sys.stdin.read(1) return None def process_command(cmd): """명령어 처리""" global is_paused cmd = cmd.lower() if cmd == 'c': tare() elif cmd == 'p': if not is_paused: is_paused = True print("\n>> 측정 일시정지됨") print(" 'r'을 입력하면 다시 시작합니다.\n") else: print("\n>> 이미 일시정지 상태입니다.\n") elif cmd == 'r': if is_paused: is_paused = False print("\n>> 측정 재시작!\n") else: print("\n>> 이미 측정 중입니다.\n") elif cmd == 'h': print_help() elif cmd == 'q': print("\n>> 프로그램 종료\n") return False elif cmd not in ['\n', '\r', ' ']: print(f">> 알 수 없는 명령어: {cmd}") print(" 'h'를 입력하면 도움말을 볼 수 있습니다.") return True def set_paused(value): global is_paused is_paused = value def set_density(value): global density density = value def main(): global start_time if not init_hx711(): return perform_calibration() print_help() old_settings = termios.tcgetattr(sys.stdin) try: tty.setcbreak(sys.stdin.fileno()) print("측정 시작 (종료하려면 'q' 입력)...\n") while True: cmd = check_input() if cmd: if not process_command(cmd): break if is_paused: time.sleep(0.1) continue weight_avg, volume_ml, weight_g_1, weight_g_2, weight_g_3 = get_weight() elapsed_time = time.time() - start_time # JSON 형식으로 출력 print(f'{{"volume_ml":{volume_ml:.1f},"w1":{weight_g_1:.1f},"w2":{weight_g_2:.1f},"w3":{weight_g_3:.1f},"time":{elapsed_time:.2f}}}') print(f"─────────────────────") print(f" volume_ml : {volume_ml:.1f} mL") print(f" w1 : {weight_g_1:.1f} g") print(f" w2 : {weight_g_2:.1f} g") print(f" w3 : {weight_g_3:.1f} g") print(f" time : {elapsed_time:.2f} s") time.sleep(measure_delay) except KeyboardInterrupt: print("\n>> 프로그램 종료\n") finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) print("프로그램 종료됨.") if __name__ == "__main__": main()