beagleboneblack embedded linux iot arm debian
Getting Started with BeagleBone Black: Embedded Linux
Complete guide to getting started with BeagleBone Black. Learn Linux embedded development, GPIO programming, and building IoT projects.
Por Jesus Velez
Getting Started with BeagleBone Black: Embedded Linux
El BeagleBone Black es una computadora de placa única potente que combina la flexibilidad de Linux con capacidades de tiempo real para proyectos embedded. Esta guía te llevará desde la configuración inicial hasta proyectos avanzados.
¿Qué es BeagleBone Black?
Especificaciones Técnicas
- Procesador: ARM Cortex-A8 1GHz
- RAM: 512MB DDR3
- Storage: 4GB eMMC + microSD slot
- GPIO: 65 pines digitales
- Analog: 7 entradas ADC de 12-bit
- Comunicación: UART, I2C, SPI, CAN
- Conectividad: Ethernet, USB Host/Device
- OS: Debian Linux personalizado
Ventajas sobre Raspberry Pi
BeagleBone Black:
✅ Más GPIOs (65 vs 40)
✅ ADC integrado (7 canales)
✅ Tiempo real con PRU
✅ CAN bus nativo
✅ Más robusto industrialmente
Raspberry Pi:
✅ Mayor comunidad
✅ Mejor multimedia
✅ Más RAM (modelos nuevos)
✅ WiFi/Bluetooth integrado
Configuración Inicial
1. Preparación de la MicroSD
# Descargar imagen oficial de Debian
wget https://debian.beagleboard.org/images/bone-debian-10.3-iot-armhf-2020-04-06-4gb.img.xz
# Descomprimir imagen
xz -d bone-debian-*.img.xz
# Flash a microSD (macOS/Linux)
sudo dd if=bone-debian-*.img of=/dev/diskX bs=1m
# Verificar integridad
sudo sync
2. Primera Conexión
# Conexión USB (aparece como dispositivo de red)
ssh debian@192.168.7.2
# Password por defecto: temppwd
# Conexión Ethernet (encontrar IP con nmap)
nmap -sn 192.168.1.0/24
ssh debian@[IP_ENCONTRADA]
# Cambiar password inmediatamente
passwd
# Actualizar sistema
sudo apt update && sudo apt upgrade -y
3. Configuración de Red WiFi (con adaptador USB)
# Instalar network-manager
sudo apt install network-manager
# Configurar WiFi
sudo nmcli device wifi list
sudo nmcli device wifi connect "YourWiFiName" password "YourPassword"
# Verificar conexión
ip addr show
ping google.com
Configuración del Entorno de Desarrollo
1. Instalación de Herramientas
# Herramientas de desarrollo
sudo apt install -y \
build-essential \
git \
python3-pip \
python3-dev \
nodejs \
npm \
vim \
htop \
tree
# Librerías para GPIO
sudo apt install -y \
python3-adafruit-gpio \
python3-adafruit-platformdetect \
python3-adafruit-circuitpython-busdevice
# Instalar Adafruit BeagleBone IO Python library
sudo pip3 install Adafruit-BBIO
2. Configuración de Device Tree
# Ver overlays disponibles
ls /opt/sources/bb.org-overlays/src/arm/
# Habilitar I2C, SPI, UART
sudo nano /boot/uEnv.txt
# Agregar líneas:
disable_uboot_overlay_emmc=1
disable_uboot_overlay_video=1
enable_uboot_overlays=1
enable_uboot_overlay_addr0=/lib/firmware/BB-I2C1-00A0.dtbo
enable_uboot_overlay_addr1=/lib/firmware/BB-SPI0-01-00A0.dtbo
enable_uboot_overlay_addr2=/lib/firmware/BB-UART1-00A0.dtbo
# Reiniciar para aplicar cambios
sudo reboot
GPIO Programming
1. Control Básico de LEDs
#!/usr/bin/env python3
# led_control.py - Control básico de LEDs
import Adafruit_BBIO.GPIO as GPIO
import time
import signal
import sys
# Definir pines
LED_PINS = ["P9_12", "P9_14", "P9_16", "P9_18"]
BUTTON_PIN = "P9_24"
def signal_handler(sig, frame):
"""Cleanup al recibir Ctrl+C"""
print("\nLimpiando GPIO...")
GPIO.cleanup()
sys.exit(0)
def setup_gpio():
"""Configurar pines GPIO"""
# Configurar LEDs como salida
for pin in LED_PINS:
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, GPIO.LOW)
# Configurar botón como entrada con pull-up
GPIO.setup(BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
print("GPIO configurado correctamente")
def knight_rider_effect():
"""Efecto Knight Rider con los LEDs"""
delay = 0.1
# Ida
for i in range(len(LED_PINS)):
GPIO.output(LED_PINS[i], GPIO.HIGH)
time.sleep(delay)
GPIO.output(LED_PINS[i], GPIO.LOW)
# Vuelta
for i in range(len(LED_PINS)-2, 0, -1):
GPIO.output(LED_PINS[i], GPIO.HIGH)
time.sleep(delay)
GPIO.output(LED_PINS[i], GPIO.LOW)
def button_controlled_leds():
"""Control de LEDs con botón"""
led_state = 0
last_button_state = GPIO.HIGH
while True:
current_button_state = GPIO.input(BUTTON_PIN)
# Detectar presión del botón (flanco descendente)
if last_button_state == GPIO.HIGH and current_button_state == GPIO.LOW:
# Apagar todos los LEDs
for pin in LED_PINS:
GPIO.output(pin, GPIO.LOW)
# Encender LED actual
if led_state < len(LED_PINS):
GPIO.output(LED_PINS[led_state], GPIO.HIGH)
led_state = (led_state + 1) % (len(LED_PINS) + 1)
print(f"Estado LED: {led_state}")
time.sleep(0.2) # Debounce
last_button_state = current_button_state
time.sleep(0.01)
def main():
print("BeagleBone Black - Control de LEDs")
# Configurar handler para Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
# Configurar GPIO
setup_gpio()
try:
print("Modo 1: Efecto Knight Rider (10 segundos)")
start_time = time.time()
while time.time() - start_time < 10:
knight_rider_effect()
# Apagar todos los LEDs
for pin in LED_PINS:
GPIO.output(pin, GPIO.LOW)
print("\nModo 2: Control por botón")
print("Presiona el botón para cambiar LEDs")
print("Ctrl+C para salir")
button_controlled_leds()
except Exception as e:
print(f"Error: {e}")
finally:
GPIO.cleanup()
if __name__ == "__main__":
main()
2. Lectura de Sensores Analógicos
#!/usr/bin/env python3
# adc_sensors.py - Lectura de sensores analógicos
import Adafruit_BBIO.ADC as ADC
import time
import json
from datetime import datetime
# Configuración de ADC
ADC_PINS = {
"temperature": "P9_40", # AIN1
"light": "P9_38", # AIN3
"potentiometer": "P9_36", # AIN5
"moisture": "P9_35" # AIN6
}
class SensorReader:
def __init__(self):
# Inicializar ADC
ADC.setup()
self.calibration_data = {
"temperature": {"offset": 0.5, "scale": 100.0},
"light": {"min": 0, "max": 1.8},
"potentiometer": {"min": 0, "max": 1.8},
"moisture": {"dry": 1.6, "wet": 0.4}
}
def read_raw_voltage(self, pin):
"""Leer voltaje raw del ADC"""
# BeagleBone ADC: 12-bit, 0-1.8V
return ADC.read(pin) * 1.8
def read_temperature(self):
"""Leer sensor de temperatura LM35"""
voltage = self.read_raw_voltage(ADC_PINS["temperature"])
# LM35: 10mV/°C, offset para temperatura ambiente
temperature_c = (voltage - self.calibration_data["temperature"]["offset"]) * self.calibration_data["temperature"]["scale"]
temperature_f = temperature_c * 9.0 / 5.0 + 32.0
return {
"celsius": round(temperature_c, 2),
"fahrenheit": round(temperature_f, 2),
"voltage": round(voltage, 3)
}
def read_light_sensor(self):
"""Leer sensor de luz (LDR)"""
voltage = self.read_raw_voltage(ADC_PINS["light"])
cal = self.calibration_data["light"]
# Convertir a porcentaje (0-100%)
percentage = ((voltage - cal["min"]) / (cal["max"] - cal["min"])) * 100
percentage = max(0, min(100, percentage)) # Clamp 0-100
return {
"percentage": round(percentage, 1),
"voltage": round(voltage, 3),
"level": "bright" if percentage > 70 else "normal" if percentage > 30 else "dark"
}
def read_potentiometer(self):
"""Leer potenciómetro"""
voltage = self.read_raw_voltage(ADC_PINS["potentiometer"])
cal = self.calibration_data["potentiometer"]
percentage = ((voltage - cal["min"]) / (cal["max"] - cal["min"])) * 100
percentage = max(0, min(100, percentage))
return {
"percentage": round(percentage, 1),
"voltage": round(voltage, 3),
"position": int(percentage * 1023 / 100) # Simular valor de 10-bit
}
def read_moisture_sensor(self):
"""Leer sensor de humedad del suelo"""
voltage = self.read_raw_voltage(ADC_PINS["moisture"])
cal = self.calibration_data["moisture"]
# Invertir lógica: menor voltaje = más humedad
moisture = ((cal["dry"] - voltage) / (cal["dry"] - cal["wet"])) * 100
moisture = max(0, min(100, moisture))
return {
"percentage": round(moisture, 1),
"voltage": round(voltage, 3),
"level": "wet" if moisture > 70 else "moist" if moisture > 30 else "dry"
}
def read_all_sensors(self):
"""Leer todos los sensores"""
return {
"timestamp": datetime.now().isoformat(),
"temperature": self.read_temperature(),
"light": self.read_light_sensor(),
"potentiometer": self.read_potentiometer(),
"moisture": self.read_moisture_sensor()
}
def calibrate_sensor(self, sensor_name, **kwargs):
"""Calibrar un sensor específico"""
if sensor_name in self.calibration_data:
self.calibration_data[sensor_name].update(kwargs)
print(f"Sensor {sensor_name} calibrado: {self.calibration_data[sensor_name]}")
def main():
print("BeagleBone Black - Lectura de Sensores ADC")
# Crear instancia del lector
sensor_reader = SensorReader()
try:
print("Presiona Ctrl+C para detener\n")
while True:
# Leer todos los sensores
data = sensor_reader.read_all_sensors()
# Mostrar datos formateados
print(f"\n=== {datetime.now().strftime('%H:%M:%S')} ===")
temp_data = data["temperature"]
print(f"🌡️ Temperatura: {temp_data['celsius']}°C ({temp_data['fahrenheit']}°F)")
light_data = data["light"]
print(f"💡 Luz: {light_data['percentage']}% ({light_data['level']})")
pot_data = data["potentiometer"]
print(f"🎛️ Potenciómetro: {pot_data['percentage']}% (pos: {pot_data['position']})")
moisture_data = data["moisture"]
print(f"💧 Humedad: {moisture_data['percentage']}% ({moisture_data['level']})")
# Guardar en archivo JSON (opcional)
with open("/tmp/sensor_data.json", "w") as f:
json.dump(data, f, indent=2)
time.sleep(2)
except KeyboardInterrupt:
print("\n\nPrograma terminado por el usuario")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Comunicación I2C y SPI
1. Sensor I2C (BME280)
#!/usr/bin/env python3
# bme280_i2c.py - Sensor BME280 via I2C
import smbus
import time
import struct
from dataclasses import dataclass
@dataclass
class BME280Data:
temperature: float
humidity: float
pressure: float
class BME280:
# Registros BME280
BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_CTRL_HUM = 0xF2
BME280_REGISTER_CTRL_MEAS = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
def __init__(self, address=0x76, bus=2):
self.address = address
self.bus = smbus.SMBus(bus) # I2C-2 en BeagleBone
# Verificar chip ID
chip_id = self.bus.read_byte_data(self.address, self.BME280_REGISTER_CHIPID)
if chip_id != 0x60:
raise ValueError(f"BME280 no encontrado. Chip ID: 0x{chip_id:02X}")
# Leer coeficientes de calibración
self._read_coefficients()
# Configurar sensor
self._configure_sensor()
print("BME280 inicializado correctamente")
def _read_coefficients(self):
"""Leer coeficientes de calibración"""
# Coeficientes de temperatura y presión (0x88-0x9F)
coeff = self.bus.read_i2c_block_data(self.address, 0x88, 24)
self.dig_T1 = struct.unpack('<H', bytes(coeff[0:2]))[0]
self.dig_T2 = struct.unpack('<h', bytes(coeff[2:4]))[0]
self.dig_T3 = struct.unpack('<h', bytes(coeff[4:6]))[0]
self.dig_P1 = struct.unpack('<H', bytes(coeff[6:8]))[0]
self.dig_P2 = struct.unpack('<h', bytes(coeff[8:10]))[0]
self.dig_P3 = struct.unpack('<h', bytes(coeff[10:12]))[0]
self.dig_P4 = struct.unpack('<h', bytes(coeff[12:14]))[0]
self.dig_P5 = struct.unpack('<h', bytes(coeff[14:16]))[0]
self.dig_P6 = struct.unpack('<h', bytes(coeff[16:18]))[0]
self.dig_P7 = struct.unpack('<h', bytes(coeff[18:20]))[0]
self.dig_P8 = struct.unpack('<h', bytes(coeff[20:22]))[0]
self.dig_P9 = struct.unpack('<h', bytes(coeff[22:24]))[0]
# Coeficientes de humedad
self.dig_H1 = self.bus.read_byte_data(self.address, 0xA1)
coeff_h = self.bus.read_i2c_block_data(self.address, 0xE1, 7)
self.dig_H2 = struct.unpack('<h', bytes(coeff_h[0:2]))[0]
self.dig_H3 = coeff_h[2]
self.dig_H4 = (coeff_h[3] << 4) | (coeff_h[4] & 0x0F)
self.dig_H5 = (coeff_h[5] << 4) | (coeff_h[4] >> 4)
self.dig_H6 = struct.unpack('<b', bytes([coeff_h[6]]))[0]
def _configure_sensor(self):
"""Configurar el sensor"""
# Oversampling: T=1x, P=1x, H=1x
self.bus.write_byte_data(self.address, self.BME280_REGISTER_CTRL_HUM, 0x01)
self.bus.write_byte_data(self.address, self.BME280_REGISTER_CTRL_MEAS, 0x27) # Mode normal
self.bus.write_byte_data(self.address, self.BME280_REGISTER_CONFIG, 0xA0) # Standby 1000ms
def read_data(self):
"""Leer datos del sensor"""
# Leer datos raw (presión, temperatura, humedad)
data = self.bus.read_i2c_block_data(self.address, self.BME280_REGISTER_PRESSURE_DATA, 8)
# Extraer valores raw
adc_p = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
adc_t = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
adc_h = (data[6] << 8) | data[7]
# Calcular temperatura compensada
t_fine, temperature = self._compensate_temperature(adc_t)
# Calcular presión compensada
pressure = self._compensate_pressure(adc_p, t_fine)
# Calcular humedad compensada
humidity = self._compensate_humidity(adc_h, t_fine)
return BME280Data(temperature, humidity, pressure)
def _compensate_temperature(self, adc_t):
"""Compensar lectura de temperatura"""
var1 = (adc_t / 16384.0 - self.dig_T1 / 1024.0) * self.dig_T2
var2 = ((adc_t / 131072.0 - self.dig_T1 / 8192.0) ** 2) * self.dig_T3
t_fine = var1 + var2
temperature = t_fine / 5120.0
return t_fine, temperature
def _compensate_pressure(self, adc_p, t_fine):
"""Compensar lectura de presión"""
var1 = (t_fine / 2.0) - 64000.0
var2 = var1 * var1 * self.dig_P6 / 32768.0
var2 = var2 + var1 * self.dig_P5 * 2.0
var2 = (var2 / 4.0) + (self.dig_P4 * 65536.0)
var1 = (self.dig_P3 * var1 * var1 / 524288.0 + self.dig_P2 * var1) / 524288.0
var1 = (1.0 + var1 / 32768.0) * self.dig_P1
if var1 == 0:
return 0
pressure = 1048576.0 - adc_p
pressure = (pressure - (var2 / 4096.0)) * 6250.0 / var1
var1 = self.dig_P9 * pressure * pressure / 2147483648.0
var2 = pressure * self.dig_P8 / 32768.0
pressure = pressure + (var1 + var2 + self.dig_P7) / 16.0
return pressure / 100.0 # Convert to hPa
def _compensate_humidity(self, adc_h, t_fine):
"""Compensar lectura de humedad"""
var_h = t_fine - 76800.0
if var_h == 0:
return 0
var_h = ((adc_h - (self.dig_H4 * 64.0 + self.dig_H5 / 16384.0 * var_h)) *
(self.dig_H2 / 65536.0 * (1.0 + self.dig_H6 / 67108864.0 * var_h *
(1.0 + self.dig_H3 / 67108864.0 * var_h))))
var_h = var_h * (1.0 - self.dig_H1 * var_h / 524288.0)
if var_h > 100.0:
var_h = 100.0
elif var_h < 0.0:
var_h = 0.0
return var_h
def main():
print("BeagleBone Black - BME280 I2C Sensor")
try:
# Inicializar sensor
sensor = BME280()
print("Presiona Ctrl+C para detener\n")
while True:
# Leer datos
data = sensor.read_data()
print(f"🌡️ Temperatura: {data.temperature:.2f}°C")
print(f"💧 Humedad: {data.humidity:.1f}%")
print(f"🌀 Presión: {data.pressure:.1f} hPa")
print("-" * 30)
time.sleep(2)
except KeyboardInterrupt:
print("\nPrograma terminado")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
2. Display SPI (SSD1306 OLED)
#!/usr/bin/env python3
# ssd1306_spi.py - Display OLED SSD1306 via SPI
import spidev
import RPi.GPIO as GPIO
import time
from PIL import Image, ImageDraw, ImageFont
class SSD1306_SPI:
# Comandos SSD1306
DISPLAYOFF = 0xAE
DISPLAYON = 0xAF
SETCONTRAST = 0x81
SETHIGHCOLUMN = 0x10
SETLOWCOLUMN = 0x00
SETMEMORYMODE = 0x20
SETSTARTLINE = 0x40
def __init__(self, dc_pin=18, rst_pin=22, spi_bus=0, spi_device=0):
self.dc_pin = dc_pin
self.rst_pin = rst_pin
# Configurar GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.dc_pin, GPIO.OUT)
GPIO.setup(self.rst_pin, GPIO.OUT)
# Configurar SPI
self.spi = spidev.SpiDev()
self.spi.open(spi_bus, spi_device)
self.spi.max_speed_hz = 8000000 # 8MHz
self.spi.mode = 0
# Dimensiones del display
self.width = 128
self.height = 64
# Buffer de display
self.buffer = [0] * (self.width * self.height // 8)
# Inicializar display
self._init_display()
def _init_display(self):
"""Inicializar el display SSD1306"""
# Reset
GPIO.output(self.rst_pin, GPIO.LOW)
time.sleep(0.01)
GPIO.output(self.rst_pin, GPIO.HIGH)
time.sleep(0.01)
# Secuencia de inicialización
init_sequence = [
self.DISPLAYOFF,
0xD5, 0x80, # Set display clock div
0xA8, 0x3F, # Set multiplex ratio
0xD3, 0x00, # Set display offset
0x40, # Set start line
0x8D, 0x14, # Enable charge pump
0x20, 0x00, # Memory mode horizontal
0xA1, # Set segment re-map
0xC8, # Set COM scan direction
0xDA, 0x12, # Set COM pins
0x81, 0xCF, # Set contrast
0xD9, 0xF1, # Set precharge
0xDB, 0x40, # Set VCOM detect
0xA4, # Display all on resume
0xA6, # Normal display
self.DISPLAYON
]
for cmd in init_sequence:
self._write_command(cmd)
# Limpiar display
self.clear()
self.display()
def _write_command(self, cmd):
"""Escribir comando al display"""
GPIO.output(self.dc_pin, GPIO.LOW) # Modo comando
self.spi.xfer2([cmd])
def _write_data(self, data):
"""Escribir datos al display"""
GPIO.output(self.dc_pin, GPIO.HIGH) # Modo datos
self.spi.xfer2(data)
def clear(self):
"""Limpiar buffer del display"""
self.buffer = [0] * len(self.buffer)
def display(self):
"""Enviar buffer al display"""
# Set column address
self._write_command(0x21) # COLUMNADDR
self._write_command(0) # Column start
self._write_command(self.width - 1) # Column end
# Set row address
self._write_command(0x22) # PAGEADDR
self._write_command(0) # Page start
self._write_command(7) # Page end
# Write buffer
chunk_size = 32
for i in range(0, len(self.buffer), chunk_size):
chunk = self.buffer[i:i + chunk_size]
self._write_data(chunk)
def set_pixel(self, x, y, color):
"""Establecer un pixel"""
if x >= self.width or y >= self.height or x < 0 or y < 0:
return
byte_pos = x + (y // 8) * self.width
bit_pos = y % 8
if color:
self.buffer[byte_pos] |= (1 << bit_pos)
else:
self.buffer[byte_pos] &= ~(1 << bit_pos)
def draw_text(self, x, y, text, font_size=12):
"""Dibujar texto usando PIL"""
# Crear imagen PIL
image = Image.new('1', (self.width, self.height))
draw = ImageDraw.Draw(image)
try:
font = ImageFont.truetype('/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf', font_size)
except:
font = ImageFont.load_default()
draw.text((x, y), text, font=font, fill=255)
# Convertir a buffer
for y_pos in range(self.height):
for x_pos in range(self.width):
pixel = image.getpixel((x_pos, y_pos))
self.set_pixel(x_pos, y_pos, pixel)
def draw_line(self, x0, y0, x1, y1, color=1):
"""Dibujar línea (algoritmo de Bresenham)"""
dx = abs(x1 - x0)
dy = abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx - dy
while True:
self.set_pixel(x0, y0, color)
if x0 == x1 and y0 == y1:
break
e2 = 2 * err
if e2 > -dy:
err -= dy
x0 += sx
if e2 < dx:
err += dx
y0 += sy
def draw_rectangle(self, x, y, width, height, color=1, fill=False):
"""Dibujar rectángulo"""
if fill:
for i in range(height):
self.draw_line(x, y + i, x + width - 1, y + i, color)
else:
self.draw_line(x, y, x + width - 1, y, color)
self.draw_line(x, y, x, y + height - 1, color)
self.draw_line(x + width - 1, y, x + width - 1, y + height - 1, color)
self.draw_line(x, y + height - 1, x + width - 1, y + height - 1, color)
def system_monitor_display():
"""Monitor del sistema en display OLED"""
import psutil
display = SSD1306_SPI()
try:
while True:
display.clear()
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
display.draw_text(0, 0, f"CPU: {cpu_percent:.1f}%", 10)
# Memory usage
memory = psutil.virtual_memory()
mem_percent = memory.percent
display.draw_text(0, 12, f"RAM: {mem_percent:.1f}%", 10)
# Temperature (si está disponible)
try:
temp_sensors = psutil.sensors_temperatures()
if temp_sensors:
temp = list(temp_sensors.values())[0][0].current
display.draw_text(0, 24, f"Temp: {temp:.1f}C", 10)
except:
display.draw_text(0, 24, "Temp: N/A", 10)
# Network
net_io = psutil.net_io_counters()
net_mb_sent = net_io.bytes_sent / 1024 / 1024
display.draw_text(0, 36, f"TX: {net_mb_sent:.1f}MB", 10)
# Uptime
uptime = time.time() - psutil.boot_time()
uptime_hours = int(uptime / 3600)
display.draw_text(0, 48, f"Up: {uptime_hours}h", 10)
# Progress bars
display.draw_rectangle(70, 2, 50, 8)
display.draw_rectangle(71, 3, int(48 * cpu_percent / 100), 6, fill=True)
display.draw_rectangle(70, 14, 50, 8)
display.draw_rectangle(71, 15, int(48 * mem_percent / 100), 6, fill=True)
display.display()
time.sleep(2)
except KeyboardInterrupt:
display.clear()
display.draw_text(20, 25, "Goodbye!", 16)
display.display()
time.sleep(2)
display.clear()
display.display()
GPIO.cleanup()
if __name__ == "__main__":
print("BeagleBone Black - SSD1306 OLED Display")
system_monitor_display()
Proyecto Completo: Sistema de Monitoreo IoT
Arquitectura del Sistema
#!/usr/bin/env python3
# iot_monitoring_system.py - Sistema completo de monitoreo
import json
import time
import threading
import logging
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Dict, List
import paho.mqtt.client as mqtt
import Adafruit_BBIO.GPIO as GPIO
import Adafruit_BBIO.ADC as ADC
# Configuración de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/iot_monitor.log'),
logging.StreamHandler()
]
)
@dataclass
class SensorReading:
timestamp: str
sensor_id: str
value: float
unit: str
location: str
@dataclass
class SystemStatus:
timestamp: str
cpu_temp: float
memory_usage: float
disk_usage: float
network_status: bool
active_sensors: int
class SensorManager:
def __init__(self):
self.sensors = {}
self.readings = []
self.running = False
ADC.setup()
def register_sensor(self, sensor_id: str, pin: str, sensor_type: str, location: str):
"""Registrar un nuevo sensor"""
self.sensors[sensor_id] = {
'pin': pin,
'type': sensor_type,
'location': location,
'last_reading': None
}
logging.info(f"Sensor registrado: {sensor_id} ({sensor_type}) en {location}")
def read_sensor(self, sensor_id: str) -> SensorReading:
"""Leer un sensor específico"""
if sensor_id not in self.sensors:
raise ValueError(f"Sensor {sensor_id} no registrado")
sensor = self.sensors[sensor_id]
raw_value = ADC.read(sensor['pin']) * 1.8 # Convertir a voltaje
# Procesar según tipo de sensor
if sensor['type'] == 'temperature':
# LM35: 10mV/°C
processed_value = raw_value * 100
unit = '°C'
elif sensor['type'] == 'humidity':
# Sensor de humedad del suelo
processed_value = ((1.6 - raw_value) / (1.6 - 0.4)) * 100
unit = '%'
elif sensor['type'] == 'light':
# LDR
processed_value = (raw_value / 1.8) * 100
unit = '%'
else:
processed_value = raw_value
unit = 'V'
reading = SensorReading(
timestamp=datetime.now().isoformat(),
sensor_id=sensor_id,
value=round(processed_value, 2),
unit=unit,
location=sensor['location']
)
sensor['last_reading'] = reading
self.readings.append(reading)
# Mantener solo las últimas 1000 lecturas
if len(self.readings) > 1000:
self.readings = self.readings[-1000:]
return reading
def read_all_sensors(self) -> List[SensorReading]:
"""Leer todos los sensores registrados"""
readings = []
for sensor_id in self.sensors:
try:
reading = self.read_sensor(sensor_id)
readings.append(reading)
except Exception as e:
logging.error(f"Error leyendo sensor {sensor_id}: {e}")
return readings
def start_monitoring(self, interval: int = 30):
"""Iniciar monitoreo continuo"""
self.running = True
def monitor_loop():
while self.running:
try:
readings = self.read_all_sensors()
for reading in readings:
logging.info(f"Sensor {reading.sensor_id}: {reading.value}{reading.unit}")
except Exception as e:
logging.error(f"Error en monitoreo: {e}")
time.sleep(interval)
monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
monitor_thread.start()
logging.info("Monitoreo de sensores iniciado")
def stop_monitoring(self):
"""Detener monitoreo"""
self.running = False
logging.info("Monitoreo de sensores detenido")
class MQTTManager:
def __init__(self, broker_host: str = "localhost", broker_port: int = 1883):
self.broker_host = broker_host
self.broker_port = broker_port
self.client = mqtt.Client()
self.connected = False
# Configurar callbacks
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
logging.info(f"Conectado a MQTT broker {self.broker_host}:{self.broker_port}")
# Suscribirse a comandos
client.subscribe("beaglebone/commands")
else:
logging.error(f"Error conectando a MQTT: {rc}")
def _on_disconnect(self, client, userdata, rc):
self.connected = False
logging.info("Desconectado de MQTT broker")
def _on_message(self, client, userdata, msg):
try:
command = json.loads(msg.payload.decode())
logging.info(f"Comando recibido: {command}")
# Procesar comando
self._process_command(command)
except Exception as e:
logging.error(f"Error procesando mensaje MQTT: {e}")
def _process_command(self, command):
"""Procesar comandos recibidos via MQTT"""
cmd_type = command.get('type')
if cmd_type == 'read_sensor':
sensor_id = command.get('sensor_id')
# Implementar lectura de sensor específico
elif cmd_type == 'set_interval':
interval = command.get('interval', 30)
# Cambiar intervalo de monitoreo
# Agregar más comandos según necesidad
def connect(self):
"""Conectar al broker MQTT"""
try:
self.client.connect(self.broker_host, self.broker_port, 60)
self.client.loop_start()
except Exception as e:
logging.error(f"Error conectando a MQTT: {e}")
def disconnect(self):
"""Desconectar del broker MQTT"""
self.client.loop_stop()
self.client.disconnect()
def publish_sensor_data(self, reading: SensorReading):
"""Publicar datos de sensor"""
if not self.connected:
return False
topic = f"beaglebone/sensors/{reading.sensor_id}"
payload = json.dumps(asdict(reading))
try:
self.client.publish(topic, payload)
return True
except Exception as e:
logging.error(f"Error publicando datos: {e}")
return False
def publish_system_status(self, status: SystemStatus):
"""Publicar estado del sistema"""
if not self.connected:
return False
topic = "beaglebone/system/status"
payload = json.dumps(asdict(status))
try:
self.client.publish(topic, payload)
return True
except Exception as e:
logging.error(f"Error publicando estado: {e}")
return False
class AlertManager:
def __init__(self):
self.alert_pins = {
'warning': 'P9_12', # LED amarillo
'critical': 'P9_14', # LED rojo
'buzzer': 'P9_16' # Buzzer
}
# Configurar pines
for pin in self.alert_pins.values():
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, GPIO.LOW)
self.thresholds = {
'temperature_high': 35.0,
'temperature_low': 10.0,
'humidity_low': 20.0,
'light_low': 10.0
}
def check_alerts(self, readings: List[SensorReading]):
"""Verificar alertas basadas en las lecturas"""
alerts = []
for reading in readings:
if reading.sensor_id == 'temp_sensor':
if reading.value > self.thresholds['temperature_high']:
alerts.append(f"Temperatura alta: {reading.value}°C")
self._activate_alert('critical')
elif reading.value < self.thresholds['temperature_low']:
alerts.append(f"Temperatura baja: {reading.value}°C")
self._activate_alert('warning')
elif reading.sensor_id == 'humidity_sensor':
if reading.value < self.thresholds['humidity_low']:
alerts.append(f"Humedad baja: {reading.value}%")
self._activate_alert('warning')
elif reading.sensor_id == 'light_sensor':
if reading.value < self.thresholds['light_low']:
alerts.append(f"Poca luz: {reading.value}%")
self._activate_alert('warning')
if not alerts:
self._clear_alerts()
return alerts
def _activate_alert(self, level: str):
"""Activar alerta visual/sonora"""
if level == 'warning':
GPIO.output(self.alert_pins['warning'], GPIO.HIGH)
elif level == 'critical':
GPIO.output(self.alert_pins['critical'], GPIO.HIGH)
# Buzzer intermitente
for _ in range(3):
GPIO.output(self.alert_pins['buzzer'], GPIO.HIGH)
time.sleep(0.1)
GPIO.output(self.alert_pins['buzzer'], GPIO.LOW)
time.sleep(0.1)
def _clear_alerts(self):
"""Limpiar todas las alertas"""
for pin in self.alert_pins.values():
GPIO.output(pin, GPIO.LOW)
class IoTMonitoringSystem:
def __init__(self):
self.sensor_manager = SensorManager()
self.mqtt_manager = MQTTManager()
self.alert_manager = AlertManager()
self.running = False
def setup(self):
"""Configurar el sistema"""
# Registrar sensores
self.sensor_manager.register_sensor('temp_sensor', 'P9_40', 'temperature', 'greenhouse')
self.sensor_manager.register_sensor('humidity_sensor', 'P9_38', 'humidity', 'greenhouse')
self.sensor_manager.register_sensor('light_sensor', 'P9_36', 'light', 'greenhouse')
# Conectar MQTT
self.mqtt_manager.connect()
logging.info("Sistema de monitoreo IoT configurado")
def run(self):
"""Ejecutar el sistema principal"""
self.running = True
# Iniciar monitoreo de sensores
self.sensor_manager.start_monitoring(interval=30)
try:
while self.running:
# Leer todos los sensores
readings = self.sensor_manager.read_all_sensors()
# Verificar alertas
alerts = self.alert_manager.check_alerts(readings)
# Publicar datos via MQTT
for reading in readings:
self.mqtt_manager.publish_sensor_data(reading)
# Publicar estado del sistema
system_status = self._get_system_status()
self.mqtt_manager.publish_system_status(system_status)
# Log de actividad
if alerts:
for alert in alerts:
logging.warning(alert)
else:
logging.info("Sistema funcionando normalmente")
time.sleep(60) # Loop principal cada minuto
except KeyboardInterrupt:
logging.info("Sistema detenido por el usuario")
finally:
self.shutdown()
def _get_system_status(self) -> SystemStatus:
"""Obtener estado del sistema"""
import psutil
return SystemStatus(
timestamp=datetime.now().isoformat(),
cpu_temp=self._get_cpu_temperature(),
memory_usage=psutil.virtual_memory().percent,
disk_usage=psutil.disk_usage('/').percent,
network_status=self.mqtt_manager.connected,
active_sensors=len(self.sensor_manager.sensors)
)
def _get_cpu_temperature(self) -> float:
"""Obtener temperatura del CPU"""
try:
with open('/sys/class/thermal/thermal_zone0/temp', 'r') as f:
temp = int(f.read()) / 1000.0
return temp
except:
return 0.0
def shutdown(self):
"""Apagar el sistema limpiamente"""
self.running = False
self.sensor_manager.stop_monitoring()
self.mqtt_manager.disconnect()
GPIO.cleanup()
logging.info("Sistema apagado correctamente")
def main():
print("BeagleBone Black - Sistema de Monitoreo IoT")
system = IoTMonitoringSystem()
try:
system.setup()
system.run()
except Exception as e:
logging.error(f"Error del sistema: {e}")
finally:
system.shutdown()
if __name__ == "__main__":
main()
Instalación como Servicio Systemd
# Crear archivo de servicio
sudo tee /etc/systemd/system/iot-monitor.service > /dev/null <<EOF
[Unit]
Description=IoT Monitoring System
After=network.target
[Service]
Type=simple
User=debian
WorkingDirectory=/home/debian/iot-monitor
ExecStart=/usr/bin/python3 /home/debian/iot-monitor/iot_monitoring_system.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
# Habilitar y iniciar servicio
sudo systemctl daemon-reload
sudo systemctl enable iot-monitor.service
sudo systemctl start iot-monitor.service
# Ver estado
sudo systemctl status iot-monitor.service
# Ver logs
sudo journalctl -u iot-monitor.service -f
Conclusión
BeagleBone Black ofrece una plataforma robusta para proyectos embedded con Linux:
- ✅ GPIO extenso con 65 pines digitales
- ✅ ADC integrado para sensores analógicos
- ✅ Comunicación completa I2C, SPI, UART, CAN
- ✅ Linux completo con acceso a ecosistema de software
- ✅ Tiempo real con unidades PRU
- ✅ Conectividad Ethernet y USB
Aplicaciones Ideales
- Automatización industrial
- Sistemas IoT complejos
- Data acquisition sistemas
- Control de procesos
- Monitoreo ambiental
- Robótica avanzada
Próximos Pasos
- Explorar unidades PRU para tiempo real
- Implementar interfaces web con Flask/Django
- Integrar bases de datos (SQLite/PostgreSQL)
- Configurar VPN para acceso remoto
- Desarrollar aplicaciones móviles complementarias
¡BeagleBone Black es perfecto para proyectos que requieren la potencia de Linux con capacidades embedded robustas!