diff --git a/selfdrive/app/commalisten.py b/selfdrive/app/commalisten.py new file mode 100644 index 0000000..9de9f4d --- /dev/null +++ b/selfdrive/app/commalisten.py @@ -0,0 +1,536 @@ +#!/usr/bin/env python3 +import json +import socket +import sys +import threading +import time +from datetime import datetime +import argparse +import traceback + +# 处理Windows环境 +try: + import curses +except ImportError: + try: + # Windows平台上尝试导入windows-curses + import windows_curses as curses + except ImportError: + print("无法导入curses模块。在Windows上需要安装windows-curses包。") + print("请运行: pip install windows-curses") + sys.exit(1) + +class CommaListener: + def __init__(self, port=8088): + """初始化接收器""" + self.port = port + self.data = {} + self.last_update = 0 + self.running = True + self.device_ip = None + + def start_listening(self): + """启动UDP监听""" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + sock.bind(('0.0.0.0', self.port)) + print(f"正在监听端口 {self.port} 的广播数据...") + + while self.running: + try: + data, addr = sock.recvfrom(4096) + self.device_ip = addr[0] + try: + self.data = json.loads(data.decode('utf-8')) + self.last_update = time.time() + except json.JSONDecodeError: + print(f"接收到无效的JSON数据: {data[:100]}...") + except Exception as e: + print(f"接收数据时出错: {e}") + except Exception as e: + print(f"无法绑定到端口 {self.port}: {e}") + finally: + sock.close() + +class UI: + def __init__(self, listener): + """初始化UI""" + self.listener = listener + self.stdscr = None + self.running = True + self.current_page = 0 + self.pages = ["车辆信息", "设备信息", "位置信息", "扩展车辆信息", "原始数据"] + self.max_pages = len(self.pages) + + # 颜色配对 + self.COLOR_NORMAL = 1 + self.COLOR_HIGHLIGHT = 2 + self.COLOR_STATUS_OK = 3 + self.COLOR_STATUS_WARNING = 4 + self.COLOR_STATUS_ERROR = 5 + self.COLOR_HEADER = 6 + self.COLOR_DATA = 7 + self.COLOR_TITLE = 8 + + # 布局参数 + self.max_height = 0 + self.max_width = 0 + + def run(self): + """运行UI""" + curses.wrapper(self.main) + + def main(self, stdscr): + """主UI循环""" + self.stdscr = stdscr + self.init_colors() + + curses.curs_set(0) # 隐藏光标 + self.stdscr.timeout(100) # 设置getch非阻塞超时 + + while self.running: + self.stdscr.clear() + self.max_height, self.max_width = self.stdscr.getmaxyx() + + # 处理按键 + self.handle_input() + + # 绘制界面 + self.draw_header() + self.draw_status() + + # 根据当前页面绘制内容 + if self.current_page == 0: + self.draw_car_info() + elif self.current_page == 1: + self.draw_device_info() + elif self.current_page == 2: + self.draw_location_info() + elif self.current_page == 3: + self.draw_extended_car_info() + elif self.current_page == 4: + self.draw_raw_data() + + self.draw_footer() + + self.stdscr.refresh() + time.sleep(0.1) + + def init_colors(self): + """初始化颜色配对""" + curses.start_color() + curses.use_default_colors() + curses.init_pair(self.COLOR_NORMAL, curses.COLOR_WHITE, -1) + curses.init_pair(self.COLOR_HIGHLIGHT, curses.COLOR_BLACK, curses.COLOR_WHITE) + curses.init_pair(self.COLOR_STATUS_OK, curses.COLOR_GREEN, -1) + curses.init_pair(self.COLOR_STATUS_WARNING, curses.COLOR_YELLOW, -1) + curses.init_pair(self.COLOR_STATUS_ERROR, curses.COLOR_RED, -1) + curses.init_pair(self.COLOR_HEADER, curses.COLOR_CYAN, -1) + curses.init_pair(self.COLOR_DATA, curses.COLOR_GREEN, -1) + curses.init_pair(self.COLOR_TITLE, curses.COLOR_MAGENTA, -1) + + def handle_input(self): + """处理按键""" + key = self.stdscr.getch() + if key == ord('q'): + self.running = False + elif key == ord('n') or key == curses.KEY_RIGHT: + self.current_page = (self.current_page + 1) % self.max_pages + elif key == ord('p') or key == curses.KEY_LEFT: + self.current_page = (self.current_page - 1) % self.max_pages + + def draw_header(self): + """绘制顶部标题栏""" + title = "CommaAssist 数据监视器" + x = max(0, (self.max_width - len(title)) // 2) + self.stdscr.attron(curses.color_pair(self.COLOR_HEADER) | curses.A_BOLD) + self.safe_addstr(0, x, title) + self.stdscr.attroff(curses.color_pair(self.COLOR_HEADER) | curses.A_BOLD) + + # 绘制页面标签 + self.safe_addstr(1, 0, " " * (self.max_width - 1), curses.color_pair(self.COLOR_HIGHLIGHT)) + x = 2 + for i, page in enumerate(self.pages): + if i == self.current_page: + self.safe_addstr(1, x, f" {page} ", curses.color_pair(self.COLOR_HIGHLIGHT) | curses.A_BOLD) + else: + self.safe_addstr(1, x, f" {page} ", curses.color_pair(self.COLOR_NORMAL)) + x += len(page) + 3 + + def draw_status(self): + """绘制状态栏""" + current_time = time.time() + status_line = 2 + + if not self.listener.data: + self.safe_addstr(status_line, 0, "等待数据...", curses.color_pair(self.COLOR_STATUS_WARNING)) + return + + time_diff = current_time - self.listener.last_update + if time_diff > 5: + self.safe_addstr(status_line, 0, f"数据已过期! 上次更新: {datetime.fromtimestamp(self.listener.last_update).strftime('%H:%M:%S')}", + curses.color_pair(self.COLOR_STATUS_ERROR)) + else: + self.safe_addstr(status_line, 0, f"已连接到 {self.listener.device_ip} - 最后更新: {datetime.fromtimestamp(self.listener.last_update).strftime('%H:%M:%S')}", + curses.color_pair(self.COLOR_STATUS_OK)) + + def draw_footer(self): + """绘制底部控制栏""" + footer = "操作: [q]退出 [←/p]上一页 [→/n]下一页" + y = self.max_height - 1 + x = max(0, (self.max_width - len(footer)) // 2) + + # 防止超出屏幕边界 + try: + # 修复:确保不超出屏幕边界 + if y > 0 and self.max_width > 0: + # 使用safe_addstr方法避免边界问题 + self.safe_addstr(y, 0, " " * (self.max_width - 1), curses.color_pair(self.COLOR_HIGHLIGHT)) + self.safe_addstr(y, x, footer[:self.max_width - x - 1], curses.color_pair(self.COLOR_HIGHLIGHT)) + except curses.error: + # 忽略curses边界错误 + pass + + def safe_addstr(self, y, x, text, attr=0): + """安全地添加字符串,避免边界问题""" + try: + # 确保y和x是有效坐标 + height, width = self.stdscr.getmaxyx() + if y < 0 or y >= height or x < 0: + return + + # 计算可以显示的最大长度 + max_len = min(len(text), width - x - 1) + if max_len <= 0: + return + + # 确保不会写入最后一个字符位置 + self.stdscr.addstr(y, x, text[:max_len], attr) + except curses.error: + # 捕获并忽略curses错误 + pass + + def draw_car_info(self): + """绘制车辆信息""" + if not self.listener.data or 'car' not in self.listener.data: + self.draw_no_data("车辆数据不可用") + return + + car = self.listener.data.get('car', {}) + + self.draw_section_title("基本车辆信息", 4) + + # 速度和档位 + data = [ + ("当前速度", f"{car.get('speed', 0):.1f} km/h"), + ("巡航速度", f"{car.get('cruise_speed', 0):.1f} km/h"), + ("档位", car.get('gear_shifter', 'Unknown')), + ("车门状态", "打开" if car.get('door_open', False) else "关闭"), + ] + self.draw_data_section(data, 5, 0, self.max_width // 2) + + # 方向盘和踏板 + data = [ + ("方向盘角度", f"{car.get('steering_angle', 0):.1f}°"), + ("转向力矩", f"{car.get('steering_torque', 0):.1f} Nm"), + ("制动踏板", "已踩下" if car.get('brake_pressed', False) else "释放"), + ("油门踏板", "已踩下" if car.get('gas_pressed', False) else "释放"), + ] + self.draw_data_section(data, 5, self.max_width // 2, self.max_width // 2) + + # 转向灯状态 + self.draw_section_title("信号灯状态", 10) + left_blinker = car.get('left_blinker', False) + right_blinker = car.get('right_blinker', False) + + blinker_status = "无" + if left_blinker and right_blinker: + blinker_status = "双闪" + elif left_blinker: + blinker_status = "左转" + elif right_blinker: + blinker_status = "右转" + + self.safe_addstr(11, 2, f"转向灯: {blinker_status}", curses.color_pair(self.COLOR_DATA)) + + def draw_device_info(self): + """绘制设备信息""" + if not self.listener.data or 'device' not in self.listener.data: + self.draw_no_data("设备数据不可用") + return + + device = self.listener.data.get('device', {}) + + self.draw_section_title("设备状态", 4) + + # 设备基本信息 + data = [ + ("IP地址", device.get('ip', 'Unknown')), + ("内存使用", f"{device.get('mem_usage', 0):.1f}%"), + ("CPU温度", f"{device.get('cpu_temp', 0):.1f}°C"), + ("可用空间", f"{device.get('free_space', 0):.1f}%"), + ] + self.draw_data_section(data, 5, 0, self.max_width // 2) + + # 电池信息 + battery = device.get('battery', {}) + data = [ + ("电池电量", f"{battery.get('percent', 0)}%"), + ("电池电压", f"{battery.get('voltage', 0):.2f}V"), + ("电池电流", f"{battery.get('status', 0):.2f}A"), + ("充电状态", "充电中" if not battery.get('charging', False) else "未充电"), + ] + self.draw_data_section(data, 5, self.max_width // 2, self.max_width // 2) + + def draw_location_info(self): + """绘制位置信息""" + if not self.listener.data or 'location' not in self.listener.data: + self.draw_no_data("位置数据不可用") + return + + location = self.listener.data.get('location', {}) + + if not location.get('gps_valid', False): + self.draw_section_title("GPS状态", 4) + self.safe_addstr(5, 2, "GPS信号无效或未获取", curses.color_pair(self.COLOR_STATUS_ERROR)) + return + + self.draw_section_title("GPS位置", 4) + + # GPS基本信息 + data = [ + ("纬度", f"{location.get('latitude', 0):.6f}"), + ("经度", f"{location.get('longitude', 0):.6f}"), + ("海拔", f"{location.get('altitude', 0):.1f}m"), + ("GPS精度", f"{location.get('accuracy', 0):.1f}m"), + ] + self.draw_data_section(data, 5, 0, self.max_width // 2) + + # 运动信息 + data = [ + ("方向", f"{location.get('bearing', 0):.1f}°"), + ("GPS速度", f"{location.get('speed', 0):.1f}km/h"), + ] + self.draw_data_section(data, 5, self.max_width // 2, self.max_width // 2) + + # 导航信息 + if 'navigation' in self.listener.data: + nav = self.listener.data.get('navigation', {}) + + self.draw_section_title("导航信息", 10) + + # 格式化距离 + dist = nav.get('distance_remaining', 0) + if dist > 1000: + dist_str = f"{dist/1000:.1f}km" + else: + dist_str = f"{dist:.0f}m" + + # 格式化时间 + time_sec = nav.get('time_remaining', 0) + minutes = int(time_sec // 60) + seconds = int(time_sec % 60) + time_str = f"{minutes}分{seconds}秒" + + data = [ + ("剩余距离", dist_str), + ("剩余时间", time_str), + ("道路限速", f"{nav.get('speed_limit', 0):.1f}km/h"), + ] + self.draw_data_section(data, 11, 0, self.max_width // 2) + + if nav.get('maneuver_distance', 0) > 0: + data = [ + ("下一动作", nav.get('maneuver_text', '')), + ("动作距离", f"{nav.get('maneuver_distance', 0)}m"), + ] + self.draw_data_section(data, 11, self.max_width // 2, self.max_width // 2) + + def draw_extended_car_info(self): + """绘制扩展车辆信息""" + if not self.listener.data or 'car_info' not in self.listener.data: + self.draw_no_data("扩展车辆数据不可用") + return + + car_info = self.listener.data.get('car_info', {}) + + # 绘制基本信息 + if 'basic' in car_info: + basic = car_info.get('basic', {}) + self.draw_section_title("车辆基本信息", 4) + + data = [ + ("车型", basic.get('car_model', 'Unknown')), + ("车辆指纹", basic.get('fingerprint', 'Unknown')), + ("重量", basic.get('weight', 'Unknown')), + ("轴距", basic.get('wheelbase', 'Unknown')), + ("转向比", basic.get('steering_ratio', 'Unknown')), + ] + self.draw_data_section(data, 5, 0, self.max_width) + + # 绘制详细车辆信息 + if 'details' not in car_info: + return + + details = car_info.get('details', {}) + row = 10 + + # 巡航控制信息 + if 'cruise' in details: + cruise = details.get('cruise', {}) + self.draw_section_title("巡航控制", row) + row += 1 + + data = [ + ("巡航状态", "开启" if cruise.get('enabled', False) else "关闭"), + ("自适应巡航", "可用" if cruise.get('available', False) else "不可用"), + ("设定速度", f"{cruise.get('speed', 0):.1f}km/h"), + ] + + if 'gap' in cruise: + data.append(("跟车距离", str(cruise.get('gap', 0)))) + + self.draw_data_section(data, row, 0, self.max_width) + row += len(data) + 1 + + # 车轮速度 + if 'wheel_speeds' in details: + ws = details.get('wheel_speeds', {}) + self.draw_section_title("车轮速度", row) + row += 1 + + data = [ + ("左前", f"{ws.get('fl', 0):.1f}km/h"), + ("右前", f"{ws.get('fr', 0):.1f}km/h"), + ("左后", f"{ws.get('rl', 0):.1f}km/h"), + ("右后", f"{ws.get('rr', 0):.1f}km/h"), + ] + self.draw_data_section(data, row, 0, self.max_width // 2) + row += len(data) + 1 + + # 安全系统 + if 'safety_systems' in details and details['safety_systems']: + ss = details.get('safety_systems', {}) + self.draw_section_title("安全系统", row) + row += 1 + + data = [] + if 'esp_disabled' in ss: + data.append(("ESP状态", "禁用" if ss.get('esp_disabled', False) else "正常")) + if 'abs_active' in ss: + data.append(("ABS状态", "激活" if ss.get('abs_active', False) else "正常")) + if 'tcs_active' in ss: + data.append(("牵引力控制", "激活" if ss.get('tcs_active', False) else "正常")) + if 'collision_warning' in ss: + data.append(("碰撞警告", "警告" if ss.get('collision_warning', False) else "正常")) + + if data: + self.draw_data_section(data, row, 0, self.max_width // 2) + row += len(data) + 1 + + # 盲点检测 + if 'blind_spot' in details and details['blind_spot']: + bs = details.get('blind_spot', {}) + self.draw_section_title("盲点监测", row) + row += 1 + + data = [] + if 'left' in bs: + data.append(("左侧", "检测到车辆" if bs.get('left', False) else "无车辆")) + if 'right' in bs: + data.append(("右侧", "检测到车辆" if bs.get('right', False) else "无车辆")) + + if data: + self.draw_data_section(data, row, 0, self.max_width // 2) + row += len(data) + 1 + + # 其他信息 + if 'other' in details and details['other']: + other = details.get('other', {}) + self.draw_section_title("其他信息", row) + row += 1 + + data = [] + if 'outside_temp' in other: + data.append(("车外温度", f"{other.get('outside_temp', 0):.1f}°C")) + if 'fuel_range' in other: + data.append(("续航里程", f"{other.get('fuel_range', 0):.1f}km")) + if 'odometer' in other: + data.append(("里程表", f"{other.get('odometer', 0):.1f}km")) + if 'fuel_consumption' in other: + data.append(("油耗", f"{other.get('fuel_consumption', 0):.1f}L/100km")) + + if data: + self.draw_data_section(data, row, 0, self.max_width // 2) + + def draw_raw_data(self): + """绘制原始数据""" + if not self.listener.data: + self.draw_no_data("没有数据") + return + + self.draw_section_title("原始JSON数据", 4) + + try: + json_str = json.dumps(self.listener.data, indent=2) + lines = json_str.split('\n') + + for i, line in enumerate(lines): + if 4 + i + 1 >= self.max_height - 1: # 保留底部状态栏 + self.safe_addstr(4 + i, 0, "... (内容过多无法完全显示)", curses.color_pair(self.COLOR_STATUS_WARNING)) + break + if len(line) > self.max_width: + line = line[:self.max_width - 3] + "..." + self.safe_addstr(4 + i + 1, 0, line) + except Exception as e: + self.safe_addstr(5, 0, f"无法显示JSON数据: {e}", curses.color_pair(self.COLOR_STATUS_ERROR)) + + def draw_no_data(self, message): + """绘制无数据消息""" + y = self.max_height // 2 + x = max(0, (self.max_width - len(message)) // 2) + self.safe_addstr(y, x, message, curses.color_pair(self.COLOR_STATUS_WARNING)) + + def draw_section_title(self, title, row): + """绘制区域标题""" + self.safe_addstr(row, 0, title, curses.color_pair(self.COLOR_TITLE) | curses.A_BOLD) + + def draw_data_section(self, data_list, start_row, start_col, width): + """绘制数据区域""" + for i, (label, value) in enumerate(data_list): + row = start_row + i + + if row >= self.max_height - 1: # 避免超出屏幕底部 + break + + self.safe_addstr(row, start_col + 2, f"{label}: ", curses.color_pair(self.COLOR_NORMAL)) + try: + self.stdscr.addstr(value, curses.color_pair(self.COLOR_DATA)) + except curses.error: + pass + +def main(): + """主函数""" + parser = argparse.ArgumentParser(description='CommaAssist 终端监控器') + parser.add_argument('-p', '--port', type=int, default=8088, help='监听端口(默认: 8088)') + args = parser.parse_args() + + # 创建并启动监听器 + listener = CommaListener(port=args.port) + listener_thread = threading.Thread(target=listener.start_listening) + listener_thread.daemon = True + listener_thread.start() + + # 创建并运行UI + ui = UI(listener) + try: + ui.run() + except KeyboardInterrupt: + pass + finally: + listener.running = False + print("正在退出...") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/selfdrive/app/commassit.py b/selfdrive/app/commassit.py new file mode 100644 index 0000000..faa6c81 --- /dev/null +++ b/selfdrive/app/commassit.py @@ -0,0 +1,498 @@ +#!/usr/bin/env python3 +import fcntl +import json +import math +import os +import socket +import struct +import threading +import time +import traceback +import zmq +from datetime import datetime + +import cereal.messaging as messaging +from openpilot.common.realtime import Ratekeeper +from openpilot.common.params import Params +from openpilot.system.hardware import PC, TICI +try: + from selfdrive.car.car_helpers import interfaces + HAS_CAR_INTERFACES = True +except ImportError: + HAS_CAR_INTERFACES = False + interfaces = None + +class CommaAssist: + def __init__(self): + print("初始化 CommaAssist 服务...") + # 初始化参数 + self.params = Params() + + # 订阅需要的数据源 + self.sm = messaging.SubMaster(['deviceState', 'carState', 'controlsState', + 'longitudinalPlan', 'liveLocationKalman', + 'navInstruction', 'modelV2']) + + # 网络相关配置 + self.broadcast_ip = self.get_broadcast_address() + if self.broadcast_ip is None: + self.broadcast_ip = "255.255.255.255" # 使用通用广播地址作为备选 + self.broadcast_port = 8088 + self.ip_address = "0.0.0.0" + self.is_running = True + + # 获取车辆信息 + self.car_info = {} + self.load_car_info() + + # 启动广播线程 + threading.Thread(target=self.broadcast_data).start() + + def load_car_info(self): + """加载车辆基本信息""" + try: + # 获取车辆型号信息 + try: + car_model = self.params.get("CarModel", encoding='utf8') + self.car_info["car_name"] = car_model if car_model else "Unknown" + except Exception as e: + print(f"无法获取CarModel参数: {e}") + try: + # 尝试获取其他可能的车辆参数 + car_params = self.params.get("CarParamsCache", encoding='utf8') + self.car_info["car_name"] = "通过CarParamsCache获取" if car_params else "Unknown" + except Exception: + self.car_info["car_name"] = "Unknown Model" + car_model = None + + # 检查车辆接口是否可用 + if not HAS_CAR_INTERFACES: + print("车辆接口模块不可用") + elif not car_model: + print("无有效的车型信息") + elif not isinstance(interfaces, list): + print("车辆接口不是列表类型,尝试转换...") + # 尝试获取车辆接口的具体实现 + if hasattr(interfaces, '__call__'): + # 如果interfaces是一个函数,尝试直接获取车辆指纹 + try: + self.car_info["car_fingerprint"] = f"直接从车型{car_model}获取" + print(f"直接从车型识别: {car_model}") + except Exception as e: + print(f"无法从车型直接获取指纹: {e}") + else: + # 正常遍历接口列表 + print("尝试从车辆接口中获取指纹信息...") + for interface in interfaces: + if not hasattr(interface, 'CHECKSUM'): + continue + + try: + if isinstance(interface.CHECKSUM, dict) and 'pt' in interface.CHECKSUM: + if car_model in interface.CHECKSUM["pt"]: + platform = interface + self.car_info["car_fingerprint"] = platform.config.platform_str + + # 获取车辆规格参数 + specs = platform.config.specs + if specs: + if hasattr(specs, 'mass'): + self.car_info["mass"] = specs.mass + if hasattr(specs, 'wheelbase'): + self.car_info["wheelbase"] = specs.wheelbase + if hasattr(specs, 'steerRatio'): + self.car_info["steerRatio"] = specs.steerRatio + break + except Exception as e: + print(f"处理特定车辆接口异常: {e}") + + except Exception as e: + print(f"加载车辆信息失败: {e}") + traceback.print_exc() + + # 确保基本字段存在,避免后续访问出错 + if "car_name" not in self.car_info: + self.car_info["car_name"] = "Unknown Model" + if "car_fingerprint" not in self.car_info: + self.car_info["car_fingerprint"] = "Unknown Fingerprint" + + print(f"车辆信息加载完成: {self.car_info}") + + def get_broadcast_address(self): + """获取广播地址""" + try: + if PC: + iface = b'br0' + else: + iface = b'wlan0' + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + ip = fcntl.ioctl( + s.fileno(), + 0x8919, # SIOCGIFADDR + struct.pack('256s', iface) + )[20:24] + ip_str = socket.inet_ntoa(ip) + print(f"获取到IP地址: {ip_str}") + # 从IP地址构造广播地址 + ip_parts = ip_str.split('.') + return f"{ip_parts[0]}.{ip_parts[1]}.{ip_parts[2]}.255" + except (OSError, Exception) as e: + print(f"获取广播地址失败: {e}") + return None + + def get_local_ip(self): + """获取本地IP地址""" + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("8.8.8.8", 80)) + return s.getsockname()[0] + except Exception as e: + print(f"获取本地IP失败: {e}") + return "127.0.0.1" + + def make_data_message(self): + """构建广播消息内容""" + # 基本消息结构 + message = { + "timestamp": int(time.time()), + "device": { + "ip": self.ip_address, + "battery": {}, + "mem_usage": 0, + "cpu_temp": 0, + "free_space": 0 + }, + "car": { + "speed": 0, + "cruise_speed": 0, + "gear_shifter": "unknown", + "steering_angle": 0, + "steering_torque": 0, + "brake_pressed": False, + "gas_pressed": False, + "door_open": False, + "left_blinker": False, + "right_blinker": False + }, + "location": { + "latitude": 0, + "longitude": 0, + "bearing": 0, + "speed": 0, + "altitude": 0, + "accuracy": 0, + "gps_valid": False + }, + "car_info": { + "basic": { + "car_model": self.car_info.get("car_name", "Unknown"), + "fingerprint": self.car_info.get("car_fingerprint", "Unknown"), + "weight": f"{self.car_info.get('mass', 0):.0f} kg" if 'mass' in self.car_info else "Unknown", + "wheelbase": f"{self.car_info.get('wheelbase', 0):.3f} m" if 'wheelbase' in self.car_info else "Unknown", + "steering_ratio": f"{self.car_info.get('steerRatio', 0):.1f}" if 'steerRatio' in self.car_info else "Unknown" + } + } + } + + # 安全地获取设备信息 + try: + if self.sm.updated['deviceState'] and self.sm.valid['deviceState']: + device_state = self.sm['deviceState'] + + # 获取设备信息 - 使用getattr安全地访问属性 + message["device"]["mem_usage"] = getattr(device_state, 'memoryUsagePercent', 0) + message["device"]["free_space"] = getattr(device_state, 'freeSpacePercent', 0) + + # CPU温度 + cpu_temps = getattr(device_state, 'cpuTempC', []) + if isinstance(cpu_temps, list) and len(cpu_temps) > 0: + message["device"]["cpu_temp"] = cpu_temps[0] + + # 电池信息 + try: # 额外的错误处理,因为这是常见错误点 + message["device"]["battery"]["percent"] = getattr(device_state, 'batteryPercent', 0) + message["device"]["battery"]["status"] = getattr(device_state, 'batteryCurrent', 0) + message["device"]["battery"]["voltage"] = getattr(device_state, 'batteryVoltage', 0) + message["device"]["battery"]["charging"] = getattr(device_state, 'chargingError', False) + except Exception as e: + print(f"获取电池信息失败: {e}") + except Exception as e: + print(f"获取设备状态出错: {e}") + + # 安全地获取车辆信息 + try: + if self.sm.updated['carState'] and self.sm.valid['carState']: + CS = self.sm['carState'] + + # 基本车辆信息 + message["car"]["speed"] = getattr(CS, 'vEgo', 0) * 3.6 # m/s转km/h + message["car"]["gear_shifter"] = str(getattr(CS, 'gearShifter', "unknown")) + message["car"]["steering_angle"] = getattr(CS, 'steeringAngleDeg', 0) + message["car"]["steering_torque"] = getattr(CS, 'steeringTorque', 0) + message["car"]["brake_pressed"] = getattr(CS, 'brakePressed', False) + message["car"]["gas_pressed"] = getattr(CS, 'gasPressed', False) + message["car"]["door_open"] = getattr(CS, 'doorOpen', False) + message["car"]["left_blinker"] = getattr(CS, 'leftBlinker', False) + message["car"]["right_blinker"] = getattr(CS, 'rightBlinker', False) + + # 扩展的车辆状态信息 + is_car_started = getattr(CS, 'vEgo', 0) > 0.1 + is_car_engaged = False + + # 详细车辆信息 + car_details = {} + + # 车辆状态 + status = { + "running_status": "Moving" if is_car_started else "Stopped", + "door_open": getattr(CS, 'doorOpen', False), + "seatbelt_unlatched": getattr(CS, 'seatbeltUnlatched', False), + } + + # 引擎信息 + engine_info = {} + if hasattr(CS, 'engineRpm') and CS.engineRpm > 0: + engine_info["rpm"] = f"{CS.engineRpm:.0f}" + car_details["engine"] = engine_info + + # 巡航控制 + cruise_info = {} + if hasattr(CS, 'cruiseState'): + is_car_engaged = getattr(CS.cruiseState, 'enabled', False) + cruise_info["enabled"] = getattr(CS.cruiseState, 'enabled', False) + cruise_info["available"] = getattr(CS.cruiseState, 'available', False) + cruise_info["speed"] = getattr(CS.cruiseState, 'speed', 0) * 3.6 + + if hasattr(CS, 'pcmCruiseGap'): + cruise_info["gap"] = CS.pcmCruiseGap + + status["cruise_engaged"] = is_car_engaged + car_details["cruise"] = cruise_info + + # 车轮速度 + wheel_speeds = {} + if hasattr(CS, 'wheelSpeeds'): + ws = CS.wheelSpeeds + wheel_speeds["fl"] = getattr(ws, 'fl', 0) * 3.6 + wheel_speeds["fr"] = getattr(ws, 'fr', 0) * 3.6 + wheel_speeds["rl"] = getattr(ws, 'rl', 0) * 3.6 + wheel_speeds["rr"] = getattr(ws, 'rr', 0) * 3.6 + car_details["wheel_speeds"] = wheel_speeds + + # 方向盘信息 + steering = { + "angle": getattr(CS, 'steeringAngleDeg', 0), + "torque": getattr(CS, 'steeringTorque', 0), + } + if hasattr(CS, 'steeringRateDeg'): + steering["rate"] = CS.steeringRateDeg + car_details["steering"] = steering + + # 踏板状态 + pedals = { + "gas_pressed": getattr(CS, 'gasPressed', False), + "brake_pressed": getattr(CS, 'brakePressed', False), + } + if hasattr(CS, 'gas'): + pedals["throttle_position"] = CS.gas * 100 + if hasattr(CS, 'brake'): + pedals["brake_pressure"] = CS.brake * 100 + car_details["pedals"] = pedals + + # 安全系统 + safety_systems = {} + if hasattr(CS, 'espDisabled'): + safety_systems["esp_disabled"] = CS.espDisabled + if hasattr(CS, 'absActive'): + safety_systems["abs_active"] = CS.absActive + if hasattr(CS, 'tcsActive'): + safety_systems["tcs_active"] = CS.tcsActive + if hasattr(CS, 'collisionWarning'): + safety_systems["collision_warning"] = CS.collisionWarning + car_details["safety_systems"] = safety_systems + + # 车门状态 + doors = { + "driver": getattr(CS, 'doorOpen', False) + } + if hasattr(CS, 'passengerDoorOpen'): + doors["passenger"] = CS.passengerDoorOpen + if hasattr(CS, 'trunkOpen'): + doors["trunk"] = CS.trunkOpen + if hasattr(CS, 'hoodOpen'): + doors["hood"] = CS.hoodOpen + car_details["doors"] = doors + + # 灯光状态 + lights = { + "left_blinker": getattr(CS, 'leftBlinker', False), + "right_blinker": getattr(CS, 'rightBlinker', False), + } + if hasattr(CS, 'genericToggle'): + lights["high_beam"] = CS.genericToggle + if hasattr(CS, 'lowBeamOn'): + lights["low_beam"] = CS.lowBeamOn + car_details["lights"] = lights + + # 盲点监测 + blind_spot = {} + if hasattr(CS, 'leftBlindspot'): + blind_spot["left"] = CS.leftBlindspot + if hasattr(CS, 'rightBlindspot'): + blind_spot["right"] = CS.rightBlindspot + if blind_spot: + car_details["blind_spot"] = blind_spot + + # 其他可选信息 + other_info = {} + if hasattr(CS, 'outsideTemp'): + other_info["outside_temp"] = CS.outsideTemp + if hasattr(CS, 'fuelGauge'): + other_info["fuel_range"] = CS.fuelGauge + if hasattr(CS, 'odometer'): + other_info["odometer"] = CS.odometer + if hasattr(CS, 'instantFuelConsumption'): + other_info["fuel_consumption"] = CS.instantFuelConsumption + if other_info: + car_details["other"] = other_info + + # 更新状态和详细信息 + message["car_info"]["status"] = status + message["car_info"]["details"] = car_details + + if self.sm.updated['controlsState'] and self.sm.valid['controlsState']: + controls_state = self.sm['controlsState'] + message["car"]["cruise_speed"] = getattr(controls_state, 'vCruise', 0) + + # 额外的控制状态信息 + controls_info = {} + if hasattr(controls_state, 'enabled'): + controls_info["enabled"] = controls_state.enabled + if hasattr(controls_state, 'active'): + controls_info["active"] = controls_state.active + if hasattr(controls_state, 'alertText1'): + controls_info["alert_text"] = controls_state.alertText1 + if controls_info: + message["car_info"]["controls"] = controls_info + + except Exception as e: + print(f"获取车辆信息出错: {e}") + traceback.print_exc() + + # 安全地获取GPS位置信息 + try: + if self.sm.updated['liveLocationKalman'] and self.sm.valid['liveLocationKalman']: + location = self.sm['liveLocationKalman'] + + # 检查GPS是否有效 + location_status = getattr(location, 'status', -1) + position_valid = False + if hasattr(location, 'positionGeodetic'): + position_valid = getattr(location.positionGeodetic, 'valid', False) + + gps_valid = (location_status == 0) and position_valid + message["location"]["gps_valid"] = gps_valid + + if gps_valid and hasattr(location, 'positionGeodetic') and hasattr(location.positionGeodetic, 'value'): + # 获取位置信息 + pos_value = location.positionGeodetic.value + if len(pos_value) >= 3: + message["location"]["latitude"] = pos_value[0] + message["location"]["longitude"] = pos_value[1] + message["location"]["altitude"] = pos_value[2] + + # 获取精度信息 + if hasattr(location, 'positionGeodeticStd') and hasattr(location.positionGeodeticStd, 'value'): + std_value = location.positionGeodeticStd.value + if len(std_value) > 0: + message["location"]["accuracy"] = std_value[0] + + # 获取方向信息 + if hasattr(location, 'calibratedOrientationNED') and hasattr(location.calibratedOrientationNED, 'value'): + orientation = location.calibratedOrientationNED.value + if len(orientation) > 2: + message["location"]["bearing"] = math.degrees(orientation[2]) + + # 设置速度信息 + car_state = self.sm['carState'] if self.sm.valid['carState'] else None + if car_state and hasattr(car_state, 'vEgo'): + message["location"]["speed"] = car_state.vEgo * 3.6 + except Exception as e: + print(f"获取位置信息出错: {e}") + + # 如果有导航指令,添加导航信息 + try: + if self.sm.valid['navInstruction']: + nav_instruction = self.sm['navInstruction'] + + nav_info = {} + nav_info["distance_remaining"] = getattr(nav_instruction, 'distanceRemaining', 0) + nav_info["time_remaining"] = getattr(nav_instruction, 'timeRemaining', 0) + nav_info["speed_limit"] = getattr(nav_instruction, 'speedLimit', 0) * 3.6 + nav_info["maneuver_distance"] = getattr(nav_instruction, 'maneuverDistance', 0) + nav_info["maneuver_type"] = getattr(nav_instruction, 'maneuverType', "") + nav_info["maneuver_modifier"] = getattr(nav_instruction, 'maneuverModifier', "") + nav_info["maneuver_text"] = getattr(nav_instruction, 'maneuverPrimaryText', "") + + message["navigation"] = nav_info + except Exception as e: + print(f"获取导航信息出错: {e}") + + try: + return json.dumps(message) + except Exception as e: + print(f"序列化消息出错: {e}") + return "{}" + + def broadcast_data(self): + """定期发送数据到广播地址""" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + rk = Ratekeeper(10, print_delay_threshold=None) # 10Hz广播频率 + + print(f"开始广播数据到 {self.broadcast_ip}:{self.broadcast_port}") + + while self.is_running: + try: + # 更新数据 + self.sm.update(0) + + # 更新IP地址 + ip_address = self.get_local_ip() + if ip_address != self.ip_address: + self.ip_address = ip_address + print(f"IP地址已更新: {ip_address}") + + # 构建并发送消息 + msg = self.make_data_message() + dat = msg.encode('utf-8') + sock.sendto(dat, (self.broadcast_ip, self.broadcast_port)) + + # 减少日志输出频率 + if rk.frame % 50 == 0: # 每5秒打印一次日志 + print(f"广播数据: {self.broadcast_ip}:{self.broadcast_port}") + + rk.keep_time() + except Exception as e: + print(f"广播数据错误: {e}") + traceback.print_exc() + time.sleep(1) + +def main(gctx=None): + """主函数 + 支持作为独立程序运行或由process_config启动 + gctx参数用于与openpilot进程管理系统兼容 + """ + comma_assist = CommaAssist() + + # 保持主线程运行 + try: + while True: + time.sleep(10) # 主线程休眠 + except KeyboardInterrupt: + comma_assist.is_running = False + print("CommaAssist服务已停止") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/selfdrive/app/commawebview.py b/selfdrive/app/commawebview.py new file mode 100644 index 0000000..61f6291 --- /dev/null +++ b/selfdrive/app/commawebview.py @@ -0,0 +1,858 @@ +#!/usr/bin/env python3 +import json +import socket +import threading +import time +import traceback +from datetime import datetime +import argparse +from flask import Flask, render_template_string, jsonify, render_template + +class CommaWebListener: + def __init__(self, port=8088): + """初始化接收器""" + self.port = port + self.data = {} + self.last_update = 0 + self.running = True + self.device_ip = None + + def start_listening(self): + """启动UDP监听""" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + sock.bind(('0.0.0.0', self.port)) + print(f"正在监听端口 {self.port} 的广播数据...") + + while self.running: + try: + data, addr = sock.recvfrom(4096) + self.device_ip = addr[0] + try: + self.data = json.loads(data.decode('utf-8')) + self.last_update = time.time() + except json.JSONDecodeError: + print(f"接收到无效的JSON数据: {data[:100]}...") + except Exception as e: + print(f"接收数据时出错: {e}") + except Exception as e: + print(f"无法绑定到端口 {self.port}: {e}") + finally: + sock.close() + +# HTML模板 +HTML_TEMPLATE = """ + + + + + + CommaAssist 数据监视器 + + + + +
+
+

CommaAssist 数据监视器

+
+ +
+ 等待来自comma3的数据... +
+ +
+ + +
+
+ + + +
+ +
+
+
+
+
基本车辆信息
+
+
+
等待车辆数据...
+
+
+
+ +
+
方向盘与控制系统
+
+
+
等待车辆数据...
+
+
+
+ +
+
详细车辆信息
+
+
+
等待车辆数据...
+
+
+
+
+ +
+
+
踏板与制动系统
+
+
+
等待车辆数据...
+
+
+
+ +
+
车门与信号灯
+
+
+
等待车辆数据...
+
+
+
+ +
+
巡航控制
+
+
+
等待车辆数据...
+
+
+
+
+
+
+ + +
+
+
设备状态
+
+
+
等待设备数据...
+
+
+
+ +
+
系统资源
+
+
+
等待设备数据...
+
+
+
+
+ + +
+
+
+
+
GPS位置
+
+
+
等待GPS数据...
+
+
+
+ +
+
导航信息
+
+ +
+
+
+ +
+
+
地图
+
+
+
+
+
+
+
+ + +
+
+
原始JSON数据
+
+
等待数据...
+
+
+
+
+
+ + + + + + + + +""" + +def create_app(listener): + app = Flask(__name__) + + @app.route('/') + def index(): + """主页""" + return render_template_string(HTML_TEMPLATE) + + @app.route('/api/data') + def get_data(): + """提供JSON数据API""" + return jsonify({ + 'data': listener.data, + 'last_update': listener.last_update, + 'device_ip': listener.device_ip + }) + + return app + +def main(): + """主函数""" + parser = argparse.ArgumentParser(description='CommaAssist Web监视器') + parser.add_argument('-p', '--port', type=int, default=8088, help='监听comma设备的UDP端口(默认: 8088)') + parser.add_argument('-w', '--web-port', type=int, default=5000, help='Web服务器端口(默认: 5000)') + parser.add_argument('--host', default='0.0.0.0', help='Web服务器监听地址(默认: 0.0.0.0)') + args = parser.parse_args() + + # 创建监听器 + listener = CommaWebListener(port=args.port) + + # 启动接收线程 + receiver_thread = threading.Thread(target=listener.start_listening) + receiver_thread.daemon = True + receiver_thread.start() + + # 创建Flask应用 + app = create_app(listener) + + # 启动Web服务器 + try: + print(f"Web服务已启动,请访问 http://{args.host}:{args.web_port}/") + app.run(host=args.host, port=args.web_port) + except KeyboardInterrupt: + print("\n退出...") + finally: + listener.running = False + receiver_thread.join(timeout=1.0) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/selfdrive/manager/process_config.py b/selfdrive/manager/process_config.py index 41d581d..2202306 100644 --- a/selfdrive/manager/process_config.py +++ b/selfdrive/manager/process_config.py @@ -101,6 +101,9 @@ procs = [ PythonProcess("fleet_manager", "selfdrive.frogpilot.fleetmanager.fleet_manager", always_run), PythonProcess("frogpilot_process", "selfdrive.frogpilot.frogpilot_process", always_run), PythonProcess("mapd", "selfdrive.frogpilot.navigation.mapd", always_run), + + # CommaAssist process + PythonProcess("commassist", "selfdrive.app.commassit", always_run), ] managed_processes = {p.name: p for p in procs}