import serial import struct import numpy as np import os import binascii import time import cv2 # Added OpenCV for image processing # Configure serial port - this will connect to the USB serial port from the Pico PORT = '/dev/tty.usbmodem101' # Adjust for your system BAUD_RATE = 3000 # Reduced to match the Pico's baud rate for debugging TIMEOUT = 60 # Increased timeout for slower data transfer # Enable detailed debugging information DEBUG_MODE = True # Set to True for verbose output, False for normal operation # Frame constants - match the values in the C++ code FRAME_HEADER = b'\xDE\xAD\xBE\xEF' # Distinctive header FRAME_FOOTER = b'\xCA\xFE\xBA\xBE' # Distinctive footer BINARY_START_MARKER = "BINARY_START" BINARY_END_MARKER = "###BINARY_END###" # Updated to match the new format # Directory to save the frames output_dir = "frames" os.makedirs(output_dir, exist_ok=True) def debug_hex(data): """Print data in hexadecimal format for debugging.""" return binascii.hexlify(data).decode('ascii') def save_as_image(frame_data, filename_base, frame_size=(160, 120)): """Save the frame data as both PNG and raw file formats.""" try: # Convert the raw data to a numpy array img_array = np.frombuffer(frame_data, dtype=np.uint16) # Reshape to the expected dimensions width, height = frame_size img_array = img_array.reshape(height, width) # Convert from RGB565 to BGR format that OpenCV uses # This conversion depends on your exact pixel format # RGB565: 5 bits red, 6 bits green, 5 bits blue img_rgb = np.zeros((height, width, 3), dtype=np.uint8) # Extract R, G, B components from RGB565 r = ((img_array & 0xF800) >> 11) * 8 # 5 bits for red (0-31) scaled to 0-255 g = ((img_array & 0x07E0) >> 5) * 4 # 6 bits for green (0-63) scaled to 0-255 b = (img_array & 0x001F) * 8 # 5 bits for blue (0-31) scaled to 0-255 img_rgb[:,:,0] = b # OpenCV uses BGR order img_rgb[:,:,1] = g img_rgb[:,:,2] = r # Save as PNG png_filename = f"{filename_base}.png" cv2.imwrite(png_filename, img_rgb) print(f"Saved PNG image: {png_filename}") # Save as JPG with 90% quality jpg_filename = f"{filename_base}.jpg" cv2.imwrite(jpg_filename, img_rgb, [cv2.IMWRITE_JPEG_QUALITY, 90]) print(f"Saved JPG image: {jpg_filename}") # Also save the raw numpy array for further analysis if needed np_filename = f"{filename_base}.npy" np.save(np_filename, img_array) print(f"Saved NumPy array: {np_filename}") # Save raw binary data too raw_filename = f"{filename_base}.raw" with open(raw_filename, "wb") as f: f.write(frame_data) print(f"Saved raw data: {raw_filename}") return True except Exception as e: print(f"Error saving image: {e}") import traceback traceback.print_exc() return False def receive_frame(): """Receive a complete frame from the serial port.""" try: with serial.Serial(PORT, BAUD_RATE, timeout=TIMEOUT) as ser: print(f"Monitoring serial port: {ser.name}") line = "" in_binary_section = False binary_data = bytearray() # Safety check - maximum buffer size to prevent infinite collection MAX_BUFFER_SIZE = 500000 # 500KB should be more than enough for a 160x120 frame while True: # Read data from serial port if ser.in_waiting > 0: # Try to read a line first to check for markers c = ser.read(1) # If in binary section, collect binary data if in_binary_section: binary_data.append(c[0]) # Safety check - if buffer gets too large, abort if len(binary_data) > MAX_BUFFER_SIZE: print(f"WARNING: Binary data exceeded maximum buffer size ({MAX_BUFFER_SIZE})") print("Aborting binary collection to prevent memory issues") return None # Show progress for large binary sections if len(binary_data) % 1000 == 0: print(f"Collected {len(binary_data)} bytes of binary data") # Check if we've accumulated enough data to check for the end marker # Add the character to our text buffer as well for end marker detection if c.isascii() and not c.isspace(): line += c.decode('ascii', errors='replace') else: # For whitespace characters, we need special handling if c == b'\n': # If the current line contains our end marker if BINARY_END_MARKER in line: print(f"Binary end marker detected in line: {line}") in_binary_section = False print(f"Total binary data collected: {len(binary_data)} bytes") # Print binary data summary print("First 32 bytes:", debug_hex(binary_data[:32])) print("Last 32 bytes:", debug_hex(binary_data[-32:])) # Find the last occurrence of FRAME_FOOTER footer_pos = -1 for i in range(len(binary_data) - len(FRAME_FOOTER)): if binary_data[i:i+len(FRAME_FOOTER)] == FRAME_FOOTER: footer_pos = i if footer_pos >= 0: print(f"Frame footer found at position {footer_pos}: {debug_hex(FRAME_FOOTER)}") # Trim the binary data to end at the footer + footer length binary_data = binary_data[:footer_pos + len(FRAME_FOOTER)] else: print(f"WARNING: Frame footer {debug_hex(FRAME_FOOTER)} not found in binary data") # Process the binary data return process_binary_data(binary_data) line = "" elif c == b'\r': pass # Ignore CR characters else: # Normal text processing if c == b'\n': # Check if this line is the start marker if BINARY_START_MARKER in line: print("Binary start marker detected") in_binary_section = True binary_data = bytearray() line = "" else: print(f"Log: {line}") line = "" else: line += c.decode('ascii', errors='replace') else: # No data available, sleep briefly time.sleep(0.01) except serial.SerialException as e: print(f"Serial error: {e}") return None except Exception as e: print(f"Unexpected error: {e}") import traceback traceback.print_exc() return None def process_binary_data(data): """Process the binary data to extract the frame.""" print(f"Processing {len(data)} bytes of binary data") # Search for the frame header header_pos = -1 for i in range(len(data) - len(FRAME_HEADER)): if data[i:i+len(FRAME_HEADER)] == FRAME_HEADER: header_pos = i break if header_pos == -1: print("Error: Frame header not found") return None print(f"Frame header found at position {header_pos}") # Extract the frame size size_start = header_pos + len(FRAME_HEADER) size_bytes = data[size_start:size_start+4] if len(size_bytes) < 4: print("Error: Incomplete size bytes") return None frame_size = struct.unpack('