#!/usr/bin/env python3 import math from typing import SupportsFloat from cereal import car, log import cereal.messaging as messaging from openpilot.common.conversions import Conversions as CV from openpilot.common.params import Params from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper from openpilot.common.swaglog import cloudlog import numpy as np from collections import deque from opendbc.car.car_helpers import interfaces from opendbc.car.vehicle_model import VehicleModel from openpilot.selfdrive.controls.lib.drive_helpers import clip_curvature, get_lag_adjusted_curvature from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque from openpilot.selfdrive.controls.lib.longcontrol import LongControl from openpilot.common.realtime import DT_CTRL, DT_MDL from openpilot.selfdrive.modeld.constants import ModelConstants from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N State = log.SelfdriveState.OpenpilotState LaneChangeState = log.LaneChangeState LaneChangeDirection = log.LaneChangeDirection ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys()) class Controls: def __init__(self) -> None: self.params = Params() cloudlog.info("controlsd is waiting for CarParams") self.CP = messaging.log_from_bytes(self.params.get("CarParams", block=True), car.CarParams) cloudlog.info("controlsd got CarParams") self.CI = interfaces[self.CP.carFingerprint](self.CP) self.disable_dm = False self.sm = messaging.SubMaster(['liveParameters', 'liveTorqueParameters', 'modelV2', 'selfdriveState', 'liveCalibration', 'liveLocationKalman', 'longitudinalPlan', 'carState', 'carOutput', 'liveDelay', 'carrotMan', 'lateralPlan', 'radarState', 'driverMonitoringState', 'onroadEvents', 'driverAssistance'], poll='selfdriveState') self.pm = messaging.PubMaster(['carControl', 'controlsState']) self.steer_limited_by_controls = False self.curvature = 0.0 self.desired_curvature = 0.0 self.yStd = 0.0 self.LoC = LongControl(self.CP) self.VM = VehicleModel(self.CP) self.LaC: LatControl if self.CP.steerControlType == car.CarParams.SteerControlType.angle: self.LaC = LatControlAngle(self.CP, self.CI) elif self.CP.lateralTuning.which() == 'pid': self.LaC = LatControlPID(self.CP, self.CI) elif self.CP.lateralTuning.which() == 'torque': self.LaC = LatControlTorque(self.CP, self.CI) def update(self): self.sm.update(15) def state_control(self): CS = self.sm['carState'] # Update VehicleModel lp = self.sm['liveParameters'] x = max(lp.stiffnessFactor, 0.1) sr = max(lp.steerRatio, 0.1) * self.params.get_float("SteerRatioRate") / 100.0 custom_sr = self.params.get_float("CustomSR") / 10.0 sr = max(custom_sr if custom_sr > 1.0 else sr, 0.1) self.VM.update_params(x, sr) steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg) self.curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll) # Update Torque Params if self.CP.lateralTuning.which() == 'torque': torque_params = self.sm['liveTorqueParameters'] if self.sm.all_checks(['liveTorqueParameters']) and torque_params.useParams: self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered, torque_params.frictionCoefficientFiltered) long_plan = self.sm['longitudinalPlan'] model_v2 = self.sm['modelV2'] CC = car.CarControl.new_message() CC.enabled = self.sm['selfdriveState'].enabled # carrot gear = car.CarState.GearShifter driving_gear = CS.gearShifter not in (gear.neutral, gear.park, gear.reverse, gear.unknown) lateral_enabled = driving_gear #self.soft_hold_active = CS.softHoldActive #car.OnroadEvent.EventName.softHold in [e.name for e in self.sm['onroadEvents']] # Check which actuators can be enabled standstill = abs(CS.vEgo) <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstill CC.latActive = ((self.sm['selfdriveState'].active or lateral_enabled) and CS.latEnabled and not CS.steerFaultTemporary and not CS.steerFaultPermanent and not standstill) CC.longActive = CC.enabled and not any(e.overrideLongitudinal for e in self.sm['onroadEvents']) and self.CP.openpilotLongitudinalControl actuators = CC.actuators actuators.longControlState = self.LoC.long_control_state # Enable blinkers while lane changing if model_v2.meta.laneChangeState != LaneChangeState.off: CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right if not CC.latActive: self.LaC.reset() if not CC.longActive: self.LoC.reset() # accel PID loop pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, CS.vCruise * CV.KPH_TO_MS) t_since_plan = (self.sm.frame - self.sm.recv_frame['longitudinalPlan']) * DT_CTRL accel, aTargetNow, jerk = self.LoC.update(CC.longActive, CS, long_plan, pid_accel_limits, t_since_plan, self.sm['radarState']) actuators.accel = float(accel) actuators.aTargetNow = float(aTargetNow) actuators.jerk = float(jerk) # Steering PID loop and lateral MPC lat_plan = self.sm['lateralPlan'] curve_speed_abs = abs(self.sm['carrotMan'].vTurnSpeed) self.lanefull_mode_enabled = (lat_plan.useLaneLines and self.params.get_int("UseLaneLineSpeedApply") > 0 and curve_speed_abs > self.params.get_int("UseLaneLineCurveSpeed")) lat_smooth_seconds = self.params.get_float("SteerSmoothSec") * 0.01 steer_actuator_delay = self.params.get_float("SteerActuatorDelay") * 0.01 if steer_actuator_delay == 0.0: steer_actuator_delay = self.sm['liveDelay'].lateralDelay if len(model_v2.position.yStd) > 0: yStd = np.interp(steer_actuator_delay + lat_smooth_seconds, ModelConstants.T_IDXS, model_v2.position.yStd) self.yStd = yStd * 0.1 + self.yStd * 0.9 else: self.yStd = 0.0 if not CC.latActive: new_desired_curvature = self.curvature elif self.lanefull_mode_enabled: if len(lat_plan.curvatures) == 0: new_desired_curvature = self.curvature else: def smooth_value(val, prev_val, tau): alpha = 1 - np.exp(-DT_CTRL / tau) if tau > 0 else 1 return alpha * val + (1 - alpha) * prev_val t_since_plan = (self.sm.frame - self.sm.recv_frame['lateralPlan']) * DT_CTRL curvature = np.interp(steer_actuator_delay + lat_smooth_seconds + t_since_plan, ModelConstants.T_IDXS[:CONTROL_N], lat_plan.curvatures) new_desired_curvature = smooth_value(curvature, self.desired_curvature, lat_smooth_seconds) else: new_desired_curvature = model_v2.action.desiredCurvature self.desired_curvature, curvature_limited = clip_curvature(CS.vEgo, self.desired_curvature, new_desired_curvature, lp.roll) actuators.curvature = float(self.desired_curvature) steer, steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp, self.steer_limited_by_controls, self.desired_curvature, self.sm['liveLocationKalman'], curvature_limited) # TODO what if not available actuators.torque = float(steer) actuators.steeringAngleDeg = float(steeringAngleDeg) actuators.yStd = float(self.yStd) # Ensure no NaNs/Infs for p in ACTUATOR_FIELDS: attr = getattr(actuators, p) if not isinstance(attr, SupportsFloat): continue if not math.isfinite(attr): cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}") setattr(actuators, p, 0.0) return CC, lac_log def publish(self, CC, lac_log): CS = self.sm['carState'] # Orientation and angle rates can be useful for carcontroller # Only calibrated (car) frame is relevant for the carcontroller orientation_value = list(self.sm['liveLocationKalman'].calibratedOrientationNED.value) if len(orientation_value) > 2: CC.orientationNED = orientation_value angular_rate_value = list(self.sm['liveLocationKalman'].angularVelocityCalibrated.value) if len(angular_rate_value) > 2: CC.angularVelocity = angular_rate_value CC.cruiseControl.override = CC.enabled and not CC.longActive and self.CP.openpilotLongitudinalControl CC.cruiseControl.cancel = CS.cruiseState.enabled and (not CC.enabled or not self.CP.pcmCruise) desired_kph = min(CS.vCruiseCluster, self.sm['carrotMan'].desiredSpeed) setSpeed = float(desired_kph * CV.KPH_TO_MS) speeds = self.sm['longitudinalPlan'].speeds if len(speeds): CC.cruiseControl.resume = CC.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1 vCluRatio = CS.vCluRatio if CS.vCluRatio > 0.5 else 1.0 setSpeed = speeds[-1] / vCluRatio hudControl = CC.hudControl hudControl.activeCarrot = self.sm['carrotMan'].activeCarrot lp = self.sm['longitudinalPlan'] if self.CP.pcmCruise: speed_from_pcm = self.params.get_int("SpeedFromPCM") if speed_from_pcm == 1: #toyota hudControl.setSpeed = float(CS.vCruiseCluster * CV.KPH_TO_MS) elif speed_from_pcm == 2: hudControl.setSpeed = float(max(30/3.6, desired_kph * CV.KPH_TO_MS)) elif speed_from_pcm == 3: # honda hudControl.setSpeed = setSpeed if lp.xState == 3 else float(desired_kph * CV.KPH_TO_MS) else: hudControl.setSpeed = float(max(30/3.6, setSpeed)) else: hudControl.setSpeed = setSpeed if lp.xState == 3 else float(desired_kph * CV.KPH_TO_MS) hudControl.speedVisible = CC.enabled hudControl.lanesVisible = CC.enabled hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead hudControl.leadDistanceBars = self.sm['selfdriveState'].personality.raw + 1 hudControl.visualAlert = self.sm['selfdriveState'].alertHudVisual leadOne = self.sm['radarState'].leadOne hudControl.leadDistance = leadOne.dRel if leadOne.status else 0 hudControl.leadRelSpeed = leadOne.vRel if leadOne.status else 0 hudControl.leadRadar = 1 if leadOne.radar else 0 hudControl.leadDPath = leadOne.dPath hudControl.modelDesire = 1 if self.sm['modelV2'].meta.desire == log.Desire.turnLeft else 2 if self.sm['modelV2'].meta.desire == log.Desire.turnRight else 0 hudControl.rightLaneVisible = True hudControl.leftLaneVisible = True if self.sm.valid['driverAssistance']: hudControl.leftLaneDepart = self.sm['driverAssistance'].leftLaneDeparture hudControl.rightLaneDepart = self.sm['driverAssistance'].rightLaneDeparture if self.sm['selfdriveState'].active: CO = self.sm['carOutput'] if self.CP.steerControlType == car.CarParams.SteerControlType.angle: self.steer_limited_by_controls = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \ STEER_ANGLE_SATURATION_THRESHOLD else: self.steer_limited_by_controls = abs(CC.actuators.torque - CO.actuatorsOutput.torque) > 1e-2 # TODO: both controlsState and carControl valids should be set by # sm.all_checks(), but this creates a circular dependency # controlsState dat = messaging.new_message('controlsState') dat.valid = CS.canValid cs = dat.controlsState cs.curvature = self.curvature cs.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan'] cs.lateralPlanMonoTime = self.sm.logMonoTime['modelV2'] cs.desiredCurvature = self.desired_curvature cs.longControlState = self.LoC.long_control_state cs.upAccelCmd = float(self.LoC.pid.p) cs.uiAccelCmd = float(self.LoC.pid.i) cs.ufAccelCmd = float(self.LoC.pid.f) cs.forceDecel = bool((self.sm['driverMonitoringState'].awarenessStatus < 0. and self.params.get_int("DisableDM") == 0) or (self.sm['selfdriveState'].state == State.softDisabling)) lat_tuning = self.CP.lateralTuning.which() if self.CP.steerControlType == car.CarParams.SteerControlType.angle: cs.lateralControlState.angleState = lac_log elif lat_tuning == 'pid': cs.lateralControlState.pidState = lac_log elif lat_tuning == 'torque': cs.lateralControlState.torqueState = lac_log cs.activeLaneLine = self.lanefull_mode_enabled self.pm.send('controlsState', dat) # carControl cc_send = messaging.new_message('carControl') cc_send.valid = CS.canValid cc_send.carControl = CC self.pm.send('carControl', cc_send) def run(self): rk = Ratekeeper(100, print_delay_threshold=None) while True: self.update() CC, lac_log = self.state_control() self.publish(CC, lac_log) rk.monitor_time() def main(): config_realtime_process(4, Priority.CTRL_HIGH) controls = Controls() controls.run() if __name__ == "__main__": main()