314 lines
13 KiB
Python
314 lines
13 KiB
Python
import serial
|
|
import struct
|
|
import numpy as np
|
|
import os
|
|
import binascii
|
|
import time
|
|
import cv2 # Added OpenCV for image processing
|
|
|
|
# Configure serial port - this will connect to the USB serial port from the Pico
|
|
PORT = '/dev/tty.usbmodem101' # Adjust for your system
|
|
BAUD_RATE = 3000 # Reduced to match the Pico's baud rate for debugging
|
|
TIMEOUT = 60 # Increased timeout for slower data transfer
|
|
|
|
# Enable detailed debugging information
|
|
DEBUG_MODE = True # Set to True for verbose output, False for normal operation
|
|
|
|
# Frame constants - match the values in the C++ code
|
|
FRAME_HEADER = b'\xDE\xAD\xBE\xEF' # Distinctive header
|
|
FRAME_FOOTER = b'\xCA\xFE\xBA\xBE' # Distinctive footer
|
|
BINARY_START_MARKER = "BINARY_START"
|
|
BINARY_END_MARKER = "###BINARY_END###" # Updated to match the new format
|
|
|
|
# Directory to save the frames
|
|
output_dir = "frames"
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
def debug_hex(data):
|
|
"""Print data in hexadecimal format for debugging."""
|
|
return binascii.hexlify(data).decode('ascii')
|
|
|
|
def save_as_image(frame_data, filename_base, frame_size=(160, 120)):
|
|
"""Save the frame data as both PNG and raw file formats."""
|
|
try:
|
|
# Convert the raw data to a numpy array
|
|
img_array = np.frombuffer(frame_data, dtype=np.uint16)
|
|
|
|
# Reshape to the expected dimensions
|
|
width, height = frame_size
|
|
img_array = img_array.reshape(height, width)
|
|
|
|
# Convert from RGB565 to BGR format that OpenCV uses
|
|
# This conversion depends on your exact pixel format
|
|
# RGB565: 5 bits red, 6 bits green, 5 bits blue
|
|
img_rgb = np.zeros((height, width, 3), dtype=np.uint8)
|
|
|
|
# Extract R, G, B components from RGB565
|
|
r = ((img_array & 0xF800) >> 11) * 8 # 5 bits for red (0-31) scaled to 0-255
|
|
g = ((img_array & 0x07E0) >> 5) * 4 # 6 bits for green (0-63) scaled to 0-255
|
|
b = (img_array & 0x001F) * 8 # 5 bits for blue (0-31) scaled to 0-255
|
|
|
|
img_rgb[:,:,0] = b # OpenCV uses BGR order
|
|
img_rgb[:,:,1] = g
|
|
img_rgb[:,:,2] = r
|
|
|
|
# Save as PNG
|
|
png_filename = f"{filename_base}.png"
|
|
cv2.imwrite(png_filename, img_rgb)
|
|
print(f"Saved PNG image: {png_filename}")
|
|
|
|
# Save as JPG with 90% quality
|
|
jpg_filename = f"{filename_base}.jpg"
|
|
cv2.imwrite(jpg_filename, img_rgb, [cv2.IMWRITE_JPEG_QUALITY, 90])
|
|
print(f"Saved JPG image: {jpg_filename}")
|
|
|
|
# Also save the raw numpy array for further analysis if needed
|
|
np_filename = f"{filename_base}.npy"
|
|
np.save(np_filename, img_array)
|
|
print(f"Saved NumPy array: {np_filename}")
|
|
|
|
# Save raw binary data too
|
|
raw_filename = f"{filename_base}.raw"
|
|
with open(raw_filename, "wb") as f:
|
|
f.write(frame_data)
|
|
print(f"Saved raw data: {raw_filename}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error saving image: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def receive_frame():
|
|
"""Receive a complete frame from the serial port."""
|
|
try:
|
|
with serial.Serial(PORT, BAUD_RATE, timeout=TIMEOUT) as ser:
|
|
print(f"Monitoring serial port: {ser.name}")
|
|
|
|
line = ""
|
|
in_binary_section = False
|
|
binary_data = bytearray()
|
|
|
|
# Safety check - maximum buffer size to prevent infinite collection
|
|
MAX_BUFFER_SIZE = 500000 # 500KB should be more than enough for a 160x120 frame
|
|
|
|
while True:
|
|
# Read data from serial port
|
|
if ser.in_waiting > 0:
|
|
# Try to read a line first to check for markers
|
|
c = ser.read(1)
|
|
|
|
# If in binary section, collect binary data
|
|
if in_binary_section:
|
|
binary_data.append(c[0])
|
|
|
|
# Safety check - if buffer gets too large, abort
|
|
if len(binary_data) > MAX_BUFFER_SIZE:
|
|
print(f"WARNING: Binary data exceeded maximum buffer size ({MAX_BUFFER_SIZE})")
|
|
print("Aborting binary collection to prevent memory issues")
|
|
return None
|
|
|
|
# Show progress for large binary sections
|
|
if len(binary_data) % 1000 == 0:
|
|
print(f"Collected {len(binary_data)} bytes of binary data")
|
|
|
|
# Check if we've accumulated enough data to check for the end marker
|
|
# Add the character to our text buffer as well for end marker detection
|
|
if c.isascii() and not c.isspace():
|
|
line += c.decode('ascii', errors='replace')
|
|
else:
|
|
# For whitespace characters, we need special handling
|
|
if c == b'\n':
|
|
# If the current line contains our end marker
|
|
if BINARY_END_MARKER in line:
|
|
print(f"Binary end marker detected in line: {line}")
|
|
in_binary_section = False
|
|
print(f"Total binary data collected: {len(binary_data)} bytes")
|
|
|
|
# Print binary data summary
|
|
print("First 32 bytes:", debug_hex(binary_data[:32]))
|
|
print("Last 32 bytes:", debug_hex(binary_data[-32:]))
|
|
|
|
# Find the last occurrence of FRAME_FOOTER
|
|
footer_pos = -1
|
|
for i in range(len(binary_data) - len(FRAME_FOOTER)):
|
|
if binary_data[i:i+len(FRAME_FOOTER)] == FRAME_FOOTER:
|
|
footer_pos = i
|
|
|
|
if footer_pos >= 0:
|
|
print(f"Frame footer found at position {footer_pos}: {debug_hex(FRAME_FOOTER)}")
|
|
# Trim the binary data to end at the footer + footer length
|
|
binary_data = binary_data[:footer_pos + len(FRAME_FOOTER)]
|
|
else:
|
|
print(f"WARNING: Frame footer {debug_hex(FRAME_FOOTER)} not found in binary data")
|
|
|
|
# Process the binary data
|
|
return process_binary_data(binary_data)
|
|
line = ""
|
|
elif c == b'\r':
|
|
pass # Ignore CR characters
|
|
else:
|
|
# Normal text processing
|
|
if c == b'\n':
|
|
# Check if this line is the start marker
|
|
if BINARY_START_MARKER in line:
|
|
print("Binary start marker detected")
|
|
in_binary_section = True
|
|
binary_data = bytearray()
|
|
line = ""
|
|
else:
|
|
print(f"Log: {line}")
|
|
line = ""
|
|
else:
|
|
line += c.decode('ascii', errors='replace')
|
|
else:
|
|
# No data available, sleep briefly
|
|
time.sleep(0.01)
|
|
|
|
except serial.SerialException as e:
|
|
print(f"Serial error: {e}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Unexpected error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def process_binary_data(data):
|
|
"""Process the binary data to extract the frame."""
|
|
print(f"Processing {len(data)} bytes of binary data")
|
|
|
|
# Search for the frame header
|
|
header_pos = -1
|
|
for i in range(len(data) - len(FRAME_HEADER)):
|
|
if data[i:i+len(FRAME_HEADER)] == FRAME_HEADER:
|
|
header_pos = i
|
|
break
|
|
|
|
if header_pos == -1:
|
|
print("Error: Frame header not found")
|
|
return None
|
|
|
|
print(f"Frame header found at position {header_pos}")
|
|
|
|
# Extract the frame size
|
|
size_start = header_pos + len(FRAME_HEADER)
|
|
size_bytes = data[size_start:size_start+4]
|
|
if len(size_bytes) < 4:
|
|
print("Error: Incomplete size bytes")
|
|
return None
|
|
|
|
frame_size = struct.unpack('<I', size_bytes)[0]
|
|
print(f"Frame size: {frame_size} bytes")
|
|
|
|
# Extract the frame data
|
|
data_start = size_start + 4
|
|
frame_data = data[data_start:data_start+frame_size]
|
|
|
|
if len(frame_data) != frame_size:
|
|
print(f"Warning: Incomplete frame data (got {len(frame_data)} of {frame_size} bytes)")
|
|
|
|
# Save the incomplete frame data for debugging
|
|
incomplete_filename = os.path.join(output_dir, "incomplete_frame.raw")
|
|
with open(incomplete_filename, "wb") as f:
|
|
f.write(frame_data)
|
|
print(f"Saved incomplete frame data: {incomplete_filename}")
|
|
|
|
return None
|
|
|
|
# Extract and verify checksum
|
|
checksum_start = data_start + frame_size
|
|
checksum_bytes = data[checksum_start:checksum_start+4]
|
|
if len(checksum_bytes) < 4:
|
|
print("Error: Incomplete checksum bytes")
|
|
return None
|
|
|
|
received_checksum = struct.unpack('<I', checksum_bytes)[0]
|
|
calculated_checksum = sum(frame_data) & 0xFFFFFFFF
|
|
print(f"Checksum: received={received_checksum}, calculated={calculated_checksum}")
|
|
|
|
if received_checksum != calculated_checksum:
|
|
print("Error: Checksum mismatch")
|
|
#return None
|
|
|
|
# Verify frame footer
|
|
footer_start = checksum_start + 4
|
|
footer = data[footer_start:footer_start+len(FRAME_FOOTER)]
|
|
if footer != FRAME_FOOTER:
|
|
print("Error: Invalid frame footer")
|
|
#return None
|
|
|
|
print("Frame successfully extracted and verified!")
|
|
return frame_data
|
|
|
|
def find_serial_port():
|
|
"""Find the Pico's serial port by scanning available ports."""
|
|
import serial.tools.list_ports
|
|
|
|
# Look for common Pico identifiers
|
|
pico_identifiers = ['usbmodem', 'ttyACM']
|
|
|
|
ports = list(serial.tools.list_ports.comports())
|
|
for port in ports:
|
|
for identifier in pico_identifiers:
|
|
if identifier in port.device.lower():
|
|
print(f"Found likely Pico device at {port.device}")
|
|
return port.device
|
|
|
|
# If no specific Pico port was found, return the first available port if any
|
|
if ports:
|
|
print(f"No specific Pico device found. Using first available port: {ports[0].device}")
|
|
return ports[0].device
|
|
|
|
return None
|
|
|
|
def main():
|
|
"""Main function to receive and save frames."""
|
|
frame_count = 0
|
|
max_retries = 5
|
|
|
|
while True:
|
|
try:
|
|
print("\n====== Waiting for new frame ======")
|
|
# Try to find a serial port if not already connected
|
|
try:
|
|
port = find_serial_port()
|
|
if port:
|
|
print(f"Using serial port: {port}")
|
|
global PORT
|
|
PORT = port
|
|
else:
|
|
print("No serial port found. Retrying in 5 seconds...")
|
|
time.sleep(5)
|
|
continue
|
|
|
|
frame_data = receive_frame()
|
|
|
|
if frame_data is not None:
|
|
# Create base filename for this frame
|
|
base_filename = os.path.join(output_dir, f"frame_{frame_count:04d}")
|
|
|
|
# Save the frame data as images
|
|
if save_as_image(frame_data, base_filename):
|
|
print(f"Successfully saved frame {frame_count}")
|
|
frame_count += 1
|
|
|
|
except serial.SerialException as e:
|
|
print(f"Serial connection error: {e}")
|
|
print("Waiting 5 seconds before retrying...")
|
|
time.sleep(5)
|
|
continue
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nProcess interrupted by user. Exiting...")
|
|
break
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
time.sleep(5) # Wait before retrying
|
|
|
|
if __name__ == "__main__":
|
|
main() |