import numpy as np from opendbc.can.packer import CANPacker from opendbc.car import Bus, DT_CTRL, apply_driver_steer_torque_limits, common_fault_avoidance, make_tester_present_msg, structs from opendbc.car.common.conversions import Conversions as CV from opendbc.car.hyundai import hyundaicanfd, hyundaican from opendbc.car.hyundai.carstate import CarState from opendbc.car.hyundai.hyundaicanfd import CanBus from opendbc.car.hyundai.values import HyundaiFlags, Buttons, CarControllerParams, CAR from opendbc.car.interfaces import CarControllerBase VisualAlert = structs.CarControl.HUDControl.VisualAlert LongCtrlState = structs.CarControl.Actuators.LongControlState # EPS faults if you apply torque while the steering angle is above 90 degrees for more than 1 second # All slightly below EPS thresholds to avoid fault MAX_ANGLE = 85 MAX_ANGLE_FRAMES = 89 MAX_ANGLE_CONSECUTIVE_FRAMES = 2 def process_hud_alert(enabled, fingerprint, hud_control): sys_warning = (hud_control.visualAlert in (VisualAlert.steerRequired, VisualAlert.ldw)) # initialize to no line visible # TODO: this is not accurate for all cars sys_state = 1 if hud_control.leftLaneVisible and hud_control.rightLaneVisible or sys_warning: # HUD alert only display when LKAS status is active sys_state = 3 if enabled or sys_warning else 4 elif hud_control.leftLaneVisible: sys_state = 5 elif hud_control.rightLaneVisible: sys_state = 6 # initialize to no warnings left_lane_warning = 0 right_lane_warning = 0 if hud_control.leftLaneDepart: left_lane_warning = 1 if fingerprint in (CAR.GENESIS_G90, CAR.GENESIS_G80) else 2 if hud_control.rightLaneDepart: right_lane_warning = 1 if fingerprint in (CAR.GENESIS_G90, CAR.GENESIS_G80) else 2 return sys_warning, sys_state, left_lane_warning, right_lane_warning class CarController(CarControllerBase): def __init__(self, dbc_names, CP): super().__init__(dbc_names, CP) self.CAN = CanBus(CP) self.params = CarControllerParams(CP) self.packer = CANPacker(dbc_names[Bus.pt]) self.angle_limit_counter = 0 self.accel_last = 0 self.apply_torque_last = 0 self.car_fingerprint = CP.carFingerprint self.last_button_frame = 0 def update(self, CC, CS, now_nanos): actuators = CC.actuators hud_control = CC.hudControl # steering torque new_torque = int(round(actuators.torque * self.params.STEER_MAX)) apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.params) # >90 degree steering fault prevention self.angle_limit_counter, apply_steer_req = common_fault_avoidance(abs(CS.out.steeringAngleDeg) >= MAX_ANGLE, CC.latActive, self.angle_limit_counter, MAX_ANGLE_FRAMES, MAX_ANGLE_CONSECUTIVE_FRAMES) if not CC.latActive: apply_torque = 0 # Hold torque with induced temporary fault when cutting the actuation bit torque_fault = CC.latActive and not apply_steer_req self.apply_torque_last = apply_torque # accel + longitudinal accel = float(np.clip(actuators.accel, CarControllerParams.ACCEL_MIN, CarControllerParams.ACCEL_MAX)) stopping = actuators.longControlState == LongCtrlState.stopping set_speed_in_units = hud_control.setSpeed * (CV.MS_TO_KPH if CS.is_metric else CV.MS_TO_MPH) # HUD messages sys_warning, sys_state, left_lane_warning, right_lane_warning = process_hud_alert(CC.enabled, self.car_fingerprint, hud_control) can_sends = [] # *** common hyundai stuff *** # tester present - w/ no response (keeps relevant ECU disabled) if self.frame % 100 == 0 and not (self.CP.flags & HyundaiFlags.CANFD_CAMERA_SCC) and self.CP.openpilotLongitudinalControl: # for longitudinal control, either radar or ADAS driving ECU addr, bus = 0x7d0, self.CAN.ECAN if self.CP.flags & HyundaiFlags.CANFD else 0 if self.CP.flags & HyundaiFlags.CANFD_LKA_STEERING.value: addr, bus = 0x730, self.CAN.ECAN can_sends.append(make_tester_present_msg(addr, bus, suppress_response=True)) # for blinkers if self.CP.flags & HyundaiFlags.ENABLE_BLINKERS: can_sends.append(make_tester_present_msg(0x7b1, self.CAN.ECAN, suppress_response=True)) # CAN-FD platforms if self.CP.flags & HyundaiFlags.CANFD: lka_steering = self.CP.flags & HyundaiFlags.CANFD_LKA_STEERING lka_steering_long = lka_steering and self.CP.openpilotLongitudinalControl # steering control can_sends.extend(hyundaicanfd.create_steering_messages(self.packer, self.CP, self.CAN, CC.enabled, apply_steer_req, apply_torque)) # prevent LFA from activating on LKA steering cars by sending "no lane lines detected" to ADAS ECU if self.frame % 5 == 0 and lka_steering: can_sends.append(hyundaicanfd.create_suppress_lfa(self.packer, self.CAN, CS.lfa_block_msg, self.CP.flags & HyundaiFlags.CANFD_LKA_STEERING_ALT)) # LFA and HDA icons if self.frame % 5 == 0 and (not lka_steering or lka_steering_long): can_sends.append(hyundaicanfd.create_lfahda_cluster(self.packer, self.CAN, CC.enabled)) # blinkers if lka_steering and self.CP.flags & HyundaiFlags.ENABLE_BLINKERS: can_sends.extend(hyundaicanfd.create_spas_messages(self.packer, self.CAN, self.frame, CC.leftBlinker, CC.rightBlinker)) if self.CP.openpilotLongitudinalControl: if lka_steering: can_sends.extend(hyundaicanfd.create_adrv_messages(self.packer, self.CAN, self.frame)) else: can_sends.extend(hyundaicanfd.create_fca_warning_light(self.packer, self.CAN, self.frame)) if self.frame % 2 == 0: can_sends.append(hyundaicanfd.create_acc_control(self.packer, self.CAN, CC.enabled, self.accel_last, accel, stopping, CC.cruiseControl.override, set_speed_in_units, hud_control)) self.accel_last = accel else: # button presses can_sends.extend(self.create_button_messages(CC, CS, use_clu11=False)) else: can_sends.append(hyundaican.create_lkas11(self.packer, self.frame, self.CP, apply_torque, apply_steer_req, torque_fault, CS.lkas11, sys_warning, sys_state, CC.enabled, hud_control.leftLaneVisible, hud_control.rightLaneVisible, left_lane_warning, right_lane_warning)) if not self.CP.openpilotLongitudinalControl: can_sends.extend(self.create_button_messages(CC, CS, use_clu11=True)) if self.frame % 2 == 0 and self.CP.openpilotLongitudinalControl: # TODO: unclear if this is needed jerk = 3.0 if actuators.longControlState == LongCtrlState.pid else 1.0 use_fca = self.CP.flags & HyundaiFlags.USE_FCA.value can_sends.extend(hyundaican.create_acc_commands(self.packer, CC.enabled, accel, jerk, int(self.frame / 2), hud_control, set_speed_in_units, stopping, CC.cruiseControl.override, use_fca, self.CP)) # 20 Hz LFA MFA message if self.frame % 5 == 0 and self.CP.flags & HyundaiFlags.SEND_LFA.value: can_sends.append(hyundaican.create_lfahda_mfc(self.packer, CC.enabled)) # 5 Hz ACC options if self.frame % 20 == 0 and self.CP.openpilotLongitudinalControl: can_sends.extend(hyundaican.create_acc_opt(self.packer, self.CP)) # 2 Hz front radar options if self.frame % 50 == 0 and self.CP.openpilotLongitudinalControl: can_sends.append(hyundaican.create_frt_radar_opt(self.packer)) new_actuators = actuators.as_builder() new_actuators.torque = apply_torque / self.params.STEER_MAX new_actuators.torqueOutputCan = apply_torque new_actuators.accel = accel self.frame += 1 return new_actuators, can_sends def create_button_messages(self, CC: structs.CarControl, CS: CarState, use_clu11: bool): can_sends = [] if use_clu11: if CC.cruiseControl.cancel: can_sends.append(hyundaican.create_clu11(self.packer, self.frame, CS.clu11, Buttons.CANCEL, self.CP)) elif CC.cruiseControl.resume: # send resume at a max freq of 10Hz if (self.frame - self.last_button_frame) * DT_CTRL > 0.1: # send 25 messages at a time to increases the likelihood of resume being accepted can_sends.extend([hyundaican.create_clu11(self.packer, self.frame, CS.clu11, Buttons.RES_ACCEL, self.CP)] * 25) if (self.frame - self.last_button_frame) * DT_CTRL >= 0.15: self.last_button_frame = self.frame else: if (self.frame - self.last_button_frame) * DT_CTRL > 0.25: # cruise cancel if CC.cruiseControl.cancel: if self.CP.flags & HyundaiFlags.CANFD_ALT_BUTTONS: can_sends.append(hyundaicanfd.create_acc_cancel(self.packer, self.CP, self.CAN, CS.cruise_info)) self.last_button_frame = self.frame else: for _ in range(20): can_sends.append(hyundaicanfd.create_buttons(self.packer, self.CP, self.CAN, CS.buttons_counter + 1, Buttons.CANCEL)) self.last_button_frame = self.frame # cruise standstill resume elif CC.cruiseControl.resume: if self.CP.flags & HyundaiFlags.CANFD_ALT_BUTTONS: # TODO: resume for alt button cars pass else: for _ in range(20): can_sends.append(hyundaicanfd.create_buttons(self.packer, self.CP, self.CAN, CS.buttons_counter + 1, Buttons.RES_ACCEL)) self.last_button_frame = self.frame return can_sends